本文档介绍了处理 OpenAI API 速率限制和实现稳健错误处理模式的最佳实践。它解释了速率限制机制、错误类型、重试策略以及实时和批量处理场景的优化技术。有关通用 API 使用模式,请参阅聊天补全和流式传输。有关并行请求处理优化,请参阅并行 API 请求处理。
OpenAI API 强制执行速率限制,以确保公平访问、防止滥用并维护服务质量。API 对每分钟请求数 (RPM) 和每分钟令牌数 (TPM) 都强制执行限制,使用层会根据支付历史和使用模式自动增加。
超出速率限制时遇到的主要错误是 openai.RateLimitError,它包含指示超出限制类型的特定错误代码和消息
| 错误类型 | 错误代码 | 描述 |
|---|---|---|
| 速率限制 | 429 | 每分钟请求数或每分钟令牌数超出限制 |
| 配额超出 | insufficient_quota | 月度使用配额或信用额度已达上限 |
来源:examples/How_to_handle_rate_limits.ipynb67-84
来源:examples/How_to_handle_rate_limits.ipynb343-350
处理速率限制最有效的策略是实现带有抖动的指数退避。此方法会自动重试失败的请求,并逐渐增加延迟。
Tenacity 库实现
@retry 装饰器与 wait_random_exponentialwait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6)completion_with_backoff(**kwargs)Backoff 库实现
@backoff.on_exception 装饰器backoff.expo, openai.RateLimitError, max_time=60, max_tries=6completions_with_backoff(**kwargs)来源:examples/How_to_handle_rate_limits.ipynb137-213
对于没有第三方依赖的环境,可以使用重试装饰器实现手动退避
主要参数
initial_delay: float = 1exponential_base: float = 2jitter: bool = Truemax_retries: int = 10errors: tuple = (openai.RateLimitError,)来源:examples/How_to_handle_rate_limits.ipynb246-296
当主要模型遇到速率限制时,回退到备用模型可以保持服务可用性
实施注意事项
来源:examples/How_to_handle_rate_limits.ipynb302-337
设置适当的 max_tokens 值可防止令牌使用量过高估算
来源:examples/How_to_handle_rate_limits.ipynb343-374
对于高吞吐量批量处理,有几种策略可以在遵守速率限制的同时最大限度地提高效率。
主动延迟而非被动退避,可以防止触及速率限制
在单个请求中组合多个提示可减少 RPM 使用,同时最大限度地利用 TPM
主要实现细节
client.beta.chat.completions.parse() 与 response_format=StoryResponseclass StoryResponse(BaseModel)来源:examples/How_to_handle_rate_limits.ipynb441-552
对于大规模批量处理,api_request_parallel_processor.py 脚本提供了一个全面的解决方案
主要功能
来源:examples/How_to_handle_rate_limits.ipynb559-571
| 模式 | 用例 | 实现 | 权衡 |
|---|---|---|---|
| 指数退避 | 实时请求 | @retry, @backoff.on_exception, 手动装饰器 | 增加延迟,保证最终成功 |
| 模型回退 | 高可用性 | try/except 与模型切换 | 质量/成本差异,共享限制 |
| 主动延迟 | 批量处理 | time.sleep(60/rate_limit) | 可预测吞吐量,利用率欠佳 |
| 请求批处理 | 高 TPM 利用率 | 带有多个提示的结构化输出 | 复杂解析,延迟增加 |
| 并行处理 | 大规模批处理 | api_request_parallel_processor.py | 复杂实现,最大吞吐量 |