gRPC

gRPC 是一种现代、开源、高性能的 RPC 框架,可以在任何环境中运行。它可以通过对负载均衡、跟踪、健康检查和身份验证的可插拔支持,有效地连接数据中心内和跨数据中心的服务。

与许多 RPC 系统一样,gRPC 基于定义可远程调用的函数(方法)来定义服务的概念。对于每个方法,您定义参数和返回类型。服务、参数和返回类型使用 Google 的开源语言无关的 协议缓冲区 机制在 .proto 文件中定义。

通过 gRPC 传输器,Nest 使用 .proto 文件动态绑定客户端和服务器,使实现远程过程调用变得容易,自动序列化和反序列化结构化数据。

安装

要开始构建基于 gRPC 的微服务,首先安装所需的包:

$ npm i --save @grpc/grpc-js @grpc/proto-loader

概述

与其他 Nest 微服务传输层实现一样,您使用传递给 createMicroservice() 方法的选项对象的 transport 属性选择 gRPC 传输器机制。在以下示例中,我们将设置一个英雄服务。options 属性提供有关该服务的元数据;其属性在 下面 描述。

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.GRPC,
  options: {
    package: 'hero',
    protoPath: join(__dirname, 'hero/hero.proto'),
  },
});
提示

join() 函数从 path 包导入;Transport 枚举从 @nestjs/microservices 包导入。

nest-cli.json 文件中,我们添加 assets 属性,允许我们分发非 TypeScript 文件,以及 watchAssets - 开启监视所有非 TypeScript 资产。在我们的例子中,我们希望 .proto 文件被自动复制到 dist 文件夹。

{
  "compilerOptions": {
    "assets": ["**/*.proto"],
    "watchAssets": true
  }
}

选项

gRPC 传输器选项对象公开以下描述的属性。

package Protobuf 包名(与 .proto 文件中的 package 设置匹配)。必填
protoPath .proto 文件的绝对(或相对于根目录)路径。必填
url 连接 URL。格式为 ip address/dns name:port 的字符串(例如,Docker 服务器的 '0.0.0.0:50051'),定义传输器建立连接的地址/端口。可选。默认为 'localhost:5000'
protoLoader 用于加载 .proto 文件的实用程序的 NPM 包名。可选。默认为 '@grpc/proto-loader'
loader @grpc/proto-loader 选项。这些提供对 .proto 文件行为的详细控制。可选。请参阅 此处 了解更多详情
credentials 服务器凭证。可选。在此处阅读更多

示例 gRPC 服务

让我们定义我们的示例 gRPC 服务,称为 HeroesService。在上面的 options 对象中,protoPath 属性设置了 .proto 定义文件 hero.proto 的路径。hero.proto 文件使用 协议缓冲区 构建。它看起来像这样:

// hero/hero.proto
syntax = "proto3";

package hero;

service HeroesService {
  rpc FindOne (HeroById) returns (Hero) {}
}

message HeroById {
  int32 id = 1;
}

message Hero {
  int32 id = 1;
  string name = 2;
}

我们的 HeroesService 公开了一个 FindOne() 方法。此方法期望类型为 HeroById 的输入参数,并返回 Hero 消息(协议缓冲区使用 message 元素来定义参数类型和返回类型)。

接下来,我们需要实现该服务。要定义满足此定义的处理程序,我们在控制器中使用 @GrpcMethod() 装饰器,如下所示。此装饰器提供将方法声明为 gRPC 服务方法所需的元数据。

提示

前面微服务章节中介绍的 @MessagePattern() 装饰器 (了解更多) 不用于基于 gRPC 的微服务。@GrpcMethod() 装饰器有效地取代了基于 gRPC 的微服务的位置。

@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService', 'FindOne')
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(({ id }) => id === data.id);
  }
}
提示

@GrpcMethod() 装饰器从 @nestjs/microservices 包导入,而 MetadataServerUnaryCallgrpc 包导入。

上面显示的装饰器接受两个参数。第一个是服务名称(例如,'HeroesService'),对应于 hero.proto 中的 HeroesService 服务定义。第二个(字符串 'FindOne')对应于 hero.proto 文件中 HeroesService 中定义的 FindOne() rpc 方法。

findOne() 处理程序方法接受三个参数,从调用者传递的 data,存储 gRPC 请求元数据的 metadata 和用于获取 GrpcCall 对象属性(如 sendMetadata 用于向客户端发送元数据)的 call

两个 @GrpcMethod() 装饰器参数都是可选的。如果没有第二个参数(例如,'FindOne'),Nest 将根据将处理程序名称转换为大驼峰命名法(例如,findOne 处理程序与 FindOne rpc 调用定义相关联)自动将 .proto 文件 rpc 方法与处理程序关联。如下所示。

@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService')
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(({ id }) => id === data.id);
  }
}

您也可以省略第一个 @GrpcMethod() 参数。在这种情况下,Nest 会根据定义处理程序的 名称自动将处理程序与 proto 定义文件中的服务定义相关联。例如,在以下代码中,HeroesService 类基于名称 'HeroesService' 的匹配,将其处理程序方法与 hero.proto 文件中的 HeroesService 服务定义相关联。

@Controller()
export class HeroesService {
  @GrpcMethod()
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(({ id }) => id === data.id);
  }
}

客户端

Nest 应用程序可以作为 gRPC 客户端,使用 .proto 文件中定义的服务。您通过 ClientGrpc 对象访问远程服务。您可以通过多种方式获取 ClientGrpc 对象。

首选技术是导入 ClientsModule。使用 register() 方法将 .proto 文件中定义的服务包绑定到注入令牌,并配置服务。name 属性是注入令牌。对于 gRPC 服务,使用 transport: Transport.GRPCoptions 属性是一个对象,具有与 上面 描述的相同属性。

imports: [
  ClientsModule.register([
    {
      name: 'HERO_PACKAGE',
      transport: Transport.GRPC,
      options: {
        package: 'hero',
        protoPath: join(__dirname, 'hero/hero.proto'),
      },
    },
  ]),
];
提示

register() 方法接受对象数组。通过提供逗号分隔的注册对象列表来注册多个包。

注册后,我们可以使用 @Inject() 注入配置的 ClientGrpc 对象。然后我们使用 ClientGrpc 对象的 getService() 方法来检索服务实例,如下所示。

@Injectable()
export class AppService implements OnModuleInit {
  private heroesService: HeroesService;

  constructor(@Inject('HERO_PACKAGE') private client: ClientGrpc) {}

  onModuleInit() {
    this.heroesService = this.client.getService<HeroesService>('HeroesService');
  }

  getHero(): Observable<string> {
    return this.heroesService.findOne({ id: 1 });
  }
}

:::error 警告 除非在 proto 加载器配置 (options.loader.keepcase 在微服务传输器配置中) 中设置 keepCase 选项为 true,否则 gRPC 客户端不会发送名称中包含下划线 _ 的字段。 :::

请注意,与其他微服务传输方法中使用的技术相比有一个小差异。我们使用 ClientGrpc 类而不是 ClientProxy 类,后者提供 getService() 方法。getService() 泛型方法接受服务名称作为参数并返回其实例(如果可用)。

或者,您可以使用 @Client() 装饰器来实例化 ClientGrpc 对象,如下所示:

@Injectable()
export class AppService implements OnModuleInit {
  @Client({
    transport: Transport.GRPC,
    options: {
      package: 'hero',
      protoPath: join(__dirname, 'hero/hero.proto'),
    },
  })
  client: ClientGrpc;

  private heroesService: HeroesService;

  onModuleInit() {
    this.heroesService = this.client.getService<HeroesService>('HeroesService');
  }

  getHero(): Observable<string> {
    return this.heroesService.findOne({ id: 1 });
  }
}

最后,对于更复杂的场景,我们可以使用 此处 描述的 ClientProxyFactory 类注入动态配置的客户端。

无论哪种情况,我们最终都会获得对 HeroesService 代理对象的引用,该对象公开与 .proto 文件中定义的相同方法集。现在,当我们访问此代理对象(即 heroesService)时,gRPC 系统会自动序列化请求,将其转发到远程系统,返回响应,并反序列化响应。由于 gRPC 使我们免受这些网络通信细节的影响,heroesService 看起来和行为就像本地提供者一样。

请注意,所有服务方法都是 小驼峰命名法(为了遵循语言的自然约定)。因此,例如,虽然我们的 .proto 文件 HeroesService 定义包含 FindOne() 函数,但 heroesService 实例将提供 findOne() 方法。

interface HeroesService {
  findOne(data: { id: number }): Observable<any>;
}

消息处理程序还能够返回 Observable,在这种情况下,结果值将被发出,直到流完成。

@Get()
call(): Observable<any> {
  return this.heroesService.findOne({ id: 1 });
}

要发送 gRPC 元数据(连同请求),您可以传递第二个参数,如下所示:

call(): Observable<any> {
  const metadata = new Metadata();
  metadata.add('Set-Cookie', 'yummy_cookie=choco');

  return this.heroesService.findOne({ id: 1 }, metadata);
}
提示

Metadata 类从 grpc 包导入。

请注意,这需要更新我们之前定义的 HeroesService 接口。

示例

一个工作示例可在 此处 获得。

gRPC 反射

gRPC 服务器反射规范 是一个标准,允许 gRPC 客户端请求服务器公开的 API 详情,类似于为 REST API 公开 OpenAPI 文档。这可以使使用 grpc-ui 或 postman 等开发人员调试工具变得更加容易。

要向服务器添加 gRPC 反射支持,首先安装所需的实现包:

$ npm i --save @grpc/reflection

然后可以使用 gRPC 服务器选项中的 onLoadPackageDefinition 钩子将其挂钩到 gRPC 服务器,如下所示:

import { ReflectionService } from '@grpc/reflection';

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  options: {
    onLoadPackageDefinition: (pkg, server) => {
      new ReflectionService(pkg).addToServer(server);
    },
  },
});

现在您的服务器将使用反射规范响应请求 API 详情的消息。

gRPC 流式传输

gRPC 本身支持长期活动连接,通常称为 streams。流对于聊天、观察或块数据传输等情况很有用。在官方文档 此处 中找到更多详细信息。

Nest 以两种可能的方式支持 GRPC 流处理程序:

  • RxJS Subject + Observable 处理程序:可用于在 Controller 方法内直接编写响应或将其传递给 Subject/Observable 消费者
  • 纯 GRPC 调用流处理程序:可用于传递给处理 Node 标准 Duplex 流处理程序的其余调度的执行器

流式传输示例

让我们定义一个新的示例 gRPC 服务,称为 HelloServicehello.proto 文件使用 协议缓冲区 构建。它看起来像这样:

// hello/hello.proto
syntax = "proto3";

package hello;

service HelloService {
  rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
  rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
}

message HelloRequest {
  string greeting = 1;
}

message HelloResponse {
  string reply = 1;
}
提示

LotsOfGreetings 方法可以使用 @GrpcMethod 装饰器简单实现(如上面的示例),因为返回的流可以发出多个值。

基于此 .proto 文件,让我们定义 HelloService 接口:

interface HelloService {
  bidiHello(upstream: Observable<HelloRequest>): Observable<HelloResponse>;
  lotsOfGreetings(
    upstream: Observable<HelloRequest>,
  ): Observable<HelloResponse>;
}

interface HelloRequest {
  greeting: string;
}

interface HelloResponse {
  reply: string;
}
提示

proto 接口可以由 ts-proto 包自动生成,了解更多 此处

主题策略

@GrpcStreamMethod() 装饰器将函数参数作为 RxJS Observable 提供。因此,我们可以接收和处理多个消息。

@GrpcStreamMethod()
bidiHello(messages: Observable<any>, metadata: Metadata, call: ServerDuplexStream<any, any>): Observable<any> {
  const subject = new Subject();

  const onNext = message => {
    console.log(message);
    subject.next({
      reply: 'Hello, world!'
    });
  };
  const onComplete = () => subject.complete();
  messages.subscribe({
    next: onNext,
    complete: onComplete,
  });

  return subject.asObservable();
}
警告

为了支持与 @GrpcStreamMethod() 装饰器的全双工交互,控制器方法必须返回 RxJS Observable

提示

MetadataServerUnaryCall 类/接口从 grpc 包导入。

根据服务定义(在 .proto 文件中),BidiHello 方法应该向服务流式传输请求。要从客户端向流发送多个异步消息,我们利用 RxJS ReplaySubject 类。

const helloService = this.client.getService<HelloService>('HelloService');
const helloRequest$ = new ReplaySubject<HelloRequest>();

helloRequest$.next({ greeting: 'Hello (1)!' });
helloRequest$.next({ greeting: 'Hello (2)!' });
helloRequest$.complete();

return helloService.bidiHello(helloRequest$);

在上面的示例中,我们向流写入了两条消息(next() 调用)并通知服务我们已完成发送数据(complete() 调用)。

调用流处理程序

当方法返回值定义为 stream 时,@GrpcStreamCall() 装饰器将函数参数作为 grpc.ServerDuplexStream 提供,该参数支持标准方法,如 .on('data', callback).write(message).cancel()。有关可用方法的完整文档,请参阅 此处

或者,当方法返回值不是 stream 时,@GrpcStreamCall() 装饰器提供两个函数参数,分别是 grpc.ServerReadableStream(更多信息 此处)和 callback

让我们开始实现应该支持全双工交互的 BidiHello

@GrpcStreamCall()
bidiHello(requestStream: any) {
  requestStream.on('data', message => {
    console.log(message);
    requestStream.write({
      reply: 'Hello, world!'
    });
  });
}
提示

此装饰器不需要提供任何特定的返回参数。预计流将类似于任何其他标准流类型进行处理。

在上面的示例中,我们使用 write() 方法将对象写入响应流。作为第二个参数传递给 .on() 方法的回调将在我们的服务每次接收到新的数据块时被调用。

让我们实现 LotsOfGreetings 方法。

@GrpcStreamCall()
lotsOfGreetings(requestStream: any, callback: (err: unknown, value: HelloResponse) => void) {
  requestStream.on('data', message => {
    console.log(message);
  });
  requestStream.on('end', () => callback(null, { reply: 'Hello, world!' }));
}

这里我们使用 callback 函数在 requestStream 处理完成后发送响应。

健康检查

在 Kubernetes 等编排器中运行 gRPC 应用程序时,您可能需要知道它是否正在运行且处于健康状态。gRPC 健康检查规范 是一个标准,允许 gRPC 客户端公开其健康状态,以允许编排器相应地采取行动。

要添加 gRPC 健康检查支持,首先安装 grpc-node 包:

$ npm i --save grpc-health-check

然后可以使用 gRPC 服务选项中的 onLoadPackageDefinition 钩子将其挂钩到 gRPC 服务,如下所示。请注意,protoPath 需要同时具有健康检查和 hero 包。

import { HealthImplementation, protoPath as healthCheckProtoPath } from 'grpc-health-check';

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  options: {
    protoPath: [
      healthCheckProtoPath,
      protoPath: join(__dirname, 'hero/hero.proto'),
    ],
    onLoadPackageDefinition: (pkg, server) => {
      const healthImpl = new HealthImplementation({
        '': 'UNKNOWN',
      });

      healthImpl.addToServer(server);
      healthImpl.setStatus('', 'SERVING');
    },
  },
});
提示

gRPC 健康探针 是一个有用的 CLI,用于在容器化环境中测试 gRPC 健康检查。

gRPC 元数据

元数据是关于特定 RPC 调用的信息,形式为键值对列表,其中键是字符串,值通常是字符串,但也可以是二进制数据。元数据对 gRPC 本身是不透明的 - 它让客户端向服务器提供与调用相关的信息,反之亦然。元数据可能包括身份验证令牌、请求标识符和用于监控目的的标签,以及数据信息,如数据集中的记录数。

要在 @GrpcMethod() 处理程序中读取元数据,请使用第二个参数(metadata),该参数的类型为 Metadata(从 grpc 包导入)。

要从处理程序发送回元数据,请使用 ServerUnaryCall#sendMetadata() 方法(第三个处理程序参数)。

@Controller()
export class HeroesService {
  @GrpcMethod()
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const serverMetadata = new Metadata();
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];

    serverMetadata.add('Set-Cookie', 'yummy_cookie=choco');
    call.sendMetadata(serverMetadata);

    return items.find(({ id }) => id === data.id);
  }
}

同样,要在使用 @GrpcStreamMethod() 处理程序(主题策略)注释的处理程序中读取元数据,请使用第二个参数(metadata),该参数的类型为 Metadata(从 grpc 包导入)。

要从处理程序发送回元数据,请使用 ServerDuplexStream#sendMetadata() 方法(第三个处理程序参数)。

要在 调用流处理程序(使用 @GrpcStreamCall() 装饰器注释的处理程序)中读取元数据,请在 requestStream 引用上监听 metadata 事件,如下所示:

requestStream.on('metadata', (metadata: Metadata) => {
  const meta = metadata.get('X-Meta');
});