Storm-kafka【接口实现】4-1:ZKCoordinator: ZK协调器

阅读背景:您需要对Zk,Kafka有基础的了解

本章主题:详尽的梳理ZkCoordinator的过程

package com.mixbox.storm.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;

import java.util.*;

import static com.mixbox.storm.kafka.KafkaUtils.taskId;

/**
 * 
 * 
 * ZKCoordinator 协调器
 * 
 * @author Yin Shuai
 */

public class ZkCoordinator implements PartitionCoordinator {
	public static final Logger LOG = LoggerFactory
			.getLogger(ZkCoordinator.class);

	SpoutConfig _spoutConfig;

	int _taskIndex;

	int _totalTasks;

	String _topologyInstanceId;

	// 每一个分区对应着一个分区管理器
	Map<Partition, PartitionManager> _managers = new HashMap();

	//缓存的List
	List<PartitionManager> _cachedList;

	//上次刷新的时间
	Long _lastRefreshTime = null;

	//刷新频率 毫秒
	int _refreshFreqMs;

	//动态分区连接
	DynamicPartitionConnections _connections;

	//动态BrokersReader
	DynamicBrokersReader _reader;

	ZkState _state;

	Map _stormConf;

	/**
	 * 
	 * @param connections
	 *            动态的 分区连接
	 * @param stormConf
	 *            Storm的配置文件
	 * @param spoutConfig
	 *            Storm sput的配置文件
	 * @param state
	 *            对于ZKState的连接
	 * @param taskIndex
	 *            任务
	 * @param totalTasks
	 *            总共的任务
	 * @param topologyInstanceId
	 *            拓扑的实例ID
	 */
	public ZkCoordinator(DynamicPartitionConnections connections,
			Map stormConf, SpoutConfig spoutConfig, ZkState state,
			int taskIndex, int totalTasks, String topologyInstanceId) {
		this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks,
				topologyInstanceId, buildReader(stormConf, spoutConfig));
	}

	public ZkCoordinator(DynamicPartitionConnections connections,
			Map stormConf, SpoutConfig spoutConfig, ZkState state,
			int taskIndex, int totalTasks, String topologyInstanceId,
			DynamicBrokersReader reader) {
		_spoutConfig = spoutConfig;
		_connections = connections;
		_taskIndex = taskIndex;
		_totalTasks = totalTasks;
		_topologyInstanceId = topologyInstanceId;
		_stormConf = stormConf;
		_state = state;
		ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
		_refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
		_reader = reader;
	}

	/**
	 * @param stormConf
	 * @param spoutConfig
	 * @return
	 */
	private static DynamicBrokersReader buildReader(Map stormConf,
			SpoutConfig spoutConfig) {
		ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
		return new DynamicBrokersReader(stormConf, hosts.brokerZkStr,
				hosts.brokerZkPath, spoutConfig.topic);
	}

	@Override
	public List<PartitionManager> getMyManagedPartitions() {
		if (_lastRefreshTime == null
				|| (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
			refresh();
			_lastRefreshTime = System.currentTimeMillis();
		}
		return _cachedList;
	}

	/**
	 * 简单的刷新的行为
	 * 
	 */
	void refresh() {
		try {

			LOG.info(taskId(_taskIndex, _totalTasks)
					+ "Refreshing partition manager connections");

			// 拿到所有的分区信息
			GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();

			// 拿到自己任务的所有分区
			List<Partition> mine = KafkaUtils.calculatePartitionsForTask(
					brokerInfo, _totalTasks, _taskIndex);

			// 拿到当前任务的分区
			Set<Partition> curr = _managers.keySet();

			// 构造一个集合
			Set<Partition> newPartitions = new HashSet<Partition>(mine);

			// 在new分区中,移除掉所有 自己拥有的分区
			newPartitions.removeAll(curr);

			// 要删除的分区
			Set<Partition> deletedPartitions = new HashSet<Partition>(curr);

			//
			deletedPartitions.removeAll(mine);

			LOG.info(taskId(_taskIndex, _totalTasks)
					+ "Deleted partition managers: "
					+ deletedPartitions.toString());

			for (Partition id : deletedPartitions) {
				PartitionManager man = _managers.remove(id);
				man.close();
			}
			LOG.info(taskId(_taskIndex, _totalTasks)
					+ "New partition managers: " + newPartitions.toString());

			for (Partition id : newPartitions) {
				PartitionManager man = new PartitionManager(_connections,
						_topologyInstanceId, _state, _stormConf, _spoutConfig,
						id);
				_managers.put(id, man);
			}

		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		_cachedList = new ArrayList<PartitionManager>(_managers.values());
		LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
	}

	@Override
	public PartitionManager getManager(Partition partition) {
		return _managers.get(partition);
	}
}

1 : 首先 ZKCoorDinator 实现 PartitionCoordinator的接口

package com.mixbox.storm.kafka;

import java.util.List;

/**
 * @author Yin Shuai
 */
public interface PartitionCoordinator {

	/**
	 * 拿到我管理的分区列表  List{PartitionManager}
	 * @return
	 */
	List<PartitionManager> getMyManagedPartitions();

	/**
	 * @param 依据制定的分区partition,去getManager
	 * @return
	 */
	PartitionManager getManager(Partition partition);
}

第一个方法拿到所有的   PartitionManager

第二个方法依据特定的   Partition去得到一个分区管理器

对于PartitionManager 请参看本空间的另外一篇博文:

Storm-kafka【接口实现】4-2:PartitionManager: 分区管理器

Storm-kafka【接口实现】4-1:ZKCoordinator: ZK协调器

时间: 2024-08-03 02:45:32

Storm-kafka【接口实现】4-1:ZKCoordinator: ZK协调器的相关文章

Kafka 核心组件之协调器

1.消费者与消费者组 假设某 topic 有4个分区,消费者组中只有一个消费者,那么这个消费者将消费全部 partition 中的数据. 如果消费者组中有两个消费者,那么每个消费者消费两个 partition. 如果消费者组中有4个消费者,那么每个消费者消费一个partition. 如果消费者组中有5个消费者,那么有一个消费者就是空闲的. 注意:在同一个消费者组中,不要让消费者的数量大于分区的数量 多个消费者组之间不会互相影响. 2.协调器 在 kafka-0.10 版本,Kafka 在服务端引

storm+kafka:WordCount程序

简单的输入输出做完了,来点复杂点儿的场景:从某个topic定于消息,然后根据空格分词,统计单词数量,然后将当前输入的单词数量推送到另一个topic.  首先规划需要用到的类:  从KafkaSpout接收数据并进行处理的backtype.storm.spout.Scheme子类: 数据切分bolt:SplitSentenceBolt: 计数bolt:WordCountBolt: 报表bolt:ReportBolt: topology定义:WordCountTopology: 最后再加一个原样显示

Storm流计算之项目篇(Storm+Kafka+HBase+Highcharts+JQuery,含3个完整实际项目)

1.1.课程的背景 Storm是什么? 为什么学习Storm? Storm是Twitter开源的分布式实时大数据处理框架,被业界称为实时版Hadoop. 随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计.推荐系统.预警系统.金融系统(高频交易.股票)等等, 大数据实时处理解决方案(流计算)的应用日趋广泛,目前已是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流. 按照storm作者的说法,Storm对于实时计算的意义类似于Hadoop对于批处理

基于storm,kafka,mysql的实时统计系统

公司对客户开放多个系统,运营人员想要了解客户使用各个系统的情况,在此之前,数据平台团队已经建设好了统一的Kafka消息通道. 为了保证架构能够满足业务可能的扩张后的性能要求,选用storm来处理各个应用系统上传到kafka中的埋点数据并在Mysql中汇聚. 埋点数据上报的格式为json,会上报类似如下的数据 { "account": "001", "accountName": "旺财宝", "subaccount&q

Storm Bolt接口

      Bolt是Topology中数据处理的基本单元,也是Storm针对处理过程的编程单元.Topology中所有的处理都是在这些bolt中完成的. Bolt可以将数据项发送至多个数据流(Stream).编程人员首先可以使用OutputFieldsDeclarer类的declareStream()方法声明多个流,指定数据将要发送到的流,然后使用SpoutOutputCollector的emit方法将数据发送(原生spout). Storm为Bolt提供的编程抽象,以接口的形式,面向接口的编

黑马12期大数据教程(hadoop,storm,kafka,hbase,hive,sqoop)

课程目录:weekend110-第1天 01-hadoop职位需求状况 02-hadoop课程安排 03-hadoop应用场景 04-hadoop对海量数据处理的解决思路 05-hadoop版本选择和伪分布式安装 06-hadoop版本选择和伪分布式安装2 07-hdfs&mapreduce测试 08-hdfs的实现机制初始 09-hdfs的shell操作 10-hadoop集群搭建的无密登陆配置weekend110-第2天 01-NN元数据管理机制 02-NN工作机制2 03-DN工作原理 0

JA16-大型分布式综合项目实战Spring+zookeeper+mycat+storm+kafka+nio+netty分布式存储云计算视频课程

新年伊始,学习要趁早,点滴记录,学习就是进步! 不要到处找了,抓紧提升自己. 对于学习有困难不知道如何提升自己可以加扣:1225462853 获取资料. 下载地址:https://pan.baidu.com/s/1o9rZpj0 谢谢大家的支持,我会努力给大家分享高质量教程 原文地址:https://www.cnblogs.com/sunnyppl/p/8412484.html

JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler

EvenScheduler同DefaultScheduler一样,同样实现了IScheduler接口, 由下面代码可以看出: (ns backtype.storm.scheduler.EvenScheduler (:use [backtype.storm util log config]) (:require [clojure.set :as set]) (:import [backtype.storm.scheduler IScheduler Topologies Cluster Topolo

当接口被调用时使用Spring拦截器注入运行时数据

开发背景 使用CXF暴露与调用接口,为了方便追踪错误,所以想要在接口被调用时将一些运行时数据记录起来,所以就想到了拦截器. CXF自带拦截器,但是据我初步了解,自带的拦截器都是经过封装,用来打印日志什么的,好像没有提供给开发者定制功能的方式(没有深入了解,如果有说错请方便斧正) 流程 其实Spring的拦截器使用挺方便的,实现org.aopalliance.intercept.MethodInterceptor接口中的invoke方法,在方法中实现想要的逻辑.然后在Spring配置文件中注入它就