コンテンツにスキップ

SSEストリーミング

Server-Sent Events (SSE) により、AIレスポンスをリアルタイムでストリーミングできます。完全なレスポンスを待つのではなく、テキストが生成されるたびに表示することで、より良いユーザー体験を提供します。

概要

stream: trueでメッセージを送信すると、APIは単一のJSONレスポンスではなく、イベントのストリームを返します。各イベントには、エージェントの処理とレスポンスに関する増分データが含まれます。

ストリーミングの有効化

ストリーミングを有効にするには、メッセージリクエストでstream: trueを設定します:

curl -N -X POST "https://api.codeer.ai/api/v1/chats/12345/messages" \
  -H "x-api-key: YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -H "Accept: text/event-stream" \
  -d '{
    "message": "こんにちは!",
    "stream": true,
    "agent_id": "550e8400-e29b-41d4-a716-446655440000"
  }'

Tool Tag の表示

  • include_tool_tags のデフォルト値は false です。
  • 未指定または false の場合、ストリーミングテキストイベントの <tool ...>...</tool> ブロックは削除されます。
  • 生のtoolブロックが必要な内部クライアントは、include_tool_tags=true を明示的に指定してください。

イベント形式

イベントはSSE仕様に従います:

event: <event_type>
data: <json_payload>

各イベントには共通フィールドが含まれます:

フィールド 説明
type イベントタイプ
response_id このレスポンスの一意の識別子
chat_id チャットセッションID

イベントタイプ

response.created

LLMが初期化され、処理の準備ができたときに発行されます。

{
  "type": "response.created",
  "response_id": "abc123",
  "chat_id": 12345,
  "agent_id": "550e8400-e29b-41d4-a716-446655440000",
  "model": "gpt-4",
  "conversation_group_id": "group_abc",
  "user_conversation_id": 1001
}

response.chat.title.updated

チャットタイトルが自動生成されたときに発行されます(通常は最初のメッセージの後)。

{
  "type": "response.chat.title.updated",
  "response_id": "abc123",
  "chat_id": 12345,
  "name": "営業時間についての質問"
}

response.reasoning_step.start

エージェントが推論ステップを開始したときに発行されます(例:ナレッジベースの検索、ツールの呼び出し)。

{
  "type": "response.reasoning_step.start",
  "response_id": "abc123",
  "chat_id": 12345,
  "step": {
    "id": "step_abc123",
    "content": "営業時間についてナレッジベースを検索中",
    "tool_name": "retrieve_context_objs",
    "args": {
      "query": "営業時間"
    },
    "timestamp": "2024-01-15T10:30:00Z"
  }
}
tool_name 説明
search_web Web検索
fetch_web_content URL からコンテンツを取得
retrieve_context_objs 関連するナレッジオブジェクトを選択
get_context_obj_lines 選択したナレッジから必要な行を読む
call_agent 別のエージェントを呼び出す
request_form フォーム入力をリクエスト
memory ユーザーメモリを書き込む
http_request 設定済み HTTP エンドポイントを呼び出す
lookup_history / lookup_attachments 履歴または添付ファイルを検索する
generate_image 画像を生成する

response.reasoning_step.end

推論ステップが完了したときに発行されます。

{
  "type": "response.reasoning_step.end",
  "response_id": "abc123",
  "chat_id": 12345,
  "step": {
    "id": "step_abc123",
    "tool_name": "retrieve_context_objs",
    "result": {
      "success": true,
      "data": "3件の関連ドキュメントが見つかりました..."
    },
    "timestamp": "2024-01-15T10:30:01Z",
    "token_usage": {
      "total_prompt_tokens": 150,
      "total_completion_tokens": 45,
      "total_tokens": 195,
      "total_calls": 1
    }
  }
}

response.interaction_request

Agent が続行前にクライアント側の操作を必要とするときに発行されます。もっとも一般的なのはフォーム入力要求です。

{
  "type": "response.interaction_request",
  "response_id": "abc123",
  "chat_id": 12345,
  "history_id": 12345,
  "conversation_group_id": "group_abc",
  "form_request_id": "form-uuid-here",
  "form_schema": {
    "title": "連絡フォーム",
    "fields": [
      {
        "name": "email",
        "label": "Email",
        "type": "text"
      }
    ]
  }
}

このイベントを受け取ったら、クライアントはフォームなどの UI を表示し、送信または拒否を行ったうえで必要に応じて会話を再開します。

response.output_text.delta

生成されたテキストの各チャンクに対して発行されます。これらを連結して完全なレスポンスを構築します。

{
  "type": "response.output_text.delta",
  "response_id": "abc123",
  "chat_id": 12345,
  "delta": "営業時間は"
}

include_tool_tags=false の場合、<tool ...>...</tool> のみを含むチャンクは削除され、空のdeltaイベントは送信されません。

response.output_text.completed

レスポンスが完全に生成されたときに発行されます。完全なテキストとトークン使用量が含まれます。

{
  "type": "response.output_text.completed",
  "response_id": "abc123",
  "chat_id": 12345,
  "final_text": "営業時間は月曜日から金曜日の午前9時から午後6時までです。",
  "usage": {
    "total_prompt_tokens": 250,
    "total_completion_tokens": 85,
    "total_tokens": 335,
    "total_calls": 1
  }
}

include_tool_tags=false の場合、final_text<tool ...>...</tool> ブロックを削除した状態で返されます。

response.error

処理中にエラーが発生したときに発行されます。

{
  "type": "response.error",
  "response_id": "abc123",
  "chat_id": 12345,
  "message": "リクエストの処理に失敗しました",
  "code": 10005
}

エラーコードの値

codeフィールドにはエラーコード(HTTPステータスではない)が含まれます。一般的な値:

  • 10005 (SYS_SERVER_ERROR) - 内部サーバーエラー
  • 10006 (SYS_BAD_REQUEST) - 無効なリクエスト

完全なリストはエラーコードを参照してください。

response.interaction_request

assistant run が意図的に中断され、ユーザー操作が必要なときに発行されます。

このイベントは interaction_type による判別可能ユニオン(discriminated union)を使います:

  • form:構造化入力を収集し、resume_form_request_id で再開。
  • payment:外部チェックアウトを開始し、非同期の取引状態を追跡。

フォームインタラクション例

{
  "type": "response.interaction_request",
  "response_id": "abc123",
  "chat_id": 12345,
  "interaction_type": "form",
  "conversation_group_id": "cvg-xxx",
  "form_request_id": "3c8e...",
  "form_schema": {
    "id": "contact_form",
    "title": "Contact Info",
    "fields": [
      {
        "name": "email",
        "label": "Email",
        "type": "shortText",
        "required": true
      }
    ]
  }
}

支払いインタラクション例

{
  "type": "response.interaction_request",
  "response_id": "abc123",
  "chat_id": 12345,
  "interaction_type": "payment",
  "conversation_group_id": "cvg-xxx",
  "payment": {
    "payment_request_id": "23db...",
    "checkout_url": "https://api.example.com/api/v1/payments/checkout/<token>",
    "merchant_order_no": "CDR202603...",
    "amount_twd": 1200,
    "currency": "TWD",
    "status": "pending",
    "item_desc": "Consultation fee"
  }
}

支払いステータスのライフサイクル

支払いインタラクションのステータスは通常、次の遷移をたどります:

  • pending:リクエスト作成済み、チェックアウト未開始(キャンセル可能)。
  • processing:チェックアウト開始済み(例:チェックアウトページを開いた後)。キャンセル不可。
  • succeeded / failed:決済結果の終端状態。
  • cancelledpending の間にユーザーがキャンセル。
  • expired:期限内に完了せず失効。

追加の挙動:

  • キャンセル操作は現在ステータスが pending の場合のみ受け付けます。
  • バックエンドの定期ジョブが、長時間 pending/processing のままのリクエストを expired に更新します。

payment を独立した interaction type にする理由

payment は通常の form の一種ではありません。主な理由は次の通りです:

  • ライフサイクルが異なる:外部チェックアウト + webhook/polling を含み、1回送信で完了しない。
  • payload 契約が異なる:checkout_url、取引識別子、ステータスを必要とする。
  • UI 操作が異なる:決済ページを開く、ステータス更新、終端状態の表示が必要。

イベント入口は interaction_request に統一し、interaction_type で型安全に分岐します。

支払いデータの永続化

interaction_type="payment" では、バックエンドが支払いサマリーを会話履歴として永続化します。少なくとも以下を含みます:

  • 注文番号(merchant_order_no
  • タイトル(item_desc
  • 金額(amount_twd, currency
  • ステータス(pending/processing/succeeded/failed/cancelled/expired/...

その後のコールバック(例:決済ゲートウェイの notify webhook)でも同じサマリーが更新され、履歴再読み込みや会話継続時に文脈を維持できます。

ストリームの終了

ストリームは特別なメッセージで終了します:

data: [DONE]

接続を適切に閉じるために、必ずこのマーカーを処理してください。

タイムアウト処理

デフォルトのストリームタイムアウトは180秒です。この期間内にイベントを受信しない場合、サーバーは response.error を送信してからストリームを閉じます。長時間実行される操作では、クライアント側でタイムアウトと再接続を扱えるようにしてください。

クライアント実装

JavaScript(ブラウザ)

ネイティブfetch APIをストリーミングで使用:

const url = 'https://api.codeer.ai/api/v1/chats/12345/messages';

// EventSourceはPOSTをサポートしていないため、fetchをストリーミングで使用
const response = await fetch(url, {
  method: 'POST',
  headers: {
    'x-api-key': 'YOUR_API_KEY',
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    message: 'こんにちは!',
    stream: true,
    agent_id: 'your-agent-id'
  })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split('\n');
  buffer = lines.pop() || '';

  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = line.slice(6);
      if (data === '[DONE]') {
        console.log('ストリーム完了');
        return;
      }
      try {
        const event = JSON.parse(data);
        handleEvent(event);
      } catch (e) {
        // JSONではない、イベント名の行かもしれない
      }
    }
  }
}

function handleEvent(event) {
  switch (event.type) {
    case 'response.output_text.delta':
      // テキストをUIに追加
      document.getElementById('output').textContent += event.delta;
      break;
    case 'response.interaction_request':
      // フォームなどの操作 UI を表示
      openInteractionUi(event);
      break;
    case 'response.output_text.completed':
      console.log('最終レスポンス:', event.final_text);
      break;
    case 'response.error':
      console.error('エラー:', event.message);
      break;
  }
}

Python

requestsライブラリを使用:

import requests
import json

def stream_chat(chat_id: int, message: str, agent_id: str, api_key: str):
    url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
    headers = {
        "x-api-key": api_key,
        "Content-Type": "application/json",
        "Accept": "text/event-stream"
    }
    payload = {
        "message": message,
        "stream": True,
        "agent_id": agent_id
    }

    with requests.post(url, headers=headers, json=payload, stream=True) as response:
        response.raise_for_status()

        for line in response.iter_lines():
            if not line:
                continue

            line = line.decode('utf-8')

            if line.startswith('data: '):
                data = line[6:]
                if data == '[DONE]':
                    print("\nストリーム完了")
                    break

                try:
                    event = json.loads(data)
                    handle_event(event)
                except json.JSONDecodeError:
                    pass

def handle_event(event: dict):
    event_type = event.get('type')

    if event_type == 'response.output_text.delta':
        print(event.get('delta', ''), end='', flush=True)
    elif event_type == 'response.interaction_request':
        print(f"\n追加操作が必要です: {event}")
    elif event_type == 'response.output_text.completed':
        print(f"\n\nトークン使用量: {event.get('usage')}")
    elif event_type == 'response.error':
        print(f"\nエラー: {event.get('message')}")

# 使用方法
stream_chat(
    chat_id=12345,
    message="営業時間は何時ですか?",
    agent_id="your-agent-id",
    api_key="your-api-key"
)

Python(非同期)

aiohttpを使用:

import aiohttp
import asyncio
import json

async def stream_chat_async(chat_id: int, message: str, agent_id: str, api_key: str):
    url = f"https://api.codeer.ai/api/v1/chats/{chat_id}/messages"
    headers = {
        "x-api-key": api_key,
        "Content-Type": "application/json",
        "Accept": "text/event-stream"
    }
    payload = {
        "message": message,
        "stream": True,
        "agent_id": agent_id
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, json=payload) as response:
            async for line in response.content:
                line = line.decode('utf-8').strip()
                if not line:
                    continue

                if line.startswith('data: '):
                    data = line[6:]
                    if data == '[DONE]':
                        break

                    try:
                        event = json.loads(data)
                        if event.get('type') == 'response.output_text.delta':
                            print(event.get('delta', ''), end='', flush=True)
                    except json.JSONDecodeError:
                        pass

# 使用方法
asyncio.run(stream_chat_async(
    chat_id=12345,
    message="こんにちは!",
    agent_id="your-agent-id",
    api_key="your-api-key"
))

ベストプラクティス

  1. バッファ処理:SSEデータはイベントの境界に揃わないチャンクで到着する場合があります。常に受信データをバッファリングし、完全なイベントを解析してください。

  2. エラー回復:ネットワーク障害に対する再接続ロジックを実装してください。処理を再開できるように、最後に受信したイベントを保存してください。

  3. UI更新:過度な再レンダリングを避けるために、UI更新をバッチ処理してください。スムーズなテキスト表示のためにrequestAnimationFrameの使用を検討してください。

  4. クリーンアップ:ユーザーがページを離れたりリクエストをキャンセルしたときは、常に接続を適切に閉じてください。

  5. タイムアウト処理:サーバーの180秒タイムアウトに一致するか、それを超えるクライアント側のタイムアウト処理を実装してください。

関連項目