Skip to content

SSE Streaming

Server-Sent Events (SSE) enable real-time streaming of AI responses, providing a better user experience by displaying text as it's generated rather than waiting for the complete response.

Overview

When you send a message with stream: true, the API returns a stream of events instead of a single JSON response. Each event contains incremental data about the agent's processing and response.

Enabling Streaming

To enable streaming, set stream: true in your message request:

curl -X POST "https://api.codeer.ai/api/v1/chats/12345/messages" \
  -H "x-api-key: YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -H "Accept: text/event-stream" \
  -d '{
    "message": "Hello!",
    "stream": true,
    "agent_id": "550e8400-e29b-41d4-a716-446655440000"
  }'

Event Format

Events follow the SSE specification:

event: <event_type>
data: <json_payload>

Each event includes common fields:

Field Description
type The event type
response_id Unique identifier for this response
chat_id The chat session ID

Event Types

response.created

Emitted when the LLM is initialized and ready to process.

{
  "type": "response.created",
  "response_id": "abc123",
  "chat_id": 12345,
  "agent_id": "550e8400-e29b-41d4-a716-446655440000",
  "model": "gpt-4"
}

response.chat.title.updated

Emitted when the chat title is auto-generated (typically after the first message).

{
  "type": "response.chat.title.updated",
  "response_id": "abc123",
  "chat_id": 12345,
  "name": "Question about business hours"
}

response.reasoning_step.start

Emitted when the agent begins a reasoning step (e.g., searching knowledge base, calling a tool).

{
  "type": "response.reasoning_step.start",
  "response_id": "abc123",
  "chat_id": 12345,
  "step": {
    "id": "step_abc123",
    "content": "Searching knowledge base for business hours",
    "type": "consultant_retrieve_context_source",
    "args": {
      "query": "business hours"
    },
    "timestamp": "2024-01-15T10:30:00Z"
  }
}
Step Type Description
consultant_search_web Searching the web
consultant_fetch_web_content Fetching content from a URL
consultant_retrieve_context_source Searching knowledge base
consultant_call_agent Calling another agent
consultant_request_form Requesting form input
consultant_lookup Looking up history/attachments

response.reasoning_step.end

Emitted when a reasoning step completes.

{
  "type": "response.reasoning_step.end",
  "response_id": "abc123",
  "chat_id": 12345,
  "step": {
    "id": "step_abc123",
    "type": "consultant_retrieve_context_source",
    "result": {
      "success": true,
      "data": "Found 3 relevant documents..."
    },
    "timestamp": "2024-01-15T10:30:01Z",
    "token_usage": {
      "total_prompt_tokens": 150,
      "total_completion_tokens": 45,
      "total_tokens": 195,
      "total_calls": 1
    }
  }
}

response.output_text.delta

Emitted for each chunk of generated text. Concatenate these to build the full response.

{
  "type": "response.output_text.delta",
  "response_id": "abc123",
  "chat_id": 12345,
  "delta": "Our business hours are "
}

response.output_text.completed

Emitted when the response is fully generated. Contains the complete text and token usage.

{
  "type": "response.output_text.completed",
  "response_id": "abc123",
  "chat_id": 12345,
  "final_text": "Our business hours are Monday to Friday, 9 AM to 6 PM EST.",
  "usage": {
    "total_prompt_tokens": 250,
    "total_completion_tokens": 85,
    "total_tokens": 335,
    "total_calls": 1
  }
}

response.error

Emitted when an error occurs during processing.

{
  "type": "response.error",
  "response_id": "abc123",
  "chat_id": 12345,
  "message": "Failed to process request",
  "code": 10005
}

Error Code Values

The code field contains an error code (not HTTP status). Common values:

  • 10005 (SYS_SERVER_ERROR) - Internal server error
  • 10006 (SYS_BAD_REQUEST) - Invalid request

See Error Codes for the full list.

Stream Termination

The stream ends with a special message:

data: [DONE]

Always handle this marker to properly close your connection.

Timeout Handling

The default stream timeout is 90 seconds. If no events are received within this period, the connection will close with a timeout error. For long-running operations, ensure your client handles reconnection appropriately.

Client Implementations

JavaScript (Browser)

Using the native EventSource API:

const url = 'https://api.codeer.ai/api/v1/chats/12345/messages';

// EventSource doesn't support POST, use fetch with streaming
const response = await fetch(url, {
  method: 'POST',
  headers: {
    'x-api-key': 'YOUR_API_KEY',
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    message: 'Hello!',
    stream: true,
    agent_id: 'your-agent-id'
  })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split('\n');
  buffer = lines.pop() || '';

  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = line.slice(6);
      if (data === '[DONE]') {
        console.log('Stream completed');
        return;
      }
      try {
        const event = JSON.parse(data);
        handleEvent(event);
      } catch (e) {
        // Not JSON, might be event name line
      }
    }
  }
}

function handleEvent(event) {
  switch (event.type) {
    case 'response.output_text.delta':
      // Append text to UI
      document.getElementById('output').textContent += event.delta;
      break;
    case 'response.output_text.completed':
      console.log('Final response:', event.final_text);
      break;
    case 'response.error':
      console.error('Error:', event.message);
      break;
  }
}

Python

Using the requests library:

import requests
import json

def stream_chat(chat_id: int, message: str, agent_id: str, api_key: str):
    url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
    headers = {
        "x-api-key": api_key,
        "Content-Type": "application/json",
        "Accept": "text/event-stream"
    }
    payload = {
        "message": message,
        "stream": True,
        "agent_id": agent_id
    }

    with requests.post(url, headers=headers, json=payload, stream=True) as response:
        response.raise_for_status()

        for line in response.iter_lines():
            if not line:
                continue

            line = line.decode('utf-8')

            if line.startswith('data: '):
                data = line[6:]
                if data == '[DONE]':
                    print("\nStream completed")
                    break

                try:
                    event = json.loads(data)
                    handle_event(event)
                except json.JSONDecodeError:
                    pass

def handle_event(event: dict):
    event_type = event.get('type')

    if event_type == 'response.output_text.delta':
        print(event.get('delta', ''), end='', flush=True)
    elif event_type == 'response.output_text.completed':
        print(f"\n\nTokens used: {event.get('usage')}")
    elif event_type == 'response.error':
        print(f"\nError: {event.get('message')}")

# Usage
stream_chat(
    chat_id=12345,
    message="What are your business hours?",
    agent_id="your-agent-id",
    api_key="your-api-key"
)

Python (Async)

Using aiohttp:

import aiohttp
import asyncio
import json

async def stream_chat_async(chat_id: int, message: str, agent_id: str, api_key: str):
    url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
    headers = {
        "x-api-key": api_key,
        "Content-Type": "application/json",
        "Accept": "text/event-stream"
    }
    payload = {
        "message": message,
        "stream": True,
        "agent_id": agent_id
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, json=payload) as response:
            async for line in response.content:
                line = line.decode('utf-8').strip()
                if not line:
                    continue

                if line.startswith('data: '):
                    data = line[6:]
                    if data == '[DONE]':
                        break

                    try:
                        event = json.loads(data)
                        if event.get('type') == 'response.output_text.delta':
                            print(event.get('delta', ''), end='', flush=True)
                    except json.JSONDecodeError:
                        pass

# Usage
asyncio.run(stream_chat_async(
    chat_id=12345,
    message="Hello!",
    agent_id="your-agent-id",
    api_key="your-api-key"
))

Best Practices

  1. Buffer handling: SSE data may arrive in chunks that don't align with event boundaries. Always buffer incoming data and parse complete events.

  2. Error recovery: Implement reconnection logic for network failures. Store the last received event to potentially resume processing.

  3. UI updates: Batch UI updates to avoid excessive re-renders. Consider using requestAnimationFrame for smooth text display.

  4. Cleanup: Always close connections properly when the user navigates away or cancels the request.

  5. Timeout handling: Implement client-side timeout handling that matches or exceeds the server's 90-second timeout.

See Also