コンテンツにスキップ

SSEストリーミング

Server-Sent Events (SSE) により、AIレスポンスをリアルタイムでストリーミングできます。完全なレスポンスを待つのではなく、テキストが生成されるたびに表示することで、より良いユーザー体験を提供します。

概要

stream: trueでメッセージを送信すると、APIは単一のJSONレスポンスではなく、イベントのストリームを返します。各イベントには、エージェントの処理とレスポンスに関する増分データが含まれます。

ストリーミングの有効化

ストリーミングを有効にするには、メッセージリクエストでstream: trueを設定します:

curl -X POST "https://api.codeer.ai/api/v1/chats/12345/messages" \
  -H "x-api-key: YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -H "Accept: text/event-stream" \
  -d '{
    "message": "こんにちは!",
    "stream": true,
    "agent_id": "550e8400-e29b-41d4-a716-446655440000"
  }'

イベント形式

イベントはSSE仕様に従います:

event: <event_type>
data: <json_payload>

各イベントには共通フィールドが含まれます:

フィールド 説明
type イベントタイプ
response_id このレスポンスの一意の識別子
chat_id チャットセッションID

イベントタイプ

response.created

LLMが初期化され、処理の準備ができたときに発行されます。

{
  "type": "response.created",
  "response_id": "abc123",
  "chat_id": 12345,
  "agent_id": "550e8400-e29b-41d4-a716-446655440000",
  "model": "gpt-4"
}

response.chat.title.updated

チャットタイトルが自動生成されたときに発行されます(通常は最初のメッセージの後)。

{
  "type": "response.chat.title.updated",
  "response_id": "abc123",
  "chat_id": 12345,
  "name": "営業時間についての質問"
}

response.reasoning_step.start

エージェントが推論ステップを開始したときに発行されます(例:ナレッジベースの検索、ツールの呼び出し)。

{
  "type": "response.reasoning_step.start",
  "response_id": "abc123",
  "chat_id": 12345,
  "step": {
    "id": "step_abc123",
    "content": "営業時間についてナレッジベースを検索中",
    "type": "consultant_retrieve_context_source",
    "args": {
      "query": "営業時間"
    },
    "timestamp": "2024-01-15T10:30:00Z"
  }
}
ステップタイプ 説明
consultant_search_web Web検索
consultant_fetch_web_content URLからコンテンツを取得
consultant_retrieve_context_source ナレッジベースを検索
consultant_call_agent 別のエージェントを呼び出し
consultant_request_form フォーム入力をリクエスト
consultant_lookup 履歴/添付ファイルを検索

response.reasoning_step.end

推論ステップが完了したときに発行されます。

{
  "type": "response.reasoning_step.end",
  "response_id": "abc123",
  "chat_id": 12345,
  "step": {
    "id": "step_abc123",
    "type": "consultant_retrieve_context_source",
    "result": {
      "success": true,
      "data": "3件の関連ドキュメントが見つかりました..."
    },
    "timestamp": "2024-01-15T10:30:01Z",
    "token_usage": {
      "total_prompt_tokens": 150,
      "total_completion_tokens": 45,
      "total_tokens": 195,
      "total_calls": 1
    }
  }
}

response.output_text.delta

生成されたテキストの各チャンクに対して発行されます。これらを連結して完全なレスポンスを構築します。

{
  "type": "response.output_text.delta",
  "response_id": "abc123",
  "chat_id": 12345,
  "delta": "営業時間は"
}

response.output_text.completed

レスポンスが完全に生成されたときに発行されます。完全なテキストとトークン使用量が含まれます。

{
  "type": "response.output_text.completed",
  "response_id": "abc123",
  "chat_id": 12345,
  "final_text": "営業時間は月曜日から金曜日の午前9時から午後6時までです。",
  "usage": {
    "total_prompt_tokens": 250,
    "total_completion_tokens": 85,
    "total_tokens": 335,
    "total_calls": 1
  }
}

response.error

処理中にエラーが発生したときに発行されます。

{
  "type": "response.error",
  "response_id": "abc123",
  "chat_id": 12345,
  "message": "リクエストの処理に失敗しました",
  "code": 10005
}

エラーコードの値

codeフィールドにはエラーコード(HTTPステータスではない)が含まれます。一般的な値:

  • 10005 (SYS_SERVER_ERROR) - 内部サーバーエラー
  • 10006 (SYS_BAD_REQUEST) - 無効なリクエスト

完全なリストはエラーコードを参照してください。

ストリームの終了

ストリームは特別なメッセージで終了します:

data: [DONE]

接続を適切に閉じるために、必ずこのマーカーを処理してください。

タイムアウト処理

デフォルトのストリームタイムアウトは90秒です。この期間内にイベントを受信しない場合、接続はタイムアウトエラーで閉じられます。長時間実行される操作の場合は、クライアントが適切に再接続を処理するようにしてください。

クライアント実装

JavaScript(ブラウザ)

ネイティブfetch APIをストリーミングで使用:

const url = 'https://api.codeer.ai/api/v1/chats/12345/messages';

// EventSourceはPOSTをサポートしていないため、fetchをストリーミングで使用
const response = await fetch(url, {
  method: 'POST',
  headers: {
    'x-api-key': 'YOUR_API_KEY',
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    message: 'こんにちは!',
    stream: true,
    agent_id: 'your-agent-id'
  })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split('\n');
  buffer = lines.pop() || '';

  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = line.slice(6);
      if (data === '[DONE]') {
        console.log('ストリーム完了');
        return;
      }
      try {
        const event = JSON.parse(data);
        handleEvent(event);
      } catch (e) {
        // JSONではない、イベント名の行かもしれない
      }
    }
  }
}

function handleEvent(event) {
  switch (event.type) {
    case 'response.output_text.delta':
      // テキストをUIに追加
      document.getElementById('output').textContent += event.delta;
      break;
    case 'response.output_text.completed':
      console.log('最終レスポンス:', event.final_text);
      break;
    case 'response.error':
      console.error('エラー:', event.message);
      break;
  }
}

Python

requestsライブラリを使用:

import requests
import json

def stream_chat(chat_id: int, message: str, agent_id: str, api_key: str):
    url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
    headers = {
        "x-api-key": api_key,
        "Content-Type": "application/json",
        "Accept": "text/event-stream"
    }
    payload = {
        "message": message,
        "stream": True,
        "agent_id": agent_id
    }

    with requests.post(url, headers=headers, json=payload, stream=True) as response:
        response.raise_for_status()

        for line in response.iter_lines():
            if not line:
                continue

            line = line.decode('utf-8')

            if line.startswith('data: '):
                data = line[6:]
                if data == '[DONE]':
                    print("\nストリーム完了")
                    break

                try:
                    event = json.loads(data)
                    handle_event(event)
                except json.JSONDecodeError:
                    pass

def handle_event(event: dict):
    event_type = event.get('type')

    if event_type == 'response.output_text.delta':
        print(event.get('delta', ''), end='', flush=True)
    elif event_type == 'response.output_text.completed':
        print(f"\n\nトークン使用量: {event.get('usage')}")
    elif event_type == 'response.error':
        print(f"\nエラー: {event.get('message')}")

# 使用方法
stream_chat(
    chat_id=12345,
    message="営業時間は何時ですか?",
    agent_id="your-agent-id",
    api_key="your-api-key"
)

Python(非同期)

aiohttpを使用:

import aiohttp
import asyncio
import json

async def stream_chat_async(chat_id: int, message: str, agent_id: str, api_key: str):
    url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
    headers = {
        "x-api-key": api_key,
        "Content-Type": "application/json",
        "Accept": "text/event-stream"
    }
    payload = {
        "message": message,
        "stream": True,
        "agent_id": agent_id
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, json=payload) as response:
            async for line in response.content:
                line = line.decode('utf-8').strip()
                if not line:
                    continue

                if line.startswith('data: '):
                    data = line[6:]
                    if data == '[DONE]':
                        break

                    try:
                        event = json.loads(data)
                        if event.get('type') == 'response.output_text.delta':
                            print(event.get('delta', ''), end='', flush=True)
                    except json.JSONDecodeError:
                        pass

# 使用方法
asyncio.run(stream_chat_async(
    chat_id=12345,
    message="こんにちは!",
    agent_id="your-agent-id",
    api_key="your-api-key"
))

ベストプラクティス

  1. バッファ処理:SSEデータはイベントの境界に揃わないチャンクで到着する場合があります。常に受信データをバッファリングし、完全なイベントを解析してください。

  2. エラー回復:ネットワーク障害に対する再接続ロジックを実装してください。処理を再開できるように、最後に受信したイベントを保存してください。

  3. UI更新:過度な再レンダリングを避けるために、UI更新をバッチ処理してください。スムーズなテキスト表示のためにrequestAnimationFrameの使用を検討してください。

  4. クリーンアップ:ユーザーがページを離れたりリクエストをキャンセルしたときは、常に接続を適切に閉じてください。

  5. タイムアウト処理:サーバーの90秒タイムアウトに一致するか、それを超えるクライアント側のタイムアウト処理を実装してください。

関連項目