开发者

Go语言中gPRC的使用

目录
  • 核心概念理解
    • 1. Protobuf 与服务定义
    • 2. 代码生成
    • 3. 服务端实现
    • 4. 服务启动
    • 5. 客户端调用
  • 与标准库 RPC 的区别
    • 客户端异步调用
      • gPRC流
        • 1. 为什么需要 gRPC 流?
        • 2. 定义流服务
        • 3. 流接口详解
        • 4. 服务端实现
        • 5. 客户端实现
        • 6. 流的工作模式
        • 7. 流的特性
        • 8. 适用场景
        • 9. 注意事项
      • gRPC发布-订阅
        • 本地发布-订阅实现
      • 基于gRPC的远程发布-订阅系统
        • 1. 定义服务接口
        • 2. 服务端实现
        • 3. 客户端发布消息
        • 4. 客户端订阅消息

      核心概念理解

      1. Protobuf 与服务定义

      gRPC 使用 Protobuf 来定义服务接口和消息格式。在你的例子中,hello.proto 文件定义了一个名为 HelloService 的服务,它包含一个方法 Hello,该方法接收一个 String 消息并返回一个 String 消息。

      service HelloService {
        rpc Hello (String) returns (String);
      }
      

      这类似于定义一个函数接口,但它是跨网络调用的。

      2. 代码生成

      通过 Protobuf 编译器和 gRPC 插件,你可以生成客户端和服务器的代码:

      protoc --go_out=plugins=grpc:. hello.proto
      

      这行命令会生成:

      • 客户端代码:包含如何调用远程服务的方法
      • 服务器代码:包含如何实现服务的接口定义

      3. 服务端实现

      服务端需要实现 HelloServiceServer 接口:

      type HelloServiceImpl struct{}
      
      func (p *HelloServiceImpl) Hello(ctx context.Context, args *String) (*String, error) {
          reply := &String{Value: "hello:" + args.GetValue()}
          return reply, nil
      }
      

      这里的 Hello 方法是服务的具体实现,它接收一个字符串参数,添加 “hello:” 前缀后返回。

      4. 服务启动

      服务端需要启动 gRPC 服务器并注册服务:

      func main() {
          grpcServer := grpc.NewServer()
          RegisterHelloServiceServer(grpcServer, new(HelloServiceImpl))
      
          lis, err := net.Listen("tcp", ":1234")
          if err != nil {
              log.Fatal(err)
          }
          grpcServer.Serve(lis)
      }
      

      这部分代码:

      • 创建 gRPC 服务器
      • 注册服务实现
      • 监听 TCP 端口
      • 启动服务

      5. 客户端调用

      客户端需要连接到服务端并调用服务:

      func main() {
          conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
          if err != nil {
              log.Fatal(err)
          }
          defer conn.Close()
      
          client := NewHelloServiceClient(conn)
          reply, err := client.Hello(context.Background(), &String{Value: "hello"})
          if err != nil {
              log.Fatal(err)
          }
          fmt.Println(reply.GetValue())
      }
      

      这部分代码:

      • 连接到服务端
      • 创建客户端
      • 调用远程方法
      • 处理返回结果

      与标准库 RPC 的区别

      • 协议:gRPC 使用 HTTP/2,而标准库 RPC 使用自定义协议
      • 序列化:gRPC 使用 Protobuf (二进制),标准库 RPC 使用 jsON 或 Gob
      • 性能:gRPC 通常更快,尤其是在高并发场景
      • 跨语言支持:gRPC 支持多种语言,标准库 RPC 主要支持 Go
      • 流模式:gRPC 支持双向流,标准库 RPC 不支持

      客户端异步调用

      你提到 gRPC 不直接支持异步调用,但可以通过 Goroutine 实现:

      // 客户端异步调用示例
      go func() {
          reply, err := client.Hello(context.Background(), &String{Value: "async"})
          if err != nil {
              log.Println("Error:", err)
              return
          }
          fmt.Println("Async reply:", reply.GetValue())
      }()
      

      gPRC流

      1. 为什么需要 gRPC 流?

      传统 RPC 调用是"请求-响应"模式:

      • 客户端发送一个请求
      • 服务器处理请求并返回一个响应
      • 整个过程是同步的,不适合大数据传输或实时通信

      gRPC 流解决了这些问题,它允许:

      • 客户端和服务器之间持续交换数据
      • 支持四种流模式:
        • 客户端流(单向)
        • 服务器流(单向)
        • 双向流
        • 一元(普通 RPC,无流)

      2. 定义流服务

      .proto 文件中,使用 stream 关键字定义流:js

      service HelloService {
        // 普通 RPC(一元)
        rpc Hello (String) returns (String);
        
        // 双向流 RPC
        rpc Channel (stream String) returns (stream String);
      }
      

      这里的 Channel 方法支持双向流:

      • 客户端可以持续发送 String 消息
      • 服务器可以持续返回 String 消息

      3. 流接口详解

      生成的代码中,流接口包含 SendRecv 方法:

      // 服务端流接口
      type HelloService_ChannelServer interface {
          Send(*String) error       // 向客户端发送消息
          Recv() (*String, error)   // 从客户端接收消息
          grpc.ServerStream
      }
      
      // 客户端流接口
      type HelloService_ChannelClient interface {
          Send(*String) error       /http://www.devze.com/ 向服务器发送消息
          Recvandroid() (*String, error)   // 从服务器接收消息
          grpc.ClientStream
      }
      

      这些方法允许双向、异步的数据交换。

      4. 服务端实现

      服务端实现 Channel 方法:

      func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
          for {
           编程   // 接收客户端消息
              args, err := stream.Recv()
              if err != nil {
                  if err == io.EOF {
                      return nil // 客户端关闭流
                  }
                  return err // 处理其他错误
              }
      
              // 处理消息并返回响应
              reply := &String{Value: "hello:" + args.GetValue()}
              err = stream.Send(reply)
              if err != nil {
                  return err
              }
          }
      }
      

      关键点:

      • stream.Recv() 从客户端接收消息
      • stream.Send() 向客户端发送消息
      • io.EOF 表示客户端关闭了流

      5. 客户端实现

      客户端需要启动两个 Goroutine 分别处理发送和接收:

      // 创建流
      stream, err := client.Channel(context.Background())
      if err != nil {
          log.Fatal(err)
      }
      
      // 发送消息的 Goroutine
      go func() {
          for {
              if err := stream.Send(&String{Value: "hi"}); err != nil {
                  log.Fatal(err)
              }
              time.Sleep(time.Second) // 每秒发送一次
          }
      }()
      
      // 接收消息的主循环
      for {
          reply, err := stream.Recv()
          if err != nil {
              if err == io.EOF {
                  break // 服务器关闭流
              }
              log.Fatal(err)
          }
          fmt.Println("Received:", reply.GetValue())
      }
      

      关键点:

      • 发送和接收是完全独立的
      • 客户端和服务器可以随时发送数据
      • io.EOF 表示服务器关闭了流

      6. 流的工作模式

      gRPC 流有四种工作模式:

      1. 一元 RPC(无流)

      rpc UnaryMethod(Request) returns (Response) {}
      
      • 客户端发送一个请求,服务器返回一个响应

      2. 服务器流

      rpc ServerStreamingMethod(Request) returns (stream Response) {}
      
      • 客户端发送一个请求
      • 服务器返回一个数据流
      • 示例:实时数据推送

      3. 客户端流

      rpc ClientStreamingMethod(stream Request) returns (Response) {}
      
      • 客户端发送一个数据流
      • 服务器返回一个响应
      • 示例:大数据上传

      4. 双向流

      rpc BidirectionalStreamingMethod(stream Request) returns (stream Response) {}
      
      • 客户端和服务器可以同时发送和接收数据流
      • 示例:实时聊天、多人游戏

      7. 流的特性

      1. 全双工通信:客户端和服务器可以同时发送和接收数据
      2. 异步处理:发送和接收操作不阻塞其他操作
      3. 高效传输:基于 HTTP/2 的多路复用,单个连接支持多个流
      4. 大数据支持:适合传输大文件或持续数据流
      5. 实时性:数据可以立即传输,无需等待整个消息完成

      8. 适用场景

      • 实时数据推送(股票行情、实时通知)
      • 大数据上传/下载
      • 实时聊天系统
      • 监控系统(持续数据流)
      • 游戏服务器(低延迟通信)

      9. 注意事项

      1. 错误处理:流可能因网络问题或服务器关闭而中断,需要适当处理错误
      2. 资源管理:流使用完后需要关闭,避免资源泄漏
      3. 并发控制:多个 Goroutine 访问同一个流时需要考虑同步问题
      4. 消息顺序:在双向流中,发送和接收的顺序可能不同步

      gRPC发布-订阅

      发布-订阅(PubSub)模式是一种消息传递模式,其中发送者(发布者)不会直接将消息发送给特定的接收者(订阅者),而是将消息分类发布到主题中。订阅者可以订阅一个或多个主题,只接收他们感兴趣的消息。这种模式实现了发布者和订阅者之间的解耦,非常适合构建分布式系统。

      本地发布-订阅实现

      首先,我们看一下基于 moby/moby/pkg/pubsub 包的本地实现:

      import (
          "github.com/moby/moby/pkg/pubsub"
      )
      
      func main() {
          // 创建发布者,设置超时和队列大小
          p := pubsub.Newpublisher(100*time.Millisecond, 10)
      
          // 订阅golang主题
          golang := p.SubscribeTopic(func(v interface{}) bool {
              if key, ok := v.(string); ok {
                  return strings.HASPrefix(key, "golang:")
              }
              return false
          })
          
          // 订阅docker主题
          docker := p.SubscribeTopic(func(v interface{}) bool {
              if key, ok := v.(string); ok {
                  return strings.HasPrefix(key, "docker:")
              }
              return false
          })
      
          // 发布消息
          go p.Publish("hi")
          go p.Publish("golang: https://golang.org")
          go p.Publish("docker: https://www.docker.com/")
          
          // 接收消息
          go func() {
              fmt.Println("golang topic:", <-golang)
          }()
          go func() {
              fmt.Println("docker topic:", <-docker)
          }()
      
          <-make(chan bool) // 保持主程序运行
      }
      

      关键点:

      • pubsub.NewPublisher 创建一个发布者,负责管理主题和分发消息
      • SubscribeTopic 使用谓词函数过滤感兴趣的主题
      • 发布者发布消息时,所有订阅该主题的订阅者都会收到通知
      • 这是一个本地实现,无法跨网络工作

      基于gRPC的远程发布-订阅系统

      现在,我们将使用gRPC扩展这个系统,使其能够跨网络工作。

      1. 定义服务接口

      service PubsubService {
        rpc Publish (String) returns (String);       // 发布消息
        rpc Subscribe (String) returns (stream String); // 订阅主题
      }
      

      这里:

      • Publish 是一个普通的RPC方法,用于发布消息
      • Subscribe 是一个服务端流方法,允许客户端持续接收消息

      2. 服务端实现

      type PubsubService struct {
          pub *pubsub.Publisher
      }
      
      func NewPubsubService() *PubsubService {
          return &PubsubService{
              pub: pubsub.NewPublisher(100*time.Millisecond, 10),
          }
      }
      
      // 发布消息
      func (p *PubsubService) Publish(ctx context.Context, arg *String) (*String, error) {
          p.pub.Publish(arg.GetValue())
          return &String{}, nil
      }
      
      // 订阅主题
      func (p *PubsubService) Subscribe(arg *String, stream PubsubService_SubscribeServer) error {
          // 订阅特定主题
          ch := p.pub.SubscribeTopic(func(v interface{}) bool {
              if key, ok := v.(string); ok {
                  return strings.HasPrefix(key, arg.GetValue())
              }
              return false
          })
      
          // 将接收到的消息通过流发送给客户端
          for v := range ch {
              if err := stream.Send(&String{Value: v.(string)}); err != nil {
                  return err
              }
          }
      
          return nil
      }
      

      关键点:

      • 服务端维护一个 pubsub.Publisher 实例
      • Publish 方法将消息发布到本地发布者
      • Subscribe 方法创建一个主题订阅,并通过gRPC流将消息发送给客户端

      3. 客户端发布消息

      func main() {
          conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
          if err != nil {
              log.Fatal(err)
          }
          defer conn.Close()
      
          client := NewPubsubServiceClient(conn)
      
          // 发布golang主题消息
          _, err = client.Publish(context.Background(), &String{Value: "golang: hello Go"})
          if err != nil {
              log.Fatal(err)
          }
          
          // 发布docker主题消息
          _, err = client.Publish(context.Background(), &String{Value: "docker: hello Docker"})
          if err != nil {
              log.Fatal(err)
          }
      }
      

      4. 客户端订阅消息

      func main() {
          conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
          if err != nil {
              log.Fatal(err)
          }
          defer conn.Close()
      
          client := NewPubsubServiceClient(conn)
          
          // 订阅golang主题
          stream, err := client.Subscribe(context.Background(), &String{Value: "golang:"})
      android    if err != nil {
              log.Fatal(err)
          }
      
          // 持续接收消息
          for {
              reply, err := stream.Recv()
              if err != nil {
                  if err == io.EOF {
                      break
                  }
                  log.Fatal(err)
              }
      
              fmt.Println(reply.GetValue())
          }
      }
      

      工作原理详解

      服务端

      • 创建一个本地的 pubsub.Publisher 实例
      • Publish 方法将接收到的消息发布到本地发布者
      • Subscribe 方法创建一个主题订阅,并通过gRPC流将消息实时发送给客户端

      客户端发布

      • 调用远程的 Publish 方法
      • 消息通过gRPC传输到服务端
      • 服务端将消息发布到本地主题

      客户端订阅

      • 调用远程的 Subscribe 方法,建立一个服务端流
      • 服务端为该客户端创建一个主题订阅
      • 当有匹配的消息发布时,服务端通过流将消息发送给客户端
      • 客户端持续接收并处理这些消息

      优势与应用场景

      这种基于gRPC的发布-订阅系统具有以下优势:

      • 跨语言支持:gRPC支持多种语言,可以构建多语言混合的分布式系统
      • 高性能:基于HTTP/2和Protobuf,性能优越
      • 实时通信:使用流特性实现实时消息推送
      • 解耦:发布者和订阅者不需要直接交互
      • 扩展性:可以轻松扩展为支持多个主题和大量订阅者

      典型应用场景包括:

      • 实时数据推送(如股票行情、新闻更新)
      • 微服务间的事件驱动通信
      • 实时聊天系统
      • 监控和警报系统

      通过这种方式,你可以构建一个跨网络的、高效的发布-订阅系统,充分利用gRPC的流特性和类型安全优势。

      到此这篇关于Go语言中gPRC的使用的文章就介绍到这了,更多相关Go语言 gPRC内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜