MessageReceiver

class MessageReceiver
    {
        private RelayEngine<MessageCollection> _MessageRelayEngine;
        private string _Hostname;
        private int _MessageDispatchServerPort;
        private string _SessionId;
        private TcpClient _Client;
        private bool _Stopped = false;
        private Thread _Worker = null;
        private ulong _LastMessageSequence = 0;

        internal MessageReceiver(RelayEngine<MessageCollection> messageRelayEngine)
        {
            this._MessageRelayEngine = messageRelayEngine;
        }

        internal void Start(string hostname, int messageDispatchServerPort, string sessionId)
        {
            this.Stop();

            this._Stopped = false;

            this._Hostname = hostname;
            this._MessageDispatchServerPort = messageDispatchServerPort;
            this._SessionId = sessionId;

            this._Worker = new Thread(ConnectServerAndReceiveMessage);
            this._Worker.IsBackground = true;
            this._Worker.Start();
        }

        internal void SetSessionId(string newSessionId)
        {
            this._SessionId = newSessionId;
        }

        private void ConnectServerAndReceiveMessage(object state)
        {
            byte[] header = new byte[4];
            int defaultBufferLen = 1024 * 64;
            byte[] defaultBuffer = new byte[defaultBufferLen];

            while (!this._Stopped)
            {
                this._Client = null;
                NetworkStream stream = null;

                try
                {
                    Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage to connect {0}:{1}", _Hostname, _MessageDispatchServerPort);

                    this._Client = new TcpClient();
                    this._Client.Connect(_Hostname, _MessageDispatchServerPort);
                    stream = this._Client.GetStream();

                    byte[] sessionData = ASCIIEncoding.ASCII.GetBytes(this._SessionId);
                    byte[] sessionDataLen = new byte[2] { (byte)(sessionData.Length >> 8), (byte)sessionData.Length };
                    stream.Write(sessionDataLen, 0, sessionDataLen.Length);
                    stream.Write(sessionData, 0, sessionData.Length);
                }
                catch (Exception ex)
                {
                    Logger.TraceEvent(TraceEventType.Error, "MessageReceiver.ConnectServerAndReceiveMessage error:\r\n{0}", ex);
                    this.CloseConectionSilently();
                    continue;
                }

                while (true)
                {
                    Array.Clear(header, 0, header.Length);
                    if (!this.ReadAll(stream, header))
                    {
                        Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage read header data failed");
                        this.CloseConectionSilently();
                        break;//reconnect
                    }

                    int dataLength = ((int)header[0] << 24) + ((int)header[1] << 16) + ((int)header[2] << 8) + header[3];
                    byte[] buffer = dataLength <= defaultBufferLen ? defaultBuffer : new byte[dataLength];

                    if (!this.ReadAll(stream, buffer, dataLength))
                    {
                        Logger.TraceEvent(TraceEventType.Information, "MessageReceiver.ConnectServerAndReceiveMessage read message data failed");
                        this.CloseConectionSilently();
                        break;//reconnect
                    }

                    try
                    {
                        MessageCollection message = CompressHelper.FromByteArray<MessageCollection>(buffer, dataLength);
                        if (message.Sequence != this._LastMessageSequence)
                        {
                            Logger.TraceEvent(TraceEventType.Warning, "MessageReceiver.ConnectServerAndReceiveMessage got message with worng sequence {0}, excepted sequece is {1}",
                                message.Sequence, this._LastMessageSequence);
                        }
                        this._LastMessageSequence++;

                        this._MessageRelayEngine.AddItem(message);
                        ConsoleClient.Instance.RefreshLastMsgTime();
                    }
                    catch (Exception ex)
                    {
                        Logger.TraceEvent(TraceEventType.Error, "MessageReceiver.ConnectServerAndReceiveMessage add message to engine error:\r\n{0}", ex);
                    }
                }
            }
        }

        private void CloseConectionSilently()
        {
            try
            {
                if (this._Client != null)
                {
                    this._Client.Close();
                    this._Client = null;
                }
            }
            catch (Exception ex)
            {
                Logger.TraceEvent(TraceEventType.Error, "MessageReceiver CloseConectionSilently error:\r\n{0}", ex);
            }
        }

        private bool ReadAll(NetworkStream stream, byte[] buffer, int? dataLength = null)
        {
            try
            {
                int offset = 0;
                int len = dataLength.HasValue ? dataLength.Value : buffer.Length;

                while (len > 0)
                {
                    if (!stream.DataAvailable)
                    {
                        Thread.Sleep(100);
                        continue;
                    }
                    int readLength = stream.Read(buffer, offset, len);
                    if (readLength == 0)
                    {
                        return false;
                    }
                    else
                    {
                        offset += readLength;
                        len -= offset;
                    }
                }
                return true;
            }
            catch (Exception ex)
            {
                Logger.TraceEvent(TraceEventType.Warning, "MessageReceiver.ReadAll error:\r\n{0}", ex.ToString());
                return false;
            }
        }

        internal void Stop()
        {
            this._Stopped = true;
            this._LastMessageSequence = 0;
            this.CloseConectionSilently();
            if (this._Worker != null)
            {
                this._Worker.Join(1000);
                this._Worker = null;
            }
        }
    }
时间: 2024-10-11 23:14:23

MessageReceiver的相关文章

yate学习--yatengine.h--class YATE_API MessageReceiver : public GenObject

请声明出处: MessageReceiver,这个类是一个消息接受的基类: /** * A multiple message receiver to be invoked by a message relay * 多个消息接收器调用消息传递 * @short A multiple message receiver * @short 多个消息接收器 */ class YATE_API MessageReceiver : public GenObject { public: /** * This m

Data Model for Message Receiver

1. Physical Data Model 2. SQL Statements drop database MessageReceiver go /*==============================================================*/ /* Database: MessageReceiver */ /*==============================================================*/ create dat

在Spring下集成ActiveMQ

1.参考文献 Spring集成ActiveMQ配置 Spring JMS异步发收消息 ActiveMQ 2.环境 在前面的一篇ActiveMQ入门实例中我们实现了消息的异步传送,这篇博文将如何在spring环境下集成ActiveMQ.如果要在spring下集成ActiveMQ,那么就需要将如下jar包导入项目: 本文有两篇参考文献,因此有两个实例,项目结构如下图所示: 3.实例1 信息发送者:HelloSender.java package edu.sjtu.erplab.springactiv

Android接收和发送短信

每一部手机都具有短信接收和发送功能,下面我们通过代码来实现接收和发送短信功能. 一.接收短信 1.创建内部广播接收器类,接收系统发出的短信广播 2.从获得的内容中解析出短信发送者和短信内容 3.在Activity中注册广播 4.添加接收短信权限 下面放上具体的代码 activity_main.xml文件用于显示短信发送者号码和显示短信内容 <?xml version="1.0" encoding="utf-8"?> <RelativeLayout

Axis2实现 web service接口开发 + 客户端调用

一. 新建一个web项目, 1.打开axis2.war包,将conf,lib,modules三个文件夹复制到项目的WEB-INF文件夹下,再在WEB-INF目录下新建一个services文件夹,然后在services文件下新建一个文件夹(任意取名): 再新建META-INF文件夹,最后再新增services.xml,接口信息就写在这里面. 具体路径:WEB-INF/services/myservice/META-INF/services.xml 2.配置 web.xml .加载axis2 和 a

Axis2身份验证-采用Module方式验证Soap Header实现

1.创建UserCheckModule类 import org.apache.axis2.AxisFault; import org.apache.axis2.context.ConfigurationContext; import org.apache.axis2.description.AxisDescription; import org.apache.axis2.description.AxisModule; import org.apache.axis2.modules.Module;

安卓学习之接收、发送短信

短信接收 android中当手机接收到一条短信后,会发送android.provider.Telephony.SMS_RECEIVED 的广播,这条广播中携带有与短信相关的所有数据.每个应用程序都可以在广播接收器里对他监听. 简单的短信接收程序: protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main

Axis2之wsdl2java工具

本章主要介绍axis2的wsdl2java工具的使用. Axis2提供了一个wsdl2java命令可以根据WSDL文件自动产生调用WebService的代码.wsdl2java命令可以在<Axis2安装目录>下的bin目录中找到.在使用wsdl2java命令之前需要设置AXIS2_HOME环境变量,该变量值是<Axis2安装目录>.以下以Windows平台上使用为例. 进入Windows控制台,执行如下命令来生成WebService的客户端代码: %AXIS2_HOME%\bin\

每天进步一点----- 收发短信

package com.example.smstest; import android.app.Activity; import android.app.PendingIntent; import android.content.BroadcastReceiver; import android.content.Context; import android.content.Intent; import android.content.IntentFilter; import android.o