pipeline-filter模式变体之尾循环

pipeline-filter作为一种处理数据的模式(见【POSA】卷4)可以将应用的任务划分为几个自我完备的数据处理步骤,并连接到一个数据管道。本文介绍一种不太常见的pipeline-filter的变体——尾循环的pipeline-filter,当然这也是在特定的需求场景下才会出现的。

首先,我们来看一个常见的pipeline-filter的模式图:

模式的思路比较简单明了,就是对数据的处理步骤进行拆分。然后以统一的编程接口加上递归的方式,将它们串在一起。

最近在写的一个Message broker中处理消息通信的时候也采用了这种模式来切分消息处理步骤。在发送消息的时候这种模式使用得非常的顺畅,因此很自然得在接收消息的时候同样采用了这种模式。我们可以先来简单来看一个发送消息的时候整个pipeline就是像下面这样:

关于名词的说明:本文中谈到的handler可以类比为filter,提到的handler-chain可以类比为pipeline,只是叫法不同。

如果不去多想,接收消息的pipeline也应该跟发送消息类似,只是produce-handler变成了consume-handler。但在代码实现的时候,rabbitmq-java-client的实现方式使得这种模式的运用有些受阻。正常情况下,我们的理解是一个消息的处理或一个消息集合的处理,会穿过一个pipeline,但官方提供的java-client对于接收消息的实现是socket-blocking以等待消息推送到client的(push)方式。因为官方client的这种实现方式,使得外部封装的做法最好是将socket-blocking搬迁到一个独立的EventLoop-Thread上,然后获取到消息之后,解封送并以事件的方式对外提供消息,而客户端在该事件中实现自己的处理逻辑即可,也就是说是一种异步接收的方式,仔细想想也确实应该是这种push的方式。

由此可见在接收消息时的pipeline还是很不同于发送消息的。对接收消息而言,filter分成两个部分,第一部分的多个filter只执行一次(主要是在真正开始接收消息之前,处理一些前置任务,比如权限检查,参数验证等);第二部分的多个filter却要不断得在另外一个EventLoop-Thread上循环执行(因为这些filter涉及到对接收到的message进行处理)。所以,在接收消息时的示意图为:

其中,下面框起来的两个handler是在EventLoop-Thread上循环执行的。

显然,上面用于produce的那种pipeline-filter不能应对这种变化(既无法跨线程也无法在就其中的几个进行循环)。而此时不可能独立得为consume单独实现一套新的pipeline-filter(因为在pipeline-filter的基础设施上,我们已经将produce,consume以及request,response、publish,subscribe等都抽象为消息传输(carry))。因此,我们只能在同一套运行机制上,找到一种办法来满足所有的消息传输方式。

我们的做法是,实现一个“过渡handler”(此处的handler即为filter,只是取名不同而已),并实现handle方法。该handler用于将后续的逻辑过渡到一个独立的EventLoop-Thread上并启动EventLoop-Thread(把传递给当前handler的上下文以及chain对象传递到事件处理线程上去),其后的所有handler都在该线程上循环执行。

其实现代码段如下:

public void handle(@NotNull MessageContext context,
                       @NotNull IHandlerChain chain) {
        if (!context.isSync()) {
            ReceiveEventLoop eventLoop = new ReceiveEventLoop();
            eventLoop.setChain(chain);
            eventLoop.setContext(context);
            eventLoop.setChannelDestroyer(context.getDestroyer());
            eventLoop.setCurrentConsumer((QueueingConsumer) context.getOtherParams().get("consumer"));
            context.setReceiveEventLoop(eventLoop);

            //repeat current handler
            ((MessageCarryHandlerChain) chain).setEnableRepeatBeforeNextHandler(true);

            eventLoop.startEventLoop();
        } else {
            chain.handle(context);
        }
    }

如上,ReceiveEventLoop即为一个独立的EventLoop-Thread,启动之后,对于pipeline发起的线程而言,它启动的本次调用链(handle方法的递归调用)已经结束。因此主线程将会从该调用的触发点向下继续执行

而后续的filter在eventloop线程上独立运行:

public void run() {
    try {
        while (true) {
            QueueingConsumer.Delivery delivery = this.currentConsumer.nextDelivery();

            AMQP.BasicProperties properties = delivery.getProperties();
            byte[] msgBody = delivery.getBody();

            context.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);

            //.....

            this.context.setConsumedMsg(msg);
            this.chain.handle(this.context);
        }
    } catch (InterruptedException e) {
        logger.info("[run] close the consumer‘s message handler!");
    } catch (IOException e) {
        logger.error("[run] occurs a IOException : " + e.getMessage());
        this.shutdown();
    } catch (ConsumerCancelledException e) {
        logger.info("[run] the consumer has been canceled ");
        this.shutdown();
    } catch (Exception e) {
        logger.error("[run] occurs a Exception : " + e.getMessage());
        this.shutdown();
    }

    logger.info("******** thread id " + this.getThreadID() + " quit from message receiver ********");
}

有一个问题:我们必须拿到对EventLoop-Thread的控制权(外部可以在任意时刻关闭该eventloop),而获取其控制权的关键代码就是上上段代码中的:

context.setReceiveEventLoop(eventLoop);

我们将当前EventLoop的实例包装成一个Thread类型的属性,并为其开放了相应的开、关方法,将其控制权丢给外部:

public void startEventLoop() {
      this.currentThread.start();
}

public void shutdown() {
      this.channelDestroyer.destroy(context.getChannel());
      this.currentThread.interrupt();
}

然后在主线程发起接收消息的方法最后会返回一个IReceiverCloser接口的实例,在其接口方法close中调用shutdown,来对其进行关闭:

//launch pipeline
        carry(ctx);

        return new IReceiverCloser() {
            @Override
            public void close() {
                synchronized (this) {
                    if (ctx.getReceiveEventLoop().isAlive()) {
                        ctx.getReceiveEventLoop().shutdown();
                    }
                }
            }
        };

另一个问题,handler-chain是如何知道从某个handler之后转入eventloop线程会开始循环执行?是如何实现的?它来自于第一段代码中的如下这句代码:

((MessageCarryHandlerChain) chain).setEnableRepeatBeforeNextHandler(true);

这句代码会在进入下一个handler之前设置一个“循环执行”的标志。下面看看,我们是如何来改造handlerchain来达到这个目的的,在MessageCarryHandlerChain(它实现了IHandlerChain接口)的实现中,有如下四个变量:

    private List<AbstractHandler> handlerChain;
    private int     pos          = 0;
    private boolean enableRepeat = false;
    private int     repeatPos    = -1;

  • handlerChain:解析并顺序存储每个handler
  • pos:用来记录当前已经执行到哪个handler
  • enableRepeat:用来标识是否启用重复执行handler
  • repeatPos:用于记录重复执行的handler的起始位置

在设置启用enableRepeat属性的时候,会记录当前的位置状态:

public void setEnableRepeatBeforeNextHandler(boolean enableRepeat) {
        this.enableRepeat = enableRepeat;
        if (this.enableRepeat) {
            this.repeatPos = this.pos;
        } else {
            this.repeatPos = Integer.MIN_VALUE;
        }
    }

MessageCarryChain的handle实现,这也是handler串接的核心:

public void handle(MessageContext context) {
        if (this.repeatPos != Integer.MIN_VALUE) {
            if (this.pos < handlerChain.size()) {
                AbstractHandler currentHandler = handlerChain.get(pos++);
                currentHandler.handle(context, this);
            } else if (this.enableRepeat) {
                this.pos = this.repeatPos;
            }
        }
    }

在处理第一条到来的消息时,对应到上面while(true)中的最后一句:

this.chain.handle(this.context);

调用会进入MessageCarryChain的handle方法,并执行:

if (this.pos < handlerChain.size()) {

}

判断分支,在其中会触发下一个handler的handle方法,并一直执行下去直到判断条件不成立之后会执行else逻辑,将之前保存的起始循环的handler的位置置为新的handler的位置。

于是,当下一次,事件循环在上面while(true)中收到新的消息时,会再次执行上面的if判断分支(因为在接收上一条消息时,已将pos重置了,所以If判断条件又重新成立)并以循环位置的handler作为起始,直到走到handlerChain中的最后一个之后,又将当前位置的pos重置为repeatPos保存的位置(注意repeatPos在第一次被设置之后不再改变),如此周而复始。也就在表面上形成了上图所示的尾循环。

为什么说只是表面上形成了呢。因为在表述中,我故意“忽略”了这样一个现实——pipeline-filter模式根本上还是利用了递归来实现的,递归会有个还原点,用于“保护现场”之后再“还原现场”。因此,上面MessageCarryChain中的handle代码段中的:

currentHandler.handle(context, this);

对于每个被执行的handler都是还原点,当第一轮handler执行完成(调用完这句:this.pos = this.repeatPos;)都会在还原点层层回退(执行还原点之后的该方法内部的剩余代码)。因此,在收到第二个消息时,其实是触发了新一轮的handler-chain执行流程,只是因为pos在之前被置为chain中的循环起始位置,而不是从0开始而已。但对于后面尾循环的handler而言,它们递归调用的本质没有改变,所以其实只是看起来在尾部“循环”而已。

说明:其实如果你回顾pipeline-filter模式的本质,它们是用来处理数据的。我这里不管是发送还是接收消息兼顾处理了消息之外的一些逻辑。如果这里只处理消息,其实是可以不用跨线程以及尾循环的。我只是利用了这种模式,将消息通信的各个环节进行拆分,组合,重用从而不可回避得遇到了这个问题,如果回到纯粹的pipeline-filter模式,是不需要这么做的。

如果我没表述清楚的,请直接看代码:Messagebus-Consume

时间: 2025-01-16 22:12:34

pipeline-filter模式变体之尾循环的相关文章

Android Studio下项目构建的Gradle配置及打包应用变体

Gradle简介 ??Gradle是一个自动化构建工具,采用Groovy的Domain Specific Language(领域特定语言)来描述和控制构建逻辑.具有语法简洁.可读性强.配置灵活等特点.基于Intellij IDEA社区版本开发的Android Studio天生支持Gradle构建程序.Groovy是一种基于JVM的敏捷开发语言,结合了Phthon.Ruby和Smalltalk的许多强大特性.同时,Groovy代码既能够与java代码很好地结合,也能够用于扩展现有的代码. Grad

(转) 干货 | 图解LSTM神经网络架构及其11种变体(附论文)

干货 | 图解LSTM神经网络架构及其11种变体(附论文) 2016-10-02 机器之心 选自FastML 作者:Zygmunt Z. 机器之心编译  参与:老红.李亚洲 就像雨季后非洲大草原许多野生溪流分化成的湖泊和水洼,深度学习已经分化成了各种不同的专门架构. 并且,每个架构都会有一个图解,这里将详细介绍它们. 神经网络在概念上很简单,并且它们十分动人.在层级上,有着一堆同质化的元素和统一的单位,并且它们之间还存在在一系列的加权连接.这就是神经网络的所有,至少从理论上来说是这样.然而,时间

5.2 vector: 创建和使用vector的各种变体

Clojure练习-5.组合数据类型 "用100个函数来操作一个数据结构比10个函数操作10个数据结构要好很多.基于一个统一的数据结构,我们可以构建出一系列的小函数,这些小函数又可以组合起来形成一个强大的系统.而为不同的数据结构编写不同的函数,在根本上就削减了复用的可能." -- [ Alan Perlis ] Clojure练习-5组合数据类型 组合数据类型 2 vector 创建和使用vector的各种变体 已知 解1 测试 解2 测试 5. 组合数据类型 5.2 vector:

单例模式变体

/**饿汉模式*/ public class EagerSingleton { private static EagerSingleton eagerSingleton=new EagerSingleton(); private EagerSingleton() { super(); } public static EagerSingleton getInstance(){ return eagerSingleton; }} /* *懒汉模式 *优点: 不使用就不创建, 节省内存空间.  * 缺

[转]Delphi 变体类型(Variant)的介绍(流与变体类型的相互转换、变体类型常用的函数)

Delphi 变体类型(Variant)的介绍(流与变体类型的相互转换.变体类型常用的函数) 一.变体类型常用的函数介绍: Variant: 一种可以拥有各种数据类型: 也可以告诉目前存储的数据是什么类型(通过使用VarType函数): 可以给相同的Variant分配不同的数据类型,只要Variant包含数字值就可以执行算法: variant数组只不过是variant型的数组,不必包含同类型的数据: 1.  VarArrayOf函数:更快捷的创建一维变体数组,可以创建全异的数值数组: funct

Delphi 变体类型(Variant)的介绍(流与变体类型的相互转换、变体类型常用的函数)

来源:http://blog.csdn.net/xiongmao000738/article/details/6863988 一.变体类型常用的函数介绍: Variant: 一种可以拥有各种数据类型: 也可以告诉目前存储的数据是什么类型(通过使用VarType函数): 可以给相同的Variant分配不同的数据类型,只要Variant包含数字值就可以执行算法: variant数组只不过是variant型的数组,不必包含同类型的数据: 1.  VarArrayOf函数:更快捷的创建一维变体数组,可以

构建 build variants 构建变体

官方文档 配置构建变体 Configure build variants 此页面以配置构建概览[Configure your build overview]为基础,向您介绍如何配置构建变体,以便从同一个项目中创建应用的不同版本,以及如何正确地管理依赖项并签署配置[ properly manage your dependencies and signing configurations]. 每个构建变体都代表您可以为应用构建的一个不同版本.例如,您可能希望构建应用的免费版本(只提供有限的内容)和付

从头认识java-18.2 基本的线程机制(6)-使用构造器或者内部类来实现多线程的编码变体

这一章节我们来讨论一下使用构造器或者内部类来实现多线程的编码变体. 1.基础实现 package com.ray.ch17; public class Test { public static void main(String[] args) { Thread thread1 = new ExtendsThread(); thread1.start(); Thread thread2 = new Thread(new ImplRunnable()); thread2.start(); } } c

变体(协变与逆变)

变体的引入是为了提高泛型类型的变量在赋值时可以对类型进行兼容性转换,以扩展泛型的灵活性.下面看个例子: public delegate void DoWork<T>(T arg); ........ DoWork<A> del1=delegate(A arg){//.......}; DoWork<B> del2=del1; B bb=new B(); del2(bb); 其中A ,B是两个类,B类继承A类,即:public class A{.....}