Http 调用netty 服务,服务调用客户端,伪同步响应.ProtoBuf 解决粘包,半包问题.

实际情况是: 公司需要开发一个接口给新产品使用,需求如下

1.有一款硬件设备,客户用usb接上电脑就可以,但是此设备功能比较单一,所以开发一个服务器程序,辅助此设备业务功能

2.解决方案,使用Socket调用此设备

3.增强此设备功能,增加Socket客户端连接到Socket服务端

4.Http请求,同步响应

测试注意:

1.nettyServer 在ubuntu下编码,使用Epoll

2.Http请求的测试最好运行再Linux 下进行,因为Windows 可能会因为并发高的时候占满端口限制,HttpClient或者RestTemplate 请求不了.

3.ProtoBuf 插件无论再Windows,还是linux同样适用,在linux 下,会自动下载 protoc-3.5.1-linux-x86_64.exe

简单的流程如下

解决方案:

1.使用Netty框架

2.使用ProtoBuf,配合Netty 对ProtoBuf解决半包问题

3.Future 实现伪同步响应

4.SpringBoot + jetty

pom.xml 添加ProtoBuf依赖以及插件

 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <grpc.version>1.11.0</grpc.version>
        <protobuf.version>3.5.1</protobuf.version>
    </properties>
  <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty</artifactId>
            <version>${grpc.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>${grpc.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>${grpc.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.googlecode.protobuf-java-format</groupId>
            <artifactId>protobuf-java-format</artifactId>
            <version>1.4</version>
        </dependency>

插件

    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.5.0.Final</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}}:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

屏蔽Tomcat 使用 Jetty

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
        </dependency>

编写proto:再/src/main中创建文件夹 proto,创建一个Message.proto

文件内容

syntax = "proto3";
option java_package = "com.lzw.netty";
option java_outer_classname = "MessageProto";
message Message {

    int32 type = 1;

    sfixed64 id = 2;

    string msgBody = 3;

    enum Type {
        ACTIVE = 0;
        MESSAGE = 1;
    }

}

生成java 文件

文件目录,挪到自己需要的包下面

服务端代码

/**
 * User: laizhenwei
 * Date: 2018-03-26 Time: 21:46
 * Description:
 */
public class EchoServer {

    //缓存ResponseFuture
    public static Map<Long, ResponseFuture<MessageProto.Message>> responseFutureMap = new HashMap<>();

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {

        EventLoopGroup bossGroup = new EpollEventLoopGroup(1);
        EventLoopGroup workerGroup = new EpollEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup,workerGroup).channel(EpollServerSocketChannel.class)
                .localAddress(new InetSocketAddress(port))
                .childHandler(new MyServerChannelInitializer());
        try {
            ChannelFuture f = bootstrap.bind().sync();
            //清理不可预知而失败的脏数据
            f.channel().eventLoop().scheduleAtFixedRate(() -> {
                long nowTime = System.currentTimeMillis();
                responseFutureMap.entrySet().stream().filter(e -> (nowTime - e.getValue().getBeginTime()) > 60000).map(e -> e.getKey()).forEach(k->responseFutureMap.remove(k));
            }, 300, 300, TimeUnit.SECONDS);
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
        }
    }
}

ContextHelper缓存ChannelHandlerContext

/**
 * User: laizhenwei
 * Date: 2018-03-26 Time: 21:46
 * Description: 缓存客户端的ChannelHandlerContext
 */
public class ContextHelper {

    private final static Map<String, ChannelHandlerContext> clientMap = new ConcurrentHashMap<>();

    public static Map<String, ChannelHandlerContext> getClientMap() {
        return Collections.unmodifiableMap(clientMap);
    }

    public static ChannelHandlerContext get(String id){
        return clientMap.get(id);
    }

    public static void add(String id, ChannelHandlerContext ctx) {
        clientMap.put(id, ctx);
    }

    public static void remove(String id) {
        clientMap.remove(id);
    }
}

MyServerHandler

/**
 * User: laizhenwei
 * Date: 2018-03-26 Time: 21:46
 * Description:
 */
@Slf4j
@ChannelHandler.Sharable
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProto.Message> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProto.Message msg) {
        String message = msg.getMsgBody();
        if ((MessageProto.Message.Type.ACTIVE_VALUE) == msg.getType()) {
            Attribute<String> attribute = channelHandlerContext.channel().attr(AttributeKey.valueOf("userName"));
            //连接上以后获取消息参数,设置到channelAttr
            String userName = message.split(":")[1];
            attribute.setIfAbsent(userName);
            //缓存channelHandlerContext
            ContextHelper.add(userName, channelHandlerContext);
        } else if (MessageProto.Message.Type.MESSAGE_VALUE == msg.getType()) {
            ResponseFuture<MessageProto.Message> resutl = EchoServer.responseFutureMap.get(msg.getId());
            if (resutl == null)
                log.warn("result is null ! msgId:" + msg.getId());
            MessageProto.Message message1 = MessageProto.Message.newBuilder().setId(msg.getId()).setType(MessageProto.Message.Type.MESSAGE_VALUE).setMsgBody("接收成功!msg:" + message).build();
            resutl.setResponse(message1);
        }
//        System.out.println("Client->Server:" + channelHandlerContext.channel().remoteAddress() + " send " + msg.getMsgBody());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx){
        Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf("userName"));
        ContextHelper.remove(attribute.get());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
    }
}

ChannelInitializer,添加 Netty 支持 ProtoBuf 的拆包处理,以及编码解码

/**
 * User: laizhenwei
 * Date: 2018-03-26 Time: 21:46
 * Description:
 */
public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline()
                .addLast(new ProtobufVarint32FrameDecoder())
                .addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()))
                .addLast(new ProtobufVarint32LengthFieldPrepender())
                .addLast(new ProtobufEncoder())
                .addLast(new MyServerHandler());
    }

}

ResponseFuture

@NoArgsConstructor
public class ResponseFuture<T> implements Future<T> {
    // 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。
    private CountDownLatch latch = new CountDownLatch(1);
    // 响应结果
    private T response;
    // Futrue的请求时间,用于计算Future是否超时
    private long beginTime = System.currentTimeMillis();

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        if (response != null)
            return true;
        return false;
    }

    // 获取响应结果,直到有结果才返回。
    @Override
    public T get() throws InterruptedException {
        latch.await();
        return this.response;
    }

    // 获取响应结果,直到有结果或者超过指定时间就返回。
    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException {
        if (latch.await(timeout, unit))
            return this.response;
        return null;
    }

    // 用于设置响应结果,并且做countDown操作,通知请求线程
    public void setResponse(T response) {
        this.response = response;
        latch.countDown();
    }

    public long getBeginTime() {
        return beginTime;
    }
}

ApplicationStartup SpringBoot 完全启动以后,运行Netty服务

/**
 * User: laizhenwei
 * Date: 2018-03-26 Time: 21:46
 * Description:
 */
@Component
public class ApplicationStartup implements CommandLineRunner {

    @Override
    public void run(String... args) throws Exception {
        new EchoServer(5000).start();
    }
}

客户端 EchoClient

/**
 * User: laizhenwei
 * Date: 2018-03-27 Time: 21:50
 * Description:
 */
public class EchoClient {

    private final String host;

    private final int port;

    public EchoClient(String host,int port){
        this.host = host;
        this.port = port;
    }

    public void start(String userName) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(host,port))
                .handler(new ChannelInitializer<SocketChannel>(){
                    @Override
                    protected void initChannel(SocketChannel socketChannel){
                        socketChannel.attr(AttributeKey.valueOf("userName")).setIfAbsent(userName);
                        socketChannel.pipeline()
                                .addLast(new ProtobufVarint32FrameDecoder())
                                .addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()))
                                .addLast(new ProtobufVarint32LengthFieldPrepender())
                                .addLast(new ProtobufEncoder())
                                .addLast(new MyClientHandler());
                    }
                });

       try {
           ChannelFuture f = b.connect().sync();
           f.channel().closeFuture().sync();
       }finally {
           group.shutdownGracefully().sync();
       }
    }

    public static void main(String[] args){
        threadRun("Athos");
        threadRun("Nero");
        threadRun("Dante");
        threadRun("Vergil");
        threadRun("lzw");
        threadRun("Churchill");
        threadRun("Peter");
        threadRun("Bob");
    }

    private static void threadRun(String userName){
        new Thread(()-> {
            try {
                new EchoClient("192.168.1.8",5000).start(userName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

}

MyClientHandler

/**
 * User: laizhenwei
 * Date: 2018-04-09 Time: 11:20
 * Description:
 */
@ChannelHandler.Sharable
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProto.Message> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        Attribute<Object> attribute = ctx.channel().attr(AttributeKey.valueOf("userName"));
        String m = "userName:" + attribute.get();
        MessageProto.Message.Builder builder = MessageProto.Message.newBuilder();
        builder.setType(MessageProto.Message.Type.ACTIVE_VALUE).setMsgBody(m);
        ctx.writeAndFlush(builder.build());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProto.Message msg) {
        MessageProto.Message.Builder builder = MessageProto.Message.newBuilder();
        //把接收到的消息写回到服务端
        builder.setId(msg.getId()).setType(MessageProto.Message.Type.MESSAGE_VALUE).setMsgBody(msg.getMsgBody());
        channelHandlerContext.channel().writeAndFlush(builder.build());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

JunitTest

    @Test
    public void testRest() throws InterruptedException {
        final Gson gson = new Gson();
        AtomicLong atomicLong = new AtomicLong(0);

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(512);
        executor.setThreadNamePrefix("Executor-");
        executor.setAllowCoreThreadTimeOut(false);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        String[] userNames = {"Athos", "Nero", "Dante"
                , "Vergil", "lzw", "Churchill"
                , "Peter", "Bob"};

//        String[] userNames = {"Athos"};

        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.setAccept(Arrays.asList(MediaType.APPLICATION_JSON_UTF8));
        httpHeaders.add("connection", "keep-alive");
//        httpHeaders.setConnection("close");
        List<CompletableFuture<Boolean>> futures = new ArrayList<>();
        long begin = System.nanoTime();
        Arrays.stream(userNames).forEach(userName -> new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                futures.add(CompletableFuture.supplyAsync(() -> {
                    long currentId = atomicLong.getAndIncrement();
                    MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
                    params.add("userName", userName);
                    params.add("msg", "你好啊!" + currentId);
                    HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, httpHeaders);
                    String response = restTemplate.postForObject("http://192.168.91.130:8010/process", httpEntity, String.class);
                    if (response != null) {
                        Map<String, Object> responseMap;
                        responseMap = gson.fromJson(response, HashMap.class);
                        return responseMap.get("msgBody").equals("接收成功!msg:你好啊!" + currentId);
                    }
                    return false;
                }, executor));
            }
        }).start());

        while(futures.size()!=(100000*userNames.length)){
            TimeUnit.MILLISECONDS.sleep(500);
        }

        List<Boolean> result = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());

        System.out.println((System.nanoTime() - begin) / 1000000);

        result.stream().filter(r -> !r).forEach(r -> System.out.println(r));

    }

1.启动NettyServer

2.启动NettyClient

3.启动N个JunitTest windows 启动5个,Linux 启动5个

看看server输出,从请求到响应非常迅速

Client 多个线程也没有看到输出有false,证明伪同步响应成功

原文地址:https://www.cnblogs.com/sweetchildomine/p/8798493.html

时间: 2024-09-29 23:06:37

Http 调用netty 服务,服务调用客户端,伪同步响应.ProtoBuf 解决粘包,半包问题.的相关文章

wcf第三方客户端与wcf服务之间调用入门

Wcf服务与我们的客户端如何建立联系的呢.本文简单记录一下 1.创建我们的wcf服务程序. 第一个wcf服务库是创建我们的wcf库,运行时会单独来托管我们的程序,而非托管在iis下. 第二个wcf服务应用程序则是托管在iis下的. 1.创建我们的第三方客户端.可以理解为应用方公司的程序,可以的网站,桌面程序,甚至控制台.这里以桌面程序(winform)为例. 2.建立两者间联系 3.1.第一种建立两者间联系的方式如下: 直接右键客户端程序(winform)引用,点击添加服务引用 这个服务地址在哪

搭建基于asp.net的wcf服务,ios客户端调用的实现记录

一.写wcf 问题: 1.特定的格式 2.数据绑定 3.加密解密 二.发布到iis 问题: 1.访问权限问题,添加everyone权限 访问网站时:http://localhost/WebbUploadSample/ZipUpload.aspx “/WebbUploadSample”应用程序中的服务器错误. -------------------------------------------------------------------------------- 访问被拒绝. 说明: 访问服

Spring Cloud 入门教程(六): 用声明式REST客户端Feign调用远端HTTP服务

首先简单解释一下什么是声明式实现? 要做一件事, 需要知道三个要素,where, what, how.即在哪里( where)用什么办法(how)做什么(what).什么时候做(when)我们纳入how的范畴. 1)编程式实现: 每一个要素(where,what,how)都需要用具体代码实现来表示.传统的方式一般都是编程式实现,业务开发者需要关心每一处逻辑 2)声明式实现: 只需要声明在哪里(where )做什么(what),而无需关心如何实现(how).Spring的AOP就是一种声明式实现,

Netty实现服务端客户端长连接通讯及心跳检测

通过netty实现服务端与客户端的长连接通讯,及心跳检测.        基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key.每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可.心跳检测通过IdleEvent 事件,定时向服务端放送Ping消息,检测SocketChannel是否终断.         环境JDK1.8 和netty5      

C#.NET 大型企业信息化系统集成快速开发平台 4.2 版本 - 外部服务调用、内部服务调用优化,面向服务化的

现在的信息系统越来越复杂,越来越庞大,不仅需要内部是一个整体,而且还需要提供很多对外的服务调用. 1:别人如何调用最方便?用不同的开发语言调用.2:服务的返回状态是什么样子的?有利于排查问题.3:服务的安全性.可过渡升级性.性能效率要有保障.4:服务也需要有完整的调用日志记录等. 下面是一个服务调用有效性判断的函数代码,供大家参考. 1 //----------------------------------------------------------------- 2 // All Rig

C#调用WebService服务(动态调用)

原文:C#调用WebService服务(动态调用) 1 创建WebService using System; using System.Web.Services; namespace WebService1 { /// <summary> /// Service1 的摘要说明 /// </summary> [WebService(Namespace = "http://tempuri.org/", Description="测试服务")] [

JAVA与.NET的相互调用——通过Web服务实现相互调用

JAVA与.NET是现今世界竞争激烈的两大开发媒体,两者语言有很多相似的地方.而在很多大型的开发项目里面,往往需要使用两种语言进行集成开发.而很多的开发人员都会偏向于其中一种语言,在使用集成开发的时候对另一种语言感觉到畏惧.在这里在下向各位介绍一下,JAVA与.NET相互调用的例子.下面的介绍主要包括三方面:一是通过常用Web服务进行相互调用,二是使用TCP/IP套接字进行相互调用,三是使用Remote实现远程对象相互调用. 在这章里面先为大家介绍一下最简单,最常用的Web服务相互调用方式.首先

分享一个Android和java调用RESTful Web服务的利器Resting

分享一个Android和java调用RESTful Web服务的利器Resting 当我们调用Web服务,往往是最终目标是取HTTP响应,将其转化为将在应用中呈现的值对象.Resting可以用来实现这一功能.Resting,在Java的一个轻量级的REST框架,可用于调用一个RESTful Web服务,并转换成响应来自客户端应用程序定制的Java对象.由于它的简单,resting是适合Android等手持设备. resting目标?暴露简单的get(),post(),put()和delete()

[Python]webservice 学习(1) -- 简单服务和调用

由于项目中需要用到webservice来做接口,于是花点时间先做知识储备. 开始的时候觉着这个webservice就是一个http请求啊,服务端监听,客户端发送xml报文,然后解析下发送了什么内容,返回响应的数据. 这是百度百科对webservice的定义,一般使用wsdl来描述服务. 后来我的误区就是 wsdl的xml  和 用http 请求组成的xml也就是用soap来请求webservice, 这两种xml为啥不一样... 困惑: 看了些资料以后才明白,wsdl就是你发布的webservi