写作

NestJS WebSocket 实战:从零搭建实时聊天室

NestJS WebSocket 实战:从零搭建实时聊天室

前言

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,广泛应用于实时通信场景——聊天室、在线协作文档、实时通知、游戏等。

NestJS 提供了 Gateway 抽象,将 WebSocket 的底层复杂性封装为类似于 Controller 的声明式 API,使开发者能以熟悉的方式构建实时应用。

本文将以搭建一个实时聊天室为主线,全面介绍 NestJS WebSocket Gateway 的完整 API 和最佳实践。

本文基于 Socket.IO 适配器。NestJS 的 WebSocket 模块是平台无关的,也支持 ws 库。

安装

pnpm add @nestjs/websockets @nestjs/platform-socket.io

Gateway 完整 API 清单

1. @WebSocketGateway() - 类装饰器

标记一个类为 WebSocket Gateway。NestJS 会自动初始化该 Gateway 并管理其生命周期。

@WebSocketGateway()
export class ChatGateway {}

配置选项:

@WebSocketGateway({
  namespace: 'chat',            // 命名空间隔离
  path: '/ws',                  // WebSocket 连接路径
  cors: { origin: '*' },        // CORS 配置
  transports: ['websocket'],    // 传输方式
  pingTimeout: 5000,            // 心跳超时
  pingInterval: 10000,          // 心跳间隔
})
export class ChatGateway {}
选项说明
namespace命名空间,用于业务隔离
pathWebSocket 连接路径
corsCORS 配置
transports传输方式(websocket / polling
pingTimeout / pingInterval心跳超时和间隔

使用场景: 创建实时通信服务入口。可按业务模块划分命名空间,如 chatnotificationslive


2. @WebSocketServer() - 属性装饰器

注入底层 WebSocket Server 实例(Socket.IO 的 ServerNamespace 类型)。NestJS 在 Gateway 初始化后自动赋值。

import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';

@WebSocketGateway()
export class ChatGateway {
  @WebSocketServer()
  server: Server;

  // 向所有客户端广播
  broadcast(message: string) {
    this.server.emit('message', message);
  }
}

使用场景: 向客户端广播消息、管理房间、获取连接的客户端列表。

注意:@WebSocketServer() 装饰的属性不能手动赋值。


3. @SubscribeMessage(eventName) - 方法装饰器

订阅特定客户端事件。事件名与客户端 socket.emit(eventName, data) 匹配时触发。

import { SubscribeMessage } from '@nestjs/websockets';

@WebSocketGateway()
export class ChatGateway {
  @SubscribeMessage('sendMessage')
  handleMessage(client: any, payload: any) {
    return { event: 'receiveMessage', data: payload };
  }
}

使用场景: 处理客户端发来的各类事件——发消息、加入房间、更新状态等。


4. @MessageBody() - 参数装饰器

提取传入消息的载荷数据。支持 Pipe 验证,也可提取嵌套属性。

// 获取完整载荷
@SubscribeMessage('sendMessage')
handleMessage(@MessageBody() data: ChatMessage) {
  this.server.emit('receiveMessage', data);
}

// 提取特定字段
@SubscribeMessage('sendMessage')
handleMessage(@MessageBody('content') content: string) {
  this.server.emit('receiveMessage', { content });
}

// 配合 Pipe 验证
@SubscribeMessage('createMessage')
handleCreate(@MessageBody(new ValidationPipe()) dto: CreateMessageDto) {
  // ...
}

使用场景: 获取客户端发送的消息内容、用户输入数据,配合 Pipe 进行数据验证。


5. @ConnectedSocket() - 参数装饰器

注入当前连接的客户端 Socket 实例,用于向单个客户端发送消息或获取连接信息。

import { ConnectedSocket } from '@nestjs/websockets';
import { Socket } from 'socket.io';

@SubscribeMessage('identify')
handleIdentify(@ConnectedSocket() client: Socket) {
  console.log('Client ID:', client.id);
  client.emit('identified', { userId: client.id });
}

使用场景: 向特定用户发送私信、获取客户端 ID/IP、操作单个连接。


6. @Ack() - 参数装饰器

提取客户端事件中的 ACK 回调函数。用于客户端发送事件后等待服务端响应的场景。

import { Ack } from '@nestjs/websockets';

@SubscribeMessage('saveMessage')
async handleSave(
  @MessageBody() data: ChatMessage,
  @Ack() ack: (response: { id: string }) => void,
) {
  const saved = await this.messageService.save(data);
  ack({ id: saved.id });
}

客户端对应写法:

socket.emit('saveMessage', { content: 'Hello' }, (response) => {
  console.log('Server responded with id:', response.id);
});

使用场景: 客户端需要确认服务端已处理消息(消息发送确认、操作结果返回)。


7. 生命周期接口

NestJS 提供了三个生命周期接口,Gateway 实现后可在特定时机执行代码。

接口方法触发时机
OnGatewayInitafterInit(server)Gateway 初始化后
OnGatewayConnectionhandleConnection(client)客户端连接时
OnGatewayDisconnecthandleDisconnect(client)客户端断开时
import {
  WebSocketGateway,
  WebSocketServer,
  OnGatewayInit,
  OnGatewayConnection,
  OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';

@WebSocketGateway()
export class ChatGateway
  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
  @WebSocketServer() server: Server;
  private logger: Logger = new Logger('ChatGateway');

  afterInit(server: Server) {
    this.logger.log('Chat Gateway initialized');
  }

  handleConnection(client: Socket) {
    this.logger.log(`Client connected: ${client.id}`);
  }

  handleDisconnect(client: Socket) {
    this.logger.log(`Client disconnected: ${client.id}`);
    this.server.emit('userLeft', { userId: client.id });
  }
}

使用场景: 连接管理、在线用户统计、资源清理、初始化日志。


8. 跨切面装饰器

以下装饰器来自 @nestjs/common@nestjs/core,可在 Gateway 级别或方法级别使用:

装饰器说明
@UseGuards()应用 Guard(如 JWT 鉴权)
@UsePipes()应用 Pipe(数据验证/转换)
@UseInterceptors()应用 Interceptor(日志/转换)
@UseFilters()应用 Exception Filter
import { UseGuards, UsePipes, UseFilters, UseInterceptors } from '@nestjs/common';

@WebSocketGateway()
@UseGuards(WsJwtGuard)
export class ChatGateway {
  @SubscribeMessage('sendMessage')
  @UsePipes(new ValidationPipe())
  handleMessage(@MessageBody() dto: SendMessageDto) {
    // ...
  }

  @SubscribeMessage('adminAction')
  @UseGuards(AdminGuard)
  @UseFilters(new WsExceptionFilter())
  handleAdminAction(@ConnectedSocket() client: Socket) {
    // ...
  }
}

9. 工具类

WsException — WebSocket 专用异常

import { WsException } from '@nestjs/websockets';

@SubscribeMessage('getProfile')
async handleGetProfile(
  @MessageBody('userId') userId: string,
) {
  const user = await this.userService.find(userId);
  if (!user) {
    throw new WsException('User not found');
  }
  return user;
}

WsResponse<T> — 标准响应格式

用于返回自定义事件名和数据的场景。

import { WsResponse } from '@nestjs/websockets';

@SubscribeMessage('events')
handleEvent(): WsResponse<string> {
  return { event: 'items', data: 'response data' };
}

BaseWsExceptionFilter — 异常过滤器基类

import { Catch, ArgumentsHost } from '@nestjs/common';
import { BaseWsExceptionFilter, WsException } from '@nestjs/websockets';

@Catch()
export class WsExceptionFilter extends BaseWsExceptionFilter {
  catch(exception: unknown, host: ArgumentsHost) {
    super.catch(exception, host);
  }
}

消息响应方式

Gateway 处理器支持多种响应方式:

方式返回值客户端收到
同步返回return data{ event: '事件名', data }
返回 WsResponsereturn { event: 'xxx', data }自定义事件名
返回 Promisereturn Promise.resolve(data)异步响应
返回 Observablereturn of(data1, data2)多次推送
手动 emitclient.emit(...)完全控制
// 1. 同步返回
@SubscribeMessage('sync')
handleSync() {
  return 'Hello';
}

// 2. 返回 WsResponse(自定义事件名)
@SubscribeMessage('custom')
handleCustom(): WsResponse<string> {
  return { event: 'customEvent', data: 'Hello' };
}

// 3. 返回 Promise(异步)
@SubscribeMessage('async')
async handleAsync(): Promise<string> {
  return await this.service.getData();
}

// 4. 返回 Observable(多次推送)
@SubscribeMessage('stream')
handleStream(): Observable<WsResponse<number>> {
  return interval(1000).pipe(
    map(i => ({ event: 'tick', data: i })),
    take(5),
  );
}

// 5. 手动 emit(完全控制)
@SubscribeMessage('manual')
handleManual(@ConnectedSocket() client: Socket) {
  client.emit('response', { custom: 'data' });
}

实战:搭建实时聊天室

架构设计

客户端 (Socket.IO)

Gateway (鉴权 → 验证 → 处理)

Service (业务逻辑)

广播 / 房间

1. 定义消息 DTO

// src/chat/dto/send-message.dto.ts
import { IsNotEmpty, IsString, MaxLength } from 'class-validator';

export class SendMessageDto {
  @IsNotEmpty()
  @IsString()
  @MaxLength(1000)
  content: string;

  @IsString()
  roomId?: string;
}

2. 创建 WebSocket Guard

HTTP 的 Guard 不能直接用于 WebSocket,需要适配:

// src/auth/ws-jwt.guard.ts
import { CanActivate, ExecutionContext, Injectable } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';
import { JwtService } from '@nestjs/jwt';

@Injectable()
export class WsJwtGuard implements CanActivate {
  constructor(private jwtService: JwtService) {}

  canActivate(context: ExecutionContext): boolean {
    const client = context.switchToWs().getClient<{ handshake: { headers: { authorization?: string } } }>();
    const token = client.handshake.headers.authorization?.replace('Bearer ', '');

    if (!token) {
      throw new WsException('Unauthorized');
    }

    try {
      const user = this.jwtService.verify(token);
      client.user = user;
      return true;
    } catch {
      throw new WsException('Invalid token');
    }
  }
}

3. 创建 Exception Filter

// src/chat/filters/ws-exception.filter.ts
import { Catch, ArgumentsHost } from '@nestjs/common';
import { BaseWsExceptionFilter, WsException } from '@nestjs/websockets';
import { Socket } from 'socket.io';

@Catch(WsException)
export class ChatWsExceptionFilter extends BaseWsExceptionFilter {
  catch(exception: WsException, host: ArgumentsHost) {
    const client = host.switchToWs().getClient<Socket>();
    client.emit('error', { message: exception.message });
  }
}

4. 完整 Gateway 实现

// src/chat/chat.gateway.ts
import {
  WebSocketGateway,
  WebSocketServer,
  SubscribeMessage,
  ConnectedSocket,
  MessageBody,
  OnGatewayInit,
  OnGatewayConnection,
  OnGatewayDisconnect,
  WsException,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger, UseGuards, UsePipes, UseFilters, ValidationPipe } from '@nestjs/common';
import { ChatService } from './chat.service';
import { SendMessageDto } from './dto/send-message.dto';
import { WsJwtGuard } from '../auth/ws-jwt.guard';
import { ChatWsExceptionFilter } from './filters/ws-exception.filter';

@WebSocketGateway({
  namespace: 'chat',
  cors: { origin: '*' },
})
@UseGuards(WsJwtGuard)
@UseFilters(ChatWsExceptionFilter)
export class ChatGateway
  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
  @WebSocketServer()
  server: Server;

  private logger: Logger = new Logger('ChatGateway');
  private onlineUsers = new Map<string, string>(); // socketId -> userId

  afterInit() {
    this.logger.log('Chat Gateway initialized');
  }

  handleConnection(client: Socket) {
    const userId = (client as any).user?.sub;
    if (userId) {
      this.onlineUsers.set(client.id, userId);
      this.logger.log(`User ${userId} connected (${client.id})`);
      this.server.emit('userJoined', { userId, socketId: client.id });
    }
  }

  handleDisconnect(client: Socket) {
    const userId = this.onlineUsers.get(client.id);
    if (userId) {
      this.onlineUsers.delete(client.id);
      this.logger.log(`User ${userId} disconnected (${client.id})`);
      this.server.emit('userLeft', { userId });
    }
  }

  @SubscribeMessage('joinRoom')
  handleJoinRoom(
    @ConnectedSocket() client: Socket,
    @MessageBody('roomId') roomId: string,
  ) {
    client.join(roomId);
    this.logger.log(`User joined room: ${roomId}`);
    return { event: 'roomJoined', data: { roomId } };
  }

  @SubscribeMessage('leaveRoom')
  handleLeaveRoom(
    @ConnectedSocket() client: Socket,
    @MessageBody('roomId') roomId: string,
  ) {
    client.leave(roomId);
    this.logger.log(`User left room: ${roomId}`);
    return { event: 'roomLeft', data: { roomId } };
  }

  @SubscribeMessage('sendMessage')
  @UsePipes(new ValidationPipe())
  async handleMessage(
    @ConnectedSocket() client: Socket,
    @MessageBody() dto: SendMessageDto,
  ) {
    const userId = (client as any).user?.sub;
    const roomId = dto.roomId || 'global';

    const message = {
      id: Date.now().toString(),
      content: dto.content,
      userId,
      roomId,
      timestamp: new Date().toISOString(),
    };

    // 保存到数据库
    await this.chatService.saveMessage(message);

    // 广播到房间
    this.server.to(roomId).emit('receiveMessage', message);

    return { event: 'messageSent', data: message };
  }

  @SubscribeMessage('getOnlineUsers')
  handleGetOnlineUsers() {
    return {
      event: 'onlineUsers',
      data: Array.from(this.onlineUsers.values()),
    };
  }
}

5. 注册 Module

// src/chat/chat.module.ts
import { Module } from '@nestjs/common';
import { ChatGateway } from './chat.gateway';
import { ChatService } from './chat.service';

@Module({
  providers: [ChatGateway, ChatService],
})
export class ChatModule {}

6. 客户端连接示例

import { io } from 'socket.io-client';

const socket = io('http://localhost:3000/chat', {
  extraHeaders: {
    Authorization: `Bearer ${token}`,
  },
});

// 加入房间
socket.emit('joinRoom', { roomId: 'room-1' });

// 发送消息
socket.emit('sendMessage', { content: 'Hello!', roomId: 'room-1' });

// 接收消息
socket.on('receiveMessage', (message) => {
  console.log('New message:', message);
});

// 监听在线用户
socket.on('userJoined', (data) => {
  console.log('User joined:', data);
});

// 监听错误
socket.on('error', (err) => {
  console.error('WebSocket error:', err);
});

命名空间(Namespace)

当应用有多个实时通信场景时,可以使用命名空间进行隔离:

// 聊天 Gateway
@WebSocketGateway({ namespace: 'chat' })
export class ChatGateway {}

// 通知 Gateway
@WebSocketGateway({ namespace: 'notifications' })
export class NotificationGateway {}

客户端连接时指定命名空间:

const chatSocket = io('http://localhost:3000/chat');
const notificationSocket = io('http://localhost:3000/notifications');

使用场景: 聊天室、通知推送、实时数据看板等业务隔离。

房间(Rooms)管理

Socket.IO 的 Room 是服务端的逻辑分组,客户端可以加入和离开房间。

// 加入房间
@SubscribeMessage('joinRoom')
handleJoinRoom(
  @ConnectedSocket() client: Socket,
  @MessageBody('roomId') roomId: string,
) {
  client.join(roomId);
}

// 广播到房间(不包括发送者)
this.server.to(roomId).emit('receiveMessage', message);

// 广播到房间(包括发送者)
this.server.in(roomId).emit('broadcast', message);

// 发送给特定 socket
this.server.to(socketId).emit('private', message);

// 发送给多个房间
this.server.to(['room1', 'room2']).emit('multiRoom', message);

使用场景: 聊天室消息隔离、频道订阅、分组推送。

动态 Gateway

对于某些场景,可能需要在运行时动态创建 Gateway。NestJS 支持通过 createWebSocketGateway 方法实现:

import { WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';

// 动态配置端口
const port = process.env.WS_PORT || 3001;
@WebSocketGateway(port)
export class DynamicGateway {}

总结

NestJS 的 WebSocket Gateway 提供了一套完整的实时通信解决方案:

  • 声明式 API — 通过装饰器定义事件处理,与 Controller 风格一致
  • 依赖注入 — 完整支持 DI,可轻松注入 Service
  • 生态集成 — Guard、Pipe、Interceptor、Exception Filter 无缝集成
  • 平台无关 — 支持 Socket.IO 和 ws 两种适配器

关键 API 回顾:

API用途
@WebSocketGateway()标记 Gateway 类,配置命名空间和传输选项
@WebSocketServer()注入 Server 实例用于广播
@SubscribeMessage()订阅客户端事件
@MessageBody()提取消息载荷
@ConnectedSocket()获取客户端 Socket 实例
@Ack()获取 ACK 回调
生命周期接口afterInit / handleConnection / handleDisconnect
WsException抛出 WebSocket 异常

参考

最后更新