logo

OpenAI API Streaming

Streaming 让你可以实时接收 AI 的输出,而不是等待完整回复。这对于提升用户体验和构建聊天应用至关重要。

把它想成“边写边显示的打字机模式”:
用户不用等全文生成完才看到结果,体感会从“卡住了”变成“系统在工作”。

建议优先使用 Responses API 的 stream 机制;本文示例以 Chat Completions 为主,方便老项目迁移时对照。

为什么使用流式响应?

普通模式 vs 流式模式

普通模式:
用户发送 → [等待 3-5 秒] → 一次性返回完整回复

流式模式:
用户发送 → 立即开始返回 → 逐字显示 → 完成

优势

特性普通模式流式模式
首字延迟3-5 秒< 1 秒
用户体验等待感强即时反馈
长回复可能超时稳定传输
适用场景API 调用聊天界面

读者导向:生产化落地顺序

  1. 先在后端跑通最小 streaming(能稳定吐字)。
  2. 再接前端增量渲染(含取消请求与重连处理)。
  3. 最后补监控(首字延迟、完成率、异常中断率)。

Python 实现

基础流式

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-5.2",
    messages=[{"role": "user", "content": "讲一个简短的故事"}],
    stream=True  # 启用流式
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

使用上下文管理器

with client.chat.completions.create(
    model="gpt-5.2",
    messages=[{"role": "user", "content": "Hello"}],
    stream=True
) as stream:
    for chunk in stream:
        content = chunk.choices[0].delta.content or ""
        print(content, end="", flush=True)

收集完整响应

def stream_chat(messages: list) -> str:
    """流式输出并返回完整响应"""
    client = OpenAI()
    full_response = ""

    stream = client.chat.completions.create(
        model="gpt-5.2",
        messages=messages,
        stream=True
    )

    for chunk in stream:
        content = chunk.choices[0].delta.content or ""
        print(content, end="", flush=True)
        full_response += content

    print()  # 换行
    return full_response

# 使用
response = stream_chat([
    {"role": "user", "content": "用三句话介绍 Python"}
])
print(f"\n完整响应长度: {len(response)}")

Node.js 实现

基础流式

import OpenAI from 'openai';

const client = new OpenAI();

async function streamChat() {
  const stream = await client.chat.completions.create({
    model: 'gpt-5.2',
    messages: [{ role: 'user', content: '讲一个简短的故事' }],
    stream: true
  });

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || '';
    process.stdout.write(content);
  }
}

streamChat();

使用 Stream Helper

import OpenAI from 'openai';

const client = new OpenAI();

async function streamWithHelper() {
  const stream = client.beta.chat.completions.stream({
    model: 'gpt-5.2',
    messages: [{ role: 'user', content: 'Hello' }]
  });

  // 事件监听
  stream.on('content', (delta, snapshot) => {
    process.stdout.write(delta);
  });

  // 获取最终结果
  const finalResponse = await stream.finalChatCompletion();
  console.log('\n完成:', finalResponse.choices[0].message.content);
}

Web 应用集成

FastAPI + SSE

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
client = OpenAI()

async def generate_stream(prompt: str):
    stream = client.chat.completions.create(
        model="gpt-5.2",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    for chunk in stream:
        content = chunk.choices[0].delta.content or ""
        if content:
            yield f"data: {content}\n\n"

    yield "data: [DONE]\n\n"

@app.get("/chat")
async def chat(prompt: str):
    return StreamingResponse(
        generate_stream(prompt),
        media_type="text/event-stream"
    )

前端接收 (React)

async function streamChat(prompt: string, onChunk: (text: string) => void) {
  const response = await fetch(`/chat?prompt=${encodeURIComponent(prompt)}`);
  const reader = response.body?.getReader();
  const decoder = new TextDecoder();

  while (reader) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    const lines = text.split('\n');

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const content = line.slice(6);
        if (content !== '[DONE]') {
          onChunk(content);
        }
      }
    }
  }
}

// React 组件中使用
function ChatComponent() {
  const [response, setResponse] = useState('');

  const handleSend = async () => {
    setResponse('');
    await streamChat('你的问题', (chunk) => {
      setResponse(prev => prev + chunk);
    });
  };

  return (
    <div>
      <button onClick={handleSend}>发送</button>
      <div>{response}</div>
    </div>
  );
}

Next.js API Route

// app/api/chat/route.ts
import { OpenAI } from 'openai';

const client = new OpenAI();

export async function POST(req: Request) {
  const { messages } = await req.json();

  const stream = await client.chat.completions.create({
    model: 'gpt-5.2',
    messages,
    stream: true
  });

  const encoder = new TextEncoder();
  const readable = new ReadableStream({
    async start(controller) {
      for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content || '';
        controller.enqueue(encoder.encode(content));
      }
      controller.close();
    }
  });

  return new Response(readable, {
    headers: { 'Content-Type': 'text/plain; charset=utf-8' }
  });
}

Chunk 数据结构

# 每个 chunk 的结构
{
    "id": "chatcmpl-xxx",
    "object": "chat.completion.chunk",
    "created": 1234567890,
    "model": "gpt-5.2",
    "choices": [{
        "index": 0,
        "delta": {
            "content": "你"  # 增量内容
        },
        "finish_reason": null  # 最后一个为 "stop"
    }]
}

处理完成状态

for chunk in stream:
    delta = chunk.choices[0].delta
    finish_reason = chunk.choices[0].finish_reason

    if delta.content:
        print(delta.content, end="")

    if finish_reason == "stop":
        print("\n[完成]")
    elif finish_reason == "length":
        print("\n[达到长度限制]")

常见坑

  • 只处理增量内容,不处理 finish_reason 导致状态机错乱
  • 前端直接拼接 chunk,不做断线重试和去重
  • 没有超时与取消控制,长请求占满连接资源

流式 + Function Calling

stream = client.chat.completions.create(
    model="gpt-5.2",
    messages=[{"role": "user", "content": "北京天气怎么样?"}],
    tools=[{
        "type": "function",
        "function": {
            "name": "get_weather",
            "parameters": {...}
        }
    }],
    stream=True
)

tool_calls = []
for chunk in stream:
    delta = chunk.choices[0].delta

    # 处理工具调用
    if delta.tool_calls:
        for tc in delta.tool_calls:
            if tc.index >= len(tool_calls):
                tool_calls.append({"name": "", "arguments": ""})
            if tc.function.name:
                tool_calls[tc.index]["name"] = tc.function.name
            if tc.function.arguments:
                tool_calls[tc.index]["arguments"] += tc.function.arguments

    # 处理文本内容
    if delta.content:
        print(delta.content, end="")

print(f"\n工具调用: {tool_calls}")

错误处理

from openai import APIError, APIConnectionError

try:
    stream = client.chat.completions.create(
        model="gpt-5.2",
        messages=[...],
        stream=True
    )

    for chunk in stream:
        # 处理 chunk
        pass

except APIConnectionError:
    print("连接失败,请检查网络")
except APIError as e:
    print(f"API 错误: {e}")
except Exception as e:
    print(f"未知错误: {e}")

性能优化

设置超时

from openai import OpenAI
import httpx

client = OpenAI(
    timeout=httpx.Timeout(60.0, connect=5.0)
)

重试机制

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
def stream_with_retry(messages):
    return client.chat.completions.create(
        model="gpt-5.2",
        messages=messages,
        stream=True
    )

下一步


提示:流式响应是构建聊天应用的核心技术,务必掌握。

OpenAI API 开发指南
AI Engineer

OpenAI API 开发指南

OpenAI API 是最广泛使用的 AI API 之一,提供 GPT-4、DALL-E、Whisper 等模型的访问。

OpenAI API 开发指南流式响应

OpenAI API Streaming

Streaming 让你可以实时接收 AI 的输出,而不是等待完整回复。这对于提升用户体验和构建聊天应用至关重要。

把它想成“边写边显示的打字机模式”:
用户不用等全文生成完才看到结果,体感会从“卡住了”变成“系统在工作”。

建议优先使用 Responses API 的 stream 机制;本文示例以 Chat Completions 为主,方便老项目迁移时对照。

#为什么使用流式响应?

#普通模式 vs 流式模式

普通模式:
用户发送 → [等待 3-5 秒] → 一次性返回完整回复

流式模式:
用户发送 → 立即开始返回 → 逐字显示 → 完成

#优势

特性普通模式流式模式
首字延迟3-5 秒< 1 秒
用户体验等待感强即时反馈
长回复可能超时稳定传输
适用场景API 调用聊天界面

#读者导向:生产化落地顺序

  1. 先在后端跑通最小 streaming(能稳定吐字)。
  2. 再接前端增量渲染(含取消请求与重连处理)。
  3. 最后补监控(首字延迟、完成率、异常中断率)。

#Python 实现

#基础流式

python
from openai import OpenAI client = OpenAI() stream = client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": "讲一个简短的故事"}], stream=True # 启用流式 ) for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="", flush=True)

#使用上下文管理器

python
with client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": "Hello"}], stream=True ) as stream: for chunk in stream: content = chunk.choices[0].delta.content or "" print(content, end="", flush=True)

#收集完整响应

python
def stream_chat(messages: list) -> str: """流式输出并返回完整响应""" client = OpenAI() full_response = "" stream = client.chat.completions.create( model="gpt-5.2", messages=messages, stream=True ) for chunk in stream: content = chunk.choices[0].delta.content or "" print(content, end="", flush=True) full_response += content print() # 换行 return full_response # 使用 response = stream_chat([ {"role": "user", "content": "用三句话介绍 Python"} ]) print(f"\n完整响应长度: {len(response)}")

#Node.js 实现

#基础流式

typescript
import OpenAI from 'openai'; const client = new OpenAI(); async function streamChat() { const stream = await client.chat.completions.create({ model: 'gpt-5.2', messages: [{ role: 'user', content: '讲一个简短的故事' }], stream: true }); for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content || ''; process.stdout.write(content); } } streamChat();

#使用 Stream Helper

typescript
import OpenAI from 'openai'; const client = new OpenAI(); async function streamWithHelper() { const stream = client.beta.chat.completions.stream({ model: 'gpt-5.2', messages: [{ role: 'user', content: 'Hello' }] }); // 事件监听 stream.on('content', (delta, snapshot) => { process.stdout.write(delta); }); // 获取最终结果 const finalResponse = await stream.finalChatCompletion(); console.log('\n完成:', finalResponse.choices[0].message.content); }

#Web 应用集成

#FastAPI + SSE

python
from fastapi import FastAPI from fastapi.responses import StreamingResponse from openai import OpenAI app = FastAPI() client = OpenAI() async def generate_stream(prompt: str): stream = client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": prompt}], stream=True ) for chunk in stream: content = chunk.choices[0].delta.content or "" if content: yield f"data: {content}\n\n" yield "data: [DONE]\n\n" @app.get("/chat") async def chat(prompt: str): return StreamingResponse( generate_stream(prompt), media_type="text/event-stream" )

#前端接收 (React)

typescript
async function streamChat(prompt: string, onChunk: (text: string) => void) { const response = await fetch(`/chat?prompt=${encodeURIComponent(prompt)}`); const reader = response.body?.getReader(); const decoder = new TextDecoder(); while (reader) { const { done, value } = await reader.read(); if (done) break; const text = decoder.decode(value); const lines = text.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const content = line.slice(6); if (content !== '[DONE]') { onChunk(content); } } } } } // React 组件中使用 function ChatComponent() { const [response, setResponse] = useState(''); const handleSend = async () => { setResponse(''); await streamChat('你的问题', (chunk) => { setResponse(prev => prev + chunk); }); }; return ( <div> <button onClick={handleSend}>发送</button> <div>{response}</div> </div> ); }

#Next.js API Route

typescript
// app/api/chat/route.ts import { OpenAI } from 'openai'; const client = new OpenAI(); export async function POST(req: Request) { const { messages } = await req.json(); const stream = await client.chat.completions.create({ model: 'gpt-5.2', messages, stream: true }); const encoder = new TextEncoder(); const readable = new ReadableStream({ async start(controller) { for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content || ''; controller.enqueue(encoder.encode(content)); } controller.close(); } }); return new Response(readable, { headers: { 'Content-Type': 'text/plain; charset=utf-8' } }); }

#Chunk 数据结构

python
# 每个 chunk 的结构 { "id": "chatcmpl-xxx", "object": "chat.completion.chunk", "created": 1234567890, "model": "gpt-5.2", "choices": [{ "index": 0, "delta": { "content": "你" # 增量内容 }, "finish_reason": null # 最后一个为 "stop" }] }

#处理完成状态

python
for chunk in stream: delta = chunk.choices[0].delta finish_reason = chunk.choices[0].finish_reason if delta.content: print(delta.content, end="") if finish_reason == "stop": print("\n[完成]") elif finish_reason == "length": print("\n[达到长度限制]")

#常见坑

  • 只处理增量内容,不处理 finish_reason 导致状态机错乱
  • 前端直接拼接 chunk,不做断线重试和去重
  • 没有超时与取消控制,长请求占满连接资源

#流式 + Function Calling

python
stream = client.chat.completions.create( model="gpt-5.2", messages=[{"role": "user", "content": "北京天气怎么样?"}], tools=[{ "type": "function", "function": { "name": "get_weather", "parameters": {...} } }], stream=True ) tool_calls = [] for chunk in stream: delta = chunk.choices[0].delta # 处理工具调用 if delta.tool_calls: for tc in delta.tool_calls: if tc.index >= len(tool_calls): tool_calls.append({"name": "", "arguments": ""}) if tc.function.name: tool_calls[tc.index]["name"] = tc.function.name if tc.function.arguments: tool_calls[tc.index]["arguments"] += tc.function.arguments # 处理文本内容 if delta.content: print(delta.content, end="") print(f"\n工具调用: {tool_calls}")

#错误处理

python
from openai import APIError, APIConnectionError try: stream = client.chat.completions.create( model="gpt-5.2", messages=[...], stream=True ) for chunk in stream: # 处理 chunk pass except APIConnectionError: print("连接失败,请检查网络") except APIError as e: print(f"API 错误: {e}") except Exception as e: print(f"未知错误: {e}")

#性能优化

#设置超时

python
from openai import OpenAI import httpx client = OpenAI( timeout=httpx.Timeout(60.0, connect=5.0) )

#重试机制

python
from tenacity import retry, stop_after_attempt, wait_exponential @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) ) def stream_with_retry(messages): return client.chat.completions.create( model="gpt-5.2", messages=messages, stream=True )

#下一步


提示:流式响应是构建聊天应用的核心技术,务必掌握。

System Design

系统设计必备:核心概念 + 经典案例

快速掌握取舍与设计套路,备战系统设计面试。

进入 System Design →

相关路线图