架构设计:系统间通信(43)——自己动手设计ESB(4)

==============================

接上文《架构设计:系统间通信(42)——自己动手设计ESB(3)

5、Borker Server选择

在本文之前的三篇文章中,我们介绍了自行设计的ESB中间件的顶层设计、介绍了主控服务如何对多个ESB-Brokers动态节点进行日志采集和监控、还介绍了ESB-Broker节点如何进行动态路由定义的加载管理。这篇文章我们主要讨论关于ESB-Client的一些关键设计。

在我们自行设计的ESB中间件中为了保证ESB服务不会成为整个软件系统的瓶颈,我们为ESB服务设计了多个ESB-Broker Server节点共同运行的方式。多个ESB-Client的请求将按照一定的规则被分配到这些ESB-Broker节点上进行处理,并且这些ESB-Client节点将根据为它们服务的ESB-Brokers集群的即时状态动态切换对应的ESB-Broker节点。那么我们要讨论的问题就是:ESB-Client为什么需要这些基础特性以及怎样实现这些基础特性?

我们先来讨论一下最可能的ESB-Client的存在形式。虽然我们知道ESB服务中存在多个ESB-Broker是为了能让多个ESB-Client的请求尽可能平均分配到这些ESB-Broker上,但是一个ESB-Client就代表一个参与ESB集成的业务系统吗?答案是:肯定不是,各个业务系统为了保证处理性能其构建结构也肯定是多个处理节点。也就是说一个业务系统可能包含了多个ESB-Client节点。如下图所示:

上图所示业务系统A和业务系统B都分别有三个运行节点,至于这三个运行节点是否还存在协同动作,以完成某些特别的业务请求处理,我们这里就不讨论了,因为这是业务系统内部的事情并不会影响ESB中间件的设计。这些业务系统的节点在启动后都根据ESB服务运行模块中已有的ESB-Broker节点的状态,选择一个ESB-Broker节点为自己提供服务响应服务。这是我们在正式系统上常见的运行状况,这也从另一个侧面说明为什么在业务系统的顶层设计中,ESB中间件服务容易成为性能瓶颈——并不是每个业务系统只有一个ESB-Client节点请求到ESB-Brokers运行集群

在业务高峰期,运维团队通过主控模块观察到目前拥有三个ESB-Brokers运行节点的模块已经达到性能峰值,于是紧接着启动了一个新的ESB-Broker节点“Broker Server4”。如果这时ESB中间件并没有任何方法对现有的ESB-Client节点和ESB-Broker节点的服务关系进行重新分配,就算再启动多少个新的ESB-Broker节点也不可能减轻ESB服务的运行压力

并且为了保证能够快速缓解ESB-Brokers模块的性能压力,我们所设计的分配方案应该是全动态的。全动态的意思是,只要ESB-Brokers模块发生了变化,所有的ESB-Client就会立刻收到通知并立即为自己重新选择一个新的ESB-Broker Server节点。这里所说的“发生变化”,不止包括ESB-Brokers模块添加了新的ESB-Broker Server节点的情况,也包括前者移除了某个已存在的ESB-Broker Server节点的情况,还包括某个ESB-Broker Server节点虽然没有从ESB-Brokers模块移除,但本身已发生了故障暂时无法提供服务的情况。

5-1、解决过程

从以上描述的硬性设计要求来看,使用zookeeper进行各ESB-Client节点和ESB-Broker节点的状态协调就是一个可行的方案。为此,我们需要对之前文章中已经提到的zookeeper端数据存储结构进行调整。请看如下示意图:

上图中名为“active_System”的Path Node是原有的数据结构,用于描述业务系统信息的。而红圈范围内为新增的数据结构,用于描述当前活动的ESB-Broker节点。请注意这些节点都是临时节点,即是说当ESB-Broker节点完全停止动作后,zookeeper上关于这个ESB-Broker节点的Path Node信息会自动被删除。而原来已经被记录在zookeeper服务端的业务系统信息,被全部移动到“active_System”这个Path Node下面,这样做的原因是避免ESB-Broker将“active_Brokers”误识别为一个业务系统信息。

完成zookeeper上数据结构的调整后,我们就可以对ESB-Client、ESB-Broker和zookeeper三者的协调、调用关系进行细化了:

上图是《架构设计:系统间通信(40)——自己动手设计ESB(1)》这边文章中“ESB顶层设计”描述部分的处理过程细化。详细描述了ESB-Client、ESB-Broker和zookeeper三者的协调处理过程:

  1. 当某个ESB-Broker Server节点完成启动过程后(即完成上文“4-3”小节所描述的过程),需要在zookeeper服务端的“active_Brokers”Path Node下创建一个代表自己存在且工作正常的临时节点。注意一定是临时节点(EPHEMERAL类型的),这样保证当前的ESB-Broker Server节点完全工作时这个临时节点会自动脱落,并将事件通知到所有ESB-Client(上图中所示的第7步或者第9步)。这个处理步骤也代表着前文已经介绍过的ESB-Broker Server节点启动过程需要做相应调整。
  2. 当某个ESB-Client节点启动时,会通过zookeeper的客户端连接到ZK服务端查询当前“active_Brokers”Path Node下已经存在的所有ESB-Broker Server节点信息。这里需要注意,这些ESB-Broker Server节点信息并不一定都是可以使用的,这是因为当ESB-Broker Server节点异常终止时,zookeeper服务端上对应的临时节点可能不会立即被删除,另外还有种可能就是虽然某个ESB-Broker Server节点工作是正常的,但是ESB-Broker Server节点上提供给ESB-Client节点访问的端口却被防火墙阻挡了。
  3. 这些ESB-Broker Server节点的[Data]部分将记录这个节点的工作状态。但是在这一步骤zookeeper服务端会把当前存在的所有ESB-Broker Server节点信息都返回给ESB-Client。后者将基于“取余”的方式,根据当前的ESB-Broker Server节点信息选择一个为自己服务的节点(本文称为“主要的选择过程”)。这个过程很重要,因为选择过程中可能选择到一个“不可用”ESB-Broker Server节点。下文将会详细讲解这个“主要的选择过程”。
  4. 接下来ESB-Client就可以正常访问以上步骤中选择的“可用”ESB-Broker Server节点了。并且在ESB-Client节点启动过程完成的最后一步,ESB-Client节点还会建立针对“active_Brokers”Path Node的监听,以便接收该节点下直接子节点的变化事件。
  5. 当这个ESB-Client节点确定了可以访问的ESB-Broker Server节点,且前者并没有收到任何关于ESB-Broker集群变化的事件通知,则ESB-Client节点和ESB-Broker Server节点的对应关系就不会发生变化。但当ESB-Client节点对ESB-Broker Server节点的访问一旦受阻,则ESB-Client节点就会启动重试过程。
  6. 如果重试过程全部失败,则这个ESB-Client节点将暂时阻断本身对ESB服务的所有请求(直到这个ESB-Client节点对应的ESB-Broker Server信息被更新)。接着ESB-Client节点会主动到zookeeper服务端修改这个ESB-Broker Server节点的状态为“不可用”,这样所有ESB-Client节点都会收到这个事件。
  7. 就如同上一步骤提到的那样当某个ESB-Broker Server节点的可用性发生变化时(变化可能是切换为“可用”状态,也可能是切换为“不可用”状态),所有的ESB-Client节点都将接收到这个事件(包括触发这个事件的ESB-Client节点)。接着,它们都会重新进行ESB-Broker Server节点的选择。
  8. 以上步骤5至步骤7,是由ESB-Client节点发现故障的ESB-Broker Server节点后主动触发“重新进行ESB-Broker Server节点选择”的过程。当正式系统的运维人员主动正常终止某个ESB-Broker Server节点的运行时,这个ESB-Broker Server节点在zookeeper服务上对应的临时节点就会被立即删除,也会触发“重新进行ESB-Broker Server节点选择”的过程。
  9. 由以上情况所触发的ESB-Broker Server节点变化事件,在ESB-Client端的处理过程是可以重用的。

5-2、修改ESB-Broker Server的启动过程

针对上一小节提到的第一个操作步骤,我们需要对之前已经给出的ESB-Broker Server上的BootStartup类进行修改。主要是在其完成正常启动过程后,将要进入“camelContextOperateQueue.take()”队列等待之前,通过ESB-Broker Server上的Curator组件在zookeeper服务端的“active_Brokers”Path Node下创建一个代表本ESB-Broker的临时节点。代码片段如下所示,请注意由于之前已经给出了BootStartup类中的主要代码片段,所以这里只给出在代码中增加的那一部分代码:

/**
 * 我们自己设计的ESB-Broker Server应用程序的主启动器<br>
 * 这个启动器将在启动过程中统一协调zookeeper操作模块和CamelContext上下文。
 * @author yinwenjie
 */
public class BootStartupV2 {
    ......
    /**
     * 主要的启动过程在这里
     */
    @SuppressWarnings("unchecked")
    protected void start() throws Exception {
        /*
         * 启动顺序为:
         *
         * ......
         *
         * 4.1、在active_Brokers节点下,增加代表这个Broker Server节点的临时节点(新增的步骤)
         *
         * ......
         * */

         ......

        // ========================
        // 这里的代码在zookeeper服务上添加代表该Broker Server的临时节点
        // 注意检查“active_Brokers” Path Node的存在性
        // ========================
        // 如果条件成立,说明需要永久创建active_Brokers
        if(zkClient.checkExists().forPath("/active_Brokers") == null) {
            zkClient.create().forPath("/active_Brokers", null);
        }

        // 这里取得一个ESB-Broker的有效ip,以便提供给ESB-Client使用
        // IPChoiceUtils是一个工具类,用于选择一个可用的IP注册到ZK。
        String sourceIp = IPChoiceUtils.getSourceIp();
        // 如果条件成立,说明没有取到任何IP。终止运行!
        if(sourceIp == null) {
            System.exit(-1);
        }

        // 建立临时节点
        zkClient.create().withMode(CreateMode.EPHEMERAL)
        .forPath("/active_Brokers/Broker_" + sourceIp, String.valueOf(true).getBytes());

        ......

    }
    ......
}

以上代码片段中没有交代清楚的就是“如何选择一个IP提供给ESB-Client”。一般情况下,一个物理服务器会有多个IP,例如值为127.0.0.1的回环IP、值为192.168.1.100的内网IP。那么到底将哪一个IP提供给ESB-Client,以便后者进行服务请求呢?这里有两种最直接的处理方式:第一种是ESB-Broker Server提供一个配置文件,并在配置文件中由开发人员/运维人员指定一个IP。这种方式实现方式比较简单,只需要在代码中读取这个配置文件就可以了,但是这种方式的缺点也比较明显:开发人员/运维人员需要为每一个ESB-Broker节点都维护一个配置文件,增加了一定的维护工作量和管理难度。

另一种处理方式是由ESB-Broker Server在启动时自动选择一个IP。这种方式的优点是,在一般情况下不需要开发人员/运维人员专门维护一个调用IP的配置文件,减少了工作人员的部署和维护工作量。但是由于物理服务器可能存在于一个比较复杂的局域网内,所以这种方式不一定能保证程序所选择的IP是ESB-Client最佳的调用IP,甚至可能出现错选的情况。试想一下这样的情况,一个物理服务器有两张网卡,分别指定了两个内网IP地址。但是其中一个内网IP一共走了三层交换机另一个内网IP只经过一层交换机。所以,正式系统中最好的方式是综合以上两种方式:以程序自动识别为主,以人工配置指定为辅。这里给出“IPChoiceUtils”工具类的核心代码:


......

/**
 * IP选择工具,用于在本机多个IP中,选择一个提供给ESB-Client进行访问的IP
 * @author yinwenjie
 */
public class IPChoiceUtils {

    /**
     * 日志
     */
    private static final Log LOGGER = LogFactory.getLog(InterfaceLister.class);
    /**
     * 内网IP范围
     */
    private static final String LOCALIP_RANGES[] = new String[] {
        "10.0.0.0-10.255.255.255",  "172.16.0.0-172.131.255.255" , "192.168.0.0-192.168.255.255"
    };
    /**
     * A、B、C类IP范围,不包括广播IP,但是包括内网IP范围
     */
    private static final String REMOTEIP_RANGES[] = new String[] {
        "1.0.0.1-126.255.255.254" , "128.0.0.1-191.255.255.254" , "192.0.0.1-223.255.255.254"
    };

    /**
     * 该方法用于在本机多个IP中,选择一个提供给ESB-Client进行访问的IP
     * @return
     */
    public static String getSourceIp() {
        Enumeration<NetworkInterface> interfaces = null;
        try {
            interfaces = NetworkInterface.getNetworkInterfaces();
        } catch(SocketException e) {
            LOGGER.error(e.getMessage() , e);
            return null;
        }
        // 首先拿到本机可用的IPV4地址。
        List<String> allIPs = new ArrayList<String>();
        while (interfaces.hasMoreElements()) {
            NetworkInterface networkInterface = interfaces.nextElement();

            // 注意,一个网络设备可能绑定了多个IP,也可能一个IP都没有
            // 还有可能是IPV6的格式
            // 就算是IPV4的格式,也可能是loop形式的ip
            Enumeration<InetAddress> inetAddresss = networkInterface.getInetAddresses();
            while(inetAddresss.hasMoreElements()) {
                InetAddress inetAddress = inetAddresss.nextElement();
                if(!(inetAddress instanceof Inet4Address)
                    || inetAddress.isLoopbackAddress()
                    || inetAddress.isMulticastAddress()) {
                    continue;
                }
                String ip = inetAddress.getHostAddress();
                allIPs.add(ip);
            }
        }  

        // 开始从可用的IP中优先选择一个IP
        // 选择规则为:首先选择一个低位的内网IP。如果内网IP无效,则依次优先选择A类、B类、C类IP
        // 开发人员可以视自己的需求对选择规则进行更改
        for (String sourceIP : allIPs) {
            for (String localip_range : LOCALIP_RANGES) {
                if(ipExistsInRange(sourceIP , localip_range)) {
                    LOGGER.info("======选择到IP:" + sourceIP);
                    return sourceIP;
                }
            }
        }
        for (String sourceIP : allIPs) {
            for (String remoteip_range : REMOTEIP_RANGES) {
                if(ipExistsInRange(sourceIP , remoteip_range)) {
                    LOGGER.info("======选择到IP:" + sourceIP);
                    return sourceIP;
                }
            }
        }
        return null;
    }

    /**
     * 判断给定的IP是否在一个范围内
     * @param ip
     * @param ipSection
     * @return 如果是在一个范围内,则返回true;其它情况返回false
     */
    private static boolean ipExistsInRange(String ip,String ipSection) {
        String[] ipSections = ipSection.split("\\-");
        String beginIP = ipSections[0];
        String endIP = ipSections[1];
        return getIp2long(beginIP) <= getIp2long(ip) && getIp2long(ip) <= getIp2long(endIP);
    }

    /**
     * 这个私有方法将字符串形式的ip,转换为一个对应的长整型。
     * 为什么?请参见IP结构的基础知识
     * 以便进行比较
     * @param ip
     * @return
     */
    private static long getIp2long(String ip) {
        String[] ips = ip.split("\\.");
        long ip2long = 0L;
        for (int i = 0; i < 4; ++i) {
            ip2long = ip2long << 8 | Integer.parseInt(ips[i]);
        }
        return ip2long;
    }
}

5-3、ESB-Client的启动过程

讨论完ESB-Broker Server的变化调整后,我们再来讨论一下ESB-Client为了选择并保持自己和ESB-Broker节点稳定的服务请求需要做的主要工作。由于ESB-Client代表的业务系统在启动时一般都需要完成和业务关联相对紧密的数据初始化工作,所以各种ESB-Client的启动过程是不尽相同的。例如,物流系统在启动时可能需要完成省-市-县地域信息的加载;CRM系统在启动时需要将管理员的基本信息加载到内存;财务系统在启动时需要首先和支付系统建立长连接……那么我们这里要讨论的ESB-Client启动过程,专指ESB-Client代表的业务系统在完成业务相关的启动过程后进行的和ESB中间件相关的启动步骤

ESB-Client主要通过被动监听zookeeper服务下所注册的ESB-Broker临时子节点变化的方式,来知晓ESB-Brokers集群的服务状态变化。这些变化包括:新增了ESB-Broker节点、减少了ESB-Broker节点和某个ESB-Broker节点的状态发生了变化。

当ESB-Client完成了业务部分的启动过程后面,就开始为自己选择可用的ESB-Broker节点。通过主动到zookeeper服务查询当前所有的ESB-Broker节点信息,ESB-Client将可在本地生成一个列表。然后根据这个列表进行取余选择(也可以是加权选择算法)。但是,经过第一次选择得到的ESB-Broker节点并不一定是可用的,该节点的Data区域可能已被其它ESB-Client标识为“false”(即状态失效)。如果遇到这种情况,ESB-Client将通过一个所有ESB-Broker都默认提供的测试地址,重新检查一次该节点的可用性。如果测试成功则ESB-Client会更改ESB-Broker节点的可用性为“true”;如果测试失败,则ESB-Client将把取余的基数+1并重新进行ESB-Broker的选择。

===============================

(接下文)

时间: 2024-10-10 21:00:25

架构设计:系统间通信(43)——自己动手设计ESB(4)的相关文章

架构设计:系统间通信(44)——自己动手设计ESB(5)

(接上文<架构设计:系统间通信(43)--自己动手设计ESB(4)>) 5-4.ESB-Client端的ActiveBrokerContext 本小节开始,我们将按照前文介绍的ESB-Client的核心步骤,一点一点的给出ESB-Client端和ESB-Broker进行交互的核心代码.为了方便在ESB-Client端进行ESB-Broker的交互,我们设计了一个ActiveBrokerContext类.读者可以将这个类理解为"ESB-Broker交互上下文",在ESB-Cl

架构设计:系统间通信(40)——自己动手设计ESB(1)

1.概述 在我开始构思这几篇关于"自己动手设计ESB中间件"的文章时,曾有好几次动过放弃的念头.原因倒不是因为对冗长的文章产生了惰性,而是ESB中所涉及到的技术知识和需要突破的设计难点实在是比较多,再冗长的几篇博文甚至无法对它们全部进行概述,另外如果在思路上稍微有一点差池就会误导读者.一个可以稳定使用的ESB中间件凝聚了一个团队很多参与者的心血,一个人肯定是无法完成这些工作的.但是笔者思索再三,还是下决心将这这即便文章完成,因为这是对本专题从第19篇文章到第39篇文章中所介绍的知识点的

架构设计:系统间通信(22)——提高ActiveMQ工作性能(上)

接上文<架构设计:系统间通信(21)--ActiveMQ的安装与使用> 3.ActiveMQ性能优化思路 上篇文章中的两节内容,主要介绍消息中间件ActiveMQ的安装和基本使用.从上篇文章给出的安装配置和示例代码来看,我们既没有修改ActivieMQ服务节点的任何配置,也没有采用任何的集群方案.这种情况只适合各位读者熟悉ActiveMQ的工作原理和基本操作,但是如果要将ActivieMQ应用在生产环境下,上文中介绍的运行方式远远没有挖掘出它的潜在性能. 根据这个系列文章所陈述的中心思想,系统

架构设计:系统间通信(29)——Kafka及场景应用(中2)

接上文:<架构设计:系统间通信(28)--Kafka及场景应用(中1)> 4-3.复制功能 我们在上文中已经讨论了Kafka使用分区的概念存储消息,一个topic可以有多个分区它们分布在整个Kafka集群的多个Broker服务节点中,并且一条消息只会按照消息生产者的要求进入topic的某一个分区.那么问题来了:如果某个分区中的消息在被消费端Pull之前,承载该分区的Broker服务节点就因为各种异常原因崩溃了,那么在这个Broker重新启动前,消费者就无法收到消息了. 为了解决这个问题,Apa

架构设计:系统间通信(37)——Apache Camel快速入门(中)

========================== (接上文<架构设计:系统间通信(36)--Apache Camel快速入门(上)>) (补上文:Endpoint重要的漏讲内容) 3-1-2.特殊的Endpoint Direct Endpoint Direct用于在两个编排好的路由间实现Exchange消息的连接,上一个路由中由最后一个元素处理完的Exchange对象,将被发送至由Direct连接的下一个路由起始位置(http://camel.apache.org/direct.html)

架构设计:系统间通信(32)——其他消息中间件及场景应用(下2)

(接上文<架构设计:系统间通信(31)--其他消息中间件及场景应用(下1)>) 5-3.解决方案二:改进半侵入式方案 5-3-1.解决方法一的问题所在 方案一并不是最好的半侵入式方案,却容易理解架构师的设计意图:至少做到业务级隔离.方案一最大的优点在于日志采集逻辑和业务处理逻辑彼此隔离,当业务逻辑发生变化的时候,并不会影响日志采集逻辑. 但是我们能为方案一列举的问题却可以远远多于方案一的优点: 需要为不同开发语言分别提供客户端API包.上文中我们介绍的示例使用JAVA语言,于是 事件/日志采集

架构设计:系统间通信(33)——其他消息中间件及场景应用(下3)

=================================== (接上文:<架构设计:系统间通信(32)--其他消息中间件及场景应用(下2)>) 5-7.解决方案三:非侵入式方案 以上两种方案中为了让业务系统能够集成日志采集功能,我们或多或少需要在业务系统端编写一些代码.虽然通过一些代码结构的设计,可以减少甚至完全隔离这些代码和业务代码的耦合度,但是毕竟需要业务开发团队花费精力对这些代码进行维护,业务系统部署时业务对这些代码的配置信息做相应的调整. 这里我们再为读者介绍一种非侵入式的日

架构设计:系统间通信(20)——MQ:消息协议(下)

(接上文<架构设计:系统间通信(19)--MQ:消息协议(上)>) 上篇文章中我们重点讨论了"协议"的重要性,并为各位读者介绍了Stomp协议和XMPP协议.这两种协议是消息队列中两种不同使用场景下的典型代表.本文主要接续上文的篇幅,继续讨论消息队列中另一种典型协议:AMQP协议. 3-3.AMQP协议 AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议).目前AMQP协议的版本为 Version 1.0,这个协议标准

架构设计:系统间通信(15)——服务治理与Dubbo 上篇

1.上篇中"自定义服务治理框架"的问题 在之前的文章中(<架构设计:系统间通信(13)--RPC实例Apache Thrift 下篇(1)>.<架构设计:系统间通信(14)--RPC实例Apache Thrift 下篇(2)>),我们基于服务治理的基本原理,自己实现了一个基于zookeeper + thrift的服务治理框架.但实际上前文中我们自行设计的服务治理框架除了演示基本原理外,并没有多大的实际使用价值,因为还有很多硬性需求是没有实现的: 访问权限:在整个