SSEストリーミング
Server-Sent Events (SSE) により、AIレスポンスをリアルタイムでストリーミングできます。完全なレスポンスを待つのではなく、テキストが生成されるたびに表示することで、より良いユーザー体験を提供します。
概要
stream: trueでメッセージを送信すると、APIは単一のJSONレスポンスではなく、イベントのストリームを返します。各イベントには、エージェントの処理とレスポンスに関する増分データが含まれます。
ストリーミングの有効化
ストリーミングを有効にするには、メッセージリクエストでstream: trueを設定します:
curl -X POST "https://api.codeer.ai/api/v1/chats/12345/messages" \
-H "x-api-key: YOUR_API_KEY" \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{
"message": "こんにちは!",
"stream": true,
"agent_id": "550e8400-e29b-41d4-a716-446655440000"
}'
イベント形式
イベントはSSE仕様に従います:
event: <event_type>
data: <json_payload>
各イベントには共通フィールドが含まれます:
| フィールド | 説明 |
|---|---|
type |
イベントタイプ |
response_id |
このレスポンスの一意の識別子 |
chat_id |
チャットセッションID |
イベントタイプ
response.created
LLMが初期化され、処理の準備ができたときに発行されます。
{
"type": "response.created",
"response_id": "abc123",
"chat_id": 12345,
"agent_id": "550e8400-e29b-41d4-a716-446655440000",
"model": "gpt-4"
}
response.chat.title.updated
チャットタイトルが自動生成されたときに発行されます(通常は最初のメッセージの後)。
{
"type": "response.chat.title.updated",
"response_id": "abc123",
"chat_id": 12345,
"name": "営業時間についての質問"
}
response.reasoning_step.start
エージェントが推論ステップを開始したときに発行されます(例:ナレッジベースの検索、ツールの呼び出し)。
{
"type": "response.reasoning_step.start",
"response_id": "abc123",
"chat_id": 12345,
"step": {
"id": "step_abc123",
"content": "営業時間についてナレッジベースを検索中",
"type": "consultant_retrieve_context_source",
"args": {
"query": "営業時間"
},
"timestamp": "2024-01-15T10:30:00Z"
}
}
| ステップタイプ | 説明 |
|---|---|
consultant_search_web |
Web検索 |
consultant_fetch_web_content |
URLからコンテンツを取得 |
consultant_retrieve_context_source |
ナレッジベースを検索 |
consultant_call_agent |
別のエージェントを呼び出し |
consultant_request_form |
フォーム入力をリクエスト |
consultant_lookup |
履歴/添付ファイルを検索 |
response.reasoning_step.end
推論ステップが完了したときに発行されます。
{
"type": "response.reasoning_step.end",
"response_id": "abc123",
"chat_id": 12345,
"step": {
"id": "step_abc123",
"type": "consultant_retrieve_context_source",
"result": {
"success": true,
"data": "3件の関連ドキュメントが見つかりました..."
},
"timestamp": "2024-01-15T10:30:01Z",
"token_usage": {
"total_prompt_tokens": 150,
"total_completion_tokens": 45,
"total_tokens": 195,
"total_calls": 1
}
}
}
response.output_text.delta
生成されたテキストの各チャンクに対して発行されます。これらを連結して完全なレスポンスを構築します。
{
"type": "response.output_text.delta",
"response_id": "abc123",
"chat_id": 12345,
"delta": "営業時間は"
}
response.output_text.completed
レスポンスが完全に生成されたときに発行されます。完全なテキストとトークン使用量が含まれます。
{
"type": "response.output_text.completed",
"response_id": "abc123",
"chat_id": 12345,
"final_text": "営業時間は月曜日から金曜日の午前9時から午後6時までです。",
"usage": {
"total_prompt_tokens": 250,
"total_completion_tokens": 85,
"total_tokens": 335,
"total_calls": 1
}
}
response.error
処理中にエラーが発生したときに発行されます。
{
"type": "response.error",
"response_id": "abc123",
"chat_id": 12345,
"message": "リクエストの処理に失敗しました",
"code": 10005
}
エラーコードの値
codeフィールドにはエラーコード(HTTPステータスではない)が含まれます。一般的な値:
10005(SYS_SERVER_ERROR) - 内部サーバーエラー10006(SYS_BAD_REQUEST) - 無効なリクエスト
完全なリストはエラーコードを参照してください。
ストリームの終了
ストリームは特別なメッセージで終了します:
data: [DONE]
接続を適切に閉じるために、必ずこのマーカーを処理してください。
タイムアウト処理
デフォルトのストリームタイムアウトは90秒です。この期間内にイベントを受信しない場合、接続はタイムアウトエラーで閉じられます。長時間実行される操作の場合は、クライアントが適切に再接続を処理するようにしてください。
クライアント実装
JavaScript(ブラウザ)
ネイティブfetch APIをストリーミングで使用:
const url = 'https://api.codeer.ai/api/v1/chats/12345/messages';
// EventSourceはPOSTをサポートしていないため、fetchをストリーミングで使用
const response = await fetch(url, {
method: 'POST',
headers: {
'x-api-key': 'YOUR_API_KEY',
'Content-Type': 'application/json',
},
body: JSON.stringify({
message: 'こんにちは!',
stream: true,
agent_id: 'your-agent-id'
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
console.log('ストリーム完了');
return;
}
try {
const event = JSON.parse(data);
handleEvent(event);
} catch (e) {
// JSONではない、イベント名の行かもしれない
}
}
}
}
function handleEvent(event) {
switch (event.type) {
case 'response.output_text.delta':
// テキストをUIに追加
document.getElementById('output').textContent += event.delta;
break;
case 'response.output_text.completed':
console.log('最終レスポンス:', event.final_text);
break;
case 'response.error':
console.error('エラー:', event.message);
break;
}
}
Python
requestsライブラリを使用:
import requests
import json
def stream_chat(chat_id: int, message: str, agent_id: str, api_key: str):
url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
headers = {
"x-api-key": api_key,
"Content-Type": "application/json",
"Accept": "text/event-stream"
}
payload = {
"message": message,
"stream": True,
"agent_id": agent_id
}
with requests.post(url, headers=headers, json=payload, stream=True) as response:
response.raise_for_status()
for line in response.iter_lines():
if not line:
continue
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:]
if data == '[DONE]':
print("\nストリーム完了")
break
try:
event = json.loads(data)
handle_event(event)
except json.JSONDecodeError:
pass
def handle_event(event: dict):
event_type = event.get('type')
if event_type == 'response.output_text.delta':
print(event.get('delta', ''), end='', flush=True)
elif event_type == 'response.output_text.completed':
print(f"\n\nトークン使用量: {event.get('usage')}")
elif event_type == 'response.error':
print(f"\nエラー: {event.get('message')}")
# 使用方法
stream_chat(
chat_id=12345,
message="営業時間は何時ですか?",
agent_id="your-agent-id",
api_key="your-api-key"
)
Python(非同期)
aiohttpを使用:
import aiohttp
import asyncio
import json
async def stream_chat_async(chat_id: int, message: str, agent_id: str, api_key: str):
url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
headers = {
"x-api-key": api_key,
"Content-Type": "application/json",
"Accept": "text/event-stream"
}
payload = {
"message": message,
"stream": True,
"agent_id": agent_id
}
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
async for line in response.content:
line = line.decode('utf-8').strip()
if not line:
continue
if line.startswith('data: '):
data = line[6:]
if data == '[DONE]':
break
try:
event = json.loads(data)
if event.get('type') == 'response.output_text.delta':
print(event.get('delta', ''), end='', flush=True)
except json.JSONDecodeError:
pass
# 使用方法
asyncio.run(stream_chat_async(
chat_id=12345,
message="こんにちは!",
agent_id="your-agent-id",
api_key="your-api-key"
))
ベストプラクティス
-
バッファ処理:SSEデータはイベントの境界に揃わないチャンクで到着する場合があります。常に受信データをバッファリングし、完全なイベントを解析してください。
-
エラー回復:ネットワーク障害に対する再接続ロジックを実装してください。処理を再開できるように、最後に受信したイベントを保存してください。
-
UI更新:過度な再レンダリングを避けるために、UI更新をバッチ処理してください。スムーズなテキスト表示のためにrequestAnimationFrameの使用を検討してください。
-
クリーンアップ:ユーザーがページを離れたりリクエストをキャンセルしたときは、常に接続を適切に閉じてください。
-
タイムアウト処理:サーバーの90秒タイムアウトに一致するか、それを超えるクライアント側のタイムアウト処理を実装してください。