如何在同一个Java进程中连接多个RocketMQ服务器

前言

我们都知道,RocketMQ在代码级别对连接服务器进行了限制,基本上可以理解为一个JVM进程中只能连接一个NameServer,但实际应用场景中,我们可能会在架构设计层面上对RocketMQ进行了职能上的划分,规定了A服务处理A类消息,而B服务处理B类消息,这时我们应该如何解决这个问题呢?

问题的根源

我们从代码层级来分析到底为什么会产生“一个JVM实例只能连接一个NameServer”。
RocketMQ Client有一个核心类MQClientManager,在我们需要使用MQ Client实例的时候,实际上都是通过它的getAndCreateMQClientInstance方法进行创建的;名称比较拗口,同时是GetCreate,这不太符合我们所说的设计单一性原则,但这不是我们讨论的重点,我们看一看这个方法的实现

    public MQClientInstance getAndCreateMQClientInstance(ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = (MQClientInstance)this.factoryTable.get(clientId);
        if (null == instance) {
            instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = (MQClientInstance)this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }

代码不复杂,我们可以看到它利用客户的配置信息生成一个固定的clientId,以此去缓存factoryTable中查找,不存在才会创建全新一个实例。
那么,可以理解一个clientID仅能存在一个连接实例了,可这个clientId是怎么产生的呢?继续跟踪看看这段代码

    public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }

代码层面上对clientId进行了约定,格式为“[email protected]”格式,当unitName不为空的时候还会在后面加上“@unitName”。

怎么解决?

从代码分析上我们可以知道,为了创建多实例,我们可以

  1. 设定不同的instanceName:

    instanceName从哪来的?

    instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");

    从系统属性中读取出来的,也就是一般在JVM启动时设定的。。。
    可以变吗?当然,你可以通过代码去做到,但这么做的话,你会失去让人理解你代码的能力的,哈哈
    这就是为什么多少RocketMQ Client都只能连接一个服务器的原因了,它根本不考虑服务器是谁,仅关心自己,自私的家伙!

  2. 设定不同的unitName

除此之外还有其它解决方案吗?我仔细从网络上翻了一轮,没看到什么好方法,是大家都没这个场景还是有其它好办法解决了呢?欢迎大家讨论~

方法3

在上一篇博文来自平行世界的救赎里面,我做了个工具sandbox,我提供的方法3就是依托于这个工具。
sandbox通过代码隔离的方式,将另一份类定义放入沙箱中运行,从而实现多个实例完全隔离的效果。
MQClientManager通过缓存方式,以clientId作为key值存储到自身实例当中,为了实现多个Client,那么前两种方法的逻辑是修改clientId实现多个实例,而方法3的逻辑则是“既然你的缓存已经有这个key,我就换个缓存”,本质就是“你这个锅不装我,我就换个锅”。

怎么做?

这里我使用一个springboot项目作为演示案例。
通过springboot的Configuration将多个RocketMQ Client进行注册,再定义一个Controller接收不同请求去发送MQ消息,最后加上启动类。

我们先从pom文件中引入包(我没有推上maven仓库,各位可以从github/gitee上下载),代码如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>me.van</groupId>
    <artifactId>rocket-mq-multi-client-test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>测试多个rocketmq client共存</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <start-class>me.van.App</start-class>
        <java.version>1.8</java.version>
        <lombok.version>1.14.8</lombok.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
        <dependency>
            <groupId>me.van</groupId>
            <artifactId>sandbox</artifactId>
            <version>1.0.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

此处引入了apache的rocketmq-client组件作为mq客户端,也就是存在前面所说的问题的组件。

启动类

package me.van;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

非常的简单,没什么好介绍的。

配置类

package me.van;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MQProducer;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {

    @Bean(autowire = Autowire.BY_NAME, value = "producer")
    MQProducer producer() throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer();
        initProducer(producer, "a.io:9876;b.io:9876");
        return producer;
    }

    @Bean(autowire = Autowire.BY_NAME, value = "producer_sandbox1")
    MQProducer producerSandbox1() throws MQClientException, SandboxCannotCreateObjectException {
        DefaultMQProducer producer = createProducerInSandbox();
        initProducer(producer, "x.io:9876;y.io:9876");
        return producer;
    }

    @Bean(autowire = Autowire.BY_NAME, value = "producer_sandbox2")
    MQProducer producerSandbox2() throws MQClientException, SandboxCannotCreateObjectException {
        DefaultMQProducer producer = createProducerInSandbox();
        initProducer(producer, "1.io:9876;2.io:9876");
        return producer;
    }

    private DefaultMQProducer createProducerInSandbox() throws SandboxCannotCreateObjectException {
        Sandbox sandbox = new Sandbox("org.apache.rocketmq.client");
        return sandbox.createObject(DefaultMQProducer.class);
    }

    private void initProducer(DefaultMQProducer producer, String namesrvAddr) throws MQClientException {
        producer.setNamesrvAddr(namesrvAddr);
        producer.setProducerGroup("test-group");
        producer.setRetryAnotherBrokerWhenNotStoreOK(true);
        producer.start();
    }
}

这里可以看到,producer对象是直接new 出来的DefaultMQProducer,而producer_sandbox1producer_sandbox2是通过不同的沙箱创建出来的;三个client分别连接到不同的NameServer中,同时其它属性保持一致。

Controller

package me.van;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {
    @Autowired
    MQProducer producer;
    @Autowired
    MQProducer producer_sandbox1;
    @Autowired
    MQProducer producer_sandbox2;

    @GetMapping("/")
    public String hello(){
        return "hello world";
    }

    @GetMapping("/send")
    public String send(String msg){
        if(null == msg) return "msg is null";

        String returnMsg = "";
        Message message = new Message("topic-test-multi-mq-client", msg.getBytes());
        try {
            producer.send(message);
            returnMsg += "原生producer发送完成<br/>";

            producer_sandbox1.send(message);
            returnMsg += "第一个沙箱内producer发送完成<br/>";

            producer_sandbox2.send(message);
            returnMsg += "第二个沙箱内producer发送完成<br/>";
        } catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) {
            returnMsg += "发送过程出现异常:" + e.getMessage();
        }
        return returnMsg;
    }
}

通过send方法同时向三个producer发送消息。

测试一下

运行App,等几秒钟启动完毕,访问http://localhost:8080/send,返回

msg is null

访问,http://localhost:8080/send?msg=test

代码地址

github: https://github.com/vancoo/multi-mq-demo
gitee: https://gitee.com/vancoo/multi-mq-demo

原文地址:https://blog.51cto.com/14478649/2429041

时间: 2024-08-01 12:41:11

如何在同一个Java进程中连接多个RocketMQ服务器的相关文章

jps查看java进程中哪个线程在消耗系统资源

jps或ps -ef|grep java可以看到有哪些java进程,这个不用说了.但值得一提的是jps命令是依赖于/tmp下的某些文件 的. 而某些操作系统,定期会清理掉/tmp下的文件,导致jps无法查看到实际存在的java进程.不过jstat, jstack等命令也同样如此,所以当jps列不出进程的时候,这些命令也都不能用了.不在我们此次讨论范围之内. top -p $pid -H  加上-H这个参数后,会列出有哪些线程.这样就可以看到哪个线程id最消耗系统资源了.看到的线程id是10进制的

分析占用了大量 CPU 处理时间的是Java 进程中哪个线程

下面是详细步骤: 1. 首先确定进程的 ID ,可以使用 jps -v 或者 top 命令直接查看 2. 查看该进程中哪个线程占用大量 CPU,执行 top -H -p [PID] 结果如下: 可以发现编号为 350xx 的共有 9 个线程占用了 100% 的 CPU,好,接下来咱们随便取一个线程 ID ,假设我们想看编号为 35053 这个线程. 首先将 35053 转成 16 进制是 88ED (可以用开源中国在线工具转换) 3. 接下来我们将进程中的所有线程输出到一个文件中,执行:jsta

Win下,通过Jstack截取Java进程中的堆栈信息

在Java软件的使用过程中,有时会莫名的出现奇怪的问题.而这些问题常常无法使用日志信息定位,这时我们就需要通过查看进程内部线程的堆栈调用关系来分析问题出在哪里. 举个例子,当我们在做某个操作时,莫名的会弹出多个警告框,其中有些信息是正常的,有些则不是.对于这些错误的警告信息,我们该如何定位是哪个位置的代码出现了错误弹出的框呢? 我们就需要在弹框以后,去查看软件的各个线程,去查找究竟是哪个线程导致了该问题.可是有时因为环境.时间等问题,我们根本不能拿着IDE去调试(你总不能拿着笔记本到客户那里说,

java程序中连接solr并设置参数等

public List getUrlResults(List<String> strList,String serverUrl,pageResult result)throws MalformedURLException, SolrServerException{ HttpSolrServer server=new HttpSolrServer(serverUrl);//serverUrl 表示要连接的网址 List<String> urls=new ArrayList<St

自定义命令杀死 java 进程 alias kjava

alias kjava='ps -ef|grep ProcessName |awk "{print $2}"|xargs kill -9' 上面脚本放在杀JAVA进程中,会出现一些错误警告提示,如果把 aux 改成 ef ,连同SecureCRT也会断开连接 杀死java 进程 ps -ef|grep java |grep -v grep|cut -c 9-15|xargs kill -9 编辑别名 alias kjava='ps -ef|grep java |grep -v grep

Java单例模式中双重检查锁的问题

单例创建模式是一个通用的编程习语.和多线程一起使用时,必需使用某种类型的同步.在努力创建更有效的代码时,Java 程序员们创建了双重检查锁定习语,将其和单例创建模式一起使用,从而限制同步代码量.然而,由于一些不太常见的 Java 内存模型细节的原因,并不能保证这个双重检查锁定习语有效. 它偶尔会失败,而不是总失败.此外,它失败的原因并不明显,还包含 Java 内存模型的一些隐秘细节.这些事实将导致代码失败,原因是双重检查锁定难于跟踪.在本文余下的部分里,我们将详细介绍双重检查锁定习语,从而理解它

Linux系统下如何优雅地关闭Java进程?

资料出处: http://www.sohu.com/a/329564560_700886 https://www.cnblogs.com/nuccch/p/10903162.html 前言 Linux系统下如何kill掉一个后台Java进程,相信童鞋们都知道如何操作.首先使用ps命令查找该Java进程的进程ID,然后使用kill命令进行杀掉.命令如下: (1)ps查进程ID [[email protected] ~]$ ps -ef | grep Test user 2095020809 0 2

VisualVM连接远程Java进程

jstatd是一个RMI(Remove Method Invocation)的server应用,用于监控jvm的创建和结束,并且提供接口让监控工具(如VisualVM)可以远程连接到本机的jvms .注意是jvms,就是说运行jstatd命令后可以用监控工具监控本用户(运行jstatd命令的用户)所有已经启动的java程序. jstatd的安装.启动.连接 JDK中默认就带 jstatd,如下图,Mac 下 JDK 默认安装在 /Library/Java/JavaVirtualMachines/

Java连接MySQL数据库实现用户名密码的验证方法 Java语句中sql查询语句&#39;&#39; &quot;&quot;作用

//方法一,可以验证登录,但方法不实用.package com.swift; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.List; public class Logi