Skip to main content

Custom LLM

In Conversational AI Engine interaction scenarios, your use case may require a custom large language model (Custom LLM). This document explains how to integrate a custom LLM into Agora's Conversational AI Engine.

Understand the tech

Agora's Conversational AI Engine interacts with LLM services using the OpenAI API protocol. To integrate a custom LLM, you need to provide an HTTP service compatible with the OpenAI API, capable of handling requests and responses in the OpenAI API format.

This approach enables you to implement additional custom functionalities, including but not limited to:

  • Retrieval-Augmented Generation (RAG): Allows the model to retrieve information from a specific knowledge base.
  • Multimodal Capabilities: Enables the model to generate output in both text and audio formats.
  • Tool Invocation: Allows the model to call external tools.
  • Function Calling: Enables the model to return structured data in the form of function calls.

Prerequisites

Before you begin, ensure that you have:

  • Implemented the basic logic for interacting with a Conversational AI agent by following the REST Quickstart.
  • Set up access to a custom LLM service.
  • Prepared a vector database or retrieval system if using Retrieval-Augmented Generation (RAG).

Implementation

Take the following steps to integrate your custom LLM into Agora's Conversational AI Engine.

Create an OpenAI API-compatible service

To integrate successfully with Agora's Conversational AI Engine, your custom LLM service must provide an interface compatible with the OpenAI Chat Completions API. The key requirements include:

  • Endpoint: A request-handling endpoint, such as https://your-custom-llm-service/chat/completions.
  • Request format: Must accept request parameters adhering to the OpenAI API protocol.
  • Response format: Should return OpenAI API-compatible responses and support the Server-Sent Events (SSE) standard for streaming.

The following example demonstrates how to implement an OpenAI API-compliant interface:

class TextContent(BaseModel):    type: str = "text"    text: strclass ImageContent(BaseModel):    type: str = "image"    image_url: HttpUrlclass AudioContent(BaseModel):    type: str = "input_audio"    input_audio: Dict[str, str]class ToolFunction(BaseModel):    name: str    description: Optional[str]    parameters: Optional[Dict]    strict: bool = Falseclass Tool(BaseModel):    type: str = "function"    function: ToolFunctionclass ToolChoice(BaseModel):    type: str = "function"    function: Optional[Dict]class ResponseFormat(BaseModel):    type: str = "json_schema"    json_schema: Optional[Dict[str, str]]class SystemMessage(BaseModel):    role: str = "system"    content: Union[str, List[str]]class UserMessage(BaseModel):    role: str = "user"    content: Union[str, List[Union[TextContent, ImageContent, AudioContent]]]class AssistantMessage(BaseModel):    role: str = "assistant"    content: Union[str, List[TextContent]] = None    audio: Optional[Dict[str, str]] = None    tool_calls: Optional[List[Dict]] = Noneclass ToolMessage(BaseModel):    role: str = "tool"    content: Union[str, List[str]]    tool_call_id: str# Define the complete request formatclass ChatCompletionRequest(BaseModel):    context: Optional[Dict] = None  # Context information    model: Optional[str] = None  # Model name being used    messages: List[Union[SystemMessage, UserMessage, AssistantMessage, ToolMessage]]  # List of messages    response_format: Optional[ResponseFormat] = None  # Response format    modalities: List[str] = ["text"]  # Default modality is text    audio: Optional[Dict[str, str]] = None  # Assistant's audio response    tools: Optional[List[Tool]] = None  # List of tools    tool_choice: Optional[Union[str, ToolChoice]] = "auto"  # Tool selection    parallel_tool_calls: bool = True  # Whether to call tools in parallel    stream: bool = True  # Default to streaming response    stream_options: Optional[Dict] = None  # Streaming options@app.post("/chat/completions")async def create_chat_completion(request: ChatCompletionRequest):    try:        logger.info(f"Received request: {request.model_dump_json()}")        client = AsyncOpenAI(api_key=os.getenv("YOUR_LLM_API_KEY"))        response = await client.chat.completions.create(            model=request.model,            messages=request.messages,  # Directly use request messages            tool_choice=(                request.tool_choice if request.tools and request.tool_choice else None            ),            tools=request.tools if request.tools else None,            modalities=request.modalities,            audio=request.audio,            response_format=request.response_format,            stream=request.stream,            stream_options=request.stream_options,        )        if not request.stream:            raise HTTPException(                status_code=400, detail="chat completions require streaming"            )        async def generate():            try:                async for chunk in response:                    logger.debug(f"Received chunk: {chunk}")                    yield f"data: {json.dumps(chunk.to_dict())}\n\n"                yield "data: [DONE]\n\n"            except asyncio.CancelledError:                logger.info("Request was cancelled")                raise        return StreamingResponse(generate(), media_type="text/event-stream")    except asyncio.CancelledError:        logger.info("Request was cancelled")        raise HTTPException(status_code=499, detail="Request was cancelled")    except Exception as e:        traceback_str = "".join(traceback.format_tb(e.__traceback__))        error_message = f"{str(e)}\n{traceback_str}"        logger.error(error_message)        raise HTTPException(status_code=500, detail=error_message)

When calling the POST method to Start a conversational AI agent, use the LLM configuration to point your agent to the custom service:


_12
{
_12
"llm": {
_12
"url": "https://your-custom-llm-service/chat/completions",
_12
"api_key": "",
_12
"system_messages": [
_12
{
_12
"role": "system",
_12
"content": "You are a helpful assistant."
_12
}
_12
]
_12
}
_12
}

info

If accessing your custom LLM service requires identity verification, provide the authentication information in the api_key field.

Advanced features

To integrate advanced features such as Retrieval-Augmented Generation and generating outputs in multimodal forms, refer to the following sections.

Retrieval-Augmented Generation

To improve the accuracy and relevance of the agent's responses, use the Retrieval-Augmented Generation (RAG) feature. This feature allows your custom LLM to retrieve information from a specific knowledge base and use the retrieved results as context for generating responses.

The following example simulates the process of retrieving and returning content from a knowledge base and creates the /rag/chat/completions endpoint to incorporate RAG retrieval results when generating responses with the LLM.

async def perform_rag_retrieval(messages: Optional[Dict]) -> str:    """    Retrieve relevant content from the knowledge base using the RAG model.    Args:        messages: The original message list.    Returns:        str: The retrieved text content.    """    # TODO: Implement the actual RAG retrieval logic.    # You can choose the first or last message from the message list as the query,    # then send the query to the RAG model to retrieve relevant content.    # Return the retrieval result.    return "This is relevant content retrieved from the knowledge base."def refact_messages(context: str, messages: Optional[Dict] = None) -> Optional[Dict]:    """    Modify the message list by adding the retrieved context to the original messages.    Args:        context: The retrieved context.        messages: The original message list.    Returns:        List: The modified message list.    """    # TODO: Implement the actual message modification logic.    # This should add the retrieved context to the original message list.    return messages# Random waiting messages.waiting_messages = [    "Just a moment, I'm thinking...",    "Let me think about that for a second...",    "Good question, let me find out...",]@app.post("/rag/chat/completions")async def create_rag_chat_completion(request: ChatCompletionRequest):    try:        logger.info(f"Received RAG request: {request.model_dump_json()}")        if not request.stream:            raise HTTPException(                status_code=400, detail="chat completions require streaming"            )        async def generate():            # First, send a "please wait" prompt.            waiting_message = {                "id": "waiting_msg",                "choices": [                    {                        "index": 0,                        "delta": {                            "role": "assistant",                            "content": random.choice(waiting_messages),                        },                        "finish_reason": None,                    }                ],            }            yield f"data: {json.dumps(waiting_message)}\n\n"            # Perform RAG retrieval.            retrieved_context = await perform_rag_retrieval(request.messages)            # Modify messages.            refacted_messages = refact_messages(retrieved_context, request.messages)            # Request LLM completion.            client = AsyncOpenAI(api_key=os.getenv("<YOUR_LLM_API_KEY>"))            response = await client.chat.completions.create(                model=request.model,                messages=refacted_messages,                tool_choice=(                    request.tool_choice                    if request.tools and request.tool_choice                    else None                ),                tools=request.tools if request.tools else None,                modalities=request.modalities,                audio=request.audio,                response_format=request.response_format,                stream=True,  # Force streaming.                stream_options=request.stream_options,            )            try:                async for chunk in response:                    logger.debug(f"Received RAG chunk: {chunk}")                    yield f"data: {json.dumps(chunk.to_dict())}\n\n"                yield "data: [DONE]\n\n"            except asyncio.CancelledError:                logger.info("RAG stream was cancelled")                raise        return StreamingResponse(generate(), media_type="text/event-stream")    except asyncio.CancelledError:        logger.info("RAG request was cancelled")        raise HTTPException(status_code=499, detail="Request was cancelled")    except Exception as e:        traceback_str = "".join(traceback.format_tb(e.__traceback__))        error_message = f"{str(e)}\n{traceback_str}"        logger.error(error_message)        raise HTTPException(status_code=500, detail=error_message)

When calling the POST method to Start a conversational AI agent, simply point the LLM URL to your RAG interface:


_12
{
_12
"llm": {
_12
"url": "http://your-custom-llm-service/rag/chat/completions",
_12
"api_key": ""
_12
"system_messages": [
_12
{
_12
"role": "system",
_12
"content": "Please answer the user's question based on the following retrieved information: ..."
_12
}
_12
]
_12
}
_12
}

info

If accessing your custom LLM service requires identity verification, provide the authentication information in the api_key field.

Multimodal capabilities

Conversational AI Engine supports LLMs in generating output in multimodal formats, including text and audio. You can create dedicated multimodal interfaces to meet personalized requirements.

The following example demonstrates how to read text and audio files and send them to an LLM to generate an audio response.

async def read_text_file(file_path: str) -> str:    """    Read a text file and return its content    Args:        file_path: Path to the text file    Returns:        str: Content of the text file    """    async with aiofiles.open(file_path, "r") as file:        content = await file.read()    return contentasync def read_pcm_file(    file_path: str, sample_rate: int, duration_ms: int) -> List[bytes]:    """    Read a PCM file and return a list of audio chunks    Args:        file_path: Path to the PCM file        sample_rate: Sampling rate of the audio        duration_ms: Duration of each audio chunk in milliseconds    Returns:        List: List of audio chunks    """    async with aiofiles.open(file_path, "rb") as file:        content = await file.read()    chunk_size = int(sample_rate * 2 * (duration_ms / 1000))    return [content[i : i + chunk_size] for i in range(0, len(content), chunk_size)]@app.post("/audio/chat/completions")async def create_audio_chat_completion(request: ChatCompletionRequest):    try:        logger.info(f"Received audio request: {request.model_dump_json()}")        if not request.stream:            raise HTTPException(                status_code=400, detail="chat completions require streaming"            )        # Example usage, reading text and audio files        # Please replace with your actual logic        text_file_path = "./file.txt"        pcm_file_path = "./file.pcm"        sample_rate = 16000  # Example sampling rate        duration_ms = 40  # 40ms audio chunk        text_content = await read_text_file(text_file_path)        audio_chunks = await read_pcm_file(pcm_file_path, sample_rate, duration_ms)                async def generate():            try:                # Send text content                audio_id = uuid.uuid4().hex                text_message = {                    "id": uuid.uuid4().hex,                    "choices": [                        {                            "index": 0,                            "delta": {                                "audio": {                                    "id": audio_id,                                    "transcript": text_content,                                },                            },                            "finish_reason": None,                        }                    ],                }                yield f"data: {json.dumps(text_message)}\n\n"                # Send audio chunks                for chunk in audio_chunks:                    audio_message = {                        "id": uuid.uuid4().hex,                        "choices": [                            {                                "index": 0,                                "delta": {                                    "audio": {                                        "id": audio_id,                                        "data": base64.b64encode(chunk).decode("utf-8"),                                    },                                },                                "finish_reason": None,                            }                        ],                    }                    yield f"data: {json.dumps(audio_message)}\n\n"                yield "data: [DONE]\n\n"            except asyncio.CancelledError:                logger.info("Audio stream was cancelled")                raise                return StreamingResponse(generate(), media_type="text/event-stream")    except asyncio.CancelledError:        logger.info("Audio request was cancelled")        raise HTTPException(status_code=499, detail="Request was cancelled")    except Exception as e:        traceback_str = "".join(traceback.format_tb(e.__traceback__))        error_message = f"{str(e)}\n{traceback_str}"        logger.error(error_message)        raise HTTPException(status_code=500, detail=error_message)

When calling the POST method to Start a conversational AI agent, use the following configuration to specify the output_modalities:


_14
{
_14
"llm": {
_14
"url": "http://your-custom-llm-service/audio/chat/completions",
_14
"api_key": "your_api_key",
_14
"input_modalities": ["text"],
_14
"output_modalities": ["text", "audio"]
_14
"system_messages": [
_14
{
_14
"role": "system",
_14
"content": "You are a helpful assistant."
_14
}
_14
]
_14
}
_14
}

Reference

This section contains content that completes the information on this page, or points you to documentation that explains other aspects to this product.

Sample project

Agora provides open source sample projects on GitHub for your reference. Download the project or view the source code for a more complete example.

Interface standards

Custom LLM services must be compatible with the OpenAI Chat Completions API interface standard:

  • Request format: Contains parameters such as model, message, and tool call configuration.
  • Response format: Contains the response generated by the model, metadata, and other information
  • Streaming response: Compliant with the SSE (Server-Sent Events) specification

For detailed interface standards, please refer to:

vundefined