Skip to main content

Custom LLM

In Conversational AI Engine interaction scenarios, your use case may require a custom large language model (Custom LLM). This document explains how to integrate a custom LLM into Agora's Conversational AI Engine.

Understand the tech

Agora's Conversational AI Engine interacts with LLM services using the OpenAI API protocol. To integrate a custom LLM, you need to provide an HTTP service compatible with the OpenAI API, capable of handling requests and responses in the OpenAI API format.

This approach enables you to implement additional custom functionalities, including but not limited to:

  • Retrieval-Augmented Generation (RAG): Allows the model to retrieve information from a specific knowledge base.
  • Multimodal Capabilities: Enables the model to generate output in both text and audio formats.
  • Tool Invocation: Allows the model to call external tools.
  • Function Calling: Enables the model to return structured data in the form of function calls.

Prerequisites

Before you begin, ensure that you have:

  • Implemented the basic logic for interacting with a Conversational AI agent by following the REST Quickstart.
  • Set up access to a custom LLM service.
  • Prepared a vector database or retrieval system if using Retrieval-Augmented Generation (RAG).

Implementation

Take the following steps to integrate your custom LLM into Agora's Conversational AI Engine.

Create an OpenAI API-compatible service

To integrate successfully with Agora's Conversational AI Engine, your custom LLM service must provide an interface compatible with the OpenAI Chat Completions API. The key requirements include:

  • Endpoint: A request-handling endpoint, such as https://your-custom-llm-service/chat/completions.
  • Request format: Must accept request parameters adhering to the OpenAI API protocol.
  • Response format: Should return OpenAI API-compatible responses and support the Server-Sent Events (SSE) standard for streaming.

The following example demonstrates how to implement an OpenAI API-compliant interface:


_105
class TextContent(BaseModel):
_105
type: str = "text"
_105
text: str
_105
_105
class ImageContent(BaseModel):
_105
type: str = "image"
_105
image_url: HttpUrl
_105
_105
class AudioContent(BaseModel):
_105
type: str = "input_audio"
_105
input_audio: Dict[str, str]
_105
_105
class ToolFunction(BaseModel):
_105
name: str
_105
description: Optional[str]
_105
parameters: Optional[Dict]
_105
strict: bool = False
_105
_105
class Tool(BaseModel):
_105
type: str = "function"
_105
function: ToolFunction
_105
_105
class ToolChoice(BaseModel):
_105
type: str = "function"
_105
function: Optional[Dict]
_105
_105
class ResponseFormat(BaseModel):
_105
type: str = "json_schema"
_105
json_schema: Optional[Dict[str, str]]
_105
_105
class SystemMessage(BaseModel):
_105
role: str = "system"
_105
content: Union[str, List[str]]
_105
_105
class UserMessage(BaseModel):
_105
role: str = "user"
_105
content: Union[str, List[Union[TextContent, ImageContent, AudioContent]]]
_105
_105
class AssistantMessage(BaseModel):
_105
role: str = "assistant"
_105
content: Union[str, List[TextContent]] = None
_105
audio: Optional[Dict[str, str]] = None
_105
tool_calls: Optional[List[Dict]] = None
_105
_105
class ToolMessage(BaseModel):
_105
role: str = "tool"
_105
content: Union[str, List[str]]
_105
tool_call_id: str
_105
_105
# Define the complete request format
_105
class ChatCompletionRequest(BaseModel):
_105
context: Optional[Dict] = None # Context information
_105
model: Optional[str] = None # Model name being used
_105
messages: List[Union[SystemMessage, UserMessage, AssistantMessage, ToolMessage]] # List of messages
_105
response_format: Optional[ResponseFormat] = None # Response format
_105
modalities: List[str] = ["text"] # Default modality is text
_105
audio: Optional[Dict[str, str]] = None # Assistant's audio response
_105
tools: Optional[List[Tool]] = None # List of tools
_105
tool_choice: Optional[Union[str, ToolChoice]] = "auto" # Tool selection
_105
parallel_tool_calls: bool = True # Whether to call tools in parallel
_105
stream: bool = True # Default to streaming response
_105
stream_options: Optional[Dict] = None # Streaming options
_105
_105
@app.post("/chat/completions")
_105
async def create_chat_completion(request: ChatCompletionRequest):
_105
try:
_105
logger.info(f"Received request: {request.model_dump_json()}")
_105
client = AsyncOpenAI(api_key=os.getenv("YOUR_LLM_API_KEY"))
_105
response = await client.chat.completions.create(
_105
model=request.model,
_105
messages=request.messages, # Directly use request messages
_105
tool_choice=(
_105
request.tool_choice if request.tools and request.tool_choice else None
_105
),
_105
tools=request.tools if request.tools else None,
_105
modalities=request.modalities,
_105
audio=request.audio,
_105
response_format=request.response_format,
_105
stream=request.stream,
_105
stream_options=request.stream_options,
_105
)
_105
if not request.stream:
_105
raise HTTPException(
_105
status_code=400, detail="chat completions require streaming"
_105
)
_105
_105
async def generate():
_105
try:
_105
async for chunk in response:
_105
logger.debug(f"Received chunk: {chunk}")
_105
yield f"data: {json.dumps(chunk.to_dict())}\n\n"
_105
yield "data: [DONE]\n\n"
_105
except asyncio.CancelledError:
_105
logger.info("Request was cancelled")
_105
raise
_105
_105
return StreamingResponse(generate(), media_type="text/event-stream")
_105
except asyncio.CancelledError:
_105
logger.info("Request was cancelled")
_105
raise HTTPException(status_code=499, detail="Request was cancelled")
_105
except Exception as e:
_105
traceback_str = "".join(traceback.format_tb(e.__traceback__))
_105
error_message = f"{str(e)}\n{traceback_str}"
_105
logger.error(error_message)
_105
raise HTTPException(status_code=500, detail=error_message)

When calling the POST method to Start a conversational AI agent, use the LLM configuration to point your agent to the custom service:


_12
{
_12
"llm": {
_12
"url": "https://your-custom-llm-service/chat/completions",
_12
"api_key": "",
_12
"system_messages": [
_12
{
_12
"role": "system",
_12
"content": "You are a helpful assistant."
_12
}
_12
]
_12
}
_12
}

info

If accessing your custom LLM service requires identity verification, provide the authentication information in the api_key field.

Advanced features

To integrate advanced features such as Retrieval-Augmented Generation and generating outputs in multimodal forms, refer to the following sections.

Retrieval-Augmented Generation

To improve the accuracy and relevance of the agent's responses, use the Retrieval-Augmented Generation (RAG) feature. This feature allows your custom LLM to retrieve information from a specific knowledge base and use the retrieved results as context for generating responses.

The following example simulates the process of retrieving and returning content from a knowledge base and creates the /rag/chat/completions endpoint to incorporate RAG retrieval results when generating responses with the LLM.


_113
async def perform_rag_retrieval(messages: Optional[Dict]) -> str:
_113
"""
_113
Retrieve relevant content from the knowledge base using the RAG model.
_113
_113
Args:
_113
messages: The original message list.
_113
_113
Returns:
_113
str: The retrieved text content.
_113
"""
_113
_113
# TODO: Implement the actual RAG retrieval logic.
_113
# You can choose the first or last message from the message list as the query,
_113
# then send the query to the RAG model to retrieve relevant content.
_113
_113
# Return the retrieval result.
_113
return "This is relevant content retrieved from the knowledge base."
_113
_113
_113
def refact_messages(context: str, messages: Optional[Dict] = None) -> Optional[Dict]:
_113
"""
_113
Modify the message list by adding the retrieved context to the original messages.
_113
_113
Args:
_113
context: The retrieved context.
_113
messages: The original message list.
_113
_113
Returns:
_113
List: The modified message list.
_113
"""
_113
_113
# TODO: Implement the actual message modification logic.
_113
# This should add the retrieved context to the original message list.
_113
_113
return messages
_113
_113
# Random waiting messages.
_113
waiting_messages = [
_113
"Just a moment, I'm thinking...",
_113
"Let me think about that for a second...",
_113
"Good question, let me find out...",
_113
]
_113
_113
_113
@app.post("/rag/chat/completions")
_113
async def create_rag_chat_completion(request: ChatCompletionRequest):
_113
try:
_113
logger.info(f"Received RAG request: {request.model_dump_json()}")
_113
if not request.stream:
_113
raise HTTPException(
_113
status_code=400, detail="chat completions require streaming"
_113
)
_113
_113
async def generate():
_113
# First, send a "please wait" prompt.
_113
waiting_message = {
_113
"id": "waiting_msg",
_113
"choices": [
_113
{
_113
"index": 0,
_113
"delta": {
_113
"role": "assistant",
_113
"content": random.choice(waiting_messages),
_113
},
_113
"finish_reason": None,
_113
}
_113
],
_113
}
_113
yield f"data: {json.dumps(waiting_message)}\n\n"
_113
_113
# Perform RAG retrieval.
_113
retrieved_context = await perform_rag_retrieval(request.messages)
_113
_113
# Modify messages.
_113
refacted_messages = refact_messages(retrieved_context, request.messages)
_113
_113
# Request LLM completion.
_113
client = AsyncOpenAI(api_key=os.getenv("<YOUR_LLM_API_KEY>"))
_113
response = await client.chat.completions.create(
_113
model=request.model,
_113
messages=refacted_messages,
_113
tool_choice=(
_113
request.tool_choice
_113
if request.tools and request.tool_choice
_113
else None
_113
),
_113
tools=request.tools if request.tools else None,
_113
modalities=request.modalities,
_113
audio=request.audio,
_113
response_format=request.response_format,
_113
stream=True, # Force streaming.
_113
stream_options=request.stream_options,
_113
)
_113
_113
try:
_113
async for chunk in response:
_113
logger.debug(f"Received RAG chunk: {chunk}")
_113
yield f"data: {json.dumps(chunk.to_dict())}\n\n"
_113
yield "data: [DONE]\n\n"
_113
except asyncio.CancelledError:
_113
logger.info("RAG stream was cancelled")
_113
raise
_113
_113
return StreamingResponse(generate(), media_type="text/event-stream")
_113
_113
except asyncio.CancelledError:
_113
logger.info("RAG request was cancelled")
_113
raise HTTPException(status_code=499, detail="Request was cancelled")
_113
except Exception as e:
_113
traceback_str = "".join(traceback.format_tb(e.__traceback__))
_113
error_message = f"{str(e)}\n{traceback_str}"
_113
logger.error(error_message)
_113
raise HTTPException(status_code=500, detail=error_message)

When calling the POST method to Start a conversational AI agent, simply point the LLM URL to your RAG interface:


_12
{
_12
"llm": {
_12
"url": "http://your-custom-llm-service/rag/chat/completions",
_12
"api_key": ""
_12
"system_messages": [
_12
{
_12
"role": "system",
_12
"content": "Please answer the user's question based on the following retrieved information: ..."
_12
}
_12
]
_12
}
_12
}

info

If accessing your custom LLM service requires identity verification, provide the authentication information in the api_key field.

Multimodal capabilities

Conversational AI Engine supports LLMs in generating output in multimodal formats, including text and audio. You can create dedicated multimodal interfaces to meet personalized requirements.

The following example demonstrates how to read text and audio files and send them to an LLM to generate an audio response.


_100
async def read_text_file(file_path: str) -> str:
_100
"""
_100
Read a text file and return its contents.
_100
_100
Args:
_100
file_path: Path to the text file.
_100
_100
Returns:
_100
str: Contents of the text file.
_100
_100
"""
_100
async with aiofiles.open(file_path, "r") as file:
_100
content = await file.read()
_100
_100
return content
_100
_100
_100
async def read_pcm_file(
_100
file_path: str, sample_rate: int, duration_ms: int
_100
) -> List[bytes]:
_100
"""
_100
Read a PCM file and return a list of audio chunks.
_100
_100
Args:
_100
file_path: Path to the PCM file.
_100
sample_rate: Sample rate of the audio.
_100
duration_ms: Duration of each audio chunk in milliseconds.
_100
_100
Returns:
_100
List: List of audio chunks.
_100
_100
"""
_100
_100
async with aiofiles.open(file_path, "rb") as file:
_100
content = await file.read()
_100
_100
chunk_size = int(sample_rate * 2 * (duration_ms / 1000))
_100
return [content[i : i + chunk_size] for i in range(0, len(content), chunk_size)]
_100
_100
_100
@app.post("/audio/chat/completions")
_100
async def create_audio_chat_completion(request: ChatCompletionRequest):
_100
try:
_100
logger.info(f"Received audio request: {request.model_dump_json()}")
_100
_100
if not request.stream:
_100
raise HTTPException(
_100
status_code=400, detail="chat completions require streaming"
_100
)
_100
_100
# Example usage: Reading text and audio files
_100
# Replace with your actual logic
_100
_100
text_file_path = "./file.txt"
_100
pcm_file_path = "./file.pcm"
_100
sample_rate = 16000 # Example sample rate
_100
duration_ms = 40 # 40ms audio chunks
_100
_100
text_content = await read_text_file(text_file_path)
_100
audio_chunks = await read_pcm_file(pcm_file_path, sample_rate, duration_ms)
_100
_100
async def generate():
_100
try:
_100
# Send text content
_100
audio_id = uuid.uuid4().hex
_100
text_message = {
_100
"id": uuid.uuid4().hex,
_100
"choices": [
_100
{
_100
"index": 0,
_100
"delta": {
_100
"audio": {
_100
"id": audio_id,
_100
"transcript": text_content,
_100
},
_100
},
_100
"finish_reason": None,
_100
}
_100
],
_100
}
_100
yield f"data: {json.dumps(text_message)}\n\n"
_100
_100
# Send audio chunks
_100
for chunk in audio_chunks:
_100
audio_message = {
_100
"id": uuid.uuid4().hex,
_100
"choices": [
_100
{
_100
"index": 0,
_100
"delta": {
_100
"audio": {
_100
"id": audio_id,
_100
"data": base64.b64encode(chunk).decode("utf-8"),
_100
},
_100
},
_100
"finish_reason": None,
_100
}
_100
],
_100
}
_100
yield f

When calling the POST method to Start a conversational AI agent, use the following configuration to specify the output_modalities:


_14
{
_14
"llm": {
_14
"url": "http://your-custom-llm-service/audio/chat/completions",
_14
"api_key": "your_api_key",
_14
"input_modalities": ["text"],
_14
"output_modalities": ["text", "audio"]
_14
"system_messages": [
_14
{
_14
"role": "system",
_14
"content": "You are a helpful assistant."
_14
}
_14
]
_14
}
_14
}

Reference

This section contains content that completes the information on this page, or points you to documentation that explains other aspects to this product.

Sample project

Agora provides an open source Agora Conversational AI Engine server project on GitHub for your reference. Download the project or view the source code for a more complete example.

Interface standards

Custom LLM services must be compatible with the OpenAI Chat Completions API interface standard:

  • Request format: Contains parameters such as model, message, and tool call configuration.
  • Response format: Contains the response generated by the model, metadata, and other information
  • Streaming response: Compliant with the SSE (Server-Sent Events) specification

For detailed interface standards, please refer to:

vundefined