netty同时实现http与socket

(1)启动类

package test;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * netty服务器启动类
 * @author songyan
 *
 */
public class HttpProxyServer {

    public static void main(String[] args) throws Exception {
        int LOCAL_PORT = (args.length > 0) ? Integer.parseInt(args[0]) : 5688;// 代理的端口号
        System.out.println("Proxying on port " + LOCAL_PORT);

        // 主从线程组模型
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            // 创建核心类
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

                    // 添加助手类
                    .childHandler(new ServerInitialzer()).bind(LOCAL_PORT).sync().channel().closeFuture().sync();

        } finally {

            // 关闭主从线程
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

(2)初始化类

package test;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 *
 * @author songyan
 * 通用的初始化类
 */
public class ServerInitialzer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        //netty是基于http的,所以要添加http编码器
        pipeline.addLast(new HttpServerCodec());
        //对写大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        //设置单次请求的文件大小上限
        pipeline.addLast(new HttpObjectAggregator(1024*1024*10));
        //websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        //自定义的路由
        pipeline.addLast(new HttpHandler());

    }

}

(3)自定义路由

package test;

import java.time.LocalDateTime;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 自定义的路由 既可以实现http又可以实现socket
 *
 * @author songyan
 *
 */
public class HttpHandler extends SimpleChannelInboundHandler<Object> {
    // 用于记录和管理所有客户端的channle
    private Channel outboundChannel;
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 打开链接
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("websocket::::::::::: active");
        super.channelActive(ctx);
    }

    /**
     * 获取客户端的channle,添加到ChannelGroup中
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("websocket::::::::::: add");
        clients.add(ctx.channel());
    }

    /**
     * 从ChannelGroup中移除channel
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("websocket::::::::::: Removed");
    }

    /**
     * 销毁channel
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("websocket::::::::::: destroyed");
        if (clients != null) {
            closeOnFlush(outboundChannel);
        }
    }

    /**
     * 关闭释放channel
     * @param ch
     */
    static void closeOnFlush(Channel ch) {
        if (ch != null && ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    /**
     * 异常捕获
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.println("出错了");
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 路由
     * 对http,websocket单独处理
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {// 如果是HTTP请求,进行HTTP操作
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {// 如果是Websocket请求,则进行websocket操作
            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    /**
     * 对http请求的处理
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, final FullHttpRequest msg) {
        final Channel inboundChannel = ctx.channel();
        String host = msg.headers().get("Host");
        int port = 80;

        String pattern = "(http://|https://)?([^:]+)(:[\\d]+)?";
        Pattern r = Pattern.compile(pattern);
        Matcher m = r.matcher(host);
        if (m.find()) {
            host = m.group(2);
            port = (m.group(3) == null) ? 80 : Integer.parseInt(m.group(3).substring(1));
        }

        Bootstrap b = new Bootstrap();
        b.group(inboundChannel.eventLoop()) // use inboundChannel thread
                .channel(ctx.channel().getClass()).handler(new BackendHandlerInitializer(inboundChannel));

        ChannelFuture f = b.connect("127.0.0.1", 8015);
        outboundChannel = f.channel();
        msg.retain();
        ChannelFuture channelFuture = f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    outboundChannel.writeAndFlush(msg);
                } else {
                    inboundChannel.close();
                }
            }
        });
    }

    /**
     * 对socket请求的处理
     */
    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) {
        // 获取客户端传输过来的消息
        String content = msg.toString();
        System.out.println("websocket:::  接受到的数据:" + content);
        clients.writeAndFlush(new TextWebSocketFrame("[服务器在]" + LocalDateTime.now() + "接受到消息, 消息为:" + content));

    }

}

原文地址:https://www.cnblogs.com/excellencesy/p/11241063.html

时间: 2024-07-31 18:56:55

netty同时实现http与socket的相关文章

Netty教程在线观看

<物联网核心技术之Netty入门到精通课程> 课程观看地址:http://www.xuetuwuyou.com/course/14 课程大纲: 第一节:Netty入门到精通--物联网行业介绍及前景分析 第二节:Netty入门到精通--网络编程基本概念. 第三节:Netty入门到精通--Socket服务端和客户端编程 第四节:Netty入门到精通--UDP通信 第五节:Netty入门到精通--URL编程以及物联网常用协议TCP UDP HTTP 对比 第六节:Netty入门到精通--Netty介

ElasticSearch的基本用法与集群搭建

ElasticSearch的基本用法与集群搭建 一.简介 ElasticSearch和Solr都是基于Lucene的搜索引擎,不过ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持需要ZooKeeper的支持. 这里有一个详细的ElasticSearch和Solr的对比:http://solr-vs-elasticsearch.com/ 二.基本用法 Elasticsearch集群可以包含多个索引(indices),每一个索

Jetsever开源项目学习(二)架构学习—— Jetserver internal details and how it works.

Jet Server - architecture and internal details This section outlines how data flows through jet server, the important classes and interfaces and also which ones need to be extended by users to write their game logic, receive events and so on. If you

C#高性能TCP服务的多种实现方式

轉載:http://www.cnblogs.com/gaochundong/p/csharp_tcp_service_models.html 本篇文章的主旨是使用 .NET/C# 实现 TCP 高性能服务的不同方式,包括但不限于如下内容: APM 方式,即 Asynchronous Programming Model TAP 方式,即 Task-based Asynchronous Pattern SAEA 方式,即 SocketAsyncEventArgs RIO 方式,即 Registere

NIO+Netty5各种RPC架构实战演练课程|Netty5视频教程

NIO+Netty5各种RPC架构实战演练课程 课程观看地址:http://www.xuetuwuyou.com/course/52 课程出自学途无忧网:http://www.xuetuwuyou.com/ 课程介绍 一.课程使用到的软件及版本: jdk1.7+netty5+spring4+maven3+protobuf+thrift-0.9.3 二.课程设计到的技术点: nio,netty,protobuf,thrift,maven 三.学完课程之后,您可以: ①熟练掌握netty基础技术知识

Java架构的路上必学知识点,你又知道多少?(转)

我把它分为了五大专题 工程化专题 工程化专题 git git安装使用 git日常使用:fetch/pull/push/revert/rebase git分支管理git flow Jenkins多套环境(test/pre/production)系统自动化发布 Jenkins自动发布到远程主机 MavenMaven私服搭建setting.xml文件剖析pom.xml详解Maven实用插件教学(静态代码检查.生成可执行jar包)profile使用 源码分析 源码分析 Spring源码分析 Spring

C# 高性能 TCP 服务的多种实现方式

哎~~ 想想大部分园友应该对 "高性能" 字样更感兴趣,为了吸引眼球所以标题中一定要突出,其实我更喜欢的标题是<猴赛雷,C# 编写 TCP 服务的花样姿势!>. 本篇文章的主旨是使用 .NET/C# 实现 TCP 高性能服务的不同方式,包括但不限于如下内容: APM 方式,即 Asynchronous Programming Model TAP 方式,即 Task-based Asynchronous Pattern SAEA 方式,即 SocketAsyncEventAr

NIO+Netty5各种RPC架构实战演练

课程目录:任务1:课程版权声明任务2: 学习须知任务3: nio简单介绍任务4: Nio基本组件ByteBuffer任务5: ServerSocket任务6: selector与serversocketchannel的使用任务7: SocketChannelSelector任务8: aio编程任务9: netty服务器的快速实现任务10: netty客户端架构实现任务11: netty客户端模拟高并发以及netty5线程模型任务12: netty的编码解码任务13: protobuf简单讲解任务

Elasticsearch Java API简介

加入依赖 我本地的Elasticsearch的版本是2.1.0,因此加入相应的maven依赖 <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.1.0</version> </dependency> 创建Client Elasticsearch Client分为N