java netty socket库和自定义C#socket库利用protobuf进行通信完整实例

之前的文章讲述了socket通信的一些基本知识,已经本人自定义的C#版本的socket、和java netty 库的二次封装,但是没有真正的发表测试用例。

本文只是为了讲解利用protobuf 进行C# 和 java的通信。以及完整的实例代码

java 代码 svn 地址,本人开发工具是NetBeans 8.0.2 使用 maven 项目编译

http://code.taobao.org/svn/flynetwork_csharp/trunk/BlogTest

c# 代码 svn 地址 使用的是 vs 2013 .net 4.5

http://code.taobao.org/svn/flynetwork_csharp/trunk/Flynetwork/BlogTest

本文着重以C# socket作为服务器端,java netty作为socket的客户端进行访问通信

首先附上proto的message文件

package Sz.Test.ProtoMessage;

//登陆消息
message TestMessage {

   //消息枚举
    enum Proto_Login {

        ResTip                          = 101201;//服务器推送提示

        ReqLogin                        = 101102;//客户端申请登陆

        ReqChat                        = 101103;//客户端申请聊天消息
        ResChat                        = 101203;//服务器推送聊天消息

    }

    //服务器推送提示 ResTip
    message ResTipMessage {
        required string msg                     = 1;//提示内容
    }

    //客户端申请登陆 ReqLogin
    message ReqLoginMessage {

        required string userName                = 1;//登陆用户名
        required string userPwd                 = 2;//登陆密码

    } 

    //客户端申请登陆 ReqChat
    message ReqChatMessage {
        required string msg                     = 1;//提示内容
    }

    //客户端申请登陆 ResChat
    message ResChatMessage {
        required string msg                     = 1;//提示内容
    }
}

本人编译工具自带生产消息,和对应的handler

先把proto文件编译生产后,放到哪里,然后创建服务器监听代码

上一篇文章讲到由于java和C#默认网络端绪不一样,java是标准端绪大端序,C#使用的小端序。

1             MarshalEndian.JN = MarshalEndian.JavaOrNet.Java;
2             Sz.Network.SocketPool.ListenersBox.Instance.SetParams(new MessagePool(), typeof(MarshalEndian));
3             Sz.Network.SocketPool.ListenersBox.Instance.Start("tcp:*:9527");

所以在我开启服务器监听的时候设置解码器和编码器的解析风格为java

然后建立一个文件chat文件夹用于存放handler文件就是刚才工具生成 目录下的 ExcelSource\protobuf\net\Handler

这一系列文件

 1 if (message.MsgID == (int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ReqLogin)
 2             {
 3                 //构建消息
 4                 Sz.Test.ProtoMessage.TestMessage.ReqLoginMessage loginmessage = new Test.ProtoMessage.TestMessage.ReqLoginMessage();
 5                 object msg = DeSerialize(message.MsgBuffer, loginmessage);
 6                 //构建handler
 7                 Test.ProtoMessage.ReqLoginHandler handler = new Test.ProtoMessage.ReqLoginHandler();
 8                 handler.Session = client;
 9                 handler.Message = loginmessage;
10                 //把handler交给 登录 线程处理
11                 ThreadManager.Instance.AddTask(ServerManager.LoginThreadID, handler);
12             }
13             else if (message.MsgID == (int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ReqChat)
14             {
15                 //构建消息
16                 Sz.Test.ProtoMessage.TestMessage.ReqChatMessage loginmessage = new Test.ProtoMessage.TestMessage.ReqChatMessage();
17                 object msg = DeSerialize(message.MsgBuffer, loginmessage);
18                 //构建handler
19                 Test.ProtoMessage.ReqChatHandler handler = new Test.ProtoMessage.ReqChatHandler();
20                 handler.Session = client;
21                 handler.Message = loginmessage;
22                 //把handler交给 聊天 线程处理
23                 ThreadManager.Instance.AddTask(ServerManager.ChatThreadID, handler);
24             }

收到消息后的处理判断传入的消息id是什么类型的,然后对应反序列化byte[]数组为消息

最后把消息和生成handler移交到对应的线程处理

登录的消息全部交给 LoginThread 线程 去处理 ,这样在真实的运行环境下,能保证单点登录问题;

聊天消息全部交给 ChatThread 线程 去处理 这样的好处是,聊天与登录无关;

收到登录消息的处理

 1     public class ReqLoginHandler : TcpHandler
 2     {
 3         public override void Run()
 4         {
 5
 6             var message = (Sz.Test.ProtoMessage.TestMessage.ReqLoginMessage)this.Message;
 7             Sz.Test.ProtoMessage.TestMessage.ResTipMessage tip = new TestMessage.ResTipMessage();
 8             if (message.userName == "admin" && message.userPwd == "admin")
 9             {
10                 Logger.Debug("收到登录消息 登录完成");
11                 tip.msg = "登录完成";
12             }
13             else
14             {
15                 Logger.Debug("收到登录消息 用户名或者密码错误");
16                 tip.msg = "用户名或者密码错误";
17             }
18             byte[] buffer = MessagePool.Serialize(tip);
19             this.Session.SendMsg(new Network.SocketPool.SocketMessage((int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ResTip, buffer));
20         }
21     }

收到聊天消息的处理

 1     public class ReqChatHandler : TcpHandler
 2     {
 3         public override void Run()
 4         {
 5             var message = (Sz.Test.ProtoMessage.TestMessage.ReqChatMessage)this.Message;
 6             Logger.Debug("收到来自客户端聊天消息:" + message.msg);
 7             Sz.Test.ProtoMessage.TestMessage.ResChatMessage chat = new TestMessage.ResChatMessage();
 8             chat.msg = "服务器广播:" + message.msg;
 9             byte[] buffer = MessagePool.Serialize(chat);
10             this.Session.SendMsg(new Network.SocketPool.SocketMessage((int)Sz.Test.ProtoMessage.TestMessage.Proto_Login.ResChat, buffer));
11         }
12     }

接下来我们构建

java版本基于netty 二次封装的socket客户端

 1 package sz.network.socketpool.nettypool;
 2
 3 import Sz.Test.ProtoMessage.Test.TestMessage;
 4 import com.google.protobuf.InvalidProtocolBufferException;
 5 import io.netty.channel.ChannelHandlerContext;
 6 import java.io.BufferedReader;
 7 import java.io.IOException;
 8 import java.io.InputStreamReader;
 9 import java.util.logging.Level;
10 import org.apache.log4j.Logger;
11
12 /**
13  *
14  * @author Administrator
15  */
16 public class TestClient {
17
18     static final Logger log = Logger.getLogger(TestClient.class);
19     static NettyTcpClient client = null;
20
21     public static void main(String[] args) {
22         client = new NettyTcpClient("127.0.0.1", 9527, true, new NettyMessageHandler() {
23
24             @Override
25             public void channelActive(ChannelHandlerContext session) {
26                 log.info("连接服务器成功:");
27                 //构建错误的登录消息
28                 TestMessage.ReqLoginMessage.Builder newBuilder = TestMessage.ReqLoginMessage.newBuilder();
29                 newBuilder.setUserName("a");
30                 newBuilder.setUserPwd("a");
31                 //发送消息
32                 TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqLogin_VALUE, newBuilder.build().toByteArray()));
33
34                 //构建正确的登录消息
35                 TestMessage.ReqLoginMessage.Builder newBuilder1 = TestMessage.ReqLoginMessage.newBuilder();
36                 newBuilder1.setUserName("admin");
37                 newBuilder1.setUserPwd("admin");
38                 TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqLogin_VALUE, newBuilder1.build().toByteArray()));
39             }
40
41             @Override
42             public void readMessage(NettyMessageBean msg) {
43                 try {
44                     if (msg.getMsgid() == TestMessage.Proto_Login.ResTip_VALUE) {
45                         TestMessage.ResTipMessage tipmessage = TestMessage.ResTipMessage.parseFrom(msg.getMsgbuffer());
46                         log.info("收到提示信息:" + tipmessage.getMsg());
47                     } else if (msg.getMsgid() == TestMessage.Proto_Login.ResChat_VALUE) {
48                         TestMessage.ResChatMessage tipmessage = TestMessage.ResChatMessage.parseFrom(msg.getMsgbuffer());
49                         log.info("收到聊天消息:" + tipmessage.getMsg());
50                     }
51                 } catch (InvalidProtocolBufferException ex) {
52                     log.error("收到消息:" + msg.getMsgid() + " 解析出错:" + ex);
53                 }
54             }
55
56             @Override
57             public void closeSession(ChannelHandlerContext session) {
58                 log.info("连接关闭或者连接不成功:");
59             }
60
61             @Override
62             public void exceptionCaught(ChannelHandlerContext session, Throwable cause) {
63                 log.info("错误:" + cause.toString());
64             }
65         });
66         client.Connect();
67
68         BufferedReader strin = new BufferedReader(new InputStreamReader(System.in));
69         while (true) {
70             try {
71                 String str = strin.readLine();
72                 //构建聊天消息
73                 TestMessage.ReqChatMessage.Builder chatmessage = TestMessage.ReqChatMessage.newBuilder();
74                 chatmessage.setMsg(str);
75                 TestClient.client.sendMsg(new NettyMessageBean(TestMessage.Proto_Login.ReqChat_VALUE, chatmessage.build().toByteArray()));
76             } catch (IOException ex) {
77             }
78         }
79
80     }
81
82 }

接下来我们看看效果

我设置了断线重连功能,我们来测试一下,把服务器关闭

可以看到没3秒向服务器发起一次请求;

知道服务器再次开启链接成功

完整的通信示例演示就完了;

代码我不在上传了,请各位使用svn下载好么????

需要注意的是,消息的解码器和编码器,一定要双方都遵守你自己的契约。比如我在编码消息格式的时候先写入消息包的长度,然后跟上消息的id,再是消息的内容

所以解码的时候,先读取一个消息长度,在读取一个消息id,如果本次收到的消息字节数不够长度那么留存起来以用于下一次收到字节数组追加后再一起解析。

这样就能解决粘包的问题。

附上C#版本的解析器

  1 using System;
  2 using System.Collections.Generic;
  3 using System.IO;
  4 using System.Linq;
  5 using System.Text;
  6 using System.Threading.Tasks;
  7
  8 /**
  9  *
 10  * @author 失足程序员
 11  * @Blog http://www.cnblogs.com/ty408/
 12  * @mail [email protected]
 13  * @phone 13882122019
 14  *
 15  */
 16 namespace Sz.Network.SocketPool
 17 {
 18     public class MarshalEndian : IMarshalEndian
 19     {
 20
 21         public enum JavaOrNet
 22         {
 23             Java,
 24             Net,
 25         }
 26
 27         public MarshalEndian()
 28         {
 29
 30         }
 31
 32         public static JavaOrNet JN = JavaOrNet.Net;
 33
 34         /// <summary>
 35         /// 读取大端序的int
 36         /// </summary>
 37         /// <param name="value"></param>
 38         public int ReadInt(byte[] intbytes)
 39         {
 40             Array.Reverse(intbytes);
 41             return BitConverter.ToInt32(intbytes, 0);
 42         }
 43
 44         /// <summary>
 45         /// 写入大端序的int
 46         /// </summary>
 47         /// <param name="value"></param>
 48         public byte[] WriterInt(int value)
 49         {
 50             byte[] bs = BitConverter.GetBytes(value);
 51             Array.Reverse(bs);
 52             return bs;
 53         }
 54
 55         //用于存储剩余未解析的字节数
 56         private List<byte> _LBuff = new List<byte>(2);
 57
 58         //字节数常量一个消息id4个字节
 59         const long ConstLenght = 4L;
 60
 61         public void Dispose()
 62         {
 63             this.Dispose(true);
 64             GC.SuppressFinalize(this);
 65         }
 66
 67         protected virtual void Dispose(bool flag1)
 68         {
 69             if (flag1)
 70             {
 71                 IDisposable disposable = this._LBuff as IDisposable;
 72                 if (disposable != null) { disposable.Dispose(); }
 73             }
 74         }
 75
 76         public byte[] Encoder(SocketMessage msg)
 77         {
 78             MemoryStream ms = new MemoryStream();
 79             BinaryWriter bw = new BinaryWriter(ms, UTF8Encoding.Default);
 80             byte[] msgBuffer = msg.MsgBuffer;
 81
 82             if (msgBuffer != null)
 83             {
 84                 switch (JN)
 85                 {
 86                     case JavaOrNet.Java:
 87                         bw.Write(WriterInt(msgBuffer.Length + 4));
 88                         bw.Write(WriterInt(msg.MsgID));
 89                         break;
 90                     case JavaOrNet.Net:
 91                         bw.Write((Int32)(msgBuffer.Length + 4));
 92                         bw.Write(msg.MsgID);
 93                         break;
 94                 }
 95
 96                 bw.Write(msgBuffer);
 97             }
 98             else
 99             {
100                 switch (JN)
101                 {
102                     case JavaOrNet.Java:
103                         bw.Write(WriterInt(0));
104                         break;
105                     case JavaOrNet.Net:
106                         bw.Write((Int32)0);
107                         break;
108                 }
109             }
110             bw.Close();
111             ms.Close();
112             bw.Dispose();
113             ms.Dispose();
114             return ms.ToArray();
115         }
116
117         public List<SocketMessage> Decoder(byte[] buff, int len)
118         {
119             //拷贝本次的有效字节
120             byte[] _b = new byte[len];
121             Array.Copy(buff, 0, _b, 0, _b.Length);
122             buff = _b;
123             if (this._LBuff.Count > 0)
124             {
125                 //拷贝之前遗留的字节
126                 this._LBuff.AddRange(_b);
127                 buff = this._LBuff.ToArray();
128                 this._LBuff.Clear();
129                 this._LBuff = new List<byte>(2);
130             }
131             List<SocketMessage> list = new List<SocketMessage>();
132             MemoryStream ms = new MemoryStream(buff);
133             BinaryReader buffers = new BinaryReader(ms, UTF8Encoding.Default);
134             try
135             {
136                 byte[] _buff;
137             Label_0073:
138                 //判断本次解析的字节是否满足常量字节数
139                 if ((buffers.BaseStream.Length - buffers.BaseStream.Position) < ConstLenght)
140                 {
141                     _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position));
142                     this._LBuff.AddRange(_buff);
143                 }
144                 else
145                 {
146                     long offset = 0;
147                     switch (JN)
148                     {
149                         case JavaOrNet.Java:
150                             offset = ReadInt(buffers.ReadBytes(4));
151                             break;
152                         case JavaOrNet.Net:
153                             offset = buffers.ReadInt32();
154                             break;
155                     }
156
157                     //剩余字节数大于本次需要读取的字节数
158                     if (offset <= (buffers.BaseStream.Length - buffers.BaseStream.Position))
159                     {
160                         int msgID = 0;
161                         switch (JN)
162                         {
163                             case JavaOrNet.Java:
164                                 msgID = ReadInt(buffers.ReadBytes(4));
165                                 break;
166                             case JavaOrNet.Net:
167                                 msgID = buffers.ReadInt32();
168                                 break;
169                         }
170                         _buff = buffers.ReadBytes((int)(offset - 4));
171                         list.Add(new SocketMessage(msgID, _buff));
172                         goto Label_0073;
173                     }
174                     else
175                     {
176                         //剩余字节数刚好小于本次读取的字节数 存起来,等待接受剩余字节数一起解析
177                         buffers.BaseStream.Seek(ConstLenght, SeekOrigin.Current);
178                         _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position));
179                         this._LBuff.AddRange(_buff);
180                     }
181                 }
182             }
183             catch { }
184             finally
185             {
186                 buffers.Close();
187                 if (buffers != null) { buffers.Dispose(); }
188                 ms.Close();
189                 if (ms != null) { ms.Dispose(); }
190             }
191             return list;
192         }
193     }
194 }

谢谢观赏~!

时间: 2024-11-06 23:39:11

java netty socket库和自定义C#socket库利用protobuf进行通信完整实例的相关文章

Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)

转载请注明出处:http://blog.csdn.net/anxpp/article/details/51512200,谢谢! 本文会从传统的BIO到NIO再到AIO自浅至深介绍,并附上完整的代码讲解. 下面代码中会使用这样一个例子:客户端发送一段算式的字符串到服务器,服务器计算后返回结果到客户端. 代码的所有说明,都直接作为注释,嵌入到代码中,看代码时就能更容易理解,代码中会用到一个计算结果的工具类,见文章代码部分. 相关的基础知识文章推荐: Linux 网络 I/O 模型简介(图文) Jav

(转)Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)

原文出自:http://blog.csdn.net/anxpp/article/details/51512200 1.BIO编程 1.1.传统的BIO编程 网络编程的基本模型是C/S模型,即两个进程间的通信. 服务端提供IP和监听端口,客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信. 传统的同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口:Socket负责发起连接操作.连接成功后,双方通过输入和输出流进

ClientAbortException: java.net.SocketException: Software caused connection abort: socket write erro

1.错误描述 ClientAbortException: java.net.SocketException: Software caused connection abort: socket write error at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.java:407) at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteCh

Caused by: java.net.SocketException: Software caused connection abort: socket write error

1.错误描述 [ERROR:]2015-10-16 22:28:39,964 [异常拦截] exception.ExceptionHandler ClientAbortException: java.net.SocketException: Software caused connection abort: socket write error at org.apache.catalina.connector.OutputBuffer.realWriteBytes(OutputBuffer.ja

Java从零开始学四十五(Socket编程基础)

一.网络编程中两个主要的问题 一个是如何准确的定位网络上一台或多台主机,另一个就是找到主机后如何可靠高效的进行数据传输. 在TCP/IP协议中IP层主要负责网络主机的定位,数据传输的路由,由IP地址可以唯一地确定Internet上的一台主机. 而TCP层则提供面向应用的可靠(tcp)的或非可靠(UDP)的数据传输机制,这是网络编程的主要对象,一般不需要关心IP层是如何处理数据的. 目前较为流行的网络编程模型是客户机/服务器(C/S)结构.即通信双方一方作为服务器等待客户提出请求并予以响应.客户则

eclipse的TestNG报错java.net.SocketException: Software caused connection abort: socket write error&quot;

最近整了一个自动化项目,需要用到testng,安装的最新版本6.11.0老是提示错误java.net.SocketException: Software caused connection abort: socket write error" 百度了半天终于解决了,具体方法如下: 1,卸载eclipse中原安装的6.11.0版本 2,安装指定的版本6.8.22: 输入指定的版本地址安装.http://beust.com/eclipse-old/eclipse_6.8.22.20150507032

java中所有的通信都是socket

java中所有的通信都是socket 我目前知道的java几种通信方式: 1.socket+serverSocket通信. 2.NIO 服务端与客户端使用的还是Socket. 3.servlet :是由tomcat这种web容器解析的, 过程:tomcat的serverSocket接收到客户端的请求后根据URL和web.xml映射到servlet. 4. Hessian 使用的是servlet方式: 1 <servlet> 2 <servlet-name>remote-servic

JAVA与网络开发(TCP:Socket、ServerSocket;UDP:DatagramSocket、DatagramPacket;多线程的C/S通讯、RMI开发概述)

通过TCP建立可靠通讯信道 1)为了对应TCP协议里的客户端和服务器端,Socket包提供了Socket类和ServerSocket类. 2)Socket类构造函数及相关方法 Public Socket(); public Socket(InetAddress address,int port);//本机IP和端口 public Socket(Striing host,int port);//本机IP和端口 public void connect(SocketAddress endpoint);

testNG java.net.SocketException: Software caused connection abort: socket write error

执行用例报错,提示 java.net.SocketException: Software caused connection abort: socket write error java.net.SocketException: Software caused connection abort: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutpu