In Conversational AI Engine interaction scenarios, your use case may require a custom large language model (Custom LLM). This document explains how to integrate a custom LLM into Agora's Conversational AI Engine.
Agora's Conversational AI Engine interacts with LLM services using the OpenAI API protocol. To integrate a custom LLM, you need to provide an HTTP service compatible with the OpenAI API, capable of handling requests and responses in the OpenAI API format.
This approach enables you to implement additional custom functionalities, including but not limited to:
- Retrieval-Augmented Generation (RAG): Allows the model to retrieve information from a specific knowledge base.
- Multimodal Capabilities: Enables the model to generate output in both text and audio formats.
- Tool Invocation: Allows the model to call external tools.
- Function Calling: Enables the model to return structured data in the form of function calls.
Before you begin, ensure that you have:
- Implemented the basic logic for interacting with a Conversational AI agent by following the REST Quickstart.
- Set up access to a custom LLM service.
- Prepared a vector database or retrieval system if using Retrieval-Augmented Generation (RAG).
Take the following steps to integrate your custom LLM into Agora's Conversational AI Engine.
To integrate successfully with Agora's Conversational AI Engine, your custom LLM service must provide an interface compatible with the OpenAI Chat Completions API. The key requirements include:
- Endpoint: A request-handling endpoint, such as
https://your-custom-llm-service/chat/completions
.
- Request format: Must accept request parameters adhering to the OpenAI API protocol.
- Response format: Should return OpenAI API-compatible responses and support the Server-Sent Events (SSE) standard for streaming.
The following example demonstrates how to implement an OpenAI API-compliant interface:
_105class TextContent(BaseModel):
_105class ImageContent(BaseModel):
_105class AudioContent(BaseModel):
_105 type: str = "input_audio"
_105 input_audio: Dict[str, str]
_105class ToolFunction(BaseModel):
_105 description: Optional[str]
_105 parameters: Optional[Dict]
_105 strict: bool = False
_105class Tool(BaseModel):
_105 type: str = "function"
_105 function: ToolFunction
_105class ToolChoice(BaseModel):
_105 type: str = "function"
_105 function: Optional[Dict]
_105class ResponseFormat(BaseModel):
_105 type: str = "json_schema"
_105 json_schema: Optional[Dict[str, str]]
_105class SystemMessage(BaseModel):
_105 role: str = "system"
_105 content: Union[str, List[str]]
_105class UserMessage(BaseModel):
_105 content: Union[str, List[Union[TextContent, ImageContent, AudioContent]]]
_105class AssistantMessage(BaseModel):
_105 role: str = "assistant"
_105 content: Union[str, List[TextContent]] = None
_105 audio: Optional[Dict[str, str]] = None
_105 tool_calls: Optional[List[Dict]] = None
_105class ToolMessage(BaseModel):
_105 content: Union[str, List[str]]
_105# Define the complete request format
_105class ChatCompletionRequest(BaseModel):
_105 context: Optional[Dict] = None # Context information
_105 model: Optional[str] = None # Model name being used
_105 messages: List[Union[SystemMessage, UserMessage, AssistantMessage, ToolMessage]] # List of messages
_105 response_format: Optional[ResponseFormat] = None # Response format
_105 modalities: List[str] = ["text"] # Default modality is text
_105 audio: Optional[Dict[str, str]] = None # Assistant's audio response
_105 tools: Optional[List[Tool]] = None # List of tools
_105 tool_choice: Optional[Union[str, ToolChoice]] = "auto" # Tool selection
_105 parallel_tool_calls: bool = True # Whether to call tools in parallel
_105 stream: bool = True # Default to streaming response
_105 stream_options: Optional[Dict] = None # Streaming options
_105@app.post("/chat/completions")
_105async def create_chat_completion(request: ChatCompletionRequest):
_105 logger.info(f"Received request: {request.model_dump_json()}")
_105 client = AsyncOpenAI(api_key=os.getenv("YOUR_LLM_API_KEY"))
_105 response = await client.chat.completions.create(
_105 model=request.model,
_105 messages=request.messages, # Directly use request messages
_105 request.tool_choice if request.tools and request.tool_choice else None
_105 tools=request.tools if request.tools else None,
_105 modalities=request.modalities,
_105 audio=request.audio,
_105 response_format=request.response_format,
_105 stream=request.stream,
_105 stream_options=request.stream_options,
_105 if not request.stream:
_105 raise HTTPException(
_105 status_code=400, detail="chat completions require streaming"
_105 async def generate():
_105 async for chunk in response:
_105 logger.debug(f"Received chunk: {chunk}")
_105 yield f"data: {json.dumps(chunk.to_dict())}\n\n"
_105 yield "data: [DONE]\n\n"
_105 except asyncio.CancelledError:
_105 logger.info("Request was cancelled")
_105 return StreamingResponse(generate(), media_type="text/event-stream")
_105 except asyncio.CancelledError:
_105 logger.info("Request was cancelled")
_105 raise HTTPException(status_code=499, detail="Request was cancelled")
_105 except Exception as e:
_105 traceback_str = "".join(traceback.format_tb(e.__traceback__))
_105 error_message = f"{str(e)}\n{traceback_str}"
_105 logger.error(error_message)
_105 raise HTTPException(status_code=500, detail=error_message)
When calling the POST method to Start a conversational AI agent, use the LLM configuration to point your agent to the custom service:
_12 "url": "https://your-custom-llm-service/chat/completions",
_12 "content": "You are a helpful assistant."
If accessing your custom LLM service requires identity verification, provide the authentication information in the api_key
field.
To integrate advanced features such as Retrieval-Augmented Generation and generating outputs in multimodal forms, refer to the following sections.
To improve the accuracy and relevance of the agent's responses, use the Retrieval-Augmented Generation (RAG) feature. This feature allows your custom LLM to retrieve information from a specific knowledge base and use the retrieved results as context for generating responses.
The following example simulates the process of retrieving and returning content from a knowledge base and creates the /rag/chat/completions
endpoint to incorporate RAG retrieval results when generating responses with the LLM.
_113async def perform_rag_retrieval(messages: Optional[Dict]) -> str:
_113 Retrieve relevant content from the knowledge base using the RAG model.
_113 messages: The original message list.
_113 str: The retrieved text content.
_113 # TODO: Implement the actual RAG retrieval logic.
_113 # You can choose the first or last message from the message list as the query,
_113 # then send the query to the RAG model to retrieve relevant content.
_113 # Return the retrieval result.
_113 return "This is relevant content retrieved from the knowledge base."
_113def refact_messages(context: str, messages: Optional[Dict] = None) -> Optional[Dict]:
_113 Modify the message list by adding the retrieved context to the original messages.
_113 context: The retrieved context.
_113 messages: The original message list.
_113 List: The modified message list.
_113 # TODO: Implement the actual message modification logic.
_113 # This should add the retrieved context to the original message list.
_113# Random waiting messages.
_113 "Just a moment, I'm thinking...",
_113 "Let me think about that for a second...",
_113 "Good question, let me find out...",
_113@app.post("/rag/chat/completions")
_113async def create_rag_chat_completion(request: ChatCompletionRequest):
_113 logger.info(f"Received RAG request: {request.model_dump_json()}")
_113 if not request.stream:
_113 raise HTTPException(
_113 status_code=400, detail="chat completions require streaming"
_113 async def generate():
_113 # First, send a "please wait" prompt.
_113 "id": "waiting_msg",
_113 "role": "assistant",
_113 "content": random.choice(waiting_messages),
_113 "finish_reason": None,
_113 yield f"data: {json.dumps(waiting_message)}\n\n"
_113 # Perform RAG retrieval.
_113 retrieved_context = await perform_rag_retrieval(request.messages)
_113 refacted_messages = refact_messages(retrieved_context, request.messages)
_113 # Request LLM completion.
_113 client = AsyncOpenAI(api_key=os.getenv("<YOUR_LLM_API_KEY>"))
_113 response = await client.chat.completions.create(
_113 model=request.model,
_113 messages=refacted_messages,
_113 if request.tools and request.tool_choice
_113 tools=request.tools if request.tools else None,
_113 modalities=request.modalities,
_113 audio=request.audio,
_113 response_format=request.response_format,
_113 stream=True, # Force streaming.
_113 stream_options=request.stream_options,
_113 async for chunk in response:
_113 logger.debug(f"Received RAG chunk: {chunk}")
_113 yield f"data: {json.dumps(chunk.to_dict())}\n\n"
_113 yield "data: [DONE]\n\n"
_113 except asyncio.CancelledError:
_113 logger.info("RAG stream was cancelled")
_113 return StreamingResponse(generate(), media_type="text/event-stream")
_113 except asyncio.CancelledError:
_113 logger.info("RAG request was cancelled")
_113 raise HTTPException(status_code=499, detail="Request was cancelled")
_113 except Exception as e:
_113 traceback_str = "".join(traceback.format_tb(e.__traceback__))
_113 error_message = f"{str(e)}\n{traceback_str}"
_113 logger.error(error_message)
_113 raise HTTPException(status_code=500, detail=error_message)
When calling the POST method to Start a conversational AI agent, simply point the LLM URL to your RAG interface:
_12 "url": "http://your-custom-llm-service/rag/chat/completions",
_12 "content": "Please answer the user's question based on the following retrieved information: ..."
If accessing your custom LLM service requires identity verification, provide the authentication information in the api_key
field.
Conversational AI Engine supports LLMs in generating output in multimodal formats, including text and audio. You can create dedicated multimodal interfaces to meet personalized requirements.
The following example demonstrates how to read text and audio files and send them to an LLM to generate an audio response.
_100async def read_text_file(file_path: str) -> str:
_100 Read a text file and return its contents.
_100 file_path: Path to the text file.
_100 str: Contents of the text file.
_100 async with aiofiles.open(file_path, "r") as file:
_100 content = await file.read()
_100async def read_pcm_file(
_100 file_path: str, sample_rate: int, duration_ms: int
_100 Read a PCM file and return a list of audio chunks.
_100 file_path: Path to the PCM file.
_100 sample_rate: Sample rate of the audio.
_100 duration_ms: Duration of each audio chunk in milliseconds.
_100 List: List of audio chunks.
_100 async with aiofiles.open(file_path, "rb") as file:
_100 content = await file.read()
_100 chunk_size = int(sample_rate * 2 * (duration_ms / 1000))
_100 return [content[i : i + chunk_size] for i in range(0, len(content), chunk_size)]
_100@app.post("/audio/chat/completions")
_100async def create_audio_chat_completion(request: ChatCompletionRequest):
_100 logger.info(f"Received audio request: {request.model_dump_json()}")
_100 if not request.stream:
_100 raise HTTPException(
_100 status_code=400, detail="chat completions require streaming"
_100 # Example usage: Reading text and audio files
_100 # Replace with your actual logic
_100 text_file_path = "./file.txt"
_100 pcm_file_path = "./file.pcm"
_100 sample_rate = 16000 # Example sample rate
_100 duration_ms = 40 # 40ms audio chunks
_100 text_content = await read_text_file(text_file_path)
_100 audio_chunks = await read_pcm_file(pcm_file_path, sample_rate, duration_ms)
_100 async def generate():
_100 audio_id = uuid.uuid4().hex
_100 "id": uuid.uuid4().hex,
_100 "transcript": text_content,
_100 "finish_reason": None,
_100 yield f"data: {json.dumps(text_message)}\n\n"
_100 for chunk in audio_chunks:
_100 "id": uuid.uuid4().hex,
_100 "data": base64.b64encode(chunk).decode("utf-8"),
_100 "finish_reason": None,
When calling the POST method to Start a conversational AI agent, use the following configuration to specify the output_modalities
:
_14 "url": "http://your-custom-llm-service/audio/chat/completions",
_14 "api_key": "your_api_key",
_14 "input_modalities": ["text"],
_14 "output_modalities": ["text", "audio"]
_14 "content": "You are a helpful assistant."
This section contains content that completes the information on this page, or points you to documentation that explains other aspects to this product.
Agora provides an open source Agora Conversational AI Engine server project on GitHub for your reference. Download the project or view the source code for a more complete example.
Custom LLM services must be compatible with the OpenAI Chat Completions API interface standard:
- Request format: Contains parameters such as model, message, and tool call configuration.
- Response format: Contains the response generated by the model, metadata, and other information
- Streaming response: Compliant with the SSE (Server-Sent Events) specification
For detailed interface standards, please refer to: