架构设计:系统间通信(29)——Kafka及场景应用(中2)

接上文:《架构设计:系统间通信(28)——Kafka及场景应用(中1)

4-3、复制功能

我们在上文中已经讨论了Kafka使用分区的概念存储消息,一个topic可以有多个分区它们分布在整个Kafka集群的多个Broker服务节点中,并且一条消息只会按照消息生产者的要求进入topic的某一个分区。那么问题来了:如果某个分区中的消息在被消费端Pull之前,承载该分区的Broker服务节点就因为各种异常原因崩溃了,那么在这个Broker重新启动前,消费者就无法收到消息了。

为了解决这个问题,Apache Kafka在V 0.8+版本中加入了复制功能:让topic下的每一个分区存储到多个Broker服务节点上,并由Zookeeper统一管理它们的状态。

请注意Kafka中Partition(分区)和replication(复制)是两个完全不同的概念,很多读者容易将这两个概念混淆——虽然它们都和“如何存储消息”这件事情有关:前者是说将若干条消息按照一定的规则分别存放在不同的区域,一条消息只存入一个区域(且Topic下多个分区可以存在于同一个Broker上);后者是说,为了保证消息在被消费前不会丢失,需要将某一个区域中的消息集合复制出多个副本(同一个分区的多个副本不能存放在同一个Broker上)。

Kafka将分区的多个副本分为两种角色:Leader和Follower,Leader Broker是主要服务节点,消息只会从消息生产者发送给Leader Broker,消息消费者也只会从Leader Broker中Pull消息。Follower Broker为副本服务节点,正常情况下不会公布给生产者或者消费者直接进行操作。Follower Broker服务节点将会主动从Leader Broker上Pull消息。

在这种工作机制下,Follower和Leader的消息复制过程由于Follower服务节点的性能、压力、网络等原因,它们和Leader服务节点会有一个消息差异性。当这个差异性扩大到一定的范围,Leader节点就会认为这个Follower节点再也跟不上自己的节奏,导致的结果就是Leader节点会将这个Follower节点移出“待同步副本集”ISR(in-sync replicas),不再关注这个Follower节点的同步问题。

只有当ISR中所有分区副本全部完成了某一条消息的同步过程,这条消息才算真正完成了“记录”操作。只有这样的消息才会发送给消息消费者。至于这个真正完成“记录”操作的通知是否能返回给消息生产者,完全取决于消息生产者采用的acks模式(后文会讲到)。

现在我们可以回过头看看上文中4-1-3-5小节给出的“查看Topic状态”命令以及命令结果:

# 脚本命令范例
kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2

# 显示的结果
Topic:my_topic2 PartitionCount:4        ReplicationFactor:2     Configs:
        Topic: my_topic2        Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: my_topic2        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: my_topic2        Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: my_topic2        Partition: 3    Leader: 1       Replicas: 1,2   Isr: 1,2

以上命令行用于显示指定topic名称的基本状态信息。Partition表示分区号,Replicas表示所有副本的所在位置的Broker.id信息,Isr表示当前状态正常可以进行消息复制的副本所在位置的Broker.id信息。

那么从命令结果来看,名叫“my_topic2”的topic一共有4个数据分区,每一个分区有两个副本。其中:0号分区的Leader Broker服务节点的id为2,0号分区的两个副本分别在id为2和id为1的Broker服务节点上,且id为2和id为1的Broker上的副本状态都是正常的;同理,1号分区的Leader Broker服务节点的id为1,1号分区的两个副本分别在id为2和id为1的Broker服务节点上,且id为2和id为1的Broker上的副本状态都是正常的。。。

4-4、Kafka原理:生产者

请注意之前我们给出的Kafka集群方案的示意图,在图中消息生产者并没有连接到zookeeper协调服务,而是直接和多个Kafka Server Brokers建立了连接。和其他种类的消息队列的设计不同,在整个Kafka方案中消息生产者(Producer)会有很多重要规则的决定权,例如:

  • 消费生产者(Producer)可以决定向指定的Topic的哪一个分区(Partition)发送消息。而不是由Broker来决定。
  • 消息生产者(Producer)可以决定消息达到Kafka Broker后,Producer对消息的一致性关注到什么样的级别,又或者根本不关心消息在Broker上的一致性问题。
  • 消息生产者(Producer)可以决定是以同步方式(sync)还是异步方式(aSync)向Broker Server List发送消息。
  • 在异步方式下,消费生产者(Producer)还可以决定以什么样的间隔(周期)向Broker Server List发送消息。
  • 随机选定Broker Server List中某一个服务节点,读取当前Topic下的分区和复制表信息,并保存在本地Pool中的工作也是由消息生产者(Producer)主动完成。
  • 另外,Kafka中的消息生产者没有类似ActiveMQ中那样的事务机制(可参见文章《架构设计:系统间通信(23)——提高ActiveMQ工作性能(中)》)。这样的设计和Kafka主要的业务场景有关——用来收集各种操作日志。这样的场景对消息的可靠性要求并不高:漏掉一两条日志并不影响后端大数据平台对日志数据的分析结果;而且这样的设计大量简化了Broker的设计结构:它不需要像ActiveMQ那样专门为达成传输但还未进行commit的消息专门创建存储区域“transaction store”,并在进行了commit或者rollback操作后进行标记。这种处理机制是Apache Kafka高效性能的又一种保障。
  • Kafka中的多个消息生产者(Producer)并不需要ZooKeeper服务中的任何信息为它们协调发送过程,因为没有什么可协调的。生产者唯一需要知道的Topic有多少个分区以及每个分区,分别存在于哪些Broker上的信息都是来源于对某一个Broker的直接查询。所以Kafka集群中只剩下了Broker和Consumer需要进行协调(这个问题会在后文中进行详细讨论)。
  • 这是分布式系统建设思想中一个重要的原则——不可滥用协调装置:完成同一件工作时,协调N个参与角色要比协调N-1个参与角色耗费更多的时间和性能;所以,只协调需要协调的角色,只通知需要通知的事件,只为协调过程存储必要的数据。我在后续的写作中,会专门为读者详细介绍Kafka中消息生产者的实现过程,这里面有很多设计思想可以在各位的实际工作中借鉴。

4-4-1、基本使用

下面的代码使用Kafka的Java Client API演示消息生产者的使用。这里我们使用的Kafka Java Client API的版本是V0.8.2.2,您可以直接引入Maven的官方库依赖即可:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.2</version>
</dependency>

以下是Kafka消息生产者的代码,之前我们已经通过Kafka的命令脚本创建了一个拥有4个分区的Topic——my_topic2:

package kafkaTQ;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * kafka消息生产者演示,
 * @author yinwenjie
 */
public class KafkaProducer {

    public static void main(String[] args) throws RuntimeException {
        Properties props = new Properties();
        // 指定kafka节点列表,不需要由zookeeper进行协调
        // 并且连接的目的也不是为了发送消息,而是为了在这些节点列表中选取一个,来获取topic的分区状况
        props.put("metadata.broker.list", "192.168.61.138:9092");
        // 使用这个属性可以指定“将消息送到topic的哪一个partition中”,如果业务规则比较复杂的话可以指定分区控制器
        // 不过开发者最好要清楚topic有多少个分区,这样才好进行多线程(负载均衡)发送
        //props.put("partitioner.class", "kafkaTQ.PartitionerController");
        // 可以通过这个参数控制是异步发送还是同步发送(默认为“同步”)
        //props.put("producer.type", "async");
        // 可以通过这个属性控制复制过程的一致性规则
        //props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);

        // 创建消费者
        Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);

        // 由于我们为topic创建了四个partition,所以将数据分别发往这四个分区
        for (Integer partitionIndex = 0; ; partitionIndex++) {
            Date time = new Date();
            // 创建和发送消息,可以指定这条消息的key,producer根据这个key来决定这条消息发送到哪个parition中
            // 另外一个可以决定parition的方式是实现kafka.producer.Partitioner接口
            String messageContext_Value = "this message from producer 由producer指的partitionIndex:[" + partitionIndex % 4 + "]" + time.getTime();
            System.out.println(messageContext_Value);
            byte[] messageContext = messageContext_Value.getBytes();
            byte[] key = partitionIndex.toString().getBytes();

            // 这是消息对象,请注意第二个参数和第三个参数,下一小节将会进行详细介绍
            KeyedMessage<byte[], byte[]> message = new KeyedMessage<byte[], byte[]>("my_topic2", key , partitionIndex % 4 ,  messageContext);
            producer.send(message);

            // 休息0.5秒钟,循环发
            synchronized (KafkaProducer.class) {
                try {
                    KafkaProducer.class.wait(500);
                } catch (InterruptedException e) {
                    e.printStackTrace(System.out);
                }
            }
        }
    }
}

4-4-2、生产者指定分区

开发人员可以在消息生产者端指定发送的消息将要传送到Topic下的哪一个分区(partition),但前提条件是开发人员清楚这个Topic有多少个分区,否则开发人员就不知道怎么编写代码了。当然开发人员也可以完全忽略决定分区的规则,这时将由消费者端携带的一个默认规则决定。

开发人员可以有两种方式进行分区指定:第一种方法是以上代码片段中演示的那样,在创建消息对象KeyedMessage时,指定方法中partKey/key的值;另一种方式是重新实现kafka.producer.Partitioner接口,以便覆盖掉默认实现。

使用KeyedMessage类构造消息对象时,可以指定4个参数,他们分别是:topic名称、消息Key、分区Key和message消息内容。topic名称和message消息内容很容易理解,但是怎样理解消息Key和分区Key呢?以下是KeyedMessage类的源代码(Scala语言):

package kafka.producer

/**
 * A topic, key, and value.
 * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
 */
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
  if(topic == null)
    throw new IllegalArgumentException("Topic cannot be null.")

  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

  def this(topic: String, key: K, message: V) = this(topic, key, key, message)

  def partitionKey = {
    if(partKey != null)
      partKey
    else if(hasKey)
      key
    else
      null
  }

  def hasKey = key != null
}

KeyedMessage类的构造函数中有一个局部变量:partitionKey,在KeyedMessage类的首行注释中,对该变量进行了一个说明:

If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.

从源码中可以看出,partitionKey优先使用partKey作为分区依据,如果partKey没有被赋值,则使用key作为分区依据。所以在使用KeyedMessage类的构造函数时,partKey和key您只需要指定其中的一个就完全够了。

您还可以实现kafka.producer.Partitioner接口,并在创建消费者对象时进行指定,以便实现分区的指定(如果不进行指定,默认的实现类为“kafka.producer.DefaultPartitioner”)。代码片段如下:

package kafkaTQ;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class PartitionerController implements Partitioner {

    /**
     * 必须要有这个构造函数
     * @param vp
     */
    public PartitionerController(VerifiableProperties vp) {

    }

    /* (non-Javadoc)
     * @see kafka.producer.Partitioner#partition(java.lang.Object, int)
     */
    @Override
    public int partition(Object parKey, int partition) {
        /*
         * 在这里您可以根据自身的业务过程重新运算一个分区,并进行返回。
         */
        Integer parKeyValue = (Integer)parKey;
        return parKeyValue;
    }
}

需要实现的partition方法中,第一个参数是您在创建消息时所传递的partyKey(这是的partyKey不一定传入Integer),第二个参数是send方法根据自身内部机制决定的目标分区。

4-4-3、同步和异步发送

消息生产这还可以决定是以同步方式向Broker发送消息还是以异步方式向Broker发送消息。只需要使用生产者配置中的“producer.type”属性进行指定。当该属性值为“sync”时,表示使用同步发送的方式;当该属性值为“async”时,表示使用异步发送方式。

在异步发送方式下,开发人员调用send方法发送消息时,这个消息并不会立即被发送到topic指定的Leader partition所在的Broker,而是会存储在本地的一个缓冲区域(一定注意是客户端本地)。当缓冲区的状态满足最长等待时间或者最大数据量条数时,消息会以一个设置值批量发送给Broker。如下图所示:

缓存区的数据按照batch.num.messages设置的数值被一批一批的发送给目标Broker(默认为200条),如果消息的滞留时间超过了queue.buffering.max.ms设置的值(单位毫秒,默认值为5000)就算没有达到batch.num.messages的数值,消息也会被发送。

如果由于Broker的原因导致消息发送缓慢,这时在本地待发送消息缓存区中的消息就有可能达到

queue.buffering.max.messages设置的缓存区允许存储的最大消息数量,一旦达到这个数量消息生产者端再次调用send方法的时候,send方法所在线程就会被阻塞,直到缓存区有足够的空间能够放下新的数据为止。

4-4-4、强一致性复制和弱一致性复制

Kafka中的消息生产者还可以配置发送的消息在Broker端以哪种方式进行副本复制:强一致性复制还是弱一致性复制,又或者不关注消息的一致性。(在分布式系统中强一致性、弱一致性和最终一致性是一个非常关键的知识点,它们是CAP原则重要的实践,我将会在“存储”专题中进行对它们的定义和主流的实现方式进行讲解)

在Kafka的实现中,强一致性复制是指当Leader Partition收到消息后,将在所有Follower partition完成这条消息的复制后才认为消息处理成功,并向消息生产者返回ack信息;弱一致性复制是指当Leader partition收到消息后,只要Leader Broker自己完成了消息的存储就认为消息处理成立,并向消息生产者返回ack信息(复制过程随后由Broker节点自行完成);

您可以通过消息生产者配置中的“request.required.acks”属性来设置消息的复制性要求。在官方文档中,对于这个属性的解释是:

acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect . The offset given back for each record will always be set to -1.

当acks设置为0时,生产者端不会等待Server Broker回执任何的ACK确认信息。只是将要发送的消息交给网络层。这种情况下,消息是否真的到达了Server Broker,实际上生产者端并不知道。由于生产者端并不等待Server Broker回执任何的ACK确认信息,那么消息一旦传输失败(例如,等待超时的情况)“重试”过程就无从谈起了。由于生产者端在这种情况下发送的消息,很可能Server Broker还没来得及处理,甚至更有可能Server Broker都没有接收到,所以Server Broker也无法告知生产者这条消息在分区中的偏移位置。

acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

当acks设置为1时,生产者发送消息将等待这个分区的Leader Server Broker 完成它本地的消息记录操作,但不会等待这个分区下其它Follower Server Brokers的操作。在这种情况下,虽然Leader Server Broker对消息的处理成功了,也返回了ACK信息给生产者端,但是在进行副本复制时,还是可能失败。

acks=all (#注:原文如此,实际上属性值为“-1”)This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.

当acks设置为“all”时,消息生产者发送消息时将会等待目标分区的Leader Server Broker以及所有的Follower Server Brokers全部处理完,才会得到ACK确认信息。这样的处理逻辑下牺牲了一部分性能,但是消息存储可靠性是最高的。

4-4-5、生产者需要查询zookeeper?

在2013年2月2日,Kafka的主要参与者Neha Narkhede发表了一篇讲解Kafka Replication过程的技术文档(算是官方文档)《Kafka Replication》,在这篇文档的Synchronous replication-write章节Neha Narkhede这样描述了“写”过程前的准备工作:

To publish a message to a partition, the client first finds the leader of the partition from Zookeeper and sends the message to the leader。

这句话的大意是:为了发送消息到一个分区,客户端首先要通过zookeeper查询到这个分区的Leader Broker在哪个位置,并且向这个Leader Broker发送信息。国内一些译文由此也形成了相关的中文描述。这显然与本文中提到的“生产者不需要连接zookeeper进行任何协调操作”的描述完全矛盾

4-4-5-1、进行实验

这里冲突的重点在于“生产者在发送消息时,是直接连接到了zookeeper服务查询相关信息,还是连接到某一个已知的Broker查询现信息?”

那么我们只能以实验的形式实际验证一下消息生产者在创建、发送消息的过程中是否需要连接zookeeper。实际上笔者通过阅读0.8.2.2版本的JAVA Client For Producer API 部分的的源码,真没有发现Producer直接连接zookeeper的证据(主要的类位置包括:kafka.producer.OldProducer、kafka.producer.ProducerPool、kafka.producer.SyncProducer、org.apache.kafka.clients.producer.internals.Sender和org.apache.kafka.clients.producer.KafkaProducer)。但是这显然不具有太大的说服力,毕竟很可能出现漏读代码的情况。

验证实验基于之前我们已经搭建的Apacke Kafka集群环境,192.168.61.140服务器上运行着一个standalone模式的zookeeper服务。在实验中,我们使用192.168.61.140服务器上自带的防火墙,设置只有两个Kafka Broker服务节点(139和138)能够访问zookeeper上的2181端口。并在这种情况下观察消息生产者的工作情况(以及相同topic下的消费者是能正常收到生产者发送的消息)。如下图所示:

  • 设置192.168.61.140上的防火墙,只允许192.168.61.139和192.168.61.138访问其2181端口:
[[email protected] ~]# service iptables status
Table: filter
Chain INPUT (policy ACCEPT)
num  target     prot opt source               destination
1    ACCEPT     tcp  --  192.168.61.138       0.0.0.0/0           tcp dpt:2181
2    ACCEPT     tcp  --  192.168.61.139       0.0.0.0/0           tcp dpt:2181
3    ACCEPT     icmp --  0.0.0.0/0            0.0.0.0/0
4    REJECT     all  --  0.0.0.0/0            0.0.0.0/0           reject-with icmp-host-prohibited 

Chain FORWARD (policy ACCEPT)
num  target     prot opt source               destination
1    REJECT     all  --  0.0.0.0/0            0.0.0.0/0           reject-with icmp-host-prohibited 

Chain OUTPUT (policy ACCEPT)
num  target     prot opt source               destination

在全端口开放ICMP协议,只是为了能够使用ping命令进行检查。

  • 接下来启动140上的zookeeper服务,并且验证一下在140分别开启和关闭防火墙的情况下,producer所在的服务节点是否能够连接到zookeeper
# 在140上启动zookeeper,并且确定它以standalone模式运行
[[email protected] ~]# zkServer.sh start
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

[[email protected] ~]# zkServer.sh status
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone

首先测试在140节点开启防火墙的情况下,producer所在的192.168.61.1服务节点是否能顺利连接到2181端口(使用telnet命令):

telnet 192.168.61.140 2181

Trying 192.168.61.140...
telnet: connect to address 192.168.61.140: Connection refused

然后关闭140上的防火墙,再使用同样的telnet命令进行测试:

telnet 192.168.61.140 2181

Trying 192.168.61.140...
Connected to 192.168.61.140.
Escape character is ‘^]‘.

可以看到140启动防火墙后,192.168.61.1服务节点不能连接到140服务的2181端口。这说明我们设置的实验前提的确起到了限制192.168.61.1节点访问140节点上zookeeper服务的作用。

接下来我们重新开启140上防火墙,启动140上的zookeeper服务,启动139和138上的Kafka Broker服务,让整个Kafka Broker集群工作起来。正式开始进行实验:

// 生产者的测试代码就采用4-4-1小节中我们给出的代码样例。
// 很显然在140开启防火墙,producer无法连接zookeeper的情况下
// producer也能够正常工作。以下是producer程序打印的运行信息

this message from producer 由producer指的partitionIndex:[0]1462439421320
this message from producer 由producer指的partitionIndex:[1]1462439429482
this message from producer 由producer指的partitionIndex:[2]1462439437655
this message from producer 由producer指的partitionIndex:[3]1462439441987
  • 本编文章已经介绍过,消息能够发送出去不一定代表Kafka Broker集群工作正常。在消息复制、回执ACK等环节还是可能引起错误。只有消息消费者收到了消息,才能认为整个消息接受、处理、发送过程都是成功的

所以为了确认这些发送出去的消息能够被消费者接收到,在进行producer测试工作的同时,我们在138节点上,使用kafka-console-consumer运行了一个对应的消费者以便接收数据(不能在138节点或者139节点以外运行consumer,因为无法连接zookeeper服务节点的2181端口)。以下是消费者接收到的信息:

[[email protected] ~]# kafka-console-consumer.sh --zookeeper 192.168.61.140:2181 --topic my_topic2
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
this message from producer 由producer指的partitionIndex:[0]1462439421320
this message from producer 由producer指的partitionIndex:[1]1462439429482
this message from producer 由producer指的partitionIndex:[2]1462439437655
this message from producer 由producer指的partitionIndex:[3]1462439441987

4-4-5-2、结果分析

从以上小节的实验情况,我们看到的结果是:Producer所在的服务节点192.168.61.1,在不能访问192.168.61.140节点上zookeeper服务的情况下,包括Producer在内的整个Kafak集群能够正常工作,消费者端能够正常消费数据。那么问题来了,作为Kafka的主要参与者Neha Narkhede在这样的官方文档中是不太可能出现这样的低级错误的,那么是什么原因呢?当然如果真要说到出错,那么笔者自己出错的可能性倒是要高得多。笔者认为造成这种冲突的原因可能有以下几种:

  • 这篇文章是在2013年2月份发布的,那时候主流的Kafka版本是V0.7.X。但是笔者在实际工作中并没有使用任何V0.7.X版本,所以对V0.7.X版本中是否需要生产者连接zookeeper并没有确定的答案。
  • 在Neha Narkhede的这段话中,Client并不是指代的消息生产者,而是泛指的使用zookeeper服务的各种客户端角色。如果是这样的话,那么Client最有可能指代的就是Server Broker。
  • 以上实验中,并没有限制住producer直接访问zookeeper的所有情况。防火墙功能可能出现了问题,又或者producer自行通过一个隐藏的端口(例如:9999)访问到了zookeeper。
  • 笔者还没有考虑到的其它可能性。欢迎各位读者留言讨论。

========================================

(接下文)

时间: 2024-07-31 10:30:22

架构设计:系统间通信(29)——Kafka及场景应用(中2)的相关文章

架构设计:系统间通信(30)——Kafka及场景应用(中3)

接上文:<架构设计:系统间通信(29)--Kafka及场景应用(中2)> 4-5.Kafka原理:消费者 作为Apache Kafka消息队列,它的性能指标相当一部分取决于消费者们的性能--只要消息能被快速消费掉不在Broker端形成拥堵,整个Apache Kafka就不会出现性能瓶颈问题. 4-5-1.基本使用 我们首先使用Kafka Client For JAVA API为各位读者演示一下最简单的Kafka消费者端的使用.以下示例代码可以和上文中所给出的生产者代码相对应,形成一个完整的消息

架构设计:系统间通信(28)——Kafka及场景应用(中1)

(接上文<架构设计:系统间通信(27)--其他消息中间件及场景应用(上)>) 在本月初的写作计划中,我本来只打算粗略介绍一下Kafka(同样是因为进度原因).但是,最近有很多朋友要求我详细讲讲Kafka的设计和使用,另外两年前我在研究Kafka准备将其应用到生产环境时,由于没有仔细理解Kafka的设计结构所导致的问题最后也还没有进行交代.所以我决定即使耽误一些时间,也要将Kafka的原理和使用场景给读者详细讨论讨论.这样,也算是对两年来自己学习和使用Kafka的一个总结. 4.Kafka及特性

架构设计:系统间通信(36)——Apache Camel快速入门(上)

1.本专题主旨 1-1.关于技术组件 在这个专题中,我们介绍了相当数量技术组件:Flume.Kafka.ActiveMQ.Rabbitmq.Zookeeper.Thrift .Netty.DUBBO等等,还包括本文要进行介绍的Apache Camel.有的技术组件讲得比较深入,有的技术组件则是点到为止.于是一些读者朋友发来信息向我提到,这个专题的文章感觉就像一个技术名词的大杂烩,并不清楚作者的想要通过这个专题表达什么思想. 提出这个质疑的朋友不在少数,所以我觉得有必要进行一个统一的说明.这个专题

架构设计:系统间通信(32)——其他消息中间件及场景应用(下2)

(接上文<架构设计:系统间通信(31)--其他消息中间件及场景应用(下1)>) 5-3.解决方案二:改进半侵入式方案 5-3-1.解决方法一的问题所在 方案一并不是最好的半侵入式方案,却容易理解架构师的设计意图:至少做到业务级隔离.方案一最大的优点在于日志采集逻辑和业务处理逻辑彼此隔离,当业务逻辑发生变化的时候,并不会影响日志采集逻辑. 但是我们能为方案一列举的问题却可以远远多于方案一的优点: 需要为不同开发语言分别提供客户端API包.上文中我们介绍的示例使用JAVA语言,于是 事件/日志采集

架构设计:系统间通信(33)——其他消息中间件及场景应用(下3)

=================================== (接上文:<架构设计:系统间通信(32)--其他消息中间件及场景应用(下2)>) 5-7.解决方案三:非侵入式方案 以上两种方案中为了让业务系统能够集成日志采集功能,我们或多或少需要在业务系统端编写一些代码.虽然通过一些代码结构的设计,可以减少甚至完全隔离这些代码和业务代码的耦合度,但是毕竟需要业务开发团队花费精力对这些代码进行维护,业务系统部署时业务对这些代码的配置信息做相应的调整. 这里我们再为读者介绍一种非侵入式的日

架构设计:系统间通信(26)——ActiveMQ集群方案(下)

(接上文<架构设计:系统间通信(26)--ActiveMQ集群方案(上)>) 3.ActiveMQ热备方案 ActiveMQ热备方案,主要保证ActiveMQ的高可用性.这种方案并不像上节中我们主要讨论的ActiveMQ高性能方案那样,同时有多个节点都处于工作状态,也就是说这种方案并不提高ActiveMQ集群的性能:而是从集群中的多个节点选择一个,让其处于工作状态,集群中其它节点则处于待命状态.当主要的工作节点由于各种异常情况停止服务时,保证处于待命的节点能够无缝接替其工作. 3-1.Acti

架构设计:系统间通信(45)——阶段性问题记录

到此为止 <架构设计:系统间通信>专题就暂时告一段落了.这边文章笔者用于暂时记录这个专题中还需要补充的内容,并在后续的整理中足一补上: 退避算法和退避规则,以及其应用场景 系统间通信的性能指标.还需要进行说明. 典型的RPC框架,还需要增加Apache Avro的介绍 关于Netty的过滤器和执行器的执行顺序问题 RESET知识点的讲解和区别对比 对apache thrift IDL的描述好像有点问题,特别是在默认值上面. 关于RPC接口泛化的问题,还要结合dubbo的设计进行一下说明 补重要

架构设计:系统间通信(31)——其他消息中间件及场景应用(下1)

接上文:<架构设计:系统间通信(30)--Kafka及场景应用(中3)> 5.场景应用--电商平台:浏览记录收集功能 事件/日志收集系统是大中型软件不得不面对的话题.目前第三方业务系统对 事件/日志收集系统 的集成思路主要有两大类:侵入式收集方案和非侵入式收集方案.侵入式收集方案,是指任何需要使用事件/日志收集系统的第三方系统,都需要做有针对的编码工作,这个编码工作或者是新增代码用于调用 事件/日志收集系统 提供的客户端API,又或者是修改已有的代码,以便适应事件/日志收集系统的调用特性. 侵

架构设计:系统间通信(40)——自己动手设计ESB(1)

1.概述 在我开始构思这几篇关于"自己动手设计ESB中间件"的文章时,曾有好几次动过放弃的念头.原因倒不是因为对冗长的文章产生了惰性,而是ESB中所涉及到的技术知识和需要突破的设计难点实在是比较多,再冗长的几篇博文甚至无法对它们全部进行概述,另外如果在思路上稍微有一点差池就会误导读者.一个可以稳定使用的ESB中间件凝聚了一个团队很多参与者的心血,一个人肯定是无法完成这些工作的.但是笔者思索再三,还是下决心将这这即便文章完成,因为这是对本专题从第19篇文章到第39篇文章中所介绍的知识点的