菜单

并行 API 请求处理

相关源文件

本文档解释了如何高效地并行处理大量 OpenAI API 请求,同时遵守速率限制。它涵盖了并行处理系统的实现,该系统可在不超出 API 速率限制的情况下最大化吞吐量。

有关 OpenAI API 的通用性能优化技术的信息,请参阅最佳实践和高级技术

来源:examples/api_request_parallel_processor.py1-17

目的与问题陈述

大规模使用 OpenAI API 时,开发者面临一个严峻的挑战:

  • 顺序处理对于大量工作负载来说太慢(数百万个请求可能需要数天)
  • 不受控制的并行处理将超出速率限制并导致错误
  • 最佳吞吐量需要精心节流的并行请求

本文档解释了并行处理器的实现,该处理器:

  1. 从文件流式传输请求以避免内存问题
  2. 并发发出请求以最大化吞吐量
  3. 限制请求和令牌使用以保持在速率限制内
  4. 自动重试失败的请求
  5. 记录错误以进行诊断

来源:examples/api_request_parallel_processor.py1-17

系统架构

该并行处理器实现为一个使用 asyncio 进行并发处理的 Python 脚本。以下是其高级架构:

来源:examples/api_request_parallel_processor.py73-92 examples/api_request_parallel_processor.py110-272

核心组件

StatusTracker

StatusTracker 类维护脚本进度的元数据。整个过程中只创建一个实例。

来源:examples/api_request_parallel_processor.py277-288

APIRequest

APIRequest 类代表一个单一的 API 请求,包含其输入、输出和元数据。它还包含实际进行 API 调用的方法。

来源:examples/api_request_parallel_processor.py291-363

主处理循环

主处理循环是并行处理器的核心。它管理请求流、跟踪容量并确保遵守速率限制。

来源:examples/api_request_parallel_processor.py164-258

请求流和令牌管理

系统精细管理请求速率和令牌消耗,以保持在 API 限制内。

来源:examples/api_request_parallel_processor.py195-233

令牌计数

系统包含一个复杂的令牌计数机制,用于估算不同 API 端点的令牌使用量

  • 对于聊天补全,它计算消息中的令牌以及估算的补全令牌
  • 对于嵌入,它计算输入令牌
  • 支持单一输入和批量输入

来源:examples/api_request_parallel_processor.py387-443

错误处理与重试

处理器实现了强大的错误处理系统,并支持自动重试

来源:examples/api_request_parallel_processor.py302-363

使用示例

该处理器设计为通过命令行运行,并支持各种配置选项

python examples/api_request_parallel_processor.py \
  --requests_filepath examples/data/example_requests_to_parallel_process.jsonl \
  --save_filepath examples/data/example_requests_to_parallel_process_results.jsonl \
  --request_url https://api.openai.com/v1/embeddings \
  --max_requests_per_minute 1500 \
  --max_tokens_per_minute 6250000 \
  --token_encoding_name cl100k_base \
  --max_attempts 5 \
  --logging_level 20

来源:examples/api_request_parallel_processor.py19-29

输入和输出格式

输入格式

输入文件应为 JSONL 文件,其中每行是一个 JSON 对象,包含 API 参数和一个可选的元数据字段。

来源:examples/api_request_parallel_processor.py31-38

输出格式

输出文件也是一个 JSONL 文件,其中每行包含一个数组:

  1. 原始请求
  2. API 响应(或错误信息)
  3. 原始元数据(如果提供)

来源:examples/api_request_parallel_processor.py39-43

配置参数

参数描述默认
requests_filepathJSONL 请求文件的路径必填
save_filepath保存结果的路径{requests_filename}_results.jsonl
request_urlAPI 端点 URLhttps://api.openai.com/v1/embeddings
api_keyOpenAI API 密钥来自环境变量
max_requests_per_minute目标每分钟请求数1,500 (限制的 50%)
max_tokens_per_minute目标每分钟令牌数125,000 (限制的 50%)
token_encoding_name令牌编码名称cl100k_base
max_attempts最大重试次数5
logging_level日志详细程度20 (INFO)

来源:examples/api_request_parallel_processor.py31-71 examples/api_request_parallel_processor.py458-472

实现细节

异步处理

该处理器使用 Python 的 asyncioaiohttp 库来高效处理并发 API 请求。主处理函数是 process_api_requests_from_file,它协调整个过程。

来源:examples/api_request_parallel_processor.py110-272

速率限制管理

速率限制通过跟踪请求和令牌容量来管理,这些容量会根据配置的限制随时间补充。系统在每次循环迭代中更新可用容量。

来源:examples/api_request_parallel_processor.py195-207

达到速率限制后的冷却

当遇到速率限制错误时,系统会暂停处理一段时间(默认为 15 秒)以避免立即再次达到限制。

来源:examples/api_request_parallel_processor.py243-258

最佳实践

  1. 设置保守限制:将 max_requests_per_minutemax_tokens_per_minute 配置为您实际限制的 50-75%,以留有余量。

  2. 尽可能批量处理请求:如果您遇到请求速率限制,请考虑将多个输入批量处理为单个请求(例如,在一个 API 调用中处理多个嵌入)。

  3. 监控日志:使用日志系统监控速率限制错误,并相应调整您的限制。

  4. 正确转义换行符:创建 JSONL 文件时,请确保内容中的换行符已正确转义(json.dumps() 会自动处理此问题)。

来源:examples/api_request_parallel_processor.py50-58

局限性

  1. 令牌计数功能目前仅支持补全和嵌入请求。其他 API 端点(如编辑、插入、DALL-E)需要额外的实现。

  2. 系统假设速率限制随时间保持不变,但如果您的速率限制动态变化,这可能不成立。

来源:examples/api_request_parallel_processor.py439-443

结论

并行 API 请求处理器提供了一种高效的方法,用于在遵守速率限制的同时处理大量 OpenAI API 请求。通过精心管理并发性、重试和容量,它在不使 API 过载的情况下最大化了吞吐量。

有关更高级的优化技术和最佳实践,请参阅最佳实践和高级技术