logo

Claude API 流式响应

流式响应让你可以实时接收 Claude 的输出,提升用户体验。

你可以把它当作“实时旁白模式”:
模型边想边说,用户更容易感知系统正在处理,而不是页面卡死。

基础用法

Python 流式

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "讲一个简短的故事"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

使用事件处理

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            print(event.delta.text, end="")
        elif event.type == "message_stop":
            print("\n[完成]")

获取完整响应

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    response = stream.get_final_message()

print(response.content[0].text)
print(f"输入 tokens: {response.usage.input_tokens}")
print(f"输出 tokens: {response.usage.output_tokens}")

读者导向:上线路径

  1. 先用 text_stream 跑通最小可用流式输出。
  2. 再升级为事件级处理,覆盖 stop reason 与 usage 上报。
  3. 最后补取消、重连、超时控制,保证前后端状态一致。

Node.js 流式

基础流式

import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

async function streamChat() {
  const stream = client.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: '讲一个简短的故事' }]
  });

  stream.on('text', (text) => {
    process.stdout.write(text);
  });

  const finalMessage = await stream.finalMessage();
  console.log('\n完成');
}

streamChat();

使用 for await

async function streamWithIterator() {
  const stream = await client.messages.create({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Hello' }],
    stream: true
  });

  for await (const event of stream) {
    if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
      process.stdout.write(event.delta.text);
    }
  }
}

事件类型

Claude 流式 API 发送的事件类型:

# 消息开始
{"type": "message_start", "message": {...}}

# 内容块开始
{"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}}

# 内容增量
{"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "你好"}}

# 内容块结束
{"type": "content_block_stop", "index": 0}

# 消息增量(usage 信息)
{"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {"output_tokens": 50}}

# 消息结束
{"type": "message_stop"}

完整事件处理

def handle_stream(messages):
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=messages
    ) as stream:
        for event in stream:
            match event.type:
                case "message_start":
                    print(f"开始生成 (model: {event.message.model})")

                case "content_block_start":
                    print(f"内容块 {event.index} 开始")

                case "content_block_delta":
                    if event.delta.type == "text_delta":
                        print(event.delta.text, end="")

                case "content_block_stop":
                    print(f"\n内容块 {event.index} 结束")

                case "message_delta":
                    print(f"\n停止原因: {event.delta.stop_reason}")
                    print(f"输出 tokens: {event.usage.output_tokens}")

                case "message_stop":
                    print("消息完成")

Web 应用集成

FastAPI + SSE

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
client = anthropic.Anthropic()

async def generate_stream(prompt: str):
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            yield f"data: {text}\n\n"

    yield "data: [DONE]\n\n"

@app.get("/chat")
async def chat(prompt: str):
    return StreamingResponse(
        generate_stream(prompt),
        media_type="text/event-stream"
    )

Next.js API Route

// app/api/chat/route.ts
import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

export async function POST(req: Request) {
  const { messages } = await req.json();

  const stream = await client.messages.create({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages,
    stream: true
  });

  const encoder = new TextEncoder();
  const readable = new ReadableStream({
    async start(controller) {
      for await (const event of stream) {
        if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
          controller.enqueue(encoder.encode(event.delta.text));
        }
      }
      controller.close();
    }
  });

  return new Response(readable, {
    headers: { 'Content-Type': 'text/plain; charset=utf-8' }
  });
}

前端接收 (React)

async function streamChat(
  messages: Array<{role: string; content: string}>,
  onChunk: (text: string) => void
) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages })
  });

  const reader = response.body?.getReader();
  const decoder = new TextDecoder();

  while (reader) {
    const { done, value } = await reader.read();
    if (done) break;
    onChunk(decoder.decode(value));
  }
}

// React 组件
function ChatComponent() {
  const [response, setResponse] = useState('');

  const handleSend = async (prompt: string) => {
    setResponse('');
    await streamChat(
      [{ role: 'user', content: prompt }],
      (chunk) => setResponse(prev => prev + chunk)
    );
  };

  return <div>{response}</div>;
}

流式 + Tool Use

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=[{
        "name": "get_weather",
        "description": "获取天气",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {"type": "string"}
            },
            "required": ["city"]
        }
    }],
    messages=[{"role": "user", "content": "北京天气怎么样?"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(f"文本: {event.delta.text}")
            elif event.delta.type == "input_json_delta":
                print(f"工具参数: {event.delta.partial_json}")

错误处理

import anthropic

try:
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Hello"}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="")

except anthropic.APIConnectionError:
    print("连接失败")
except anthropic.RateLimitError:
    print("请求过于频繁")
except anthropic.APIStatusError as e:
    print(f"API 错误: {e}")

常见坑

  • 前端只拼文本,不跟踪事件类型导致 UI 状态错位
  • 服务端没做请求中断处理,连接泄露
  • 忽略 usage 统计,难以定位成本异常

性能优化

设置超时

client = anthropic.Anthropic(
    timeout=60.0  # 60 秒超时
)

重试机制

client = anthropic.Anthropic(
    max_retries=3  # 最多重试 3 次
)

与 OpenAI 流式的区别

特性Claude APIOpenAI API
流式方法messages.stream()stream=True
事件结构细粒度事件类型统一 chunk 格式
上下文管理器支持 with 语句需要手动处理
最终消息get_final_message()需要手动组装

下一步


提示:Claude 的流式 API 使用上下文管理器,更加 Pythonic。

Claude API 开发指南
AI Engineer

Claude API 开发指南

Anthropic Claude API 提供了强大的 AI 模型访问,以安全性和准确性著称,适合企业级应用。

Claude API 开发指南流式响应

Claude API 流式响应

流式响应让你可以实时接收 Claude 的输出,提升用户体验。

你可以把它当作“实时旁白模式”:
模型边想边说,用户更容易感知系统正在处理,而不是页面卡死。

#基础用法

#Python 流式

python
import anthropic client = anthropic.Anthropic() with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "讲一个简短的故事"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

#使用事件处理

python
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Hello"}] ) as stream: for event in stream: if event.type == "content_block_delta": print(event.delta.text, end="") elif event.type == "message_stop": print("\n[完成]")

#获取完整响应

python
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Hello"}] ) as stream: response = stream.get_final_message() print(response.content[0].text) print(f"输入 tokens: {response.usage.input_tokens}") print(f"输出 tokens: {response.usage.output_tokens}")

#读者导向:上线路径

  1. 先用 text_stream 跑通最小可用流式输出。
  2. 再升级为事件级处理,覆盖 stop reason 与 usage 上报。
  3. 最后补取消、重连、超时控制,保证前后端状态一致。

#Node.js 流式

#基础流式

typescript
import Anthropic from '@anthropic-ai/sdk'; const client = new Anthropic(); async function streamChat() { const stream = client.messages.stream({ model: 'claude-sonnet-4-20250514', max_tokens: 1024, messages: [{ role: 'user', content: '讲一个简短的故事' }] }); stream.on('text', (text) => { process.stdout.write(text); }); const finalMessage = await stream.finalMessage(); console.log('\n完成'); } streamChat();

#使用 for await

typescript
async function streamWithIterator() { const stream = await client.messages.create({ model: 'claude-sonnet-4-20250514', max_tokens: 1024, messages: [{ role: 'user', content: 'Hello' }], stream: true }); for await (const event of stream) { if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') { process.stdout.write(event.delta.text); } } }

#事件类型

Claude 流式 API 发送的事件类型:

python
# 消息开始 {"type": "message_start", "message": {...}} # 内容块开始 {"type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""}} # 内容增量 {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "你好"}} # 内容块结束 {"type": "content_block_stop", "index": 0} # 消息增量(usage 信息) {"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {"output_tokens": 50}} # 消息结束 {"type": "message_stop"}

#完整事件处理

python
def handle_stream(messages): with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=messages ) as stream: for event in stream: match event.type: case "message_start": print(f"开始生成 (model: {event.message.model})") case "content_block_start": print(f"内容块 {event.index} 开始") case "content_block_delta": if event.delta.type == "text_delta": print(event.delta.text, end="") case "content_block_stop": print(f"\n内容块 {event.index} 结束") case "message_delta": print(f"\n停止原因: {event.delta.stop_reason}") print(f"输出 tokens: {event.usage.output_tokens}") case "message_stop": print("消息完成")

#Web 应用集成

#FastAPI + SSE

python
from fastapi import FastAPI from fastapi.responses import StreamingResponse import anthropic app = FastAPI() client = anthropic.Anthropic() async def generate_stream(prompt: str): with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": prompt}] ) as stream: for text in stream.text_stream: yield f"data: {text}\n\n" yield "data: [DONE]\n\n" @app.get("/chat") async def chat(prompt: str): return StreamingResponse( generate_stream(prompt), media_type="text/event-stream" )

#Next.js API Route

typescript
// app/api/chat/route.ts import Anthropic from '@anthropic-ai/sdk'; const client = new Anthropic(); export async function POST(req: Request) { const { messages } = await req.json(); const stream = await client.messages.create({ model: 'claude-sonnet-4-20250514', max_tokens: 1024, messages, stream: true }); const encoder = new TextEncoder(); const readable = new ReadableStream({ async start(controller) { for await (const event of stream) { if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') { controller.enqueue(encoder.encode(event.delta.text)); } } controller.close(); } }); return new Response(readable, { headers: { 'Content-Type': 'text/plain; charset=utf-8' } }); }

#前端接收 (React)

typescript
async function streamChat( messages: Array<{role: string; content: string}>, onChunk: (text: string) => void ) { const response = await fetch('/api/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages }) }); const reader = response.body?.getReader(); const decoder = new TextDecoder(); while (reader) { const { done, value } = await reader.read(); if (done) break; onChunk(decoder.decode(value)); } } // React 组件 function ChatComponent() { const [response, setResponse] = useState(''); const handleSend = async (prompt: string) => { setResponse(''); await streamChat( [{ role: 'user', content: prompt }], (chunk) => setResponse(prev => prev + chunk) ); }; return <div>{response}</div>; }

#流式 + Tool Use

python
with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, tools=[{ "name": "get_weather", "description": "获取天气", "input_schema": { "type": "object", "properties": { "city": {"type": "string"} }, "required": ["city"] } }], messages=[{"role": "user", "content": "北京天气怎么样?"}] ) as stream: for event in stream: if event.type == "content_block_delta": if event.delta.type == "text_delta": print(f"文本: {event.delta.text}") elif event.delta.type == "input_json_delta": print(f"工具参数: {event.delta.partial_json}")

#错误处理

python
import anthropic try: with client.messages.stream( model="claude-sonnet-4-20250514", max_tokens=1024, messages=[{"role": "user", "content": "Hello"}] ) as stream: for text in stream.text_stream: print(text, end="") except anthropic.APIConnectionError: print("连接失败") except anthropic.RateLimitError: print("请求过于频繁") except anthropic.APIStatusError as e: print(f"API 错误: {e}")

#常见坑

  • 前端只拼文本,不跟踪事件类型导致 UI 状态错位
  • 服务端没做请求中断处理,连接泄露
  • 忽略 usage 统计,难以定位成本异常

#性能优化

#设置超时

python
client = anthropic.Anthropic( timeout=60.0 # 60 秒超时 )

#重试机制

python
client = anthropic.Anthropic( max_retries=3 # 最多重试 3 次 )

#与 OpenAI 流式的区别

特性Claude APIOpenAI API
流式方法messages.stream()stream=True
事件结构细粒度事件类型统一 chunk 格式
上下文管理器支持 with 语句需要手动处理
最终消息get_final_message()需要手动组装

#下一步


提示:Claude 的流式 API 使用上下文管理器,更加 Pythonic。

System Design

系统设计必备:核心概念 + 经典案例

快速掌握取舍与设计套路,备战系统设计面试。

进入 System Design →

相关路线图