kafka.network.AbstractServerThread中的线程协作机制

这个虚类是kafka.network.Acceptor和kafka.network.Processor的父类,提供了一个抽象的Sever线程。

它的有趣之处在于为子类的启动和停止提供了线程间的协作机制。

当子类的shutdown方法被调用时,子类可以得知自己被停止,在子类做了适当的处理和清理后,调用自己的shutdownComplete方法,使得对子类shutdown方法的调用从阻塞状态返回,从而使调用线程得知子类的对象已经恰当的停止。

即,在另一个线程中要关闭一个AbstractServerThread,可以执行它shutdown方法,当此方法从阻塞中返回,代表它已经恰当的关闭。

同样,对子类的awaitStartup方法调用也会阻塞,直到子类确认自己完全启动,这个方法调用才会返回。

这些功能是通过对CountdownLatch和AtomicBoolean的使用来实现的。

?





1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

private[kafka] abstract
class AbstractServerThread extends
Runnable with Logging {

  protected
val selector = Selector.open();

  private
val startupLatch = new
CountDownLatch(1)

  private
val shutdownLatch = new
CountDownLatch(1)

  private
val alive = new
AtomicBoolean(false)

  /**

   * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete

   */

  def shutdown(): Unit = {

    alive.set(false)

    selector.wakeup()

    shutdownLatch.await

  }

  /**

   * Wait for the thread to completely start up

   */

  def awaitStartup(): Unit = startupLatch.await

  /**

   * Record that the thread startup is complete

   */

  protected
def startupComplete() = {

    alive.set(true)

    startupLatch.countDown

  }

  /**

   * Record that the thread shutdown is complete

   */

  protected
def shutdownComplete() = shutdownLatch.countDown

  /**

   * Is the server still running?

   */

  protected
def isRunning = alive.get

  

  /**

   * Wakeup the thread for selection.

   */

  def wakeup() = selector.wakeup()

  

}

由于它代表了一个Server线程,在其内部使用了java.nio的Selector。所以在shutdown时,需要调用Selector的wakeup方法,使得对Selector的select方法的调用从阻塞中返回。 

继承它的子类必须对isRunning进行判断,来确定自己是否已经被要求关闭。以及在处理关闭请求后,调用shutdownComplete()来确认已完闭完成。

由于Acceptor和Processor的实现太长,这里写了一个例子模拟它们

?





1

2

3

4

5

6

7

8

9

10

11

private
class Processor extends
AbstractServerThread {

  override def run() {

    while(isRunning) {

      println("processor is running")

      //执行一些操作

      Thread.sleep(1000)

    }

    shutdownComplete()

  }

}

  在工作循环中判断isRunning作为退出循环的条件。然后执行shutdownComplete, 这时对Processor
的shutdown方法的调用才会返回。

时间: 2024-10-05 16:25:09

kafka.network.AbstractServerThread中的线程协作机制的相关文章

Java 线程中协作机制

一.生产者.消费者协作机制: 生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从列队上取数据或任务,如果队列长度有限,在队列满的时候,生产者等待,而在队列为空的时候,消费者等待. /** * 使用两个栈 实现队列 * * 生产者,消费者协作模式: 共享变量是一个阻塞队列,当队列满了生产者wait(),当队列为空消费者wait(); * * 阻塞队列有: * 接口:BlockingQueue.BlockingDeque 双端队列 * 基数数组的实现类:ArrayB

浅谈利用同步机制解决Java中的线程安全问题

我们知道大多数程序都不会是单线程程序,单线程程序的功能非常有限,我们假设一下所有的程序都是单线程程序,那么会带来怎样的结果呢?假如淘宝是单线程程序,一直都只能一个一个用户去访问,你要在网上买东西还得等着前面千百万人挑选购买,最后心仪的商品下架或者售空......假如饿了吗是单线程程序,那么一个用户得等前面全国千万个用户点完之后才能进行点餐,那饿了吗就该倒闭了不是吗?以上两个简单的例子,就说明一个程序能进行多线程并发访问的重要性,今天就让我们去了解一下Java中多线程并发访问这个方向吧. **第一

JAVA学习篇--ThreadLocal,Java中特殊的线程绑定机制

在DRP项目中,我们使用了ThreadLocal来创建Connection连接,避免了一直以参数的形式将Connection向下传递(传递connection的目的是由于jdbc事务要求确保使用同一个connection连接).那么ThreadLocal是如果做到的呢?它和同步锁的不同在哪里? 是什么: 对于ThreadLocal看英文单词我们很容易理解为一个线程的本地实现,但是它并不是一个Thread,而是threadlocalvariable(线程局部变量).也许把它命名为ThreadLoc

Java-ThreadLocal,Java中特殊的线程绑定机制

在DRP项目中,我们使用了ThreadLocal来创建Connection连接,避免了一直以参数的形式将Connection向下传递(传递connection的目的是由于jdbc事务要求确保使用同一个connection连接).那么ThreadLocal是如果做到的呢?它和同步锁的不同在哪里? 是什么: 对于ThreadLocal看英文单词我们很容易理解为一个线程的本地实现,但是它并不是一个Thread,而是threadlocalvariable(线程局部变量).也许把它命名为ThreadLoc

Java并发编程:Java中的锁和线程同步机制

锁的基础知识 锁的类型 锁从宏观上分类,只分为两种:悲观锁与乐观锁. 乐观锁 乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,采取在写时先读出当前版本号,然后加锁操作(比较跟上一次的版本号,如果一样则更新),如果失败则要重复读-比较-写的操作.Java中的乐观锁基本都是通过CAS操作实现的,CAS是一种更新的原子操作,比较当前值跟传入值是否一样,一样则更新,否则失败. 悲观

jstack Dump日志文件中的线程状态

jstack Dump 日志文件中的线程状态 dump 文件里,值得关注的线程状态有: 死锁,Deadlock(重点关注)  执行中,Runnable 等待资源,Waiting on condition(重点关注) 等待获取监视器,Waiting on monitor entry(重点关注) 暂停,Suspended 对象等待中,Object.wait() 或 TIMED_WAITING 阻塞,Blocked(重点关注)   停止,Parked 下面我们先从第一个例子开始分析,然后再列出不同线程

C#中的线程(一)入门 转载

文章系参考转载,英文原文网址请参考:http://www.albahari.com/threading/ 转载:http://www.cnblogs.com/miniwiki/archive/2010/06/18/1760540.html#1.1 作者 Joseph Albahari,  翻译 Swanky Wu 中文翻译作者把原文放在了"google 协作"上面,GFW屏蔽,不能访问和查看,因此我根据译文和英文原版整理转载到园子里面. 本系列文章可以算是一本很出色的C#线程手册,思路

C#中的线程(四)高级话题

C#中的线程(四)高级话题 Keywords:C# 线程Source:http://www.albahari.com/threading/Author: Joe AlbahariTranslator: Swanky WuPublished: http://www.cnblogs.com/txw1958/Download:http://www.albahari.info/threading/threading.pdf 第四部分:高级话题 非阻止同步 早些时候,我们讨论了非常简单的赋值和 更新一个字

CUDA学习日志:线程协作与例程

接触CUDA的时间并不长,最开始是在cuda-convnet的代码中接触CUDA代码,当时确实看的比较痛苦.最近得空,在图书馆借了本<GPU高性能编程 CUDA实战>来看看,同时也整理一些博客来加强学习效果. Jeremy Lin 在上篇博文中,我们已经用CUDA C编写了一个程序,知道了如何编写在GPU上并行执行的代码.但是对于并行编程来说,最重要的一个方面就是,并行执行的各个部分如何通过相互协作来解决问题.只有在极少数情况下,各个处理器才不需要了解其他处理器的执行状态而彼此独立地计算出结果