(五)storm-kafka源码走读之KafkaSpout

现在开始介绍KafkaSpout源码了。

开始时,早open方法中做一些初始化,

........................

        _state = new ZkState(stateConf);

        _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));

        // using TransactionalState like this is a hack
        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
        if (_spoutConfig.hosts instanceof StaticHosts) {
            _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
        } else {
            _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
        }
............

前后省略了一些代码,关于metric这系列暂时不介绍。主要是初始化Zookeeper连接zkstate,把kafka Partition 与broker关系对应起来(初始化DynamicPartitionConnections),在DynamicPartitionConnections构造函数需要传入一个brokerReader,我们是zkHosts,看KafkaUtils代码就知道采用的是ZkBrokerReader,来看下代码

public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
		try {
			reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
			cachedBrokers = reader.getBrokerInfo();
			lastRefreshTimeMs = System.currentTimeMillis();
			refreshMillis = hosts.refreshFreqSecs * 1000L;
		} catch (java.net.SocketTimeoutException e) {
			LOG.warn("Failed to update brokers", e);
		}

	}

有一个refreshMillis参数,这个参数是定时更新zk中partition的信息,

//ZkBrokerReader
	@Override
	public GlobalPartitionInformation getCurrentBrokers() {
		long currTime = System.currentTimeMillis();
		if (currTime > lastRefreshTimeMs + refreshMillis) { // 当前时间大于和上次更新时间之差大于refreshMillis
			try {
				LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
				cachedBrokers = reader.getBrokerInfo();
				lastRefreshTimeMs = currTime;
			} catch (java.net.SocketTimeoutException e) {
				LOG.warn("Failed to update brokers", e);
			}
		}
		return cachedBrokers;
	}
	// 下面是调用DynamicBrokersReader 的代码
	/**
     * Get all partitions with their current leaders
     */
    public GlobalPartitionInformation getBrokerInfo() throws SocketTimeoutException {
      GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
        try {
            int numPartitionsForTopic = getNumPartitions();
            String brokerInfoPath = brokerPath();
            for (int partition = 0; partition < numPartitionsForTopic; partition++) {
                int leader = getLeaderFor(partition);
                String path = brokerInfoPath + "/" + leader;
                try {
                    byte[] brokerData = _curator.getData().forPath(path);
                    Broker hp = getBrokerHost(brokerData);
                    globalPartitionInformation.addPartition(partition, hp);
                } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
                    LOG.error("Node {} does not exist ", path);
                }
            }
        } catch (SocketTimeoutException e) {
					throw e;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
        return globalPartitionInformation;
    }

GlobalPartitionInformation是一个Iterator类,存放了paritition与broker之间的对应关系,DynamicPartitionConnections中维护Kafka Consumer与parittion之间的关系,每个Consumer读取哪些paritition信息,这个COnnectionInfo信息会在storm.kafka.ZkCoordinator中会被初始化和更新,需要提到的一点是KafkaSpout包含一个SimpleConsumer

//storm.kafka.DynamicPartitionConnections
	static class ConnectionInfo {
        SimpleConsumer consumer;
        Set<Integer> partitions = new HashSet();

        public ConnectionInfo(SimpleConsumer consumer) {
            this.consumer = consumer;
        }
    }

再看ZkCoordinator类,看其构造函数

//storm.kafka.ZkCoordinator
	public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
        _spoutConfig = spoutConfig;
        _connections = connections;
        _taskIndex = taskIndex;
        _totalTasks = totalTasks;
        _topologyInstanceId = topologyInstanceId;
        _stormConf = stormConf;
        _state = state;
        ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
        _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
        _reader = reader;
    }

_refreshFreqMs就是定时更新zk partition到本地的操作,在kafkaSpout中nextTuple方法中每次都会去调用ZkCoordinator的getMyManagedPartitions方法。该方法根据_refreshFreqMs参数定时更新partition信息

//storm.kafka.ZkCoordinator
	@Override
    public List<PartitionManager> getMyManagedPartitions() {
        if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
            refresh();
            _lastRefreshTime = System.currentTimeMillis();
        }
        return _cachedList;
    }

    @Override
    public void refresh() {
        try {
            LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
            GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
            List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);

            Set<Partition> curr = _managers.keySet();
            Set<Partition> newPartitions = new HashSet<Partition>(mine);
            newPartitions.removeAll(curr);

            Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
            deletedPartitions.removeAll(mine);

            LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());

            for (Partition id : deletedPartitions) {
                PartitionManager man = _managers.remove(id);
                man.close();
            }
            LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());

            for (Partition id : newPartitions) {
                PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
                _managers.put(id, man);
            }

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        _cachedList = new ArrayList<PartitionManager>(_managers.values());
        LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
    }

其中每个Consumer分配partition的算法是KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);

主要做的工作就是获取并行的task数,与当前partition做比较,得出一个COnsumer要负责哪些parititons的读取,具体算法去kafka文档吧

以上在KafkaSpout中做完了初始化操作,下面开始取数据发射数据了,来看nextTuple方法

// storm.kafka.KafkaSpout
	@Override
    public void nextTuple() {
        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
        for (int i = 0; i < managers.size(); i++) {

            try {
                // in case the number of managers decreased
                _currPartitionIndex = _currPartitionIndex % managers.size();
                EmitState state = managers.get(_currPartitionIndex).next(_collector);
                if (state != EmitState.EMITTED_MORE_LEFT) {
                    _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
                }
                if (state != EmitState.NO_EMITTED) {
                    break;
                }
            } catch (FailedFetchException e) {
                LOG.warn("Fetch failed", e);
                _coordinator.refresh();
            }
        }

        long now = System.currentTimeMillis();
        if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
            commit();
        }
    }

看完上述代码可知,所有的操作都是在PartitionManager中进行的,PartitionManager中会读取message信息,然后进行发射,主要逻辑在PartitionManager的next方法中

//returns false if it's reached the end of current batch
    public EmitState next(SpoutOutputCollector collector) {
        if (_waitingToEmit.isEmpty()) {
            fill();
        }
        while (true) {
            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
            if (toEmit == null) {
                return EmitState.NO_EMITTED;
            }
            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
            if (tups != null) {
                for (List<Object> tup : tups) {
                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
                }
                break;
            } else {
                ack(toEmit.offset);
            }
        }
        if (!_waitingToEmit.isEmpty()) {
            return EmitState.EMITTED_MORE_LEFT;
        } else {
            return EmitState.EMITTED_END;
        }
    }

如果_waitingToEmit列表为空,则去读取msg,然后进行逐条发射,每发射一条,break一下,返回EMIT_MORE_LEFT给KafkaSpout的nextTuple方法中,,然后进行判断是否该paritition读取的一次读取的message buffer size是否已发射完毕,如果发射完毕就进行下一个partition 数据读取和发射,

注意的一点是,并不是一次把该partition的所有待发射的msg都发射完再commit offset到zk,而是发射一条,判断一下是否到了该commit的时候了(开始时设置的定时commit时间间隔),笔者认为这样做的原因是为了好控制fail

KafkaSpout中的ack,fail,commit操作全部交给了PartitionManager来做,看代码

@Override
    public void ack(Object msgId) {
        KafkaMessageId id = (KafkaMessageId) msgId;
        PartitionManager m = _coordinator.getManager(id.partition);
        if (m != null) {
            m.ack(id.offset);
        }
    }

    @Override
    public void fail(Object msgId) {
        KafkaMessageId id = (KafkaMessageId) msgId;
        PartitionManager m = _coordinator.getManager(id.partition);
        if (m != null) {
            m.fail(id.offset);
        }
    }

    @Override
    public void deactivate() {
        commit();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(_spoutConfig.scheme.getOutputFields());
    }

    private void commit() {
        _lastUpdateMs = System.currentTimeMillis();
        for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
            manager.commit();
        }
    }

所以PartitionManager是KafkaSpout的核心,很晚了,都3点多了,后续会不上PartitionManager的分析,晚安

时间: 2024-10-12 03:07:46

(五)storm-kafka源码走读之KafkaSpout的相关文章

kafka源码走读-controller (创建topic过程)

晚上刚刚被媳妇骂,难过之余,还是要坚持继续写一篇kafka源码走读的博客,心情难过,原谅我开头发下牢骚... 源码版本依然是0.10.2.1,我们都知道,kafka在0.8版本前没有提供Partition的Replication机制,一旦Broker宕机,其上的所有Partition就都无法提供服务,而Partition又没有备份数据,数据的可用性就大大降低了,所以0.8后提供了Replication机制来保证Broker的failover,而controller则是实现副本机制的核心. con

apache kafka源码分析走读-Producer分析

apache kafka中国社区QQ群:162272557 producer的发送方式剖析 Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式. sync架构图 async架构图 调用流程如下: 代码流程如下: Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer.DefaultEventHandler.在创建的同时,会默认new一个Prod

Idea下Kafka源码阅读编译环境搭建

Kafka源码编译阅读环境搭建 开发环境: Oracle Java 1.7.0_25 + Idea + Scala 2.10.5 +Gradle 2.1 + Kafka 0.9.0.1 一.Gradle安装配置 Kafka代码自0.8.x之后就使用Gradle来进行编译和构建了,因此首先需要安装Gradle.Gradle集成并吸收了Maven主要优点的同时还克服了Maven自身的一些局限性--你可以访问https://www.gradle.org/downloads/ 下载最新的Gradle版本

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的. Standalone部署的节点组成 介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多. 在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

y欢迎转载,转载请注明出处,徽沪一郎. 概要 “spark已经比较头痛了,还要将其运行在yarn上,yarn是什么,我一点概念都没有哎,再怎么办啊.不要跟我讲什么原理了,能不能直接告诉我怎么将spark在yarn上面跑起来,I'm a dummy, just told me how to do it.” 如果你和我一样是一个对形而上的东西不是太感兴趣,而只纠结于怎么去做的话,看这份guide保证不会让你失望, :). 前期准备 本文所有的操作基于arch linux,保证下述软件已经安装 jdk

Apache Spark源码走读之4 -- DStream实时流数据处理

欢迎转载,转载请注明出处,徽沪一郎. Spark Streaming能够对流数据进行近乎实时的速度进行数据处理.采用了不同于一般的流式数据处理模型,该模型使得Spark Streaming有非常高的处理速度,与storm相比拥有更高的吞能力. 本篇简要分析Spark Streaming的处理模型,Spark Streaming系统的初始化过程,以及当接收到外部数据时后续的处理步骤. 系统概述 流数据的特点 与一般的文件(即内容已经固定)型数据源相比,所谓的流数据拥有如下的特点 数据一直处在变化中

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就是已经非常

搭建kafka源码开发环境时使用&quot;gradle idea&quot;命令构建源码失败

我的环境: JDK: 1.8.0_131 Gradle: Gradle 3.1 Kafka源码包: kafka-0.10.0.1-src.tgz Zookeeper安装包: zookeeper-3.4.6.tar.gz Scala版本: 2.10.6 提示在 D:\soft\kafka-0.10.0.1-src\build.gradle文件的230行有问题.解决办法: 打开build.gradle文件:在开头添加如下内容: ScalaCompileOptions.metaClass.daemon

《Prism 5.0源码走读》UnityBootstrapper

UnityBootstrapper (abstract class)继承自Bootstrapper(abstract)类, 在Prism.UnityExtensions.Desktop project中.主要是为了支持Unity Container(Dependency Injection Container). 打开UnityBoostrapper源代码我们可以看到这里面主要有以下逻辑: 1. 定义Unity Container属性 public IUnityContainer Contain