kafka高吞吐量的分布式发布订阅的消息队列系统

一:kafka介绍
kafka(官网地址:http://kafka.apache.org)是一种高吞吐量的分布式发布订阅的消息队列系统,具有高性能和高吞吐率。

1.1 术语介绍
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
主题:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
分区:Partition是物理上的概念,每个Topic包含一个或多个Partition.(一般为kafka节点数cpu的总核数)
Producer
生产者,负责发布消息到Kafka broker
Consumer
消费者:从Kafka broker读取消息的客户端。
Consumer Group
消费者组:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
1.2 基本特性
可扩展性
在不需要下线的情况下进行扩容
数据流分区(partition)存储在多个机器上
高性能
单个broker就能服务上千客户端
单个broker每秒种读/写可达每秒几百兆字节
多个brokers组成的集群将达到非常强的吞吐能力
性能稳定,无论数据多大
Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。
1.3 消息格式
一个topic对应一种消息格式,因此消息用topic分类
一个topic代表的消息有1个或者多个patition(s)组成
一个partition应该存放在一到多个server上,如果只有一个server,就没有冗余备份,是单机而不是集群;如果有多个server,一个server为leader(领导者),其他servers为followers(跟随者),leader需要接受读写请求,followers仅作冗余备份,leader出现故障,会自动选举一个follower作为leader,保证服务不中断;每个server都可能扮演一些partitions的leader和其它partitions的follower角色,这样整个集群就会达到负载均衡的效果
消息按顺序存放;消息顺序不可变;只能追加消息,不能插入;每个消息都有一个offset,用作消息ID, 在一个partition中唯一;offset有consumer保存和管理,因此读取顺序实际上是完全有consumer决定的,不一定是线性的;消息有超时日期,过期则删除
1.4 原理解析
producer创建一个topic时,可以指定该topic为几个partition(默认是1,配置num.partitions),然后会把partition分配到每个broker上,分配的算法是:a个broker,第b个partition分配到b%a的broker上,可以指定有每个partition有几分副本Replication,副本的分配策略为:第c个副本存储在第(b+c)%a的broker上。一个partition在每个broker上是一个文件夹,文件夹中文件的命名方式为:topic名称+有序序号。每个partition中文件是一个个的segment,segment file由.index和.log文件组成。两个文件的命名规则是,上一个segmentfile的最后一个offset。这样,可以快速的删除old文件。

producer往kafka里push数据,会自动的push到所有的分区上,消息是否push成功有几种情况:1,接收到partition的ack就算成功,2全部副本都写成功才算成功;数据可以存储多久,默认是两天;producer的数据会先存到缓存中,等大小或时间达到阈值时,flush到磁盘,consumer只能读到磁盘中的数据。

consumer从kafka里poll数据,poll到一定配置大小的数据放到内存中处理。每个group里的consumer共同消费全部的消息,不同group里的数据不能消费同样的数据,即每个group消费一组数据。

consumer的数量和partition的数量相等时消费的效率最高。这样,kafka可以横向的扩充broker数量和partitions;数据顺序写入磁盘;producer和consumer异步

二:环境搭建(windows)
2.1 安装zookeeper
kafka需要用到zookeeper,所以需要先安装zookeeper

到官网下载最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/
解压到指定路径
复制conf目录下zoo_sample.cfg,粘贴改名为zoo.cfg,修改zoo.cfg中的dataDir的值为E:/data/zookeeper,并添加一行dataLogDir=E:/log/zookeeper
修改系统环境变量,在Path后添加 ;E:\zookeeper\zookeeper-3.4.10\bin
运行cmd命令窗口,输入zkServer回车,启动
2.2 安装kafka
到官网下载最新版kafka,http://kafka.apache.org/downloads
解压到指定路径,如:E:\kafka_2.12-0.10.2.0
修改E:\kafka_2.12-0.10.2.0\config目录下的server.properties中 log.dirs的值为E:/log/kafka
添加系统环境变量,在Path后添加 ;E:\kafka_2.12-0.10.2.0\bin\windows
启动kafka,在cmd命令行用cd命令切换到kafka根目录E:\kafka_2.12-0.10.2.0,输入命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
出现started (kafka.server.KafkaServer)字样表示启动成功
运行cmd命令行,创建一个topic,命令如下:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
再打开一个cmd,创建一个Producer,命令如下:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
再打开一个cmd,创建一个Customer,命令如下:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
在Producer窗口下输入信息进行测试 ,每输入一行回车后消息马上就会出现在Customer中,表明kafka已经安装测试成功
三:基于.net的常用类库
基于.net实现kafka的消息队列应用,常用的类库有kafka-net,Confluent.Kafka,官网推荐使用Confluent.Kafka,本文也是基于该库的实现,使用版本预发行版1.0.0-beta,创建控制台应用程序。

四:应用–生产者
生产者将数据发布到指定的主题,一般生产环境下的负载均衡,服务代理会有多个,BootstrapServers属性则为以逗号隔开的多个代理地址

/// <summary>
/// 生产者
/// </summary>
public static void Produce()
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092" }
Action<DeliveryReportResult<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");

using (var producer = new Producer<Null, string>(config))
{
// 错误日志监视
producer.OnError += (_, msg) => { Console.WriteLine($"Producer_Erro信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };

for (int i = 0; i < 5; i++)
{
// 异步发送消息到主题
producer.BeginProduce("MyTopic", new Message<Null, string> { Value = i.ToString() }, handler);
}
// 3后 Flush到磁盘
producer.Flush(TimeSpan.FromSeconds(3));
}
}
五:应用–消费者
消费者使用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在单独的进程中,也可以在不同的机器

如果所有消费者实例具有相同的消费者组,则记录将有效地在消费者实例上进行负载平衡。

如果所有消费者实例具有不同的消费者组,则每个记录将广播到所有消费者进程

上图为两个服务器Kafka群集,托管四个分区(P0-P3),包含两个消费者组。消费者组A有两个消费者实例,B组有四个消费者实例。

默认EnableAutoCommit 是自动提交,只要从队列取出消息,偏移量自动移到后一位,无论消息后续处理成功与否,该条消息都会消失,所以为免除处理失败的数据丢失,消费者方可设置该属性为false,后面进行手动commint()提交偏移

/// <summary>
/// 消费者
/// </summary>
public static void Consumer()
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetResetType.Earliest,
EnableAutoCommit = false // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失
};

using (var consumer = new Consumer<Ignore, string>(conf))
{
// 订阅topic
consumer.Subscribe("MyTopic");
// 错误日志监视
consumer.OnError += (_, msg) => { Console.WriteLine($"Consumer_Error信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };

while (true)
{
try
{
var consume = consumer.Consume();
string receiveMsg = consume.Value;
Console.WriteLine($"Consumed message ‘{receiveMsg}‘ at: ‘{consume.TopicPartitionOffset}‘.");
// 开始我的业务逻辑
...
// 业务结束
if(成功)
{
consumer.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手动提交偏移
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consumer_Error occured: {e.Error.Reason}");
}
}
}
}
执行结果

常见数据问题处理
重复消费最常见的原因:re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
去重问题:消息可以使用唯一id标识
保证不丢失消息:
生产者(ack= -1 或 all 代表至少成功发送一次)
消费者 (offset手动提交,业务逻辑成功处理后,提交offset)
保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)
Kafka 可视化调试
借助可视化客户端工具 kafka tool
具体使用可参考:https://www.cnblogs.com/frankdeng/p/9452982.html
————————————————
版权声明:本文为CSDN博主「神韵凌天」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_31176861/article/details/82853730

基于Confluent.Kafka实现的Kafka客户端操作类使用详解

一、引言

  有段时间没有写东西了,当然不是没得写,还有MongoDB的系列没有写完呢,那个系列还要继续。今天正好是周末,有点时间,来写新东西吧。最近公司用了Kafka做为消息的中间件,最开始写的那个版本不是很好,我就要来优化它,所以就抽了一些时间来研究Kafka。很多概念性的东西就不写了,今天主要是上干货,主要是代码,今天就把Kafka的消费者和生产者的代码贴出来,以供大家参考,当然这个是代码样板,最后我也会把地址贴出来。以后有时间我会把我自己实现的Kafka消息的生产者和消费者的代码贴出来。好了,话不多说,言归正传。

  说明一点,如果想调试这里的代码,必须引入Confluent.Kafka这个dll才可以,直接在Visual Studio 项目的 Nuget 里面可以查找,直接安装就可以了。

二、消息的生产者(Kafka消息的Producer)

  大多数的消息中间件都包含三个部分,一个是消息的生产者,一个是存放消息的队列,另外一个就是消息的消费者,我们就按着这个顺序,我就先把消息生产者的代码写出来。直接上代码,其实不是很难,里面有很多备注,只要有基本的概念理解起来还是很容易的。

     第一个版本,同步版本!

 1 using System;
 2 using System.IO;
 3 using System.Text;
 4 using System.Collections.Generic;
 5 using Confluent.Kafka;
 6 using Confluent.Kafka.Serialization;
 7
 8
 9 namespace Confluent.Kafka.Examples.Producer
10 {
11     public class Program
12     {
13         public static void Main(string[] args)
14         {
15             if (args.Length != 2)
16             {
17                 Console.WriteLine("Usage: .. brokerList topicName");
18                 return;
19             }
20
21             string brokerList = args[0];
22             string topicName = args[1];
23
24             var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } };
25
26             using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
27             {
28                 var cancelled = false;
29                 Console.CancelKeyPress += (_, e) => {
30                     e.Cancel = true; // 阻止进程退出
31                     cancelled = true;
32                 };
33
34                 while (!cancelled)
35                 {
36                     Console.Write("> ");
37
38                     string text;
39                     try
40                     {
41                         text = Console.ReadLine();
42                     }
43                     catch (IOException)
44                     {
45                         // IO 异常抛出的时候设置此值ConsoleCancelEventArgs.Cancel == true.
46                         break;
47                     }
48                     if (text == null)
49                     {
50                         break;
51                     }
52
53                     string key = null;
54                     string val = text;
55
56                     // 如果指定了键和值,则拆分行.
57                     int index = text.IndexOf(" ");
58                     if (index != -1)
59                     {
60                         key = text.Substring(0, index);
61                         val = text.Substring(index + 1);
62                     }
63
64                     // 在下面的异步生产请求上调用.Result会导致它阻塞,直到它完成。 通常,您应该避免同步生成,因为这会对吞吐量产生巨大影响。对于这个交互式控制台的例子,这是我们想要的。
65                     var deliveryReport = producer.ProduceAsync(topicName, key, val).Result;
66                     Console.WriteLine(
67                         deliveryReport.Error.Code == ErrorCode.NoError
68                             ? "delivered to: "+deliveryReport.TopicPartitionOffset
69                             : "failed to deliver message: "+deliveryReport.Error.Reason
70                     );
71                 }
72
73                 // 由于我们是同步的生产消息,此时不会有消息在传输并且也不需要等待消息到达的确认应答, 销毁生产者之前我们是不需要调用 producer.Flush 方法, 就像正常使用一样。
74             }
75         }
76     }
77 }

    第二个版本,异步版本,推荐使用

 1 using System;
 2 using System.IO;
 3 using System.Text;
 4 using System.Collections.Generic;
 5 using Confluent.Kafka;
 6 using Confluent.Kafka.Serialization;
 7
 8
 9 namespace Confluent.Kafka.Examples.Producer
10 {
11     public class Program
12     {
13         public static void Main(string[] args)
14         {
15             if (args.Length != 2)
16             {
17                 Console.WriteLine("Usage: .. brokerList topicName");
18                 return;
19             }
20
21             string brokerList = args[0];
22             string topicName = args[1];
23             string message="我就是要传输的消息内容";
24
25             //这是以异步方式生产消息的代码实例
26             var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } };
27             using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
28             {
29                 var deliveryReport = producer.ProduceAsync(topicName, null, message);
30                 deliveryReport.ContinueWith(task =>
31                 {
32                     Console.WriteLine("Producer: "+producer.Name+"\r\nTopic: "+topicName+"\r\nPartition: "+task.Result.Partition+"\r\nOffset: "+task.Result.Offset);
33                 });
34
35                 producer.Flush(TimeSpan.FromSeconds(10));
36            }
37         }
38     }
39 }

  好了,上面给出了两个版本的消息生产者的代码,一个是同步版本,第二个是异步版本的,推荐使用异步版本的代码实现。

三、消息的消费者(Kafka消息的Consumer)

  在消息的生产者中已经说明了消息中间件的三个部分,第一个是消息的生产者,没有消息的生产者,就没有消息的消费者了,巧妇难为无米之炊吧。在上一节我们已经写了消息生产者的代码,这一节,我们主要来贴出消息消费者的代码。代码同样很简单,注释也很全。

  1 using System;
  2 using System.Collections.Generic;
  3 using System.Linq;
  4 using System.Text;
  5 using Confluent.Kafka.Serialization;
  6
  7
  8 /// <summary>
  9 ///     演示如何使用Consumer客户端.
 10 /// </summary>
 11 namespace Confluent.Kafka.Examples.Consumer
 12 {
 13     public class Program
 14     {
 15         /// <summary>
 16         //      在这个例子中:
 17         ///         - offsets 是自动提交的。
 18         ///         - consumer.Poll / OnMessage 是用于消息消费的。
 19         ///         - 没有为轮询循环创建(Poll)二外的线程,当然可以创建
 20         /// </summary>
 21         public static void Run_Poll(string brokerList, List<string> topics)
 22         {
 23             var config = new Dictionary<string, object>
 24             {
 25                 { "bootstrap.servers", brokerList },
 26                 { "group.id", "csharp-consumer" },
 27                 { "enable.auto.commit", true },  // 默认值
 28                 { "auto.commit.interval.ms", 5000 },
 29                 { "statistics.interval.ms", 60000 },
 30                 { "session.timeout.ms", 6000 },
 31                 { "auto.offset.reset", "smallest" }
 32             };
 33
 34             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
 35             {
 36                 // 注意: 所有事件处理程序的执行都是在主线程中执行的,就是同步的。
 37
 38                 //当成功消费了消息就会触发该事件
 39                 consumer.OnMessage += (_, msg) => Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value);
 40
 41                 consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic "+end.Topic+" partition "+end.Partition+", next message will be at offset "+end.Offset);
 42
 43                 //当然发生了严重错误,比如,连接丢失或者Kafka服务器无效就会触发该事件
 44                 consumer.OnError += (_, error) => Console.WriteLine("Error: "+error);
 45
 46                 //当反序列化有错误,或者消费的过程中发生了错误,即error != NoError,就会触发该事件
 47                 consumer.OnConsumeError += (_, msg)
 48                     => Console.WriteLine("Error consuming from topic/partition/offset "+msg.Topic+"/"+msg.Partition+"/"+msg.Offset+": "+msg.Error);
 49
 50                 //成功提交了Offsets会触发该事件
 51                 consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets: "+commit.Error : "Successfully committed offsets: "+commit.Offsets);
 52
 53                 // 当消费者被分配一组新的分区时触发该事件
 54                 consumer.OnPartitionsAssigned += (_, partitions) =>
 55                 {
 56                     Console.WriteLine("Assigned partitions:"+partitions+"  "+member id: "+consumer.MemberId);
 57                     // 如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你这样做,你必须明确地调用.Assign以便消费者开始消费消息。
 58                     //开始从分区中消息消息
 59                     consumer.Assign(partitions);
 60                 };
 61
 62                 // 当消费者的当前分区集已被撤销时引发该事件。
 63                 consumer.OnPartitionsRevoked += (_, partitions) =>
 64                 {
 65                     Console.WriteLine("Revoked partitions:"+partitions);
 66                     // 如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你这样做了,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。
 67
 68                     //停止从分区中消费消息
 69                     consumer.Unassign();
 70                 };
 71
 72                 consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: "+json);
 73
 74                 consumer.Subscribe(topics);
 75
 76                 Console.WriteLine("Subscribed to:"+consumer.Subscription);
 77
 78                 var cancelled = false;
 79                 Console.CancelKeyPress += (_, e) => {
 80                     e.Cancel = true;  // 组织进程退出
 81                     cancelled = true;
 82                 };
 83
 84                 Console.WriteLine("Ctrl-C to exit.");
 85                 while (!cancelled)
 86                 {
 87                     consumer.Poll(TimeSpan.FromMilliseconds(100));
 88                 }
 89             }
 90         }
 91
 92         /// <summary>
 93         ///     在这实例中
 94         ///         - offsets 是手动提交的。
 95         ///         - consumer.Consume方法用于消费消息
 96         ///             (所有其他事件仍由事件处理程序处理)
 97         ///         -没有为了 轮询(消耗)循环 创建额外的线程。
 98         /// </summary>
 99         public static void Run_Consume(string brokerList, List<string> topics)
100         {
101             var config = new Dictionary<string, object>
102             {
103                 { "bootstrap.servers", brokerList },
104                 { "group.id", "csharp-consumer" },
105                 { "enable.auto.commit", false },
106                 { "statistics.interval.ms", 60000 },
107                 { "session.timeout.ms", 6000 },
108                 { "auto.offset.reset", "smallest" }
109             };
110
111             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
112             {
113                 // 注意:所有事件处理都是在主线程中处理的,也就是说同步的
114
115                 consumer.OnPartitionEOF += (_, end)
116                     => Console.WriteLine("Reached end of topic "+end.Topic+" partition "+end.Partition+", next message will be at offset "+end.Offset);
117
118                 consumer.OnError += (_, error)=> Console.WriteLine("Error: "+error);
119
120                 // 当反序列化有错误,或者消费的过程中发生了错误,即error != NoError,就会触发该事件
121                 consumer.OnConsumeError += (_, error)=> Console.WriteLine("Consume error: "+error);
122
123                 // 当消费者被分配一组新的分区时触发该事件
124                 consumer.OnPartitionsAssigned += (_, partitions) =>
125                 {
126                     Console.WriteLine("Assigned partitions:"+partitions+"  "+member id: "+consumer.MemberId);
127                     // 如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你这样做,你必须明确地调用.Assign以便消费者开始消费消息。
128                     //开始从分区中消息消息
129                     consumer.Assign(partitions);
130                 };
131
132                 // 当消费者的当前分区集已被撤销时引发该事件。
133                 consumer.OnPartitionsRevoked += (_, partitions) =>
134                 {
135                     Console.WriteLine("Revoked partitions:"+partitions);
136                     // 如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你这样做了,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。
137
138                     //停止从分区中消费消息
139                     consumer.Unassign();
140                 };
141
142                 consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: "+json);
143
144                 consumer.Subscribe(topics);
145
146                 Console.WriteLine("Started consumer, Ctrl-C to stop consuming");
147
148                 var cancelled = false;
149                 Console.CancelKeyPress += (_, e) => {
150                     e.Cancel = true; // 防止进程退出
151                     cancelled = true;
152                 };
153
154                 while (!cancelled)
155                 {
156                     if (!consumer.Consume(out Message<Ignore, string> msg, TimeSpan.FromMilliseconds(100)))
157                     {
158                         continue;
159                     }
160
161                     Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value);
162
163                     if (msg.Offset % 5 == 0)
164                     {
165                         var committedOffsets = consumer.CommitAsync(msg).Result;
166                         Console.WriteLine("Committed offset: "+committedOffsets);
167                     }
168                 }
169             }
170         }
171
172         /// <summary>
173         ///     在这个例子中
174         ///         - 消费者组功能(即.Subscribe +offset提交)不被使用。
175         ///         - 将消费者手动分配给分区,并始终从特定偏移量(0)开始消耗。
176         /// </summary>
177         public static void Run_ManualAssign(string brokerList, List<string> topics)
178         {
179             var config = new Dictionary<string, object>
180             {
181                 // 即使您不打算使用任何使用者组功能,也必须在创建使用者时指定group.id属性。
182                 { "group.id", new Guid().ToString() },
183                 { "bootstrap.servers", brokerList },
184                 // 即使消费者没有订阅该组,也可以将分区偏移量提交给一个组。 在这个例子中,自动提交被禁用以防止发生这种情况。
185                 { "enable.auto.commit", false }
186             };
187
188             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
189             {
190                 //总是从0开始消费
191                 consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList());
192
193                 // 引发严重错误,例如 连接失败或所有Kafka服务器失效。
194                 consumer.OnError += (_, error) => Console.WriteLine("Error: "+error);
195
196                 // 这个事件是由于在反序列化出现错误,或者在消息消息的时候出现错误,也就是 error != NoError 的时候引发该事件
197                 consumer.OnConsumeError += (_, error) => Console.WriteLine("Consume error: "+error);
198
199                 while (true)
200                 {
201                     if (consumer.Consume(out Message<Ignore, string> msg, TimeSpan.FromSeconds(1)))
202                     {
203                         Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value);
204                     }
205                 }
206             }
207         }
208
209         private static void PrintUsage()=> Console.WriteLine("Usage: .. <poll|consume|manual> <broker,broker,..> <topic> [topic..]");
210
211         public static void Main(string[] args)
212         {
213             if (args.Length < 3)
214             {
215                 PrintUsage();
216                 return;
217             }
218
219             var mode = args[0];
220             var brokerList = args[1];
221             var topics = args.Skip(2).ToList();
222
223             switch (mode)
224             {
225                 case "poll":
226                     Run_Poll(brokerList, topics);
227                     break;
228                 case "consume":
229                     Run_Consume(brokerList, topics);
230                     break;
231                 case "manual":
232                     Run_ManualAssign(brokerList, topics);
233                     break;
234                 default:
235                     PrintUsage();
236                     break;
237             }
238         }
239     }
240 }

  以上代码也有两个版本,第一个版本是自动提交Offset,第二个版本是人工提交Offset,但是代码没有分开写,只是不同的版本用了不同的方法。

四、结束

  好了,今天就写到这里了,这是一个引子,所有代码都是真实有效的,我已经全部测试过,所以大家可以放心使用或者改造成自己的消息的生产者和消息消费者的实现。原文的地址如下,https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples    ,内容差不多。不忘初心,继续努力吧。

原文地址:https://www.cnblogs.com/Leo_wl/p/11776028.html

时间: 2024-10-12 21:16:46

kafka高吞吐量的分布式发布订阅的消息队列系统的相关文章

高吞吐量的分布式发布订阅消息系统Kafka--安装及测试

一.Kafka概述 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素. 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决. 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案.Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费. 二.Kafka相关

一脸懵逼学习KafKa集群的安装搭建--(一种高吞吐量的分布式发布订阅消息系统)

1:KafKa的官方网址:http://kafka.apache.org/ 开发流程图,如: 2:KafKa的基础知识: 2.1:kafka是一个分布式的消息缓存系统2.2:kafka集群中的服务器都叫做broker2.3:kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接2.4:kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被分区,以分担消息读写的负载2.

高吞吐量的分布式发布订阅消息系统Kafka-- 管理工具 Kafka Manager

一.概述 Kafka在雅虎内部被很多团队使用,媒体团队用它做实时分析流水线,可以处理高达20Gbps(压缩数据)的峰值带宽. 为了简化开发者和服务工程师维护Kafka集群的工作,构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager.这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况.它支持管理多个集群.选择副本.副本重新分配以及创建Topic.同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具. 该软件

高吞吐量的分布式发布订阅消息系统Kafka--spring-integration-kafka的应用

一.概述 Spring Integration Kafka 是基于 Apache Kafka 和Spring Integration来集成Kafka,对开发配置提供了方便. 二.配置 1.spring-kafka-consumer.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"

RabbitMQ实例教程:发布/订阅者消息队列

消息交换机(Exchange) RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列. 相反的,生产者只能发送消息给交换机(Exchange).交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中.交换机必须清楚的知道消息如何处理它收到的每一条消息.是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义. 交换机的类型有:direct,topic,head

Kafka logo分布式发布订阅消息系统 Kafka

kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息. 支持通过kafka服务器和消费机集群来分区消息. 支持Hadoop并行数据加载. 卡夫卡的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素.

Kafka是分布式发布-订阅消息系统

https://www.biaodianfu.com/kafka.html Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据. 在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转.传统的企业消息系统并不是非常适合大规模的数据处理.为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志

分布式发布订阅消息系统 Kafka 架构设计[转]

分布式发布订阅消息系统 Kafka 架构设计 转自:http://www.oschina.net/translate/kafka-design 我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部

分布式发布订阅消息系统 Kafka 架构设计

我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部分.活动数据包括页面访问量(page view).被查看内容方面的信息以及搜索情况等内容.这种数据通常的处理方式是先把各种活动以日志的形式写入某