菜单

服务器发送事件 (SSE)

相关源文件

服务器发送事件 (SSE) 是一种服务器推送技术,它允许服务器通过 HTTP 连接实时向客户端发送更新。与 WebSockets(在 WebSockets 中介绍)不同,SSE 是一种更简单的单向通信通道,专门用于服务器需要持续向客户端推送数据的场景。

NestJS 中的 SSE 架构

下图说明了 NestJS 中服务器发送事件的实现方式。

来源

核心组件

SseStream 类

SseStream 类是负责将消息对象转换为格式正确的 SSE 事件的核心组件。它继承自 Node.js 的 Transform 流类。

主要职责

  • 为 SSE 连接设置合适的 HTTP 标头
  • 将消息对象转换为 SSE 文本格式
  • 管理消息 ID
  • 处理流式传输中的背压

来源

SSE 请求-响应流程

以下序列图展示了 NestJS 如何处理 SSE 请求。

来源

实现细节

SSE 检测机制

NestJS 使用元数据反射通过 reflectSse 方法来检测 SSE 处理器。

这在 createHandleResponseFn 方法中使用,以识别 SSE 端点并为其设置特殊处理。

来源

SSE 响应处理

检测到 SSE 端点后,会创建一个特殊的响应处理器。

该处理器委托给 RouterResponseController 中的 sse 方法。

来源

SSE 流处理

RouterResponseController 中的 sse 方法处理 SSE 响应。

  1. 验证结果是否为 Observable
  2. 创建一个新的 SseStream
  3. 订阅 Observable
  4. 将发出的值转换为 SSE 格式
  5. 处理错误和连接关闭

来源

SseStream 实现

SseStream 类负责正确的 SSE 格式化。

pipe 方法设置 SSE 的基本标头。

来源

在 NestJS 中使用 SSE

基本 SSE 控制器

要在 NestJS 中创建 SSE 端点:

  1. 创建一个控制器方法
  2. 应用 @Sse() 装饰器
  3. 返回一个发出 MessageEvent 对象的 Observable。

示例

客户端使用

在浏览器中消费 SSE 事件

错误处理

NestJS 通过 RxJS 的 catchError 操作符处理 SSE 流中的错误。

  1. 捕获 Observable 中的错误
  2. 将错误转换为类型为 'error' 的 SSE 事件
  3. 继续流(返回 EMPTY)
  4. 记录错误处理过程中发生的任何错误

如果在流本身发生错误,连接将优雅地终止。

来源

MessageEvent 结构

NestJS 遵循 W3C EventSource 规范。MessageEvent 包含:

属性类型描述
datastring ⎮ object事件负载(对象会自动 JSON 字符串化)
类型字符串可选的事件类型,用于客户端事件处理
id字符串可选的事件 ID,用于恢复连接
retry数字可选的重连时间(毫秒)

来源

自动资源管理

NestJS 会自动处理资源清理。

  1. 当客户端断开连接时,订阅会被取消。
  2. 当 Observable 完成时,响应将被终止。
  3. 如果响应已结束,则跳过 SSE 设置。

这可以防止内存泄漏并确保正确的资源管理。

来源

验证

NestJS 验证处理器是否返回了一个 Observable;否则,它会抛出一个 ReferenceError

来源

结论

服务器发送事件为 Web 应用程序实现实时更新提供了一种高效的方式。NestJS 对 SSE 的内置支持通过与 RxJS Observables 集成并处理 SSE 连接的复杂方面,简化了实现。

对于双向通信需求,请参考 WebSockets,它提供了更具交互性的功能,但复杂性也更高。