MQTT---HiveMQ源码详解(二)结构与启动


MQTT交流群:221405150


目录结构

在官网中也有更详细的介绍,下面我只对目录结构做一个简单介绍即可,感兴趣的朋友可以参考官网文档.http://www.hivemq.com/docs/hivemq/latest/#installation



bin

包含hivemq.jar以及一些启动脚本

conf

包含config.xml、logback.xml以及plugin的配置文件

examples是一些示例组网场景的示例配置

data

metadata存放版本信息(加密过)

persistence存放着所有持久化信息的文件、以及备份文件。包含client_session_subscriptions、client_sessions、outgoing_message_flow、incomming_message_flow、publish_payloads、queued_messages、retained_messages等。

diagnostics

存放着诊断模式下诊断信息,包括系统信息、网络接口信息、jvm信息、插件信息等等。方便开发者排查问题。

license

存放hivemq授权license文件。

log

存放日志

plugins

第三方插件目录


启动

既然它是一个java程序,那么我们就从它的main方法开始我们的hivemq源码之路。

main


public class HiveMQServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(HiveMQServer.class);
    private final NettyServer nettyServer;
    private final ClusterConfigurationService clusterConfigurationService;
    private final PluginBrokerCallbackHandler pluginBrokerCallbackHandler;
    private final PluginInformationStore pluginInformationStore;
    private final Provider<ClusterJoiner> clusterJoinerProvider;

    @Inject
    HiveMQServer(NettyServer nettyServer,
                 ClusterConfigurationService clusterConfigurationService,
                 PluginBrokerCallbackHandler pluginBrokerCallbackHandler,
                 PluginInformationStore pluginInformationStore,
                 Provider<ClusterJoiner> clusterJoinerProvider) {
        this.nettyServer = nettyServer;
        this.clusterConfigurationService = clusterConfigurationService;
        this.pluginBrokerCallbackHandler = pluginBrokerCallbackHandler;
        this.pluginInformationStore = pluginInformationStore;
        this.clusterJoinerProvider = clusterJoinerProvider;
    }

    public void start() throws InterruptedException, ExecutionException {
        //启动netty server
        this.nettyServer.start().sync();
        //通知OnBrokerStart事件
        fireOnBrokerStart();
        //加入cluster
        joinCluster();
        //启动对应承载在netty上的Listener,并打印出这些Listener启动结果信息。请参考Linstener配置请参考http://www.hivemq.com/docs/hivemq/latest/#configuration-chapter
        ListenableFuture<List<ListenerStartResult>> startFuture = this.nettyServer.startListeners();
        List<ListenerStartResult> startResults = startFuture.get();
        new ListenerStartResultLogger(startResults).log();
    }

    private void joinCluster() {
        //根据配置确定是否加入cluster
        if (!this.clusterConfigurationService.isEnabled()) {
            return;
        }
        try {
        //使用ClusterJoiner类进行连接jgroup,组成cluster。
            ClusterJoiner clusterJoiner = this.clusterJoinerProvider.get();
            ListenableFuture<Void> future = clusterJoiner.join();
            future.get();
        } catch (Exception e) {
            if (e.getCause() instanceof DuplicateOrInvalidLicenseException) {
                LOGGER.error("Found duplicate or invalid license file in the cluster. Shutting down HiveMQ");
            } else if (e.getCause() instanceof DifferentConfigurationException) {
                LOGGER.error("The configuration of this HiveMQ instance is different form the other instances in the cluster. Shutting down HiveMQ");
            } else {
                LOGGER.error("Could not join cluster. Shutting down HiveMQ.", e);
            }
            if (e.getCause() instanceof UnrecoverableException) {
                throw ((UnrecoverableException) e.getCause());
            }
            throw new UnrecoverableException(false);
        }
    }

    //通知对应plugin broker已经启动
    private void fireOnBrokerStart() {
        LOGGER.trace("Calling all OnBrokerStart Callbacks");
        printPluginInformations();
        this.pluginBrokerCallbackHandler.onStart();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        LOGGER.info("Starting HiveMQ Server");
        long startTime = System.nanoTime();
        //初始化SystemInformation,可以通过环境变量来分别设置conf、plugins、log、license等目录。
        //请参考hivemq spi SystemInformation
        LOGGER.trace("Initializing HiveMQ home directory");
        HiveMQSystemInformation systemInformation = new HiveMQSystemInformation(true);
        //创建MetricRegistry
        //请参考开源框架Metrics
        LOGGER.trace("Creating MetricRegistry");
        MetricRegistry metricRegistry = new MetricRegistry();
        //增加统计Listener
        metricRegistry.addListener(new StatisticsListener());
        //初始化日志
        LOGGER.trace("Initializing Logging");
        LogConfigurator.init(systemInformation.getConfigFolder(), metricRegistry);
        //增加未处理异常拦截,并对其进行优雅处理
        LOGGER.trace("Initializing Exception handlers");
        RecoverableExceptionHandler.init();
        //初始化ConfigurationService,并读取conf/config.xml文件,加载用户配置
        //请参考hivemq spi ConfigurationService,
        LOGGER.trace("Initializing configuration");
        HiveMQConfigurationService hiveMQConfigurationService = HiveMQConfigurationServiceFactory.create(systemInformation);
        //创建Clusterid提供者。
        ClusterIdProducer clusterIdProducer = new ClusterIdProducer();
        if (hiveMQConfigurationService.clusterConfiguration().isEnabled()) {
            LOGGER.info("This node‘s cluster-ID is {}", clusterIdProducer.get());
        }
        //根据原有版本,判断是否需要做持久化数据的migration,如需要进行migration,因为可以配置每个数据的使用策略(file/memory),所以每个数据分别进行migration
        LOGGER.trace("Checking for migrations");
        Map<MigrationType, Set<String>> neededMigrations = Migrations.getNeededMigrations(systemInformation);
        Injector injector = null;
        if (neededMigrations.size() > 0) {
            LOGGER.warn("HiveMQ has been updated, migrating persistent data to new version !");
            neededMigrations.keySet().forEach(type -> LOGGER.debug("{} needs to be migrated", type));
            //因为migration也是依赖guice来做容器,所以migration也会创建一个injector
            injector = Bootstrap.createInjector(systemInformation, hiveMQConfigurationService, clusterIdProducer);
            Migrations.start(injector, neededMigrations);
        }
        //升级完成,将升级的最新版本信息,持久化到文件中,以便下次启动进行判断
        Migrations.finish(systemInformation, hiveMQConfigurationService);
        //初始化guice
        LOGGER.trace("Initializing Guice");
        injector = Bootstrap.createInjector(systemInformation, metricRegistry, hiveMQConfigurationService, clusterIdProducer, injector);
        //从guice中获得HiveMQServer实例,并启动它
        HiveMQServer server = injector.getInstance(HiveMQServer.class);
        server.start();
        //对EXodus日志级别做修改
        LogConfigurator.addXodusLogModificator();
        LOGGER.info("Started HiveMQ in {}ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
        //hivemq版本升级检查器,会连接hivemq官网判断是否有新版本升级。可以在配置文件中设置不检查
        UpdateChecker updateChecker = injector.getInstance(UpdateChecker.class);
        updateChecker.start();
    }

    //根据加载出来的所有plugin打印plugin信息
    //请参考hivemq spi @Information
    private void printPluginInformations() {
        Set<PluginInformation> pluginInformations = this.pluginInformationStore.getPluginInformations();
        pluginInformations.forEach(pluginInformation ->
                LOGGER.info("Loaded Plugin {} - v{}", pluginInformation.getName(), pluginInformation.getVersion())
        );
    }
}

Bootstrap & Guice Modules

它是采用Guice作为di框架,那么我们就从Bootstrap开始看它包含了哪些Module以及简单介绍下这些Module主要是注入哪些对应处理代码。


public class Bootstrap {
    private static final Logger LOGGER = LoggerFactory.getLogger(Bootstrap.class);

    public static Injector createInjector(SystemInformation systemInformation, MetricRegistry metricRegistry, HiveMQConfigurationService hiveMQConfigurationService, ClusterIdProducer clusterIdProducer, Injector injector) {
    //根据系统变量判断是否开启诊断模式
        if (!Boolean.parseBoolean(System.getProperty("diagnosticMode"))) {
            LOGGER.trace("Turning Guice stack traces off");
            System.setProperty("guice_include_stack_traces", "OFF");
        }
        //加载所有PluginModule
        //请参考hivemq spi PluginModule
        //后续会专门讲解plugin是如何加载的
        List<PluginModule> pluginModules = new PluginBootstrap().create(systemInformation.getPluginFolder());
        ImmutableList.Builder<AbstractModule> builder = ImmutableList.builder();
        builder.add(
        //系统信息
                new SystemInformationModule(systemInformation),
           //注册cache的生命周期范围
                new ScopeModule(),
                //增加@PostConstruct、@PreDestroy注解处理
                new LifecycleModule(),
                //配置的Module
                new ConfigurationModule(hiveMQConfigurationService, clusterIdProducer),
                //netty所有handler、以及listenser等module
                new NettyModule(),
                //内部module
                new InternalModule(),
                //plugin callback module,主要处理plugin注册cabllback后回调
                new PluginCallbackModule(),
                //为方法增加cache的module
                new MethodCacheModule(),
                //持久化module
                new PersistenceModule(injector),
                //统计的module
                new MetricModule(metricRegistry),
                //流量监控module
                new TrafficShapingModule(),
                //cluster module
                new ClusterModule(),
                //plugin提供service的module
                new ServiceModule(pluginModules),
                //license的解析、验证、限制module
                new LicensingModule(),
                //更新hivemq程序的module
                new UpdateModule(),
                //诊断模式module
                new DiagnosticModule());
        builder.addAll(pluginModules);
        return Guice.createInjector(Stage.PRODUCTION, builder.build());
    }

//创建数据升级的Injector,这个较上面的module加载的少点而已。
    public static Injector createInjector(SystemInformation systemInformation,
                                          HiveMQConfigurationService hiveMQConfigurationService,
                                          ClusterIdProducer clusterIdProducer) {
        ImmutableList.Builder<AbstractModule> builder = ImmutableList.builder();
        builder.add(
                new SystemInformationModule(systemInformation),
                new ConfigurationModule(hiveMQConfigurationService, clusterIdProducer),
                new BridgeModule(),
                new ScopeModule(),
                new LifecycleModule());
        return Guice.createInjector(Stage.PRODUCTION, builder.build());
    }
}
时间: 2024-10-20 22:00:32

MQTT---HiveMQ源码详解(二)结构与启动的相关文章

深入Java基础(四)--哈希表(1)HashMap应用及源码详解

继续深入Java基础系列.今天是研究下哈希表,毕竟我们很多应用层的查找存储框架都是哈希作为它的根数据结构进行封装的嘛. 本系列: (1)深入Java基础(一)--基本数据类型及其包装类 (2)深入Java基础(二)--字符串家族 (3)深入Java基础(三)–集合(1)集合父类以及父接口源码及理解 (4)深入Java基础(三)–集合(2)ArrayList和其继承树源码解析以及其注意事项 文章结构:(1)哈希概述及HashMap应用:(2)HashMap源码分析:(3)再次总结关键点 一.哈希概

Shiro 登录认证源码详解

Shiro 登录认证源码详解 Apache Shiro 是一个强大且灵活的 Java 开源安全框架,拥有登录认证.授权管理.企业级会话管理和加密等功能,相比 Spring Security 来说要更加的简单. 本文主要介绍 Shiro 的登录认证(Authentication)功能,主要从 Shiro 设计的角度去看这个登录认证的过程. 一.Shiro 总览 首先,我们思考整个认证过程的业务逻辑: 获取用户输入的用户名,密码: 从服务器数据源中获取相应的用户名和密码: 判断密码是否匹配,决定是否

Guava Cache源码详解

目录 一.引子 二.使用方法 2.1 CacheBuilder有3种失效重载模式 2.2 测试验证 三.源码剖析 3.1 简介 3.2 源码剖析 四.总结 优点: 缺点: 正文 回到顶部 一.引子 缓存有很多种解决方案,常见的是: 1.存储在内存中 : 内存缓存顾名思义直接存储在JVM内存中,JVM宕机那么内存丢失,读写速度快,但受内存大小的限制,且有丢失数据风险. 2.存储在磁盘中: 即从内存落地并序列化写入磁盘的缓存,持久化在磁盘,读写需要IO效率低,但是安全. 3.内存+磁盘组合方式:这种

Java concurrent AQS 源码详解

一.引言 AQS(同步阻塞队列)是concurrent包下锁机制实现的基础,相信大家在读完本篇博客后会对AQS框架有一个较为清晰的认识 这篇博客主要针对AbstractQueuedSynchronizer的源码进行分析,大致分为三个部分: 静态内部类Node的解析 重要常量以及字段的解析 重要方法的源码详解. 所有的分析仅基于个人的理解,若有不正之处,请谅解和批评指正,不胜感激!!! 二.Node解析 AQS在内部维护了一个同步阻塞队列,下面简称sync queue,该队列的元素即静态内部类No

butterknife源码详解

butterknife源码详解 作为Android开发者,大家肯定都知道大名鼎鼎的butterknife.它大大的提高了开发效率,虽然在很早之前就开始使用它了,但是只知道是通过注解的方式实现的,却一直没有仔细的学习下大牛的代码.最近在学习运行时注解,决定今天来系统的分析下butterknife的实现原理. 如果你之前不了解Annotation,那强烈建议你先看注解使用. 废多看图: 从图中可以很直观的看出它的module结构,以及使用示例代码. 它的目录和我们在注解使用这篇文章中介绍的一样,大体

Android ArrayMap源码详解

尊重原创,转载请标明出处    http://blog.csdn.net/abcdef314159 分析源码之前先来介绍一下ArrayMap的存储结构,ArrayMap数据的存储不同于HashMap和SparseArray,在上一篇<Android SparseArray源码详解>中我们讲到SparseArray是以纯数组的形式存储的,一个数组存储的是key值一个数组存储的是value值,今天我们分析的ArrayMap和SparseArray有点类似,他也是以纯数组的形式存储,不过不同的是他的

《GIS软件ShapMap源码详解及应用》概述

我喜欢GIS二次开发,即使有的人看不起:我不懂开源GIS,只会点商业的GIS,有的人更加瞧不起.我认为,我不能改变现实这个环境,但可以创造一些价值.找到一本<GIS软件ShapMap源码详解及应用>来学习,我倒要看看开源GIS是什么样子. 当前GIS软件有商业GIS系统及开源GIS系统之分.GIS商用软件功能强 大,有完善的技术支持,提供封装好的.功能强大的类库,基于商用GIS库进 行的二次开发效率高.难度低.资源丰富.但对于小型GIS开发人员,商用 GIS价格过高,对于GIS学习者来说,由于

Android编程之Fragment动画加载方法源码详解

上次谈到了Fragment动画加载的异常问题,今天再聊聊它的动画加载loadAnimation的实现源代码: Animation loadAnimation(Fragment fragment, int transit, boolean enter, int transitionStyle) { 接下来具体看一下里面的源码部分,我将一部分一部分的讲解,首先是: Animation animObj = fragment.onCreateAnimation(transit, enter, fragm

Spring IOC源码详解之容器依赖注入

Spring IOC源码详解之容器依赖注入 上一篇博客中介绍了IOC容器的初始化,通过源码分析大致了解了IOC容器初始化的一些知识,先简单回顾下上篇的内容 载入bean定义文件的过程,这个过程是通过BeanDefinitionReader来完成的,其中通过 loadBeanDefinition()来对定义文件进行解析和根据Spring定义的bean规则进行处理 - 事实上和Spring定义的bean规则相关的处理是在BeanDefinitionParserDelegate中完成的,完成这个处理需