KafkaSpout之PartitionManager

KafkaSpout的核心逻辑都是由PartitionManager来实现的。

但是这个类实现时候需要考虑的东西有些多,0.92至0.93,至当前(2015.3.14)的master一直在变化。在这里,先分析一下最近的发布版0.93里的逻辑。也提出一些问题,希望以后Apache Storm会把这个类实现地更完美一些。


PartitionManager的主要功能

PartitionManager用来管理单个Partition。提供持久化读取进度、读取消息功能,并提供Storm的spout需要实现的nextTuple, fail, ack等功能。


实现PartitionManager需要考虑的问题

有一些问题是设计PartitionManager时必须考虑的,先把他们提一下,然后看下0.93版PartitionManager的实现。

关于批量读取消息以及缓存消息

由于Kafka的实现细节(为了高吞吐而设计的log格式,通讯协议),Kafka的SimpleConsumer每次读取消息是会去读取一批,而不能指定响应想要包含的具体的offset,并且由于消息按批压缩,使得每次读取的响应包含的offset可能比想要的最小的offset还要小(但不会更大)。所以,对于PartitoinManager来说,在内部构造一个缓存,保存每次读取得到的一批message是一种自然而且高效的方式。

允许有超过一个message处于pendding(已发送但没有ack)状态?

如果在发射一个message的tuple之后,就开始等待。那么ack、fail、commit的逻辑就会很简单。但这样消息的处理效率会被极大的降低,且不说还可能使得下游bolt的一些task没事可做。所以一定得允许多个message正在被blot处理,也就是需要有pendding messages的集合。

有了pendding的messages集合,ack, fail, commit的逻辑就变得比较复杂,且需要做出一些折衷。

  1. 当有message对应的tuple失败时,如何在处理其它正常的消息时,特殊处理失败的这些message?
  2. 如果有message产生的tuple在多次重复后仍然失败,应该怎么做?丢弃它吗?
  3. 在Zookeeper中应该记录什么信息?
  4. 如果下游的bolt处理的进度太慢怎么办?如何衡量处理速度是否达到需求?


PartitionManager的具体实现

在Zookeeper中记录信息的意义:

下面是PartitionManager的commit方法的主要部分:

Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                    .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                            "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                    .put("offset", lastCompletedOffset)
                    .put("partition", _partition.partition)
                    .put("broker", ImmutableMap.of("host", _partition.host.host,
                            "port", _partition.host.port))
                    .put("topic", _spoutConfig.topic).build();
 _state.writeJSON(committedPath(), data);

_committedTo = lastCompletedOffset;
  • topology.id 记录了这个topology实例的id。当PartitionManager的构造函数被调用时,它会从Zookeeper里获取topology.id,以判断当前的task是否跟记录zookeeper里信息的是一个topology实例,如果不是,说明这是一个新提交的topology,这时,会判断是否设置了forceFromStart,如果是同一个topology实例,就不理会forceFromStart
  • topology.name topology的名字,这个目前没用到。
  • offset 在这个offset之前(不包括它)的所有消息都已经处理完成。
  • partition  partition id
  • broker 此partition的leader
  • topic partition所属的topic。注意,在PartitionManager初始化时,它并没有判断这个spout task的topic跟记录里的一致。所以,如果两个topology, 有同样的SpoutConfig.id,但是不同的topic,会引发混乱。

另外,这个JSON数据写的路径committedPath也是很重要的。PartitionManager初始化时,会从这个committedPath路径读取信息。

    private String committedPath() {
        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
    }

所以,如果spoutConfig.id配置得不当,KafkaSpout就无法获取正确的进度记录。

另外,在所有记录里,最重要的就是offset这个记录。它的意义,使得PartitionManager不得不做出很多权衡。

PartitionManager用到的集合和记录

    Long _emittedToOffset;
    SortedSet<Long> _pending = new TreeSet<Long>();
    SortedSet<Long> failed = new TreeSet<Long>();
    Long _committedTo;
    LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
    long numberFailed, numberAcked;
  • _pending 所有己读取,但还没有被ack的消息,都在这里。
  • failed 所有己经认定为failed的tuple来自的message的offset都在这里
  • _waitingToEmit 所有己经被读取,但是还没经过“解析,emit tuple"步骤的消息都在这。
  • _emittedToOffset offset小于它的消息都已经被读取了
  • _comittedTo  所有offset小于它的消息都已被ack,或者由于某些原因不再需要被处理。

当PartitionManager的next方法被调用以emit新tuple时,它只会从_waitingToEmit取消息。那么failed里的消息如何被再重试呢?原因在于_waitingToEmit为空时,next方法会调用fill方法,而fill方法会考虑到failed集合内的元素,不过是一种比较粗放的做法。

fill方法

fill方法的主要逻辑依次分为三个部分:

  1. 判断该从哪个offset开始,从Kafka抓取消息
  2. 抓取消息,处理offset out of range 异常
  3. 把抓取到的消息放到_waitingToEmit集合中,同时与failed集合与pendding集合交互。

第一部分:

        final boolean had_failed = !failed.isEmpty();
        // Are there failed tuples? If so, fetch those first.
        if (had_failed) {
            offset = failed.first();
        } else {
            offset = _emittedToOffset;
        }

这段代码里,offset即是将要从Kafka里抓取消息的offset。当failed集合不为空时,就用failed集合的最小的offset做为下次要抓取的offset。Kafka的FetchRequest每次会从Kafka中获取一批消息。所以,如果有消息fail,而此failed消息之后的消息已被ack,那么fill方法会重新获取这些已被ack的消息,从而使得这部分消息最终被重复处理。

如果没有failed消息,fill方法就会从之前读取过的最大的offset继续抓取。在知道了从何处抓取之后,开始真正的抓取过程:

     try {
            msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
        } catch (UpdateOffsetException e) { //如果是offset "out of range", 并且设置了useStartOffsetTimeIfOffsetOutOfRange
            _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
            LOG.warn("Using new offset: {}", _emittedToOffset);
            // fetch failed, so don‘t update the metrics
            return;
        }

出现了UpdateOffsetException代表出现了这种情况:想要抓取的offset不在Kafka能提供的offset所在的范围之内,并且已经在config里设置了useStartOffsetTimeIfOffsetOutOfRange为true。想要抓取的offset不在Kafka提供的范围可能有几种原因:这部分消息被Kafka的log retention功能给删除了;leader变更,使得部分消息丢失(如果没有设置ack为-1的话);以及其它异常。这时候,fill方法会调用KafkaUtils的getOffset方法,不过这个方法有些不符合useStartOffsetTimeIfOffsetOutOfRange的意思,即它并不是一定会从startOffsetTime配置中配置的offsetTime开始读。

    public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
        if ( config.forceFromStart ) {
            startOffsetTime = config.startOffsetTime;
        }
        return getOffset(consumer, topic, partition, startOffsetTime);
    }

可以看出,如果没有设置forceFromStart,那么这个方法返回的offset将会是当前最大的offset,而忽略KafkaConfig中startOffsetTime的配置,使得PartitionManager直接跳到最新的消息开始处理。这样乍一看莫名其妙,但是试想,如果startOffsetTime对应的offset也out of range呢,这样KafkaSpout就陷入了死循环。而LatestOffsetTime()是始终存在的。但是,这样做而没有单独的配置,也没有日志记录说明这种权衡,会给用户带来麻烦。

在获取fetch到消息以后,获取的消息集可能会包含了各种例外情况,需要细致处理:

            for (MessageAndOffset msg : msgs) {
                final Long cur_offset = msg.offset();
                if (cur_offset < offset) {
                    // Skip any old offsets.
                    continue;
                }
                if (!had_failed || failed.contains(cur_offset)) {
                    numMessages += 1;
                    _pending.add(cur_offset);//_pending表示已经读取而未被ack的消息
                    _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
                    _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
                    if (had_failed) {//说明此消息在failed集合里。从failed集合里remove掉它,因为它被重新加入了_waitingToEmit集合,将要被重新处理。
                        failed.remove(cur_offset);//
                    }
                }
  • 首先,需要考虑到FetchRequest指定的是返回集中最小的offset A,但是,实际上Kafka只保证返回的消息集中包括了offset为A的消息,这个消息集中也可能包括了offset比A更小的消息(由于压缩)。所以,fill方法首先要skip掉这些offset更小的消息
  • 如果failed集合为空,fill方法就把得到的消息集中所有offset大于A的消息加入_waitingToEmit集合,同时加入_pending集合。然后把_emittedToOffset设为当前读取过的最大的offset。
  • 如果读取到的消息在failed集合中,它在把这条消息加入_waitingToEmit集合与_pending集合后,还要把它从failed集合中去掉,否则这条消息就会永远在failed集合里。只有在fill方法中,failed集合中的元素才可能被移除,加入到_waitingToEmit集合,使它有机会被重新emit。

通过对fill方法的分析可以看到,如果一个消息始终fail,除非在PartitionManager的其它方法中把它移除,否则它会使PartitionManager的处理进度停止。下面将要看到,在fail和ack方法中,这样一直fail的消息还是有机会被丢弃的,但这取决于你的配置,而这些配置是很微妙的。

ack方法

ack方法的主要功能是把消息从_pending集合中去掉,表示这个消息处理完成。从_pending集合去除,PartitionManager才能获取正确的处理进度的信息,以更新Zookeeper里的记录。但是,它还有别的作用。

    public void ack(Long offset) {
        if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
            // Too many things pending! 已读取但未确认的消息太多了,就把此次确认的offset - maxOffsetBehind之前的清除了
            _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear();
        }
        _pending.remove(offset);//从_pending集合中移除它,表示这个消息已被处理
        numberAcked++;
    }

当一个offset被ack时,ack方法会把所有小于offset - _spoutConfig.maxOffsetBehind的消息从_pending中移除。也就是说,即使这些被移除的消息失败了,也认为他们处理成功,使得在Zookeeper中记录的进度忽略这些被移除的消息。所以,假如task重启,那么这些失败但被移除出_pending集合的消息就不会被再处理。那么,这些失败了的消息,当Storm的acker发现它们处理失败,会发生什么呢?这由fail方法决定。

fail方法

 public void fail(Long offset) {
        if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
            LOG.info(
                    "Skipping failed tuple at offset=" + offset +
                            " because it‘s more than maxOffsetBehind=" + _spoutConfig.maxOffsetBehind +
                            " behind _emittedToOffset=" + _emittedToOffset
            );
        } else {
            LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
            failed.add(offset);
            numberFailed++;
            if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
                throw new RuntimeException("Too many tuple failures");
            }
        }
    }

当一个消息对应的tuple被fail时,fail方法会首先判断这个消息是否落后太多。如果它的offset小于(当前读取的最大offset - maxOffsetBehind), 那么就不把它加到failed集合里,使得它很可能不会被重新处理。如果不落后太多,就把它加到failed集合里,使得它可以被重新处理。如果没有消息ack,并且总的failed次数大于maxOffsetBehind,就抛出异常,代表PartitionManager工作出错,而这种情况只有在处理第一批消息并且这批消息的个数大于maxOffsetBehind时才可能发生。这样,有可能在某些情况下,使得PartitionManager卡住,但不会有异常。而且用numberFailed与spoutConfig.maxOffsetBehind比较,有些令人莫名其秒。

commit方法

commit方法被调用时,会调用lastCompletedOffset方法获取当前PartitionManager处理的进度,并且将这个进度持久化。这个“进度”,是说在此之前的所有消息都已被ack,或“不需要ack”, 总之,是说这些消息已处理完毕。

    public long lastCompletedOffset() {
        if (_pending.isEmpty()) {
            return _emittedToOffset;
        } else {
            return _pending.first();
        }
    }

在此,体现了_pending的作用。_pend中最小的元素,代表之前的元素都已处理完成。如果_pending为空,说明所有已被读取的元素都已处理完毕。

陷阱

failed方法,使得PartitonManager的有些行为非常隐晦。结合ack、fill和commit方法,可能会出现以下特殊情况,这些情况和KafkaConfig.maxOffBehind配置,及KafkaConfig.useStartOffsetTimeIfOffsetOutOfRange配置、KafkaConfig.fetchSizeBytes配置相关。

  1. maxOffsetBehind设置得较小,而fetchSizeBytes相对较大,使得maxOffsetBehind小于一次fetch得到的消息总数。设这批fetch得到的消息的offset范围为[a, b],那么所有小于(b - maxOffsetBehind)的offset的消息,即使处理失败,也不会被重试。设这样失败,但不会被重试的消息中的某个的offset为X, 那么如果某个大于( X + maxOffsetBehind)的消息被ack时,offset为X的这个消息会被从_pending集合中移除。但是如果所有大于(X + maxOffsetBehind)的消息都被fail了,而在(_emmittedToOffset与_emittedToOffset - maxOffsetBehind之间) 有消息failed了,那么failed集合中不会包括X,但会包括比X的offset大的元素,X不会被重试,但X会一直停留在_pending集合,造成commit无法更新实际进度,并且带来内存泄漏
  2. 如果maxOffsetBehind比较大,就可能有failed的消息永远不会被忽略,而会一直重试,直到它成功。而fill方法使得在它成功之前,PartitionManager无法处理后续的消息。这样后续的blot必须保证正确地ack消息,否则这个partition就会卡在fail的这个消息上。
  3. 如果把useStartOffsetTimeIfOffsetOutOfRange设为true,同时forceFromStart设为false, 而startOffsetTime不为LatestTime,那么PartitonManager想要获取的消息out of range时,它会直接跳到LatestTime开始处理消息,而不会从startOffsetTime开始。这可能发生在Kafka进行了log retention之后。
  4. 如果有消息fail,那么这条消息之后的消息也可能被跟着重试,所以后边bolt的处理必须考虑到这种情况。
  5. 如果一条消息被拆成多个tuple发送,那么只要其中有一个tuple处理失败,这条消息产生的所有tuple就可能被重新发送。

总之,当前PartitionManager的实现还有很多需要改进之处,而且有些情况容易给用户带来困扰。

PS:关于KafkaSpout的进度,有个开源的工具可用:Capillary , 它用来对比当前KafkaSpout的处理进度和Kafka消息的LatestOffset。使用Play Framework和Scala开发。俺在此基础上做了些修改和补充,加入了Kafka集群的监控信息,特别是每个partition的leader,LatestOffset等信息,也加入图表用来显示未处理消息的累积情况,以及邮件报警。希望能有感兴趣的人一起把它的功能做得更完整一些。

时间: 2024-10-05 09:10:15

KafkaSpout之PartitionManager的相关文章

KafkaSpout 浅析

最近在使用storm做一个实时计算的项目,Spout需要从 KAFKA 集群中读取数据,为了提高开发效率,直接使用了Storm提供的KAFKA插件.今天抽空看了一下KafkaSpout的源码,记录下心得体会. KafkaSpout基于kafka.javaapi.consumer.SimpleConsumer实现了consumer客户端的功能,包括 partition的分配,消费状态的维护(offset).同时KafkaSpout使用了storm的可靠API,并实现了spout的ack 和 fai

(六)storm-kafka源码走读之PartitionManager

PartitionManager算是storm-kafka的核心类了,现在开始简单分析一下.还是先声明一下,metric部分这里不做分析. PartitionManager主要负责的是消息的发送.容错处理,所以PartitionManager会有三个集合 _pending: 尚未发送的message的offset集合, 是个TreeSet<Long>() failed : 发送失败的offset 集合,是个TreeSet<Long>() _waitingToEmit: 存放待发射的

(五)storm-kafka源码走读之KafkaSpout

现在开始介绍KafkaSpout源码了. 开始时,早open方法中做一些初始化, ........................ _state = new ZkState(stateConf); _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); // using TransactionalState like this is

KafkaSpout分析:配置

public KafkaSpout(SpoutConfig spoutConf) { _spoutConfig = spoutConf;} SpoutConfig继承自KafkaConfig.由于SpoutConfig和KafkaConfig所有的instance field全是public, 因此在使用构造方法后,可以直接设置各个域的值. public class SpoutConfig extends KafkaConfig implements Serializable { public

storm kafkaSpout 踩坑问题记录! offset问题!

整合kafka和storm例子网上很多,自行查找 问题描述: kafka是之前早就搭建好的,新建的storm集群要消费kafka的主题,由于kafka中已经记录了很多消息,storm消费时从最开始消费 问题解决: 下面是摘自官网的一段话: How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures As shown in the above KafkaConfig properties, you

(三)storm-kafka源码走读之如何构建一个KafkaSpout

上一节介绍了config的相关信息,这一节说下,这些参数分别是什么,在zookeeper中的存放路径是怎样的,之前QQ群里有很多不知道该怎么传入正确的参数来new 一个kafkaSpout,其主要还是参数传递正确就可. 看SpoutConfig的构造函数 public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot;

kafkaspot在ack机制下如何保证内存不溢

新浪微博:intsmaze刘洋洋哥. storm框架中的kafkaspout类实现的是BaseRichSpout,它里面已经重写了fail和ack方法,所以我们的bolt必须实现ack机制,就可以保证消息的重新发送:如果不实现ack机制,那么kafkaspout就无法得到消息的处理响应,就会在超时以后再次发送消息,导致消息的重复发送. 但是回想一下我们自己写一个spout类实现BaseRichSpout并让他具备消息重发,那么我们是会在我们的spout类里面定义一个map集合,并以msgId作为

(二)storm-kafka源码走读之Config相关类走读

Config就是配置相关信息,下面是KafkaConfig的源码及小弟的相关注释,有错误的地方还望指出 public class KafkaConfig implements Serializable { /** 一个借口,实现类有ZkHosts,和StatisHosts **/ public final BrokerHosts hosts; public final String topic; // kafka topic name public final String clientId;

(四)storm-kafka源码走读之自定义Scheme

本文原创,转载请注明出处: 使用KafkaSpout需要子类实现Scheme,storm-kafka实现了StringScheme,KeyValueStringScheme等等,大家可以用. 这些Scheme主要负责从消息流中解析出所需要的数据. public interface Scheme extends Serializable { public List<Object> deserialize(byte[] ser); public Fields getOutputFields();