菜单

执行系统

相关源文件

ComfyUI 中的执行系统是将 Web UI 中设计的流程图转化为实际生成图像的核心处理管道。该系统负责工作流程的整个生命周期,从验证和调度到执行和结果交付。有关内存管理的具体信息,请参阅 内存管理

概述

执行系统负责

  1. 接收和验证来自 Web 界面的工作流图
  2. 管理执行队列和优先级
  3. 根据依赖关系以正确的顺序高效执行节点
  4. 管理缓存以避免冗余计算
  5. 将进度和结果传回前端
  6. 优雅地处理错误和中断

核心执行系统架构

来源:server.py150-219 server.py628-669 execution.py438-555 main.py151-212

架构组件

PromptServer

PromptServer 类是处理客户端交互的主要 HTTP/WebSocket 服务器。它管理 Web 界面、API 端点和实时通信。

关键实现细节

组件方法/属性目的
服务器初始化__init__(loop)使用中间件设置 aiohttp 应用
WebSocket 处理程序websocket_handler()管理实时客户端连接
提交提示post_prompt()验证并排队工作流
队列管理self.prompt_queue用于工作流排序的 PromptQueue 实例

服务器在初始化期间创建一个提示队列

关键路由

  • 通过 post_prompt() 进行工作流提交和验证的 POST /prompt -
  • 通过 websocket_handler() 进行 WebSocket 连接的 GET /ws -
  • 通过 get_queue() 获取队列状态的 GET /queue -
  • 通过 post_interrupt() 中断执行的 POST /interrupt -

来源:server.py150-163 server.py194-219 server.py628-669 server.py685-688

PromptQueue

PromptQueue 管理工作流执行顺序和状态。它作为一个优先级队列运行,数字越低表示优先级越高。

prompt_worker 中的队列操作

队列项结构

  • 优先级数字(数字越小,优先级越高) item[0] -
  • 唯一的 prompt_id item[1] -
  • 工作流提示字典 item[2] -
  • 其他数据(client_id 等) item[3] -
  • 要执行的输出 item[4] -

队列管理

  • 可以通过 delete_queue_item() 删除工作流
  • 可以通过 wipe_queue() 清空队列
  • unload_modelsfree_memory 这样的状态标志控制资源管理

来源:main.py169-183 server.py671-683 server.py690-699

PromptExecutor

PromptExecutor 类负责协调工作流执行。它负责管理缓存、节点执行顺序和与前端的通信。

PromptExecutor.execute() 流程

关键方法

  • 主要的执行入口点 execute(prompt, prompt_id, extra_data, execute_outputs) -
  • 向前端发送状态更新 add_message(event, data, broadcast) -
  • 错误处理 handle_execution_error(prompt_id, prompt, current_outputs, executed, error, ex) -
  • 重新初始化缓存和状态 reset() -

来源:execution.py438-555 execution.py459-485

DynamicPrompt 和 ExecutionList

  • DynamicPrompt:表示工作流图,并允许在执行期间进行修改
  • ExecutionList:根据节点依赖关系确定执行节点的最佳顺序

来源:comfy_execution/graph.py execution.py487-498

节点执行

单个节点的执行由主要的 execute() 函数处理,该函数处理工作流图中的单个节点。

execute() 函数流程

关键函数

  • 主要的节点执行入口点 execute(server, dynprompt, caches, current_item, extra_data, executed, prompt_id, execution_list, pending_subgraph_results) -
  • 收集节点输入 get_input_data(inputs, class_def, unique_id, outputs, dynprompt, extra_data) -
  • 执行节点函数并处理输出 get_output_data(obj, input_data_all, execution_block_cb, pre_execute_cb) -
  • 处理列表处理和执行 _map_node_over_list(obj, input_data_all, func, allow_interrupt, execution_block_cb, pre_execute_cb) -

来源:execution.py270-436 execution.py108-151 execution.py220-260 execution.py155-197

执行流程

当用户从 Web 界面提交工作流时,会发生以下过程

来源:server.py625-666 main.py158-218 execution.py434-583

1. 工作流提交

工作流通过 HTTP POST 请求提交到 /prompt 端点,以 JSON 对象的形式提交给服务器。

服务器接收此请求并验证工作流,以确保所有节点和连接都有效。

来源:server.py625-666

2. 排队和工作线程

经过验证的工作流被放入 PromptQueue 中,带有优先级编号和唯一的 prompt_idprompt_worker 线程会持续检查队列中的新项目。

来源:main.py158-218

3. PromptExecutor 执行

PromptExecutor 负责工作流的实际执行。

  1. 创建工作流的 DynamicPrompt 表示。
  2. 根据配置的缓存类型初始化缓存(输出、UI、对象)。
  3. 构建一个 ExecutionList 来确定节点执行顺序
  4. 按确定的顺序执行每个节点
  5. 处理错误并将状态更新发送给客户端

来源: execution.py434-583

4. 节点执行逻辑

每个节点的执行包含几个步骤

  1. 检查节点的输出是否已缓存
  2. 如果未缓存,则从其他节点的输出收集节点的输入
  3. 获取或创建节点对象实例
  4. 使用收集到的输入执行节点的 FUNCTION 方法
  5. 处理输出并将其缓存以供其他节点使用
  6. 将 UI 更新发送给客户端

来源: execution.py266-431

缓存系统

CacheSet 类管理三种缓存,以避免重复计算来优化工作流执行。

CacheSet 架构

缓存类型

缓存类型由命令行参数确定,并在 prompt_worker 中配置

缓存类型CLI 参数实现行为
CacheType.CLASSIC--cache-classic (默认)HierarchicalCache激进缓存,尽快转储数据
CacheType.LRU--cache-lru NLRUCache保留 N 个最近使用的结果
CacheType.DEPENDENCY_AWARE--cache-noneDependencyAwareCache仅在后代需要数据时缓存

缓存选择逻辑

缓存方法

  • cache.set_prompt(dynamic_prompt, prompt_keys, is_changed_cache) - 初始化工作流的缓存
  • cache.get(node_id) - 检索缓存结果
  • cache.set(node_id, data) - 存储结果
  • cache.clean_unused() - 删除未使用的条目

来源: execution.py69-106 main.py153-159 comfy/cli_args.py103-106

错误处理

执行系统拥有强大的错误处理机制,可确保错误被正确捕获、记录并报告给前端。

execute() 函数通过不同的异常类型实现不同的行为,从而实现全面的错误处理。

异常处理流程

  1. InterruptProcessingException - 执行被手动中断

    • 创建仅包含 node_id 的错误详细信息
    • 返回 ExecutionResult.FAILURE
  2. OOM_EXCEPTION (内存不足) - GPU 内存耗尽

    • 日志记录“Got an OOM, unloading all loaded models”(内存不足,正在卸载所有已加载模型)
    • 调用 comfy.model_management.unload_all_models()
    • 在响应中包含完整的错误详细信息
  3. General Exceptions - 节点执行过程中的任何其他错误

    • 通过 traceback.format_exc() 记录完整的堆栈跟踪
    • 通过 format_value() 格式化输入数据以进行调试
    • 包含详细的错误信息

错误详情结构

PromptExecutor.handle_execution_error() 方法随后根据异常类型确定是发送 execution_interrupted 事件还是 execution_error 事件到前端。

来源: execution.py400-432 execution.py459-485 comfy/model_management.py428-430

与前端通信

通过 WebSocket,使用 PromptServer.send_sync() 方法和各种事件类型与前端进行实时通信。

WebSocket 事件类型

事件类型发送自目的消息结构
statuswebsocket_handler()初始队列状态{"status": queue_info, "sid": client_id}
executingexecute() 函数节点执行开始/结束{"node": node_id, "display_node": display_id, "prompt_id": prompt_id}
executedexecute() 函数节点完成{"node": node_id, "display_node": display_id, "output": ui_output, "prompt_id": prompt_id}
execution_startPromptExecutor.execute()工作流开始{"prompt_id": prompt_id}
execution_cachedPromptExecutor.execute()找到缓存的节点{"nodes": cached_nodes, "prompt_id": prompt_id}
execution_successPromptExecutor.execute()工作流完成{"prompt_id": prompt_id}
execution_errorhandle_execution_error()节点执行错误包含完整堆栈跟踪的错误详细信息
execution_interruptedhandle_execution_error()手动中断{"prompt_id": prompt_id, "node_id": node_id, "executed": executed_list}
progress进度钩子执行进度{"value": current, "max": total, "prompt_id": prompt_id, "node": node_id}

进度钩子实现

二进制事件

  • BinaryEventTypes.UNENCODED_PREVIEW_IMAGE - 生成过程中的实时预览图像

来源: server.py194-219 execution.py278-282 execution.py360-361 execution.py496 execution.py511-513 execution.py538 main.py223-232 server.py39-42

结论

ComfyUI 的执行系统是管理整个工作流生命周期的核心组件。它负责工作流的验证、调度、执行和结果传递,同时高效地管理资源并向用户提供实时反馈。这种架构使 ComfyUI 能够以最佳性能和可靠性处理复杂的人工智能图像生成工作流。

来源: server.py execution.py main.py comfy/model_management.py