Skip to main content

Custom LLM

In Conversational AI Engine interaction scenarios, your use case may require a custom large language model (Custom LLM). This document explains how to integrate a custom LLM into Agora's Conversational AI Engine.

Understand the tech

Agora's Conversational AI Engine interacts with LLM services using the OpenAI API protocol. To integrate a custom LLM, you need to provide an HTTP service compatible with the OpenAI API, capable of handling requests and responses in the OpenAI API format.

This approach enables you to implement additional custom functionalities, including but not limited to:

  • Retrieval-Augmented Generation (RAG): Allows the model to retrieve information from a specific knowledge base.
  • Multimodal Capabilities: Enables the model to generate output in both text and audio formats.
  • Tool Invocation: Allows the model to call external tools.
  • Function Calling: Enables the model to return structured data in the form of function calls.

Prerequisites

  • Implemented the basic logic for interacting with a Conversational AI agent by following the REST Quickstart.
  • Set up access to a custom LLM service.
  • Prepared a vector database or retrieval system if using Retrieval-Augmented Generation (RAG).

Implementation

Take the following steps to integrate your custom LLM into Agora's Conversational AI Engine.

Create an OpenAI API-compatible service

To integrate successfully with Agora's Conversational AI Engine, your custom LLM service must provide an interface compatible with the OpenAI Chat Completions API. The key requirements include:

  • Endpoint: A request-handling endpoint, such as https://your-custom-llm-service/chat/completions.
  • Request format: Must accept request parameters adhering to the OpenAI API protocol.
  • Response format: Should return OpenAI API-compatible responses and support the Server-Sent Events (SSE) standard for streaming.

The following example demonstrates how to implement an OpenAI API-compliant interface:

class TextContent(BaseModel):    type: str = "text"    text: strclass ImageContent(BaseModel):    type: str = "image"    image_url: HttpUrlclass AudioContent(BaseModel):    type: str = "input_audio"    input_audio: Dict[str, str]class ToolFunction(BaseModel):    name: str    description: Optional[str]    parameters: Optional[Dict]    strict: bool = Falseclass Tool(BaseModel):    type: str = "function"    function: ToolFunctionclass ToolChoice(BaseModel):    type: str = "function"    function: Optional[Dict]class ResponseFormat(BaseModel):    type: str = "json_schema"    json_schema: Optional[Dict[str, str]]class SystemMessage(BaseModel):    role: str = "system"    content: Union[str, List[str]]class UserMessage(BaseModel):    role: str = "user"    content: Union[str, List[Union[TextContent, ImageContent, AudioContent]]]class AssistantMessage(BaseModel):    role: str = "assistant"    content: Union[str, List[TextContent]] = None    audio: Optional[Dict[str, str]] = None    tool_calls: Optional[List[Dict]] = Noneclass ToolMessage(BaseModel):    role: str = "tool"    content: Union[str, List[str]]    tool_call_id: str# Define the complete request formatclass ChatCompletionRequest(BaseModel):    context: Optional[Dict] = None  # Context information    model: Optional[str] = None  # Model name being used    messages: List[Union[SystemMessage, UserMessage, AssistantMessage, ToolMessage]]  # List of messages    response_format: Optional[ResponseFormat] = None  # Response format    modalities: List[str] = ["text"]  # Default modality is text    audio: Optional[Dict[str, str]] = None  # Assistant's audio response    tools: Optional[List[Tool]] = None  # List of tools    tool_choice: Optional[Union[str, ToolChoice]] = "auto"  # Tool selection    parallel_tool_calls: bool = True  # Whether to call tools in parallel    stream: bool = True  # Default to streaming response    stream_options: Optional[Dict] = None  # Streaming options@app.post("/chat/completions")async def create_chat_completion(request: ChatCompletionRequest):    try:        logger.info(f"Received request: {request.model_dump_json()}")        client = AsyncOpenAI(api_key=os.getenv("YOUR_LLM_API_KEY"))        response = await client.chat.completions.create(            model=request.model,            messages=request.messages,  # Directly use request messages            tool_choice=(                request.tool_choice if request.tools and request.tool_choice else None            ),            tools=request.tools if request.tools else None,            modalities=request.modalities,            audio=request.audio,            response_format=request.response_format,            stream=request.stream,            stream_options=request.stream_options,        )        if not request.stream:            raise HTTPException(                status_code=400, detail="chat completions require streaming"            )        async def generate():            try:                async for chunk in response:                    logger.debug(f"Received chunk: {chunk}")                    yield f"data: {json.dumps(chunk.to_dict())}\n\n"                yield "data: [DONE]\n\n"            except asyncio.CancelledError:                logger.info("Request was cancelled")                raise        return StreamingResponse(generate(), media_type="text/event-stream")    except asyncio.CancelledError:        logger.info("Request was cancelled")        raise HTTPException(status_code=499, detail="Request was cancelled")    except Exception as e:        traceback_str = "".join(traceback.format_tb(e.__traceback__))        error_message = f"{str(e)}\n{traceback_str}"        logger.error(error_message)        raise HTTPException(status_code=500, detail=error_message)

When calling the POST method to Start a conversational AI agent, use the LLM configuration to point your agent to the custom service:


_12
{
_12
"llm": {
_12
"url": "https://your-custom-llm-service/chat/completions",
_12
"api_key": "",
_12
"system_messages": [
_12
{
_12
"role": "system",
_12
"content": "You are a helpful assistant."
_12
}
_12
]
_12
}
_12
}

info

If accessing your custom LLM service requires identity verification, provide the authentication information in the api_key field.

Configure Agora's Conversational AI Engine

When you Start a conversational AI agent, use the llm configuration to point your agent to the custom service:

info

If accessing your custom LLM service requires identity verification, provide the authentication information in the api_key field.


_12
{
_12
"llm": {
_12
"url": "https://your-custom-llm-service/chat/completions",
_12
"api_key": "",
_12
"system_messages": [
_12
{
_12
"role": "system",
_12
"content": "You are a helpful assistant."
_12
}
_12
]
_12
}
_12
}

Advanced features

To integrate advanced features such as Retrieval-Augmented Generation and generating outputs in multimodal forms, refer to the following sections.

Retrieval-Augmented Generation

To improve the accuracy and relevance of the agent's responses, use the Retrieval-Augmented Generation (RAG) feature. This feature allows your custom LLM to retrieve information from a specific knowledge base and use the retrieved results as context for generating responses.

The following example simulates the process of retrieving and returning content from a knowledge base and creates the /rag/chat/completions endpoint to incorporate RAG retrieval results when generating responses with the LLM.

async def perform_rag_retrieval(messages: Optional[Dict]) -> str:    """    Retrieve relevant content from the knowledge base using the RAG model.    Args:        messages: The original message list.    Returns:        str: The retrieved text content.    """    # TODO: Implement the actual RAG retrieval logic.    # You can choose the first or last message from the message list as the query,    # then send the query to the RAG model to retrieve relevant content.    # Return the retrieval result.    return "This is relevant content retrieved from the knowledge base."def refact_messages(context: str, messages: Optional[Dict] = None) -> Optional[Dict]:    """    Modify the message list by adding the retrieved context to the original messages.    Args:        context: The retrieved context.        messages: The original message list.    Returns:        List: The modified message list.    """    # TODO: Implement the actual message modification logic.    # This should add the retrieved context to the original message list.    return messages# Random waiting messages.waiting_messages = [    "Just a moment, I'm thinking...",    "Let me think about that for a second...",    "Good question, let me find out...",]@app.post("/rag/chat/completions")async def create_rag_chat_completion(request: ChatCompletionRequest):    try:        logger.info(f"Received RAG request: {request.model_dump_json()}")        if not request.stream:            raise HTTPException(                status_code=400, detail="chat completions require streaming"            )        async def generate():            # First, send a "please wait" prompt.            waiting_message = {                "id": "waiting_msg",                "choices": [                    {                        "index": 0,                        "delta": {                            "role": "assistant",                            "content": random.choice(waiting_messages),                        },                        "finish_reason": None,                    }                ],            }            yield f"data: {json.dumps(waiting_message)}\n\n"            # Perform RAG retrieval.            retrieved_context = await perform_rag_retrieval(request.messages)            # Modify messages.            refacted_messages = refact_messages(retrieved_context, request.messages)            # Request LLM completion.            client = AsyncOpenAI(api_key=os.getenv("<YOUR_LLM_API_KEY>"))            response = await client.chat.completions.create(                model=request.model,                messages=refacted_messages,                tool_choice=(                    request.tool_choice                    if request.tools and request.tool_choice                    else None                ),                tools=request.tools if request.tools else None,                modalities=request.modalities,                audio=request.audio,                response_format=request.response_format,                stream=True,  # Force streaming.                stream_options=request.stream_options,            )            try:                async for chunk in response:                    logger.debug(f"Received RAG chunk: {chunk}")                    yield f"data: {json.dumps(chunk.to_dict())}\n\n"                yield "data: [DONE]\n\n"            except asyncio.CancelledError:                logger.info("RAG stream was cancelled")                raise        return StreamingResponse(generate(), media_type="text/event-stream")    except asyncio.CancelledError:        logger.info("RAG request was cancelled")        raise HTTPException(status_code=499, detail="Request was cancelled")    except Exception as e:        traceback_str = "".join(traceback.format_tb(e.__traceback__))        error_message = f"{str(e)}\n{traceback_str}"        logger.error(error_message)        raise HTTPException(status_code=500, detail=error_message)

When calling the POST method to Start a conversational AI agent, simply point the LLM URL to your RAG interface:


_12
{
_12
"llm": {
_12
"url": "http://your-custom-llm-service/rag/chat/completions",
_12
"api_key": ""
_12
"system_messages": [
_12
{
_12
"role": "system",
_12
"content": "Please answer the user's question based on the following retrieved information: ..."
_12
}
_12
]
_12
}
_12
}

info

If accessing your custom LLM service requires identity verification, provide the authentication information in the api_key field.

Multimodal capabilities

Conversational AI Engine supports LLMs in generating output in multimodal formats, including text and audio. You can create dedicated multimodal interfaces to meet personalized requirements. For more information about using audio output, see Audio output mode.

The following example demonstrates how to read text and audio files and send them to an LLM to generate an audio response.

async def read_text_file(file_path: str) -> str:    """    Read a text file and return its content    Args:        file_path: Path to the text file    Returns:        str: Content of the text file    """    async with aiofiles.open(file_path, "r") as file:        content = await file.read()    return contentasync def read_pcm_file(    file_path: str, sample_rate: int, duration_ms: int) -> List[bytes]:    """    Read a PCM file and return a list of audio chunks    Args:        file_path: Path to the PCM file        sample_rate: Sampling rate of the audio        duration_ms: Duration of each audio chunk in milliseconds    Returns:        List: List of audio chunks    """    async with aiofiles.open(file_path, "rb") as file:        content = await file.read()    chunk_size = int(sample_rate * 2 * (duration_ms / 1000))    return [content[i : i + chunk_size] for i in range(0, len(content), chunk_size)]@app.post("/audio/chat/completions")async def create_audio_chat_completion(request: ChatCompletionRequest):    try:        logger.info(f"Received audio request: {request.model_dump_json()}")        if not request.stream:            raise HTTPException(                status_code=400, detail="chat completions require streaming"            )        # Example usage, reading text and audio files        # Please replace with your actual logic        text_file_path = "./file.txt"        pcm_file_path = "./file.pcm"        sample_rate = 16000  # Example sampling rate        duration_ms = 40  # 40ms audio chunk        text_content = await read_text_file(text_file_path)        audio_chunks = await read_pcm_file(pcm_file_path, sample_rate, duration_ms)                async def generate():            try:                # Send text content                audio_id = uuid.uuid4().hex                text_message = {                    "id": uuid.uuid4().hex,                    "choices": [                        {                            "index": 0,                            "delta": {                                "audio": {                                    "id": audio_id,                                    "transcript": text_content,                                },                            },                            "finish_reason": None,                        }                    ],                }                yield f"data: {json.dumps(text_message)}\n\n"                # Send audio chunks                for chunk in audio_chunks:                    audio_message = {                        "id": uuid.uuid4().hex,                        "choices": [                            {                                "index": 0,                                "delta": {                                    "audio": {                                        "id": audio_id,                                        "data": base64.b64encode(chunk).decode("utf-8"),                                    },                                },                                "finish_reason": None,                            }                        ],                    }                    yield f"data: {json.dumps(audio_message)}\n\n"                yield "data: [DONE]\n\n"            except asyncio.CancelledError:                logger.info("Audio stream was cancelled")                raise                return StreamingResponse(generate(), media_type="text/event-stream")    except asyncio.CancelledError:        logger.info("Audio request was cancelled")        raise HTTPException(status_code=499, detail="Request was cancelled")    except Exception as e:        traceback_str = "".join(traceback.format_tb(e.__traceback__))        error_message = f"{str(e)}\n{traceback_str}"        logger.error(error_message)        raise HTTPException(status_code=500, detail=error_message)

When calling the POST method to Start a conversational AI agent, use the following configuration to specify the output_modalities:


_14
{
_14
"llm": {
_14
"url": "http://your-custom-llm-service/audio/chat/completions",
_14
"api_key": "your_api_key",
_14
"input_modalities": ["text"],
_14
"output_modalities": ["text", "audio"]
_14
"system_messages": [
_14
{
_14
"role": "system",
_14
"content": "You are a helpful assistant."
_14
}
_14
]
_14
}
_14
}

Control agent behavior using first-packet metadata

In streaming response (SSE) scenarios, a single LLM response is segmented into multiple chunks for transmission. Agora's Conversational AI Engine supports processing a special first-chunk response where object is chat.completion.custom_metadata, and uses the metadata field in that response to control the following agent behaviors:

To use these features, modify your custom LLM service to output a special first-chunk response that meets the Conversational AI Engine's requirements. The diagram below shows how the engine processes this metadata chunk before handling the rest of the response.

Configure LLM response interruption

The Conversational AI Engine supports letting the custom LLM determine whether a response can be interrupted by the user. This is particularly useful for long-running tasks, such as querying a database or calling an external API, where the agent must remain responsive while the task completes in the background.

When the custom LLM initiates a long-running task, it sends a custom_metadata packet to the Conversational AI Engine with interruptable set to false, followed by a chat.completion.chunk response containing a filler phrase, for example "Just a moment, I'm processing your request". The Conversational AI Engine passes the filler phrase to TTS and plays it to the user while the task completes in the background. Setting interruptable to false prevents user speech from interrupting the TTS broadcast of the filler phrase.

In your custom LLM service, output a first-chunk response with the following structure:


_8
{
_8
"id": "response-id",
_8
"object": "chat.completion.custom_metadata",
_8
"choices": [],
_8
"metadata": {
_8
"interruptable": false
_8
}
_8
}

The interruptable field configures whether the TTS broadcast of this LLM response can be interrupted by user speech. Set it to false to prevent interruption, true to allow it, or omit it to leave the current interruption mode unchanged.

caution

The Conversational AI Engine only processes the first chunk response where object is chat.completion.custom_metadata and ignores the content in choices. Subsequent responses are treated as normal responses and do not support metadata configuration. Ensure that the content requiring TTS broadcast is generated in the second and subsequent responses.

ASR behavior during non-interruptible responses

Setting interruptable to false does not disable automatic speech recognition (ASR). The engine continues to capture and transcribe user speech during TTS playback, but holds the transcript and does not process it until TTS finishes. This means:

  • If a user speaks during a non-interruptible TTS broadcast, their speech is not lost. It is queued and processed after playback ends.
  • For long-running tasks, you are responsible for managing the agent's state outside the response lifecycle. Consider how your application should handle queued speech: for example, whether to process it normally, discard it, or use it to cancel the background task.

Update TTS parameters in real time

The Conversational AI Engine supports real-time TTS parameter updates during a conversation, enabling a more immersive interactive experience. This feature is useful when the agent needs to adjust its voice dynamically based on user input, for example when a user requests a different voice or when the LLM detects a change in the user's mood and adjusts volume, pitch, or speech rate accordingly.

When the LLM determines that TTS parameters need to be updated, modify your custom LLM service to output a first-chunk response with the following structure:


_13
{
_13
"id": "response-id",
_13
"object": "chat.completion.custom_metadata",
_13
"choices": [],
_13
"metadata": {
_13
"tts_params": {
_13
"params": {
_13
"voice_type": "female_1",
_13
"rate": 1.1
_13
}
_13
}
_13
}
_13
}

The tts_params.params field specifies the TTS parameters to update. For the available parameters, refer to your TTS vendor's official documentation.

Reference

This section contains content that completes the information on this page, or points you to documentation that explains other aspects to this product.

Sample project

Agora provides open source sample projects on GitHub for your reference. Download the project or view the source code for a more complete example.

Interface standards

Custom LLM services must be compatible with the OpenAI Chat Completions API interface standard:

  • Request format: Contains parameters such as model, message, and tool call configuration.
  • Response format: Contains the response generated by the model, metadata, and other information
  • Streaming response: Compliant with the SSE (Server-Sent Events) specification

For detailed interface standards, please refer to: