storm事件管理器EventManager源码分析-event.clj

storm事件管理器定义在event.clj中,主要功能就是通过独立线程执行"事件处理函数"。我们可以将"事件处理函数"添加到EventManager的阻塞队列中,EventManager的事件处理线程不断地从阻塞队列中获取"事件处理函数"并执行。

EventManager协议

协议就是一组函数定义的集合,协议中函数的第一个参数必须为实现该协议的实例本身,类似于java中实例方法的第一个参数为this;协议类似于java中的接口。

(defprotocol EventManager
 (add [this event-fn])
 (waiting? [this])
 (shutdown [this]))

event-manager函数

(defn event-manager
 "Creates a thread to respond to events. Any error will cause process to halt"
 ;; daemon?表示是否将事件处理线程设置成守护线程
 [daemon?]
 ;; added表示已添加的"事件处理函数"的个数
 (let [added (atom 0)
     ;; processed表示已处理的"事件处理函数"的个数
       processed (atom 0)
     ;; queue绑定事件管理器的阻塞队列LinkedBlockingQueue
       ^LinkedBlockingQueue queue (LinkedBlockingQueue.)
     ;; 设置事件管理器的状态为"running"
       running (atom true)
     ;; 创建事件处理线程。Clojure函数实现了Runnable和Callable接口,所以可以将Clojure函数作为参数传递给java.lang.Thread类的构造函数
       runner (Thread.
              ;; 事件处理线程循环检查事件处理器的状态是否是"running",如果是,就从阻塞队列中获取"事件处理函数",并执行;然后将processed加1
                (fn []
                  (try-cause
                    (while @running
                      (let [r (.take queue)]
                        (r)
                        (swap! processed inc)))
                    (catch InterruptedException t
                      (log-message "Event manager interrupted"))
                    (catch Throwable t
                      (log-error t "Error when processing event")
                      (exit-process! 20 "Error when processing an event")))))]
   (.setDaemon runner daemon?)
   ;; 启动事件处理线程
   (.start runner)
   ;; 返回一个实现了EventManager协议的实例
   (reify
     EventManager
     ;; add函数将"事件处理函数"添加到事件处理器的阻塞队列中
     (add
       [this event-fn]
       ;; should keep track of total added and processed to know if this is finished yet
       (when-not @running
         (throw (RuntimeException. "Cannot add events to a shutdown event manager")))
       (swap! added inc)
       (.put queue event-fn))
     ;; waiting?判断事件处理线程是否处于等待状态
     (waiting?
       [this]
       (or (Time/isThreadWaiting runner)
           (= @processed @added)))
     ;; 关闭事件管理器
     (shutdown
       [this]
       (reset! running false)
       (.interrupt runner)
       (.join runner)))))

时间: 2024-08-03 15:25:40

storm事件管理器EventManager源码分析-event.clj的相关文章

storm启动supervisor源码分析-supervisor.clj

supervisor是storm集群重要组成部分,supervisor主要负责管理各个"工作节点".supervisor与zookeeper进行通信,通过zookeeper的"watch机制"可以感知到是否有新的任务需要认领或哪些任务被重新分配.我们可以通用执行bin/storm supervisor >/dev/null 2>&1 &来启动supervisor.bin/storm是一个python脚本,在这个脚本中定义了一个superv

Android的软件包管理服务PackageManagerService源码分析

Android系统下的apk程序都是通过名为PackageManagerService的包管理服务来管理的.PacketManagerService是安卓系统的一个重要服务,由SystemServer启动,主要实现apk程序包的解析,安装,更新,移动,卸载等服务.不管是系统apk(/system/app),还是我们手工安装上去的,系统所有的apk都是由其管理的. 以android 4.0.4的源码为例,android4.0.4/frameworks/base/services/java/com/

supervisor启动worker源码分析-worker.clj

supervisor通过调用sync-processes函数来启动worker,关于sync-processes函数的详细分析请参见"storm启动supervisor源码分析-supervisor.clj".sync-processes函数代码片段如下: sync-processes函数代码片段 ;; sync-processes函数用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers;; supervisor标识supervis

storm操作zookeeper源码分析-cluster.clj

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中).backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState.clojure中的protocol可以看成java中的接口,封装了一组方法.ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协

storm启动nimbus源码分析-nimbus.clj

nimbus是storm集群的"控制器",是storm集群的重要组成部分.我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &来启动nimbus.bin/storm是一个python脚本,在这个脚本中定义了一个nimbus函数: nimbus函数 def nimbus(klass="backtype.storm.daemon.nimbus"):    """Syntax: [s

Zepto源码分析-event模块

源码注释 // Zepto.js // (c) 2010-2015 Thomas Fuchs // Zepto.js may be freely distributed under the MIT license. ;(function($){ var _zid = 1, undefined, slice = Array.prototype.slice, isFunction = $.isFunction, isString = function(obj){ return typeof obj

Mybatis Interceptor 拦截器原理 源码分析

Mybatis采用责任链模式,通过动态代理组织多个拦截器(插件),通过这些拦截器可以改变Mybatis的默认行为(诸如SQL重写之类的),由于插件会深入到Mybatis的核心,因此在编写自己的插件前最好了解下它的原理,以便写出安全高效的插件. 代理链的生成 Mybatis支持对Executor.StatementHandler.PameterHandler和ResultSetHandler进行拦截,也就是说会对这4种对象进行代理. 通过查看Configuration类的源代码我们可以看到,每次都

jQuery源码分析--event事件绑定(上)

上文提到,jquery的事件绑定有bind(),delegate()和one()以及live()方式.我用的jQuery2.1.3版本,live()已经被废弃了. bind(),delegate()和one()的内部源码. //7491行 bind: function( types, data, fn ) { return this.on( types, null, data, fn ); }, //7498行 delegate: function( selector, types, data,

spark内存管理器--MemoryManager源码解析

MemoryManager内存管理器 内存管理器可以说是spark内核中最重要的基础模块之一,shuffle时的排序,rdd缓存,展开内存,广播变量,Task运行结果的存储等等,凡是需要使用内存的地方都需要向内存管理器定额申请.我认为内存管理器的主要作用是为了尽可能减小内存溢出的同时提高内存利用率.旧版本的spark的内存管理是静态内存管理器StaticMemoryManager,而新版本(应该是从1.6之后吧,记不清了)则改成了统一内存管理器UnifiedMemoryManager,同一内存管