Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core

前置条件:
《Dapr运用》
《Dapr 运用之 Java gRPC 调用篇》
《Dapr 运用之集成 Asp.Net Core Grpc 调用篇》


  1. 搭建 RabbitMQ

    • Docker 搭建 RabbitMQ 服务

      docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
    • 创建 rabbiqmq.yaml
      apiVersion: dapr.io/v1alpha1
      kind: Component
      metadata:
      name: messagebus
      spec:
      type: pubsub.rabbitmq
      metadata:
      - name: host
          value: "amqp://localhost:5672" # Required. Example: "rabbitmq.default.svc.cluster.local:5672"
      - name: consumerID
          value: "61415901178272324029" # Required. Any unique ID. Example: "myConsumerID"
      - name: durable
          value: "true" # Optional. Default: "false"
      - name: deletedWhenUnused
          value: "false" # Optional. Default: "false"
      - name: autoAck
          value: "false" # Optional. Default: "false"
      - name: deliveryMode
          value: "2" # Optional. Default: "0". Values between 0 - 2.
      - name: requeueInFailure
          value: "true" # Optional. Default: "false".
  2. 改造 StorageService.Api

    目的:把 StorageService 从 Grpc 客户端改造为 Grpc 服务端,并 Sub Storage.Reduce 主题,完成减库存操作。

    • 删除 Storage 中无用的代码 StorageController.cs
    • 修改 Program.cs 中的 CreateHostBuilder 代码为
      public static IHostBuilder CreateHostBuilder(string[] args)
      {
          return Host.CreateDefaultBuilder(args)
              .ConfigureWebHostDefaults(webBuilder =>
              {
                  webBuilder.ConfigureKestrel(options =>
                  {
                      options.Listen(IPAddress.Loopback, 5003, listenOptions =>
                      {
                          listenOptions.Protocols = HttpProtocols.Http2;
                      });
                  });
                  webBuilder.UseStartup<Startup>();
              });
      }
    • 添加 DaprClientService
      public sealed class DaprClientService : DaprClient.DaprClientBase
      {
          public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context)
          {
              var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();
              topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");
              return Task.FromResult(topicSubscriptionsEnvelope);
          }
      }

      Dapr 运行时将调用此方法获取 StorageServcie 关注的主题列表

    • 修改 Startup.cs
       /// <summary>
      /// This method gets called by the runtime. Use this method to add services to the container.
      /// </summary>
      /// <param name="services">Services.</param>
      public void ConfigureServices(IServiceCollection services)
      {
          services.AddGrpc();
          services.AddDbContextPool<StorageContext>(options => { options.UseMySql(Configuration.GetConnectionString("MysqlConnection")); });
      }
      /// <summary>
      /// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
      /// </summary>
      /// <param name="app">app.</param>
      /// <param name="env">env.</param>
      public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
      {
          if (env.IsDevelopment())
          {
              app.UseDeveloperExceptionPage();
          }
      
          app.UseRouting();
      
          app.UseEndpoints(endpoints =>
          {
              endpoints.MapSubscribeHandler();
              endpoints.MapGrpcService<DaprClientService>();
          });
      }
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除 redis_messagebus.yaml 文件
    • 启动 StorageService 服务
      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
  3. 使用 Java 开发一个 Order 服务端,Order 服务提供的功能为
    • 下单
    • 查看订单详情
    • 获取订单列表

    在当前上下文中着重处理的是下单功能,以及下单成功后 Java 服务端将发布一个事件到 Storage.Reduce 主题,即减少库存。

    • 创建 CreateOrder.proto 文件

      syntax = "proto3";
      
      package daprexamples;
      
      option java_outer_classname = "CreateOrderProtos";
      option java_package = "generate.protos";
      
      service OrderService {
          rpc CreateOrder (CreateOrderRequest) returns (CreateOrderResponse);
          rpc RetrieveOrder(RetrieveOrderRequest) returns(RetrieveOrderResponse);
          rpc GetOrderList(GetOrderListRequest) returns(GetOrderListResponse);
      }
      
      message CreateOrderRequest {
          string ProductID = 1; //Product ID
          int32 Amount=2; //Product Amount
          string CustomerID=3; //Customer ID
      }
      
      message CreateOrderResponse {
          bool Succeed = 1; //Create Order Result,true:success,false:fail
      }
      
      message RetrieveOrderRequest{
          string OrderID=1;
      }
      
      message RetrieveOrderResponse{
          Order Order=1;
      }
      
      message GetOrderListRequest{
          string CustomerID=1;
      }
      
      message GetOrderListResponse{
          repeated Order Orders=1;
      }
      
      message Order{
          string ID=1;
          string ProductID=2;
          int32 Amount=3;
          string CustomerID=4;
      }
    • 使用 protoc 生成 Java 代码
      protoc -I=C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples --java_out=C:\Users\JR\DaprDemos\java\examples\src\main\java  C:\Users\JR\DaprDemos\java\examples\src\main\protos\examples\CreateOrder.proto
    • 引用 MyBatis 做为 Mapper 工具
    • 修改 HelloWorldService.java 文件,提取 GrpcHelloWorldDaprService.java 到单独的包中,在此文件中添加 createOrder()getOrderList()retrieveOrder() 三个函数的实现
    • 复制 rabbimq.yaml 文件到 components 文件夹中,删除原有 redis_messagebus.yaml 文件
    • 启动 OrderService 服务
      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
  4. 创建 Golang Grpc 客户端,该客户端需要完成创建订单 Grpc 调用,订单创建成功发布扣除库存事件
    • 引用 CreateOrder.proto 文件,并生成 CreateOrder.pb.go 文件

      如未安装 protoc-gen-gogo ,通过一下命令获取并安装

      go get github.com/gogo/protobuf/gogoproto

      安装 protoc-gen-gogo

      go install github.com/gogo/protobuf/gogoproto

      根据 proto 文件生成代码

      protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\CreateOrder.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
    • 客户端代码,创建订单
      ...
      
       response, err := client.InvokeService(context.Background(), &pb.InvokeServiceEnvelope{
          Id:     "OrderService",
          Data:   createOrderRequestData,
          Method: "createOrder",
          })
          if err != nil {
              fmt.Println(err)
              return
          }
      
      ...
    • 添加 DataToPublish.proto 文件,此文件作为事件发布数据结构
      syntax = "proto3";
      
      package daprexamples;
      
      option java_outer_classname = "DataToPublishProtos";
      option java_package = "generate.protos";
      
      message StorageReduceData {
          string ProductID = 1;
          int32 Amount=2;
      }
    • 生成 DataToPublish 代码
       protoc -I C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\DataToPublish.proto --go_out=plugins=grpc:C:\Users\JR\DaprDemos\golang\shoppingCartForJava\protos\daprexamples\
    • 修改 main.go 代码,根据 createOrder 结果判断是否要发布信息到消息队列
      ...
      
      createOrderResponse := &daprexamples.CreateOrderResponse{}
      
      if err := proto.Unmarshal(response.Data.Value, createOrderResponse); err != nil {
          fmt.Println(err)
          return
      }
      fmt.Println(createOrderResponse.Succeed)
      
      if !createOrderResponse.Succeed {
          //下单失败
          return
      }
      
      storageReduceData := &daprexamples.StorageReduceData{
          ProductID: createOrderRequest.ProductID,
          Amount:    createOrderRequest.Amount,
      }
      storageReduceDataData, err := jsoniter.ConfigFastest.Marshal(storageReduceData) //ptypes.MarshalAny(storageReduceData)
      if err != nil {
          fmt.Println(err)
          return
      }
      
      _, err = client.PublishEvent(context.Background(), &pb.PublishEventEnvelope{
          Topic: "Storage.Reduce",
          Data:  &any.Any{Value: storageReduceDataData},
      })
      
      fmt.Println(storageReduceDataData)
      
      if err != nil {
          fmt.Println(err)
      } else {
          fmt.Println("Published message!")
      }
      ...

      注意: 发送数据前,使用 jsoniter 转换数据为 json 字符串,原因是如果直接传输 Grpc 流,当前版本(0.3.x) Dapr runtime 打包数据时使用 Json 打包,解包使用 String ,导致数据不一致。

    • 复制 rabbimq.yaml 文件到 components 文件夹,删除原有 redis_messagebus.yaml 文件
    • 启动 golang Grpc 客户端
       dapr run --app-id client go run main.go

      输出

      == APP == true
      == APP == Published message!
  5. RabbitMQ
    • 在浏览器中输入 http://localhost:15672/ ,账号和密码均为 guest
    • 查看 Connections ,有3个连接
      • 这个3个连接来自配置了 messagebus.yaml 组件的三个服务
    • 查看 Exchanges
      Name            Type    Features    Message rate in Message rate out
      (AMQP default)  direct  D
      Storage.Reduce  fanout  D
      amq.direct      direct  D
      amq.fanout      fanout  D
      ...

      着重看 Storage.Reduce ,可以看出 Dapr 运行时创建了一个 fanout 类型的 Exchange ,这表明该 Exhange 中的数据是广播的。

    • 查看 Queues

      Dapr 运行时创建了 storageService-Storage.Reduce ,该 Queue 绑定了 Storage.Reduce Exchange ,所以可以收到 Storage.Reduce 的广播数据。

  6. DotNet Core StorageService.Api 改造以完成 Sub 事件
    • 打开 DaprClientService.cs 文件,更改内容为

      public sealed class DaprClientService : DaprClient.DaprClientBase
      {
          private readonly StorageContext _storageContext;
      
          public DaprClientService(StorageContext storageContext)
          {
              _storageContext = storageContext;
          }
      
          public override Task<GetTopicSubscriptionsEnvelope> GetTopicSubscriptions(Empty request, ServerCallContext context)
          {
              var topicSubscriptionsEnvelope = new GetTopicSubscriptionsEnvelope();
              topicSubscriptionsEnvelope.Topics.Add("Storage.Reduce");
              return Task.FromResult(topicSubscriptionsEnvelope);
          }
      
          public override async Task<Empty> OnTopicEvent(CloudEventEnvelope request, ServerCallContext context)
          {
              if (request.Topic.Equals("Storage.Reduce"))
              {
                  StorageReduceData storageReduceData = StorageReduceData.Parser.ParseJson(request.Data.Value.ToStringUtf8());
                  Console.WriteLine("ProductID:" + storageReduceData.ProductID);
                  Console.WriteLine("Amount:" + storageReduceData.Amount);
                  await HandlerStorageReduce(storageReduceData);
              }
              return new Empty();
          }
      
          private async Task HandlerStorageReduce(StorageReduceData storageReduceData)
          {
              Guid productID = Guid.Parse(storageReduceData.ProductID);
              Storage storageFromDb = await _storageContext.Storage.FirstOrDefaultAsync(q => q.ProductID.Equals(productID));
              if (storageFromDb == null)
              {
                  return;
              }
      
              if (storageFromDb.Amount < storageReduceData.Amount)
              {
                  return;
              }
      
              storageFromDb.Amount -= storageReduceData.Amount;
              Console.WriteLine(storageFromDb.Amount);
              await _storageContext.SaveChangesAsync();
          }
    • 说明
      • 添加 GetTopicSubscriptions() 将完成对主题的关注

        • 当应用停止时,RabbitMQ 中的 Queue 自动删除
        • 添加 OnTopicEvent() 重写,此方法将完成对 Sub 主题的事件处理
      • HandlerStorageReduce 用于减少库存
  7. 启动 DotNet Core StorageService.Api Grpc 服务,启动 Java OrderService Grpc 服务,启动 Go Grpc 客户端
    • DotNet Core

      dapr run --app-id storageService --app-port 5003 --protocol grpc dotnet run
    • Java
      dapr run --app-id OrderService --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=server.HelloWorldService -Dexec.args="-p 5000"
    • go
      dapr run --app-id client  go run main.go

      go grpc 输出为

      == APP == true
      == APP == Published message!

    查看 MySql Storage 数据库,对应产品库存减少 20

至此,通过 Dapr runtime 完成了 Go 和 Java 之间的 Grpc 调用,并通过 RabbitMQ 组件完成了 Pub/Sub

源码地址

原文地址:https://www.cnblogs.com/Zhang-Xiang/p/12106577.html

时间: 2024-08-28 04:20:19

Dapr Pub/Sub 集成 RabbitMQ 、Golang、Java、DotNet Core的相关文章

RabbitMQ与java、Spring结合实例详细讲解

林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 摘要:本文介绍了rabbitMq,提供了如何在Ubuntu下安装RabbitMQ 服务的方法.最好以RabbitMQ与java.Spring结合的两个实例来演示如何使用RabbitMQ. 本文工程免费下载 一.rabbitMQ简介 1.1.rabbitMQ的优点(适用范围)1. 基于erlang语言开发具有高可用高并发的优点,适合集群服务器.2. 健壮.稳定.易用.跨平台.支持多种语言.文档

Laravel 集成 RabbitMQ 消息队列

目录 消息队列 RabbitMQ docker 部署 RabbitMQ 操作步骤 访问管理界面 Laravel 集成 RabbitMQ Laravel 5.2 Laravel 5.5 消息队列 消息(Message)是指在应用间传送的数据.可以只包含文本字符串,也可以嵌入对象. 消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递. 消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的

RabbitMQ(3) Java客户端使用

RabbitMQ针对不同的开发语言(java,python,c/++,Go等等),提供了丰富对客户端,方便使用.就Java而言,可供使用的客户端有RabbitMQ Java client. RabbitMQ JMS client.apache的camel-rabbitmq.以及Banyan等.在Spring中,也可以使用Spring AMQP.Spring Cloud Data Flow方便对集成RabbitMQ. 实际开发使用中,RabbitMQ Java client和Spring AMQP

Spring Boot系列——7步集成RabbitMQ

RabbitMQ是一种我们经常使用的消息中间件,通过RabbitMQ可以帮助我们实现异步.削峰的目的. 今天这篇,我们来看看Spring Boot是如何集成RabbitMQ,发送消息和消费消息的.同时我们介绍下死信队列. 集成RabbitMQ 集成RabbitMQ只需要如下几步即可 1.添加maven依赖 <!--rabbitmq--> <dependency> ? ? <groupId>org.springframework.boot</groupId>

rabbitMQ第五篇:Spring集成RabbitMQ

前面几篇讲解了如何使用rabbitMq,这一篇主要讲解spring集成rabbitmq. 首先引入配置文件org.springframework.amqp,如下 <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.0.RELEASE</version> <

rabbitmq的java简单实现

1,安装rabbitmq.我的是ubuntu14.04,在官网上面下载最新的安装文件http://www.rabbitmq.com/install-debian.html 2.安装完之后  启动rabbitmq, sudo rabbitmq-server 3.下载jar包 4.最简单的hello world的实现 Sender类 package com.lubby.test; import java.io.IOException; import com.rabbitmq.client.Chann

RabbitMQ第四篇:Spring集成RabbitMQ

前面几篇讲解了如何使用rabbitMq,这一篇主要讲解spring集成rabbitmq. 首先引入配置文件org.springframework.amqp,如下 <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.0.RELEASE</version> <

springBoot(24):集成rabbitmq

注意:springboot支持的amqp规范的中间件只有rabbitmq 第一步:添加依赖 <!-- amqp --> <dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 第二步:配置application.yml sp

RabbitMQ简单Java示例——生产者和消费者

添加Maven依赖: 使用rabbitmq-client的最新Maven坐标: <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.3.0</ver