跳轉到

SSE 串流

Server-Sent Events (SSE) 能夠即時串流 AI 回應,透過在文字生成時即時顯示,而非等待完整回應,提供更好的使用者體驗。

概述

當你以 stream: true 發送訊息時,API 會回傳一連串事件而非單一 JSON 回應。每個事件包含 Agent 處理和回應的增量資料。

啟用串流

要啟用串流,在訊息請求中設定 stream: true

curl -X POST "https://api.codeer.ai/api/v1/chats/12345/messages" \
  -H "x-api-key: YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -H "Accept: text/event-stream" \
  -d '{
    "message": "你好!",
    "stream": true,
    "agent_id": "550e8400-e29b-41d4-a716-446655440000"
  }'

事件格式

事件遵循 SSE 規格:

event: <event_type>
data: <json_payload>

每個事件包含共通欄位:

欄位 說明
type 事件類型
response_id 此回應的唯一識別碼
chat_id 聊天會話 ID

事件類型

response.created

當 LLM 初始化完成並準備處理時發出。

{
  "type": "response.created",
  "response_id": "abc123",
  "chat_id": 12345,
  "agent_id": "550e8400-e29b-41d4-a716-446655440000",
  "model": "gpt-4"
}

response.chat.title.updated

當聊天標題自動產生時發出(通常在第一則訊息後)。

{
  "type": "response.chat.title.updated",
  "response_id": "abc123",
  "chat_id": 12345,
  "name": "關於營業時間的問題"
}

response.reasoning_step.start

當 Agent 開始推理步驟時發出(例如:搜尋知識庫、呼叫工具)。

{
  "type": "response.reasoning_step.start",
  "response_id": "abc123",
  "chat_id": 12345,
  "step": {
    "id": "step_abc123",
    "content": "正在搜尋營業時間的知識庫",
    "type": "consultant_retrieve_context_source",
    "args": {
      "query": "營業時間"
    },
    "timestamp": "2024-01-15T10:30:00Z"
  }
}
步驟類型 說明
consultant_search_web 搜尋網頁
consultant_fetch_web_content 從 URL 取得內容
consultant_retrieve_context_source 搜尋知識庫
consultant_call_agent 呼叫其他 Agent
consultant_request_form 請求表單輸入
consultant_lookup 查詢歷史記錄/附件

response.reasoning_step.end

當推理步驟完成時發出。

{
  "type": "response.reasoning_step.end",
  "response_id": "abc123",
  "chat_id": 12345,
  "step": {
    "id": "step_abc123",
    "type": "consultant_retrieve_context_source",
    "result": {
      "success": true,
      "data": "找到 3 個相關文件..."
    },
    "timestamp": "2024-01-15T10:30:01Z",
    "token_usage": {
      "total_prompt_tokens": 150,
      "total_completion_tokens": 45,
      "total_tokens": 195,
      "total_calls": 1
    }
  }
}

response.output_text.delta

每個生成的文字片段都會發出。將這些串接起來以建立完整回應。

{
  "type": "response.output_text.delta",
  "response_id": "abc123",
  "chat_id": 12345,
  "delta": "我們的營業時間是"
}

response.output_text.completed

當回應完全生成時發出。包含完整文字和 token 使用量。

{
  "type": "response.output_text.completed",
  "response_id": "abc123",
  "chat_id": 12345,
  "final_text": "我們的營業時間是週一至週五,上午 9 點到下午 6 點。",
  "usage": {
    "total_prompt_tokens": 250,
    "total_completion_tokens": 85,
    "total_tokens": 335,
    "total_calls": 1
  }
}

response.error

當處理過程中發生錯誤時發出。

{
  "type": "response.error",
  "response_id": "abc123",
  "chat_id": 12345,
  "message": "處理請求失敗",
  "code": 10005
}

錯誤代碼值

code 欄位包含錯誤代碼(不是 HTTP 狀態碼)。常見的值:

  • 10005 (SYS_SERVER_ERROR) - 內部伺服器錯誤
  • 10006 (SYS_BAD_REQUEST) - 無效的請求

完整清單請參閱錯誤代碼

串流結束

串流以特殊訊息結束:

data: [DONE]

務必處理此標記以正確關閉連線。

逾時處理

預設串流逾時為 90 秒。如果在此期間內沒有收到任何事件,連線將會關閉並顯示逾時錯誤。對於長時間執行的操作,請確保你的用戶端適當處理重新連線。

用戶端實作

JavaScript(瀏覽器)

使用原生 fetch API 搭配串流:

const url = 'https://api.codeer.ai/api/v1/chats/12345/messages';

// EventSource 不支援 POST,使用 fetch 搭配串流
const response = await fetch(url, {
  method: 'POST',
  headers: {
    'x-api-key': 'YOUR_API_KEY',
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    message: '你好!',
    stream: true,
    agent_id: 'your-agent-id'
  })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split('\n');
  buffer = lines.pop() || '';

  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = line.slice(6);
      if (data === '[DONE]') {
        console.log('串流完成');
        return;
      }
      try {
        const event = JSON.parse(data);
        handleEvent(event);
      } catch (e) {
        // 不是 JSON,可能是事件名稱行
      }
    }
  }
}

function handleEvent(event) {
  switch (event.type) {
    case 'response.output_text.delta':
      // 將文字附加到 UI
      document.getElementById('output').textContent += event.delta;
      break;
    case 'response.output_text.completed':
      console.log('最終回應:', event.final_text);
      break;
    case 'response.error':
      console.error('錯誤:', event.message);
      break;
  }
}

Python

使用 requests 函式庫:

import requests
import json

def stream_chat(chat_id: int, message: str, agent_id: str, api_key: str):
    url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
    headers = {
        "x-api-key": api_key,
        "Content-Type": "application/json",
        "Accept": "text/event-stream"
    }
    payload = {
        "message": message,
        "stream": True,
        "agent_id": agent_id
    }

    with requests.post(url, headers=headers, json=payload, stream=True) as response:
        response.raise_for_status()

        for line in response.iter_lines():
            if not line:
                continue

            line = line.decode('utf-8')

            if line.startswith('data: '):
                data = line[6:]
                if data == '[DONE]':
                    print("\n串流完成")
                    break

                try:
                    event = json.loads(data)
                    handle_event(event)
                except json.JSONDecodeError:
                    pass

def handle_event(event: dict):
    event_type = event.get('type')

    if event_type == 'response.output_text.delta':
        print(event.get('delta', ''), end='', flush=True)
    elif event_type == 'response.output_text.completed':
        print(f"\n\nToken 使用量: {event.get('usage')}")
    elif event_type == 'response.error':
        print(f"\n錯誤: {event.get('message')}")

# 使用方式
stream_chat(
    chat_id=12345,
    message="你們的營業時間是?",
    agent_id="your-agent-id",
    api_key="your-api-key"
)

Python(非同步)

使用 aiohttp

import aiohttp
import asyncio
import json

async def stream_chat_async(chat_id: int, message: str, agent_id: str, api_key: str):
    url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
    headers = {
        "x-api-key": api_key,
        "Content-Type": "application/json",
        "Accept": "text/event-stream"
    }
    payload = {
        "message": message,
        "stream": True,
        "agent_id": agent_id
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, json=payload) as response:
            async for line in response.content:
                line = line.decode('utf-8').strip()
                if not line:
                    continue

                if line.startswith('data: '):
                    data = line[6:]
                    if data == '[DONE]':
                        break

                    try:
                        event = json.loads(data)
                        if event.get('type') == 'response.output_text.delta':
                            print(event.get('delta', ''), end='', flush=True)
                    except json.JSONDecodeError:
                        pass

# 使用方式
asyncio.run(stream_chat_async(
    chat_id=12345,
    message="你好!",
    agent_id="your-agent-id",
    api_key="your-api-key"
))

最佳實踐

  1. 緩衝處理:SSE 資料可能以不符合事件邊界的片段到達。務必緩衝傳入資料並解析完整事件。

  2. 錯誤恢復:為網路故障實作重新連線邏輯。儲存最後收到的事件以便可能恢復處理。

  3. UI 更新:批次處理 UI 更新以避免過度重新渲染。考慮使用 requestAnimationFrame 來實現平滑的文字顯示。

  4. 清理:當用戶離開頁面或取消請求時,務必正確關閉連線。

  5. 逾時處理:實作符合或超過伺服器 90 秒逾時的用戶端逾時處理。

另請參閱