KafkaSpout 浅析

最近在使用storm做一个实时计算的项目,Spout需要从 KAFKA 集群中读取数据,为了提高开发效率,直接使用了Storm提供的KAFKA插件。今天抽空看了一下KafkaSpout的源码,记录下心得体会。

KafkaSpout基于kafka.javaapi.consumer.SimpleConsumer实现了consumer客户端的功能,包括 partition的分配,消费状态的维护(offset)。同时KafkaSpout使用了storm的可靠API,并实现了spout的ack 和 fail机制。KafkaSpout的基本处理流程如下:

1. 建立zookeeper客户端,在zookeeper zk_root + "/topics/" + _topic + "/partitions" 路径下获取到partition列表
2. 针对每个partition 到路径Zk_root + "/topics/" + _topic + "/partitions"+"/" + partition_id + "/state"下面获取到leader partition 所在的broker id
3. 到/broker/ids/broker id 路径下获取broker的host 和 port 信息,并保存到Map中Partition_id –-> learder broker
4. 获取spout的任务个数和当前任务的index,然后再根据partition的个数来分配当前spout 所消费的partition列表
5. 针对所消费的每个broker建立一个SimpleConsumer对象用来从kafka上获取数据
6. 提交当前partition的消费信息到zookeeper上面保存

下面对几个关键点进行下分析:

一、partition 的分配策略

1. 在KafkaSpout中获取spout的task的个数,也就是consumer的个数,代码如下:

int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();

2. 在KafkaSpout中获取当前spout的 task index,注意,task index和task id是不同的,task id是当前spout在整个topology中的id,而task index是当前spout在组件中的id,取值范围为[0, spout_task_number-1],代码如下:

_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);

3. 获取partiton与leader partition所在broker的映射关系,代码的调用顺序如下:

ZkCoordinator:

GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();

DynamicBrokersReader:

  /**
 * Get all partitions with their current leaders
 */
public GlobalPartitionInformation getBrokerInfo() throws SocketTimeoutException {
  GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
    try {
        int numPartitionsForTopic = getNumPartitions();
        String brokerInfoPath = brokerPath();
        for (int partition = 0; partition < numPartitionsForTopic; partition++) {
            int leader = getLeaderFor(partition);
            String path = brokerInfoPath + "/" + leader;
            try {
                byte[] brokerData = _curator.getData().forPath(path);
                Broker hp = getBrokerHost(brokerData);
                globalPartitionInformation.addPartition(partition, hp);
            } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
                LOG.error("Node {} does not exist ", path);
            }
        }
    } catch (SocketTimeoutException e) {
			throw e;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
    return globalPartitionInformation;
}

4. 获取当前spout消费的partition

KafkaUtils:

    public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
        Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
        //获取所有的排序后的partition列表
        List<Partition> partitions = partitionInformation.getOrderedPartitions();
        int numPartitions = partitions.size();
        if (numPartitions < totalTasks) {
            LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
        }
        List<Partition> taskPartitions = new ArrayList<Partition>();
        //此处是核心分配算法,举个例子来说明分配策略
        //假设spout的并发度是3,当前spout的task index 是 1,总的partition的个数为5,那么当前spout消费的partition id为1,4
        for (int i = taskIndex; i < numPartitions; i += totalTasks) {
            Partition taskPartition = partitions.get(i);
            taskPartitions.add(taskPartition);
        }
        logPartitionMapping(totalTasks, taskIndex, taskPartitions);
        return taskPartitions;
    }

二、partition的更新策略

如果出现broker宕机,spout挂掉的情况,那么spout是要重新分配parition的,KafkaSpout并没有监听zookeeper上broker、partition和其他spout的状态,所以当有异常发生的时候KafkaSpout并不知道的,它采用了两种方法来更新partition的分配。

1. 定时更新

根据ZkHosts中的refreshFreqSecs字段来定时更新partition列表,我们可以通过修改配置来更改定时刷新的间隔。每一次调用kafkaspout的nextTuple方法时,都会首先调用ZkCoordinator的getMyManagedPartitions方法来获取当前spout消费的partition列表

  public void nextTuple() {
        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();

 //getMyManagedPartitions方法中会判断是否已经到了该刷新的时间,如果到了就重新分配partition
  public List<PartitionManager> getMyManagedPartitions() {
  if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
      refresh();
      _lastRefreshTime = System.currentTimeMillis();
  }
  return _cachedList;
}

2.异常更新

当调用kafkaspout的nextTuple方法出现异常时,强制更新当前spout的partition消费列表

    public void nextTuple() {
        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
        for (int i = 0; i < managers.size(); i++) {

            try {

                EmitState state = managers.get(_currPartitionIndex).next(_collector);

            } catch (FailedFetchException e) {
                _coordinator.refresh();
            }
        }

三、消费状态的维护

1.首先要分析一下当spout启动的时候是怎么获取初始offset的。在每个spout获取到消费的partition列表时,会针对每个partition来创建PartitionManager对象,下面看一下PartitionManager的初始化过程:

 public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
    _partition = id;
    _connections = connections;
    _spoutConfig = spoutConfig;
    _topologyInstanceId = topologyInstanceId;
    //到连接池里注册partition和partition leader所在的broker host,如果连接池里有该broker的连接,则直接返回该连接、
    //如果连接池里没有,则建立broker的连接,并返回连接
    _consumer = connections.register(id.host, id.partition);
    _state = state;
    _stormConf = stormConf;
    numberAcked = numberFailed = 0;

    String jsonTopologyId = null;
    Long jsonOffset = null;
    //获取zookeeper上offset的提交路径
    String path = committedPath();
    try {
    //从提交路径上读取信息,提取记录的该partition的消费offset
    //如果zookeeper上没有该路径则表示当前topic没有被spout消费过
        Map<Object, Object> json = _state.readJSON(path);
        LOG.info("Read partition information from: " + path +  "  --> " + json );
        if (json != null) {
            jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
            jsonOffset = (Long) json.get("offset");
        }
    } catch (Throwable e) {
        LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
    }

    //从broker上获取当前partition的offset,默认为获取最新的offset,如果用户配置forceFromStart(KafkaConfig),则获取该partition最早的offset,
    //也就是consume from beginning
    Long currentOffset = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);

    //情况1: 如果从zookeeper上没有获取topology和消费信息,则直接用从broker上获取到的offset
    if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
        _committedTo = currentOffset;
        LOG.info("No partition information found, using configuration to determine offset");
    //情况2: 获取到的topology id 不一致 或者用户要求从新获取数据的时候,则从kafka上获取offset
    //可以和情况1 合并,在KafkaUtils.getOffset已经判断过forceFromStart,此处无需再次判断
    } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
        _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
        LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
    }
    //情况3: 使用zookeeper上保留的offset进行消费
    else {
        _committedTo = jsonOffset;
        LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
    }
    //如果上次消费的offset已经过了保质期,则直接消费新数据
    if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
        LOG.info("Last commit offset from zookeeper: " + _committedTo);
        _committedTo = currentOffset;
        LOG.info("Commit offset " + _committedTo + " is more than " +
                spoutConfig.maxOffsetBehind + " behind, resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
    }

    LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
    _emittedToOffset = _committedTo;

}

2. 然后看一下partition消费offset是怎么保存和维护的

PartitionManager 中的 _emittedToOffset用来保存当前消费的offset,在每一次获取到消息的时候都会更新这个值

 private void fill() {

              if (!had_failed || failed.contains(cur_offset)) {
                  numMessages += 1;
                  _pending.add(cur_offset);
                  _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
                  //更新_emittedToOffset
                  _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
                  if (had_failed) {
                      failed.remove(cur_offset);
                  }
              }
          }
          _fetchAPIMessageCount.incrBy(numMessages);
      }
  }

3.提交offset到zookeeper

offset的提交是周期性的,提交的周期可在SpoutConfig中的stateUpdateIntervalMs中来配置。每次调用kafkaspout的nextTuple方法后都会判断是否需要提交offset

    public void nextTuple() {
        if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
            commit();
        }
    }

如果需要提交则调用kafkaspout的commit方法,使用轮巡的方式提交每个partition的消费状况

  private void commit() {
    _lastUpdateMs = System.currentTimeMillis();
    for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
        manager.commit();
    }
}

具体的提交是委托PartitionManager来完成的

 public void commit() {
 //获取当前要提交的offset,如果有pending的offset的话,就说明还有一些消息没有完成处理,则提交pending消息的最小的offset
 //如果没有pending的消息,则提交当前消费的offset
    long lastCompletedOffset = lastCompletedOffset();
    //用来判断是否有新的offset需要提交
    if (_committedTo != lastCompletedOffset) {
        LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
        Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                        "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                .put("offset", lastCompletedOffset)
                .put("partition", _partition.partition)
                .put("broker", ImmutableMap.of("host", _partition.host.host,
                        "port", _partition.host.port))
                .put("topic", _spoutConfig.topic).build();
        _state.writeJSON(committedPath(), data);

        _committedTo = lastCompletedOffset;
        LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
    } else {
        LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
    }
}

四、kafkaspout ack 和 fail的处理

1. 首先还是说说kafkaspout消息的发送

当调用kafkaspout的nextTuple方法时,kafkaspout委托PartitionManager next方法来发送数据

public void nextTuple() {
    List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
    for (int i = 0; i < managers.size(); i++) {

        try {
            // in case the number of managers decreased
            _currPartitionIndex = _currPartitionIndex % managers.size();
            EmitState state = managers.get(_currPartitionIndex).next(_collector);
            if (state != EmitState.EMITTED_MORE_LEFT) {
                _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
            }
}

public EmitState next(SpoutOutputCollector collector) {
//判断等待队列是否为空,如果为空则调用fill方法从broker上取数据进行填充
    if (_waitingToEmit.isEmpty()) {
        fill();
    }
    while (true) {
        MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
        if (toEmit == null) {
            return EmitState.NO_EMITTED;
        }
        //对kafka的消息进行解码
        Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
        if (tups != null) {
            for (List<Object> tup : tups) {
            //如果tuple不为null,则发送该tuple,messageID为new KafkaMessageId(_partition, toEmit.offset)
            //这样在ack 或者 fail的时候才能根据_partition找到相应的PartitionManager
                collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
            }
            break;
        } else {
            ack(toEmit.offset);
        }
    }
    if (!_waitingToEmit.isEmpty()) {
        return EmitState.EMITTED_MORE_LEFT;
    } else {
        return EmitState.EMITTED_END;
    }
}

2. 在PartitionManager会维护一个pending 列表,用来保存已经发送但是没有被成功处理的消息,一个failed列表,用来保存已经失败的消息
3. 当一个消息成功处理时会调用spout的ack方法,kafkaspout会根据message id中包含的partition id 来委托相应的PartitionManager来处理

    public void ack(Object msgId) {
        KafkaMessageId id = (KafkaMessageId) msgId;
        PartitionManager m = _coordinator.getManager(id.partition);
        if (m != null) {
            m.ack(id.offset);
        }
    }
    //PartitionManager 接收到ack消息后,会判断pending的最早的一条消息是否已经过质保,如果过质保,则清除队列中所有过保的消息
    //如果没有过保的消息,则在pending队列中移除当前消息
        public void ack(Long offset) {
        if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
            // Too many things pending!
            _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear();
        }
        _pending.remove(offset);
        numberAcked++;
    }

4. 当一条消息处理失败时,会调用spout的fail方法,同样,kafkaspout会根据message id中包含的partition id 来委托相应的PartitionManager来处理

  public void fail(Object msgId) {
      KafkaMessageId id = (KafkaMessageId) msgId;
      PartitionManager m = _coordinator.getManager(id.partition);
      if (m != null) {
          m.fail(id.offset);
      }
  }
  //PartitionManager接收到fail消息,会判断失败的消息是否已经过保,如果过保则忽略掉
      public void fail(Long offset) {
      if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
          LOG.info(
                  "Skipping failed tuple at offset=" + offset +
                          " because it‘s more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind +
                          " behind _emittedToOffset=" + _emittedToOffset
          );
      }
 //如果在保质期内,则加入failed列表,如果没有成功响应的消息,并且失败的消息个数已经超过保质期个数,则认为没有消息成功,系统有问题,丢异常
      else {
          LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
          failed.add(offset);
          numberFailed++;
          if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
              throw new RuntimeException("Too many tuple failures");
          }
      }
  }

 //对于failed的消息会进行重发
  private void fill() {
      //如果有失败的消息,则获取第一个的offset
      final boolean had_failed = !failed.isEmpty();
      if (had_failed) {
          offset = failed.first();
      } else {
          offset = _emittedToOffset;
      }

      ByteBufferMessageSet msgs = null;
      try {
          msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
      } catch (UpdateOffsetException e) {
          _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
          LOG.warn("Using new offset: {}", _emittedToOffset);
          // fetch failed, so don‘t update the metrics
          return;
      }
      if (msgs != null) {
          int numMessages = 0;

          for (MessageAndOffset msg : msgs) {
              final Long cur_offset = msg.offset();
              if (cur_offset < offset) {
                  // Skip any old offsets.
                  continue;
              }
              //如果该消息在failed列表中,则重新发送,并将其从failed列表中删除
              if (!had_failed || failed.contains(cur_offset)) {
                  numMessages += 1;
                  _pending.add(cur_offset);
                  _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
                  _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
                  if (had_failed) {
                      failed.remove(cur_offset);
                  }
              }
          }
          _fetchAPIMessageCount.incrBy(numMessages);
      }
  }
时间: 2024-11-05 20:47:11

KafkaSpout 浅析的相关文章

Python之encode与decode浅析

 Python之encode与decode浅析 在 python 源代码文件中,如果你有用到非ASCII字符,则需要在文件头部进行字符编码的声明,声明如下: # code: UTF-8 因为python 只检查 #.coding 和编码字符串,为了美观等原因可以如下写法: #-*-coding:utf-8-*- 常见编码介绍: GB2312编码:适用于汉字处理.汉字通信等系统之间的信息交换. GBK编码:是汉字编码标准之一,是在 GB2312-80 标准基础上的内码扩展规范,使用了双字节编码.

浅析PHP的开源产品二次开发的基本要求

浅析PHP的开源产品二次开发的基本要求 第一, 基本要求:HTML(必须要非常熟悉),PHP(能看懂代码,能写一些小系统,如:留言板,小型CMS),Mysql(至少会一种数据库),Javascript(能看懂,能改现成的一些代码),Div+Css(能进行界面的调整,明白CSS是怎么使用的) 第二, 熟悉开源产品的使用,比如 Dedecms,你要知道怎么登录,怎么新建栏目,怎么添加文章,模板标签的使用方法,模型的概念和使用方法等等一些功能 第三, 要熟悉这个开源产品的数据库结构,还要理解里面核心文

word-break|overflow-wrap|word-wrap——CSS英文断句浅析

---恢复内容开始--- word-break|overflow-wrap|word-wrap--CSS英文断句浅析 一 问题引入 今天在再次学习 overflow 属性的时候,查看效果时,看到如下结果,内容在 div 中国换行了,可是两个 P 元素的内容并没有换行,搜索一番没有找到系统的答案,截图到群里请教大神,才知道是英文断句的问题,但是还是不太明白.之前没有遇到这种情况,为了彻底搞清楚,英文断句,又开始学习英文断句到底是怎么回事. 二 换行 每种语言里都有换行,就中文而言,我们最小语言单位

浅析vanish

浅析 VANISH --一种cache 第一部分:理解vanish的准备工作 1.对CDN的小剖析 CDN  content  delivery  network  内容分发(推送)网络,是在现有的Internet中增加一层新的网络架构,将网络内容发布到最接近用户的网络边缘(边缘服务器),使用户最近取得所需内容,解决网络拥挤状态,提高用户访问网站的速度. CDN网络架构主要有两部分组成,中心和边缘两部分,中心指CDN网管中心和DNS重定向解析中心,负责全局负载均衡.边缘主要指异地节点,CDN分发

健康,home? [java的内存浅析]

健康,home? [java的内存浅析] 摘要: 原创出处: http://www.cnblogs.com/Alandre/ 泥沙砖瓦浆木匠 希望转载,保留摘要,谢谢! 乐观上上,how can other kno u,u r yourself!I must be strong and carry on. -泥沙砖瓦浆木匠 一.闲谈下 201407月记着那时候身体垮了下来,呵呵.想说,对自己的说,也是对大家的负责吧.那时候胸疼胸闷,然后几乎累垮了,我还坚持了一星期,那一星期真的迷迷糊糊.完全不能

Mysql查询优化器浅析

--Mysql查询优化器浅析 -----------------------------2014/06/11 1 定义 Mysql查询优化器的工作是为查询语句选择合适的执行路径.查询优化器的代码一般是经常变动的,这和存储引擎不太一样.因此,需要理解最新版本的查询优化器是如何组织的,请参考相应的源代码.整体而言,优化器有很多相同性,对mysql一个版本的优化器做到整体掌握,理解起mysql新版本以及其他数据库的优化器都是类似的. 优化器会对查询语句进行转化,转化等价的查询语句.举个例子,优化器会将

Volley框架源码浅析(一)

尊重原创http://blog.csdn.net/yuanzeyao/article/details/25837897 从今天开始,我打算为大家呈现关于Volley框架的源码分析的文章,Volley框架是Google在2013年发布的,主要用于实现频繁而且粒度比较细小的Http请求,在此之前Android中进行Http请求通常是使用HttpUrlConnection和HttpClient进行,但是使用起来非常麻烦,而且效率比较地下,我想谷歌正式基于此种原因发布了Volley框架,其实出了Voll

C语言中文件打开模式(r/w/a/r+/w+/a+/rb/wb/ab/rb+/wb+/ab+)浅析

C语言文件打开模式浅析 在C语言的文件操作语法中,打开文件文件有以下12种模式,如下图: 打开模式  只可以读   只可以写  读写兼备 文本模式 r w a r+ w+ a+ 二进制模式 rb wb ab  rb+ (r+b)   wb+ (w+b)   ab+ (a+b)  其中,二进制模式与文本模式操作相似,只不过是以二进制流的形式读写而已,下面以文本模式为例分析: 1."r" 模式: 1.1 打开文件进行“只读”操作,即只能从文件读取内容. 1.2 若欲操作的文件不存在,则打开

浅析STM32之usbh_def.H

[温故而知新]类似文章浅析USB HID ReportDesc (HID报告描述符) 现在将en.stm32cubef1\STM32Cube_FW_F1_V1.4.0\Middlewares\ST\STM32_USB_Host_Library\Core\Inc\usbh_def.H /** ****************************************************************************** * @file usbh_def.h * @aut