netty+mqtt

package io.mqtt.server;

import io.mqtt.tool.ConfigService;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Log4JLoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class Server {
    private static final InternalLogger logger = InternalLoggerFactory
            .getInstance(Server.class);

private int port;
    //    private final int port = ConfigService.getIntProperty("tcp.port", 1883);
    private final int httpPort = ConfigService
            .getIntProperty("http.port", 8080);

private List<Channel> channels = new ArrayList<Channel>();
    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workerGroup = new NioEventLoopGroup();

public Server(int port) {
        this.port = port;
    }

private ServerBootstrap getDefaultServerBootstrap() {
        ServerBootstrap server = new ServerBootstrap();
        server.group(bossGroup, workerGroup)
                .option(ChannelOption.SO_BACKLOG, 1000)
                .option(ChannelOption.TCP_NODELAY, true)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        return server;
    }

public ChannelFuture run() throws Exception {
        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());

Channel channle = getDefaultServerBootstrap()
                .childHandler(new TcpChannelInitializer()).bind(port).sync()
                .channel();
        channels.add(channle);

logger.info("mqtt.io tcp server started at port " + port + ‘.‘);

ChannelFuture future = getDefaultServerBootstrap().childHandler(
                new HttpChannelInitializer()).bind(httpPort);

Channel httpChannel = future.sync().channel();
        channels.add(httpChannel);

logger.info("mqtt.io websocket server started at port " + httpPort
                + ‘.‘);

return future;
    }

public void destroy() {
        logger.info("destroy mqtt.io server ...");
        for (Channel channel : channels) {
            channel.close();
        }
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

public static void main(String[] args) throws Exception {
//        for (int i = 0; i < 5; i++) {
            new ServerThread(65432 + (0 * 2)).start();
//        }
    }

}

package io.mqtt.handler;

import io.mqtt.processer.ConnectProcesser;
import io.mqtt.processer.DisConnectProcesser;
import io.mqtt.processer.PingReqProcesser;
import io.mqtt.processer.PublishProcesser;
import io.mqtt.processer.SubscribeProcesser;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import io.mqtt.processer.*;
import org.meqantt.message.ConnAckMessage;
import org.meqantt.message.ConnAckMessage.ConnectionStatus;
import org.meqantt.message.DisconnectMessage;
import org.meqantt.message.Message;
import org.meqantt.message.Message.Type;
import org.meqantt.message.PingRespMessage;

public class MqttMessageHandler extends ChannelInboundHandlerAdapter {
    private static PingRespMessage PINGRESP = new PingRespMessage();

private static final Map<Message.Type, Processer> processers;
    static {
        Map<Message.Type, Processer> map = new HashMap<Message.Type, Processer>(
                6);

map.put(Type.CONNECT,  new ConnectProcesser());
        map.put(Type.PUBLISH,  new PublishProcesser());
        map.put(Type.SUBSCRIBE, (Processer) new SubscribeProcesser());
        map.put(Type.UNSUBSCRIBE, (Processer) new UnsubscribeProcesser());
        map.put(Type.PINGREQ, new PingReqProcesser());
        map.put(Type.DISCONNECT, (Processer) new DisConnectProcesser());

processers = Collections.unmodifiableMap(map);
    }

@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e)
            throws Exception {
        try {
            if (e.getCause() instanceof ReadTimeoutException) {
                ctx.write(PINGRESP).addListener(
                        ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                ctx.channel().close();
            }
        } catch (Throwable t) {
            t.printStackTrace();
            ctx.channel().close();
        }

e.printStackTrace();
    }

@Override
    public void channelRead(ChannelHandlerContext ctx, Object obj)
            throws Exception {
        //MQTT MESSAGE
        Message msg = (Message) obj;
        // server收到clinet 的MQTT数据包,并获取MQTT的消息类型
        Processer p = processers.get(msg.getType());
        if (p == null) {
            return;
        }
        //根据特定消息类型解析消息包
        Message rmsg = p.proc(msg, ctx);
        if (rmsg == null) {
            return;
        }
        //根据消息处理结果,向clinet做出回应
        if (rmsg instanceof ConnAckMessage
                && ((ConnAckMessage) rmsg).getStatus() != ConnectionStatus.ACCEPTED) {
            ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE);
        } else if (rmsg instanceof DisconnectMessage) {
            ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE);
        } else {
            ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    }

@Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

//client

package com.test.client;

import org.eclipse.paho.client.mqttv3.*;

public class SubscribeMessage implements MqttCallback {

private MqttClient client;

public SubscribeMessage() {
    }

public static void main(String[] args) {
//        String tcpUrl = "tcp://127.0.0.1:1883";
//        String clientId = "sub-msg/client1";
//        String topicName = "sub/client1";
//
//        new SubscribeMessage().doDemo(tcpUrl, clientId, topicName);
//        for (int j = 0; j < 5; j++) {
            for (int i = 0; i < 10000; i++) {
        new SubscribeThread("client_" + 0 + i, "tcp://127.0.0.1:" + (65432 + 0 * 2)).start();
            }
//        }

}

public void doDemo(String tcpUrl, String clientId, String topicName) {
        try {
            client = new MqttClient(tcpUrl, clientId);
            MqttConnectOptions mqcConf = new MqttConnectOptions();
            mqcConf.setConnectionTimeout(300);
            mqcConf.setKeepAliveInterval(1000);
            client.connect(mqcConf);
            client.setCallback(this);
            client.subscribe(topicName);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

public void connectionLost(Throwable cause) {
        cause.printStackTrace();
    }

public void messageArrived(String topic, MqttMessage message)
            throws Exception {
        System.out.println("[GOT PUBLISH MESSAGE] : " + message);
    }

public void deliveryComplete(IMqttDeliveryToken token) {
    }
}

时间: 2024-10-20 21:36:09

netty+mqtt的相关文章

tcp 高性能服务, netty,mqtt

1. io 线程不要有比较长的服务. 全部异步化. [1] netty 权威指南上只是说业务复杂时派发到业务线程池种. 共用的线程池最好都轻量. 多层线程池后, 下层的可以进行隔离. 这个是 mqtt 的一大特点. 避免某个任务太重,影响了其他亲量级服务. 2. new socket 可以传入域名,但是只取第一个 ip,链接 3. 物联网 的特点是大量的长连接,等待消息推送. 所以首先要不同的机器去承接连接. 故客户端需要知道去连接的 ip. 目前没有http 协议支持域名,短连接.  请求/响

lyzhangxiang - LAN8720+SIM800工业物联网RTU

http://www.amobbs.com/thread-5660865-1-1.html RTU/DTU这种产品好像2000年左右就有了,淘宝上搜索一下基本上200左右的很多,这也是一个做烂了的东西了.工作之后接触硬件相关的工作就很少了,不过看到各种硬件的板子还是有点小兴奋,看来还是很喜欢折腾这些的,自己画个图焊接个板子,真的还挺开心.所以打算自己弄一点硬件玩玩纯粹当爱好了.关于这个东西需要有什么功能,还没有想好不过支持MQTT协议是一定要的,后台自己也能写Java还算熟悉,采用NETTY +

Netty实现高性能IOT服务器(Groza)之手撕MQTT协议篇上

前言 诞生及优势 MQTT由Andy Stanford-Clark(IBM)和Arlen Nipper(Eurotech,现为Cirrus Link)于1999年开发,用于监测穿越沙漠的石油管道.目标是拥有一个带宽有效且使用很少电池电量的协议,因为这些设备是通过卫星链路连接的,当时这种设备非常昂贵. 与HTTP及其请求/响应范例相比,该协议使用发布/订阅体系结构.发布/订阅是事件驱动的,可以将消息推送到客户端.中央通信点是MQTT代理,它负责调度发送者和合法接收者之间的所有消息.向代理发布消息的

基于Netty的IdleStateHandler实现Mqtt心跳

基于Netty的IdleStateHandler实现Mqtt心跳 IdleStateHandler解析 最近研究jetlinks编写的基于Netty的mqtt-client(https://github.com/jetlinks/netty-mqtt-client),总结若干知识点. Netty中,实现心跳机制较为简单,主要依赖于IdleStateHandler判断channel的读写超时. /** * Creates a new instance firing {@link IdleState

【netty】Netty系列之Netty百万级推送服务设计要点

1. 背景 1.1. 话题来源 最近很多从事移动互联网和物联网开发的同学给我发邮件或者微博私信我,咨询推送服务相关的问题.问题五花八门,在帮助大家答疑解惑的过程中,我也对问题进行了总结,大概可以归纳为如下几类: Netty是否可以做推送服务器? 如果使用Netty开发推送服务,一个服务器最多可以支撑多少个客户端? 使用Netty开发推送服务遇到的各种技术问题. 由于咨询者众多,关注点也比较集中,我希望通过本文的案例分析和对推送服务设计要点的总结,帮助大家在实际工作中少走弯路. 1.2. 推送服务

Netty版本升级血泪史之线程篇

1. 背景 1.1. Netty 3.X系列版本现状 根据对Netty社区部分用户的调查,结合Netty在其它开源项目中的使用情况,我们可以看出目前Netty商用的主流版本集中在3.X和4.X上,其中以Netty 3.X系列版本使用最为广泛. Netty社区非常活跃,3.X系列版本从2011年2月7日发布的netty-3.2.4 Final版本到2014年12月17日发布的netty-3.10.0 Final版本,版本跨度达3年多,期间共推出了61个Final版本. 1.2. 升级还是坚守老版本

Netty系列之Netty百万级推送服务设计要点

原文:http://www.infoq.com/cn/articles/netty-million-level-push-service-design-points 1. 背景 1.1. 话题来源 最近很多从事移动互联网和物联网开发的同学给我发邮件或者微博私信我,咨询推送服务相关的问题.问题五花八门,在帮助大家答疑解惑的过程中,我也对问题进行了总结,大概可以归纳为如下几类: Netty是否可以做推送服务器? 如果使用Netty开发推送服务,一个服务器最多可以支撑多少个客户端? 使用Netty开发

Netty系列之Netty百万级推送服务设计要点(转)

1. 背景 1.1. 话题来源 最近很多从事移动互联网和物联网开发的同学给我发邮件或者微博私信我,咨询推送服务相关的问题.问题五花八门,在帮助大家答疑解惑的过程中,我也对问题进行了总结,大概可以归纳为如下几类: Netty是否可以做推送服务器? 如果使用Netty开发推送服务,一个服务器最多可以支撑多少个客户端? 使用Netty开发推送服务遇到的各种技术问题. 由于咨询者众多,关注点也比较集中,我希望通过本文的案例分析和对推送服务设计要点的总结,帮助大家在实际工作中少走弯路. 1.2. 推送服务

Android 基于Netty的消息推送方案之Hello World(一)

消息推送方案(轮询.长连接) 轮询 轮询:比较简单的,最容易理解和实现的就是客户端去服务器上拉信息,信息的及时性要求越高则拉信息的频率越高.客户端拉信息的触发可以是一些事件,也可以是一个定时器,不断地去查询服务器.所以这个方案的弊端也是显而易见的,在轮询的频率较高时,服务器端的压力很大,通讯的流量也很大,并且大部分时间都是做的无用功. 长连接 长连接:客户端和服务端维持一个长连接,服务端在有信息推送的时候,借助这个连接把信息发送到客户端.这个方案的优点是信息推送的及时性很高,基本是实时的,并且除