菜单

流式传输和实时 API

相关源文件

本文档涵盖了OpenAI Python库的流式和实时通信功能。它解释了两个不同但相关的系统:用于从服务器到客户端的单向数据流的服务器发送事件 (SSE) 流式传输,以及用于双向 WebSocket 通信的实时API。有关这些系统基础的客户端 HTTP 处理的信息,请参阅HTTP 客户端基础。有关响应处理和解析的详细信息,请参阅请求和响应处理

概述

该库提供了两种实时通信方式

通信类型协议用例实现
SSE 流式传输带服务器发送事件的 HTTP单向流式响应Stream[_T], AsyncStream[_T]
实时APIWebSocket双向实时通信RealtimeConnection, AsyncRealtimeConnection

来源: src/openai/_streaming.py22-223 src/openai/resources/beta/realtime/realtime.py65-175 src/openai/_base_client.py361-370

服务器发送事件 (SSE) 流式传输

SSE 流式传输使客户端能够通过单个 HTTP 连接从服务器接收连续的数据流。这主要用于流式传输聊天补全和其他长期运行的 API 响应。

核心流式传输类

流式传输实现围绕两个泛型类进行,这两个类处理同步和异步迭代

来源: src/openai/_streaming.py22-44 src/openai/_streaming.py123-147 src/openai/_streaming.py225-264

SSE 解码器实现

SSEDecoder 类根据 HTML5 规范处理服务器发送事件的解析

组件目的关键方法
SSEDecoder解析 SSE 流iter_bytes(), aiter_bytes(), decode()
ServerSentEvent单个事件数据event, data, id, retry, json()
SSEBytesDecoder协议接口运行时可检查协议

解码器处理原始字节并产生结构化事件,处理多行数据、事件类型和连接元数据

来源: src/openai/_streaming.py266-369 src/openai/_streaming.py371-380

流处理管道

来源: src/openai/_streaming.py52-102 src/openai/_streaming.py154-203 src/openai/_streaming.py288-320

流生命周期管理

StreamAsyncStream 都实现了上下文管理器协议,以进行适当的资源清理

  • 开始: 初始化解码器和响应迭代器
  • 处理: 解析事件并生成类型化数据
  • 结束: 关闭 HTTP 响应并释放连接
  • 错误处理: 解析并将流式传输错误引发为 APIError

来源: src/openai/_streaming.py103-121 src/openai/_streaming.py205-223

实时 API (WebSocket)

实时 API 通过 WebSocket 连接提供双向通信,从而实现低延迟、多模态的对话体验,并具备原生语音到语音功能。

连接架构

来源: src/openai/resources/beta/realtime/realtime.py411-486 src/openai/resources/beta/realtime/realtime.py229-304 src/openai/resources/beta/realtime/realtime.py488-632

连接建立

连接过程在标准 OpenAI 和 Azure 部署之间有所不同

客户端类型URL 构建身份验证
OpenAIwss:// + base_url + /realtime请求头中的 Bearer 令牌
Azure带部署参数的自定义终结点api-key 或 Azure AD 令牌

连接管理器处理 URL 准备、身份验证和 WebSocket 配置

来源: src/openai/resources/beta/realtime/realtime.py395-402 src/openai/lib/azure.py334-357 src/openai/lib/azure.py610-633

事件处理

来源:src/openai/resources/beta/realtime/realtime.py284-303 src/openai/resources/beta/realtime/realtime.py466-485 src/openai/resources/beta/realtime/realtime.py264-282

特定资源 API

Realtime API 将功能组织成专门的资源类

  • Session:配置和会话管理
  • Response:响应生成和控制
  • Input Audio Buffer:入站音频流管理
  • Conversation:对话历史和条目管理
  • Output Audio Buffer:出站音频流控制
  • Transcription Session:语音转文本处理

每个资源都提供通过 WebSocket 连接发送适当的 RealtimeClientEvent 消息的方法。

来源:src/openai/resources/beta/realtime/realtime.py244-249 src/openai/resources/beta/realtime/realtime.py426-431

与基础客户端的集成

流式传输和实时功能都与基础客户端的基础设施集成,以进行身份验证、错误处理和连接管理。

流式集成点

来源:src/openai/_base_client.py597-599 src/openai/_base_client.py472-474 src/openai/_base_client.py600-623

Azure 客户端实时配置

Azure 客户端需要对实时连接进行特殊处理,包括特定于部署的 URL 和身份验证

来源:src/openai/lib/azure.py334-357 src/openai/lib/azure.py610-633

错误处理和连接管理

流式错误处理

SSE 流处理事件数据中的错误

  • 内联错误:具有 error 字段的事件会触发 APIError
  • 基于事件的错误:具有 event: "error" 的事件会被处理为错误
  • 连接错误:网络问题会作为 HTTP 客户端异常传播

来源:src/openai/_streaming.py64-96 src/openai/_streaming.py166-197

实时连接管理

WebSocket 连接需要仔细的生命周期管理

  • 上下文管理器:退出时自动清理连接
  • 手动控制:提供直接的 .enter().close() 方法
  • 连接状态:监控连接状态并处理断开连接
  • 异常安全性:在取消或发生错误时进行适当清理

连接管理器可确保正确的资源清理,并处理 ConnectionClosedOK 等 WebSocket 特定异常。

来源:src/openai/resources/beta/realtime/realtime.py254-262 src/openai/resources/beta/realtime/realtime.py404-408 src/openai/resources/beta/realtime/realtime.py342-393