目录
  1. 1. 一、API 调用链路
  2. 2. 二、claude.ts — API 核心
    1. 2.1. 2.1 主要导出函数
    2. 2.2. 2.2 请求参数构建
    3. 2.3. 2.3 System Prompt 格式化
  3. 3. 三、Streaming 事件流
    1. 3.1. 3.1 SSE 事件类型
    2. 3.2. 3.2 流式处理过程
  4. 4. 四、认证系统
    1. 4.1. 4.1 认证方式优先级
    2. 4.2. 4.2 OAuth 流程
    3. 4.3. 4.3 API Key 轮换
  5. 5. 五、重试策略
    1. 5.1. 错误分类
  6. 6. 六、Prompt Cache 优化
  7. 7. 七、Token 计数
  8. 8. 八、多提供商支持
  9. 9. 九、API 预连接
  10. 10. 十、Usage 追踪
  11. 11. 涉及源文件
  12. 12. 十一、SSE 协议深度解析
    1. 12.1. 11.1 SSE 事件格式
    2. 12.2. 11.2 事件驱动状态机
    3. 12.3. 11.3 流中断处理
  13. 13. 十二、重试与弹性设计
    1. 13.1. 12.1 指数退避算法
    2. 13.2. 12.2 随机抖动
  14. 14. 十三、Prompt 缓存优化
    1. 14.1. 13.1 缓存的经济学
    2. 14.2. 13.2 缓存策略
  15. 15. 涉及源文件
【Claude Code源码剖析】08-API 通信与 Streaming

⚠️ 学习声明:本文档基于 Claude Code 2.1.88 源码分析整理,仅供个人学习研究使用,不做任何商业用途。

Claude Code 与 Anthropic API 之间的通信层,3420 行的核心模块。


一、API 调用链路

query.ts (Agentic Loop)


services/api/claude.ts (3420 行, API 核心)

├─ 构建请求参数 (system prompt, messages, tools, betas)
├─ 添加认证 (API Key / OAuth / AWS / GCP)
├─ 配置重试策略


@anthropic-ai/sdk (官方 SDK)

├─ Streaming 请求 (SSE)


Anthropic API Server


SSE 事件流 → 解析 → 转换 → yield 给 query.ts

二、claude.ts — API 核心

2.1 主要导出函数

// Streaming API 调用 (主要使用)
export async function* streamClaude(params: {
systemPrompt: SystemPrompt;
messages: Message[];
tools: Tools;
model: string;
maxTokens: number;
thinkingConfig?: ThinkingConfig;
// ...
}): AsyncGenerator<StreamEvent> {
// 1. 构建 BetaMessageStreamParams
// 2. 调用 SDK stream
// 3. 解析并 yield 事件
}

// 获取模型最大输出 token
export function getMaxOutputTokensForModel(model: string): number;

2.2 请求参数构建

function buildRequestParams(config): BetaMessageStreamParams {
return {
model: config.model, // "claude-sonnet-4-20250514"
max_tokens: config.maxTokens, // 16384 (默认)
system: formatSystemPrompt(config.systemPrompt),
messages: normalizeMessagesForAPI(config.messages),
tools: config.tools.map(toolToAPISchema),
stream: true,

// Betas (新功能)
betas: getMergedBetas(config.model),

// 思维链
thinking: config.thinkingConfig ? {
type: 'enabled',
budget_tokens: config.thinkingConfig.budgetTokens,
} : undefined,

// 缓存控制
...getCacheControlParams(config),

// 额外头部
extra_headers: {
...getAttributionHeader(),
...getCLISyspromptPrefix(),
},
};
}

2.3 System Prompt 格式化

// System Prompt 被拆分为多个缓存段,最大化 prompt cache 命中率
function formatSystemPrompt(prompt: SystemPrompt): ContentBlockParam[] {
return [
// Segment 1: 核心指令 (很少变化,高缓存命中)
{
type: 'text',
text: prompt.coreInstructions,
cache_control: { type: 'ephemeral' },
},
// Segment 2: 工具描述 (较少变化)
{
type: 'text',
text: prompt.toolDescriptions,
cache_control: { type: 'ephemeral' },
},
// Segment 3: 环境上下文 (每次会话变化)
{
type: 'text',
text: prompt.environmentContext,
},
];
}

三、Streaming 事件流

3.1 SSE 事件类型

type StreamEvent =
| { type: 'message_start'; message: BetaMessage }
| { type: 'content_block_start'; content_block: ContentBlock }
| { type: 'content_block_delta'; delta: ContentDelta }
| { type: 'content_block_stop' }
| { type: 'message_delta'; delta: { stop_reason: StopReason } }
| { type: 'message_stop' }

// Content Block 类型
type ContentBlock =
| { type: 'text'; text: string }
| { type: 'tool_use'; id: string; name: string; input: unknown }
| { type: 'thinking'; thinking: string } // 思维链
| { type: 'server_tool_use'; ... } // 服务器端工具

// Stop Reason
type StopReason =
| 'end_turn' // 正常结束
| 'tool_use' // 需要执行工具
| 'max_tokens' // 达到最大 token
| 'stop_sequence' // 命中停止序列

3.2 流式处理过程

// 简化的事件处理流程
for await (const event of sdk.stream(params)) {
switch (event.type) {
case 'content_block_start':
if (event.content_block.type === 'text') {
// 开始新的文本块
yield { type: 'text_start' };
} else if (event.content_block.type === 'tool_use') {
// 开始新的工具调用 → 可以提前执行
streamingToolExecutor.addTool(event.content_block);
}
break;

case 'content_block_delta':
if (event.delta.type === 'text_delta') {
// 文本增量 → 实时渲染到终端
yield { type: 'text', text: event.delta.text };
} else if (event.delta.type === 'input_json_delta') {
// 工具输入增量 → 累积 JSON
accumulateToolInput(event.delta.partial_json);
}
break;

case 'message_delta':
if (event.delta.stop_reason === 'tool_use') {
// 需要执行工具 → 收集结果后继续循环
}
break;
}
}

四、认证系统

4.1 认证方式优先级

1. API Key (环境变量 ANTHROPIC_API_KEY)
2. OAuth Token (Anthropic Console 登录)
3. AWS Bedrock (IAM 认证)
4. Google Cloud Vertex AI (GCP 认证)
5. File Descriptor (--api-key-fd 传入)

4.2 OAuth 流程

// services/oauth/client.ts
// 1. 打开浏览器 → Anthropic Console 登录
// 2. 用户授权
// 3. 回调 → 获取 access_token + refresh_token
// 4. 存入 macOS Keychain / 系统安全存储
// 5. access_token 过期 → 自动刷新

// 启动时预取:
startKeychainPrefetch(); // 在 main.tsx 最顶部并行预取

4.3 API Key 轮换

// 当 401 错误时:
// 1. 如果是 OAuth → 尝试 refresh token
// 2. 如果是 API Key → 提示用户更新
// 3. 如果是 AWS/GCP → 尝试重新获取临时凭证

五、重试策略

// services/api/withRetry.ts
async function withRetry<T>(
fn: () => Promise<T>,
config: RetryConfig,
): Promise<T> {
let lastError: Error;

for (let attempt = 0; attempt < config.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
const retryInfo = categorizeRetryableAPIError(error);

if (!retryInfo.retryable) throw error;

// 指数退避
const delay = Math.min(
retryInfo.retryAfter || config.baseDelay * Math.pow(2, attempt),
config.maxDelay
);

await sleep(delay);
}
}

throw lastError;
}

错误分类

状态码 分类 处理
429 Rate Limit 重试 (指数退避)
500 Server Error 重试
502/503 Server Unavailable 重试
401 Unauthorized 刷新 token 后重试
400 (prompt_too_long) 上下文溢出 压缩后重试
400 (其他) 请求错误 不重试
网络错误 Connection Error 重试

六、Prompt Cache 优化

// services/api/promptCacheBreakDetection.ts
// Anthropic API 支持 prompt cache:
// 相同的 system prompt + 相同的消息前缀 → 缓存命中 → 更快更便宜

// Claude Code 的优化策略:
// 1. System prompt 分段,高频部分标记 cache_control
// 2. 消息历史只追加不修改 (append-only)
// 3. 压缩时保留前缀不变
// 4. 检测缓存破裂 → 日志报告

function notifyCompaction(): void {
// 压缩会导致缓存失效,记录这次破裂事件
}

七、Token 计数

// utils/tokens.ts
export function tokenCountWithEstimation(messages: Message[]): number {
// 1. 如果有 API 返回的精确计数 → 使用它
if (lastResponseUsage) {
return lastResponseUsage.input_tokens;
}

// 2. 否则使用估算
// 粗略估算: 1 token ≈ 4 characters (英文)
// 更准确: 使用 tiktoken 或类似库
const totalChars = messages.reduce(
(sum, msg) => sum + getContentText(msg).length,
0
);
return Math.ceil(totalChars / 4);
}

// 上下文窗口配置
export function getContextWindowForModel(model: string): number {
// claude-sonnet-4: 200,000 tokens
// claude-opus-4: 200,000 tokens
// claude-haiku-3.5: 200,000 tokens
// 默认: 200,000 tokens
}

八、多提供商支持

// utils/model/providers.ts
type APIProvider =
| 'anthropic' // 官方 API
| 'bedrock' // AWS Bedrock
| 'vertex' // Google Vertex AI

function getAPIProvider(): APIProvider {
if (process.env.ANTHROPIC_BEDROCK_BASE_URL) return 'bedrock';
if (process.env.ANTHROPIC_VERTEX_BASE_URL) return 'vertex';
return 'anthropic';
}

// 不同提供商的参数差异:
// - Bedrock: 需要 AWS IAM 签名,不同的 model ID 格式
// - Vertex: 需要 GCP access token,不同的 endpoint
// - Anthropic: 标准 API Key

九、API 预连接

// utils/apiPreconnect.ts
// 在 init() 阶段就建立 TCP 连接(不发数据)
// 后续 API 调用直接复用连接,减少首次请求延迟

export function preconnectAnthropicApi(): void {
// DNS 解析 + TCP 握手 + TLS 握手
// 大约节省 100-300ms 的首次请求时间
}

十、Usage 追踪

// services/api/usage.ts + services/api/logging.ts
type NonNullableUsage = {
input_tokens: number;
output_tokens: number;
cache_creation_input_tokens: number;
cache_read_input_tokens: number;
};

// 每次 API 响应后累加
export function accumulateUsage(totalUsage, responseUsage): NonNullableUsage {
return {
input_tokens: totalUsage.input_tokens + responseUsage.input_tokens,
output_tokens: totalUsage.output_tokens + responseUsage.output_tokens,
cache_creation_input_tokens: totalUsage.cache_creation_input_tokens +
(responseUsage.cache_creation_input_tokens || 0),
cache_read_input_tokens: totalUsage.cache_read_input_tokens +
(responseUsage.cache_read_input_tokens || 0),
};
}

涉及源文件


十一、SSE 协议深度解析

11.1 SSE 事件格式

Anthropic Streaming API 使用 Server-Sent Events(SSE, W3C 标准):

event: message_start
data: {"type": "message_start", "message": {"id": "msg_xxx"}}

event: content_block_start
data: {"type": "content_block_start", "index": 0, "content_block": {"type": "text"}}

event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "..."}}

event: content_block_stop
data: {"type": "content_block_stop", "index": 0}

event: message_delta
data: {"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {...}}

event: message_stop
data: {"type": "message_stop"}

11.2 事件驱动状态机

IDLE ──message_start──→ RECEIVING

┌──────┼──────┐
text_start │ tool_use_start
▼ │ ▼
TEXT_ACTIVE │ TOOL_USE_ACTIVE
│ │ │
text_stop │ tool_use_stop
└──────┬──────┘

DONE

11.3 流中断处理

场景 策略
网络断开 自动重连 + 幂等请求(仅在未收到 content 时重试)
服务器 5xx 指数退避重试(最多 3 次,baseDelay=1s)
token 超限 compact 后重试
用户 Ctrl+C AbortController.abort() → 保留已完成的部分

十二、重试与弹性设计

12.1 指数退避算法

async function withRetry<T>(fn: () => Promise<T>, opts: RetryOptions): Promise<T> {
let attempt = 0;
while (true) {
try { return await fn(); }
catch (err) {
attempt++;
if (attempt > (opts.maxRetries ?? 3) || !isRetryable(err)) throw err;
const delay = opts.baseDelay * Math.pow(2, attempt-1) + Math.random() * 1000;
await new Promise(r => setTimeout(r, delay));
}
}
}

function isRetryable(err: Error): boolean {
if (err instanceof RateLimitError) return true; // 429
if (err instanceof ServerError) return true; // 5xx
if (err instanceof AuthError) return true; // 刷新 token 后重试
return false;
}

12.2 随机抖动

在指数退避基础上加入随机抖动(Jitter),防止多个客户端同时重试造成惊群效应(Thundering Herd)。这一设计源自 AWS 的 Architecture for Exponential Backoff 最佳实践。


十三、Prompt 缓存优化

13.1 缓存的经济学

Anthropic Prompt Caching 可将 System Prompt 延迟降低 90%:

指标 缓存命中 缓存未命中
TTFT 200-500ms 1-3s
输入 Token 成本 10% 计费 全价
有效期 5 分钟

缓存标记:cache_control: { type: 'ephemeral' } 应用于 System Prompt 和 CLAUDE.md。

13.2 缓存策略

// System Prompt 固定部分 → 标记为可缓存
const systemBlocks = [
{ type: 'text', text: ROLE_DEF + SAFETY_RULES + TOOL_SPEC, cache_control: { type: 'ephemeral' } },
{ type: 'text', text: dynamicContext }, // git status 等动态信息,不缓存
];

cache_read_input_tokens > cache_write_input_tokens * 10 时(即缓存读命中 10 次以上),缓存投资为正收益。


涉及源文件

  • services/api/claude.ts
  • services/api/logging.ts
  • services/api/promptCacheBreakDetection.ts
  • services/api/usage.ts
  • services/api/withRetry.ts
  • services/oauth/client.ts
打赏
  • 微信
  • 支付宝

评论