Skip to content

第14篇:服务与集成 · 第1节 API 客户端 — Anthropic Messages 与流式 SSE

Claude Code 完全指南 V2 · 本篇共 8 节。本节聚焦 Messages API 的封装:流式 SSE、超时、重试与降级,以及与上层 Agent 循环的边界。


学习目标

能力项说明
协议描述 Messages API 请求/响应中 messagessystemtoolsstream 等核心字段
SSE解析 event: / data: 帧,增量拼出 assistant 消息与 tool_use
韧性设计指数退避、可重试状态码、熔断与降级(模型/上下文截断)
边界区分「传输层客户端」与「业务编排」(谁负责多轮 tool loop)
观测在日志中安全打点 requestId、latency,避免泄露正文

生活类比:挂号窗口与叫号屏

医院挂号(发请求)后,你不会堵在窗口等医生把病历全写完——而是去大厅看叫号屏逐条刷新SSE 流)。若网络抖动叫号屏卡了,你会过会儿抬头再看重试);若全院系统挂了,护士可能给你纸质临时号降级:换模型或缩短上下文)。API 客户端就是窗口柜员 + 叫号屏协议解析器:保证你拿到有序、完整的就诊序列,而不是一堆乱序纸条。


请求骨架(教学示意)

typescript
// api/messagesTypes.ts — 教学示意,非官方 SDK 逐字段保证
export interface MessagesRequestBody {
  model: string;
  max_tokens: number;
  system?: unknown;
  messages: Array<{
    role: "user" | "assistant";
    content: unknown;
  }>;
  tools?: unknown[];
  tool_choice?: unknown;
  stream?: boolean;
  metadata?: Record<string, string>;
}

流式读取(fetch + ReadableStream)

typescript
// api/streamingClient.ts — 教学示意
export async function* streamSSE(
  response: Response
): AsyncGenerator<{ event: string; data: string }> {
  const reader = response.body!.getReader();
  const dec = new TextDecoder();
  let buf = "";
  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    buf += dec.decode(value, { stream: true });
    let idx: number;
    while ((idx = buf.indexOf("\n\n")) >= 0) {
      const block = buf.slice(0, idx);
      buf = buf.slice(idx + 2);
      let event = "message";
      let data = "";
      for (const line of block.split("\n")) {
        if (line.startsWith("event:")) event = line.slice(6).trim();
        if (line.startsWith("data:")) data += line.slice(5).trim();
      }
      if (data === "[DONE]") return;
      yield { event, data };
    }
  }
}

聚合流为最终消息(简化)

typescript
export async function collectStreamedMessage(
  resp: Response
): Promise<{ text: string; toolUses: unknown[] }> {
  let text = "";
  const toolUses: unknown[] = [];
  for await (const frame of streamSSE(resp)) {
    const chunk = JSON.parse(frame.data);
    // 伪代码:按 Anthropic SSE 事件类型分支 delta
    if (chunk.type === "content_block_delta") {
      if (chunk.delta?.type === "text_delta") text += chunk.delta.text;
    }
    if (chunk.type === "content_block_start" && chunk.content_block?.type === "tool_use") {
      toolUses.push(chunk.content_block);
    }
  }
  return { text, toolUses };
}

重试与降级策略表

场景策略注意
429 / Rate limitRetry-After 或指数退避尊重 header
5xx有限次重试 + jitter避免惊群
网络 reset重试整请求流式需从头或支持 resume(若 API 提供)
上下文过长降级:摘要、删旧轮、换小模型产品提示用户
401不重试 token刷新 OAuth(见第6节)

Mermaid:单次流式请求

图2:重试状态机


客户端配置表

参数典型值用途
timeoutMs120000首字节超时
maxRetries3可恢复错误
baseUrl官方或代理企业网关
anthropic-version headerAPI 版本服务端路由

与错误分类(第2节)的衔接

传输层应抛出结构化错误kind, status, requestId),由上层统一映射为用户可见文案与 AppState.tools.lastError。不要在底层 console.error 打印完整 prompt。


安全与合规

做法
API Key环境变量或 keychain;禁止写入项目仓库
日志哈希 message id;截断正文
代理 TLS企业根证书信任链校验

小结

API 客户端负责 HTTP/SSE 细节 + 韧性策略,把「一坨字节流」变成「结构化的 assistant 输出 + tool 调用」。重试要分级401 与 429 不同对待;流式解析要保持缓冲半包DONE 语义。


自测

  1. SSE 半包时为何不能按行 split 后立即丢弃 buf
  2. 流式中途连接断开,若无 resume,业务层有哪些选择?
  3. metadata 适合传哪些可观测字段?

与 Agent 循环的边界

层次职责非职责
API Client单次 HTTP/SSE、重试、解析不决定下一轮流是否调用 tool
Agent Loop多轮 messages、tool 结果回填不直接操作 TLS socket
State记录 lastError、会话 id不实现指数退避细节
typescript
// 边界示意:循环调用客户端,而非相反
export async function agentTurn(client: MessagesClient, state: ChatState) {
  const resp = await client.streamOnce(buildRequest(state));
  const parsed = await collectStreamedMessage(resp);
  return applyToolResultsToState(state, parsed);
}

参考阅读(关键词)

可在官方文档中检索:Messages APIserver-sent eventstool usestreaming。具体 URL 以当前版本为准,避免教材链接失效。


下一节02-error-handling.md — 错误分类与处理策略。

本项目仅用于教育学习目的。Claude Code 源码版权归 Anthropic, PBC 所有。