前面讲到过,MassTransit的消费三种方式,Consumer方式就是其中的一种,IConsumer<T>是该方式的约定接口。
一般来说,每个Exchange对应一个Consumer,即常规代码如下
public class SomeConsumer : IConsumer<ClassA> { public Task Consume(ConsumeContext<ClassA> context) { throw new NotImplementedException(); } }
当一个Consumer用于同一类消息(这类消息都有一个共同的基类)时,那么我们一般会通过泛型(继承)来实现Consumer
public class GenericConsumer<T> : IConsumer<T> where T : class, ICommand//这里ICommand就是所有接收到的ExChange的基类,这里就不讨论广播或者发送时可以指定发送的是Exchange,实际是其它类型的消息 {//所有消息都基于同一个基类时,可以通过泛型来实现Consumer public Task Consume(ConsumeContext<T> context) { //do some thing //以ICommand为基础做一些操作 throw new NotImplementedException(); } }
这时候你可以通过下面的代码来接收所有Exchange为ICommand的消息
configure.ReceiveEndpoint(host, "allcommand", e => { e.Consumer(() => new GenericConsumer<ICommand>());//当然这么接收的前提是消息发送方均指定了Exchange为ICommand,比如rbBus.Publish<ICommand>(new { Value = text }); });
这时候,还有一种情况,虽然不常见,但的确有的情况,那就是这些消息彼此之间没有任何关系,但他们都要做一些共同的事情,这时候泛型肯定是走不通(当然你说改代码让所有消息都继承同一个基类也可以,但这里不考虑这种方式),而第一种一个个声明的方式虽然能实现,但要写N个Consumer实现,接收部分也要写N个e.Consumer委托,这肯定不是个最好的实现,那么该如何做呢?好吧,多继承该登场了,相关代码如下
public class CommonConsumer : IConsumer<ClassA>, IConsumer<ClassB>, IConsumer<Class1> {//ClassA和ClassB没有任何关系,他们仅仅是某些属性相同,这里假设他们都有Id和Value属性 //Class1和ClassA、ClassB连属性名称都不一样 public async Task Consume(ConsumeContext<ClassB> context) { await this.DoSomeThing(context.Message); } public async Task Consume(ConsumeContext<ClassA> context) { await this.DoSomeThing(context.Message); } public async Task Consume(ConsumeContext<Class1> context) { dynamic message = context.Message;//假设Class1并不具备id和value,它有的是cid和cvalue await this.DoSomeThing(new { Id = message.CId, Value = message.CValue }); } private async Task DoSomeThing(dynamic message) { //do something //假定都有id,value属性,然后就可以通过DLR对message.Id和message.Value做一些处理 } }
而接收代码也是简单的如下
configure.ReceiveEndpoint(host, "allcommand", e => { e.Consumer(() => new CommonConsumer()); });
注意这时消息发送方在进行消息推送时,还是按各自的Exchange进行推送,这与第二种的泛型Consumer不一样
时间: 2024-10-31 11:08:35