SSE 串流
Server-Sent Events (SSE) 能夠即時串流 AI 回應,透過在文字生成時即時顯示,而非等待完整回應,提供更好的使用者體驗。
概述
當你以 stream: true 發送訊息時,API 會回傳一連串事件而非單一 JSON 回應。每個事件包含 Agent 處理和回應的增量資料。
啟用串流
要啟用串流,在訊息請求中設定 stream: true:
curl -X POST "https://api.codeer.ai/api/v1/chats/12345/messages" \
-H "x-api-key: YOUR_API_KEY" \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{
"message": "你好!",
"stream": true,
"agent_id": "550e8400-e29b-41d4-a716-446655440000"
}'
事件格式
事件遵循 SSE 規格:
event: <event_type>
data: <json_payload>
每個事件包含共通欄位:
| 欄位 | 說明 |
|---|---|
type |
事件類型 |
response_id |
此回應的唯一識別碼 |
chat_id |
聊天會話 ID |
事件類型
response.created
當 LLM 初始化完成並準備處理時發出。
{
"type": "response.created",
"response_id": "abc123",
"chat_id": 12345,
"agent_id": "550e8400-e29b-41d4-a716-446655440000",
"model": "gpt-4"
}
response.chat.title.updated
當聊天標題自動產生時發出(通常在第一則訊息後)。
{
"type": "response.chat.title.updated",
"response_id": "abc123",
"chat_id": 12345,
"name": "關於營業時間的問題"
}
response.reasoning_step.start
當 Agent 開始推理步驟時發出(例如:搜尋知識庫、呼叫工具)。
{
"type": "response.reasoning_step.start",
"response_id": "abc123",
"chat_id": 12345,
"step": {
"id": "step_abc123",
"content": "正在搜尋營業時間的知識庫",
"type": "consultant_retrieve_context_source",
"args": {
"query": "營業時間"
},
"timestamp": "2024-01-15T10:30:00Z"
}
}
| 步驟類型 | 說明 |
|---|---|
consultant_search_web |
搜尋網頁 |
consultant_fetch_web_content |
從 URL 取得內容 |
consultant_retrieve_context_source |
搜尋知識庫 |
consultant_call_agent |
呼叫其他 Agent |
consultant_request_form |
請求表單輸入 |
consultant_lookup |
查詢歷史記錄/附件 |
response.reasoning_step.end
當推理步驟完成時發出。
{
"type": "response.reasoning_step.end",
"response_id": "abc123",
"chat_id": 12345,
"step": {
"id": "step_abc123",
"type": "consultant_retrieve_context_source",
"result": {
"success": true,
"data": "找到 3 個相關文件..."
},
"timestamp": "2024-01-15T10:30:01Z",
"token_usage": {
"total_prompt_tokens": 150,
"total_completion_tokens": 45,
"total_tokens": 195,
"total_calls": 1
}
}
}
response.output_text.delta
每個生成的文字片段都會發出。將這些串接起來以建立完整回應。
{
"type": "response.output_text.delta",
"response_id": "abc123",
"chat_id": 12345,
"delta": "我們的營業時間是"
}
response.output_text.completed
當回應完全生成時發出。包含完整文字和 token 使用量。
{
"type": "response.output_text.completed",
"response_id": "abc123",
"chat_id": 12345,
"final_text": "我們的營業時間是週一至週五,上午 9 點到下午 6 點。",
"usage": {
"total_prompt_tokens": 250,
"total_completion_tokens": 85,
"total_tokens": 335,
"total_calls": 1
}
}
response.error
當處理過程中發生錯誤時發出。
{
"type": "response.error",
"response_id": "abc123",
"chat_id": 12345,
"message": "處理請求失敗",
"code": 10005
}
錯誤代碼值
code 欄位包含錯誤代碼(不是 HTTP 狀態碼)。常見的值:
10005(SYS_SERVER_ERROR) - 內部伺服器錯誤10006(SYS_BAD_REQUEST) - 無效的請求
完整清單請參閱錯誤代碼。
串流結束
串流以特殊訊息結束:
data: [DONE]
務必處理此標記以正確關閉連線。
逾時處理
預設串流逾時為 90 秒。如果在此期間內沒有收到任何事件,連線將會關閉並顯示逾時錯誤。對於長時間執行的操作,請確保你的用戶端適當處理重新連線。
用戶端實作
JavaScript(瀏覽器)
使用原生 fetch API 搭配串流:
const url = 'https://api.codeer.ai/api/v1/chats/12345/messages';
// EventSource 不支援 POST,使用 fetch 搭配串流
const response = await fetch(url, {
method: 'POST',
headers: {
'x-api-key': 'YOUR_API_KEY',
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: '你好!',
stream: true,
agent_id: 'your-agent-id'
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
console.log('串流完成');
return;
}
try {
const event = JSON.parse(data);
handleEvent(event);
} catch (e) {
// 不是 JSON,可能是事件名稱行
}
}
}
}
function handleEvent(event) {
switch (event.type) {
case 'response.output_text.delta':
// 將文字附加到 UI
document.getElementById('output').textContent += event.delta;
break;
case 'response.output_text.completed':
console.log('最終回應:', event.final_text);
break;
case 'response.error':
console.error('錯誤:', event.message);
break;
}
}
Python
使用 requests 函式庫:
import requests
import json
def stream_chat(chat_id: int, message: str, agent_id: str, api_key: str):
url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
headers = {
"x-api-key": api_key,
"Content-Type": "application/json",
"Accept": "text/event-stream"
}
payload = {
"message": message,
"stream": True,
"agent_id": agent_id
}
with requests.post(url, headers=headers, json=payload, stream=True) as response:
response.raise_for_status()
for line in response.iter_lines():
if not line:
continue
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:]
if data == '[DONE]':
print("\n串流完成")
break
try:
event = json.loads(data)
handle_event(event)
except json.JSONDecodeError:
pass
def handle_event(event: dict):
event_type = event.get('type')
if event_type == 'response.output_text.delta':
print(event.get('delta', ''), end='', flush=True)
elif event_type == 'response.output_text.completed':
print(f"\n\nToken 使用量: {event.get('usage')}")
elif event_type == 'response.error':
print(f"\n錯誤: {event.get('message')}")
# 使用方式
stream_chat(
chat_id=12345,
message="你們的營業時間是?",
agent_id="your-agent-id",
api_key="your-api-key"
)
Python(非同步)
使用 aiohttp:
import aiohttp
import asyncio
import json
async def stream_chat_async(chat_id: int, message: str, agent_id: str, api_key: str):
url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
headers = {
"x-api-key": api_key,
"Content-Type": "application/json",
"Accept": "text/event-stream"
}
payload = {
"message": message,
"stream": True,
"agent_id": agent_id
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
async for line in response.content:
line = line.decode('utf-8').strip()
if not line:
continue
if line.startswith('data: '):
data = line[6:]
if data == '[DONE]':
break
try:
event = json.loads(data)
if event.get('type') == 'response.output_text.delta':
print(event.get('delta', ''), end='', flush=True)
except json.JSONDecodeError:
pass
# 使用方式
asyncio.run(stream_chat_async(
chat_id=12345,
message="你好!",
agent_id="your-agent-id",
api_key="your-api-key"
))
最佳實踐
-
緩衝處理:SSE 資料可能以不符合事件邊界的片段到達。務必緩衝傳入資料並解析完整事件。
-
錯誤恢復:為網路故障實作重新連線邏輯。儲存最後收到的事件以便可能恢復處理。
-
UI 更新:批次處理 UI 更新以避免過度重新渲染。考慮使用 requestAnimationFrame 來實現平滑的文字顯示。
-
清理:當用戶離開頁面或取消請求時,務必正確關閉連線。
-
逾時處理:實作符合或超過伺服器 90 秒逾時的用戶端逾時處理。