搞定thrift双向消息

thrift作为脱胎于facebook的rpc框架,各方面都非常优秀。清晰的分层设计,多语言的支持,以及不输protocolbuffer的效率(compact下优于protocolbuffer),都让thrift拥有越来越多的使用者。

作为一个RPC框架,thrift支持的是open->client--rpc-->server->close的短连接模式。在实际应用中,却经常会有客户端建立连接后,等待服务端数据的长连接模式,也可以称为双向连接。通常的方案有三种,可参考http://dongxicheng.org/search-engine/thrift-bidirectional-async-rpc/,文中提到第三种方法会修改源码,而实际操作过程中发现这其实是作者小小的理解错误,实现thrift双向通信并没有这么复杂,经过一番实验,发现只需要如下理解和实现即可轻松实现一个thrift的双向连接。

  1. 双向连接的service必须为oneway,否则会因为recv函数抛出remote close异常
  2. 客户端重用建立client的protocol,开线程使用processor.Process(protocol,protocol)监听服务端callback的消息。
  3. 服务端使用ProcessorFactory,使用TConnectionInfo中的transport作为向客户端发送消息的client的transport

搞定以上三步,即可实现一个thrift双向连接,这里附上实验代码,客户端使用C#(sorry for my pool C#),服务端使用C++

thrift

service HandshakeService{
    oneway void HandShake();
}

service CallbackService{
    oneway void Push(1: string msg);
}

client

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Thrift.Collections;
using Thrift.Protocol;
using Thrift.Server;
using Thrift.Transport;
using System.Threading;
using Thrift;
using System.IO;

namespace ThriftBidirection
{
    class Program
    {
        class CallbackServiceImply : CallbackService.Iface
        {
            int msgCount = 0;
            public void Push(string msg)
            {
                Console.WriteLine("receive msg {0}: {1}", msgCount++, msg);
            }
        }
        //服务处理线程
        static void ProcessThread(TProtocol protocol)
        {
            TProcessor processor = new CallbackService.Processor(new CallbackServiceImply());
            while (true)
            {
                try
                {
                    //////////////////////////////////////////////////////////////////////////
                    ///模仿server行为,同时重用client端protocol
                    ///相当于同时重用一个连接
                    while (processor.Process(protocol, protocol)) { };
                    ///connection lost, return
                    return;
                }
                catch (IOException) //not fatal error, resume
                {
                    continue;
                }
                catch (TException) //fatal error
                {
                    return;
                }
            }
        }
        //服务器状态监听线程
        static void MonitorThread(TTransport trans, Action<string> callback)
        {
            while (true)
            {
                try
                {
                    if (!trans.Peek())
                    {
                        callback("连接中断");
                    }
                    Thread.Sleep(3000);
                }
                catch (Thrift.TException ex)
                {
                    callback(ex.Message);
                    return;
                }
            }
        }

        static void Main(string[] args)
        {
            TTransport transport = new TBufferedTransport(new TSocket("localhost", 5555));
            TProtocol protocol = new TBinaryProtocol(transport);
            HandshakeService.Client client = new HandshakeService.Client(protocol);
            Action<TProtocol> processAction = new Action<TProtocol>(ProcessThread);
            Action<TTransport, Action<string>> monitorAction = new Action<TTransport, Action<string>>(MonitorThread);

            transport.Open();
            processAction.BeginInvoke(protocol, (result) =>
            {
                 processAction.EndInvoke(result);
            }, null);
            monitorAction.BeginInvoke(transport, (msg) =>
            {
                Console.WriteLine("连接中断: {0}", msg);
            }, (result) =>
            {

            }, null);

            for (int i = 0; i < 100; ++i)
            {
                client.HandShake();
                Thread.Sleep(10);
            }
            Console.Read();
            transport.Close();
        }
    }
}

server

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "HandshakeService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <boost/make_shared.hpp>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include "CallbackService.h"

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace apache::thrift::concurrency;

using boost::make_shared;
using boost::shared_ptr;

class HandshakeServiceHandler : virtual public HandshakeServiceIf {
 public:
  HandshakeServiceHandler(const boost::shared_ptr<TTransport> &trans)
      : m_client(make_shared<TBinaryProtocol>(trans))
  {
      boost::once_flag flag = BOOST_ONCE_INIT;
      m_flag = flag;
  }

  virtual ~HandshakeServiceHandler()
  {
        m_thread->interrupt();
        m_thread->join();
  }

  void CallbackThread()
  {
      while(true)
      {
          try
          {
              m_client.Push("server push msg");
          }
          catch (TException)
          {
              return;
          }
          boost::this_thread::sleep_for(boost::chrono::milliseconds(20));
      }
  }

  void HandShake() {
    // Your implementation goes here
    printf("HandShake\n");
    boost::call_once(boost::bind(&HandshakeServiceHandler::_StartThread, this), m_flag);
  }

  void _StartThread()
  {
    m_thread.reset(new boost::thread(boost::bind(&HandshakeServiceHandler::CallbackThread, this)));
  }

boost::shared_ptr<TTransport> m_trans;
CallbackServiceClient m_client;
shared_ptr<boost::thread> m_thread;
boost::once_flag m_flag;
};

class ProcessorFactoryImply : public TProcessorFactory
{
    virtual boost::shared_ptr<TProcessor> getProcessor(
        const TConnectionInfo& connInfo)
    {
        return make_shared<HandshakeServiceProcessor>(make_shared<HandshakeServiceHandler>(connInfo.transport));
    }
};

int main(int argc, char **argv) {
  int port = 5555;
  shared_ptr<TProcessorFactory> processorFactory(new ProcessorFactoryImply());
  shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
  shared_ptr<ThreadManager> threadMgr = ThreadManager::newSimpleThreadManager(30);
  boost::shared_ptr<PlatformThreadFactory> threadFactory =
      boost::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());

  threadMgr->threadFactory(threadFactory);
  threadMgr->start();
  TThreadPoolServer server(processorFactory,serverTransport, transportFactory, protocolFactory, threadMgr);
  server.serve();
  return 0;
}

一个简单的thrift双向通信就实现了。

时间: 2024-08-09 06:35:05

搞定thrift双向消息的相关文章

微信自动回复,Python几行代码就搞定了,消息不在错过

之前写过一篇python-requests获取好友列表的文章,简直花费了好多的时间和精力,又抓包,又找参数,又分析的,简直麻烦透顶,今天突然知道了另外一种捷径,几行代码就可以完成.... 学习Python中有不明白推荐加入交流裙                                           号:735934841                                           群里有志同道合的小伙伴,互帮互助,                     

材料管理框架:一个共通的viewModel搞定所有的分页查询

前言 大家看标题就明白了我想写什么了,在做企业信息化系统中可能大家写的最多的一种页面就是查询页面了.其实每个查询页面,除了条件不太一样,数据不太一样,其它的其实都差不多.所以我就想提取一些共通的东西出来,再写查询时只要引入我共通的东西,再加上极少的代码就能完成.我个人比较崇尚代码简洁干净,有不合理的地方欢迎大家指出. 这篇文章主要介绍两个重点:1.前台viewModel的实现.2.后台服务端如何简洁的处理查询请求. 需求分析 查询页面要有哪些功能呢 1.有条件部输入查询条件(这个不打算做成共通的

[转] Java程序员学C#基本语法两个小时搞定(对比学习)

Java程序员学C#基本语法两个小时搞定(对比学习) 对于学习一门新的语言,关键是学习新语言和以前掌握的语言的区别,但是也不要让以前语言的东西,固定了自己的思维模式,多看一下新的语言的编程思想. 1.引包 using System;java用import2.构造函数和java语法相同3.析构函数  变量和类的对象都有生命周期,生命周期结束,这些变量和对象就要被撤销.  类的对象被撤销时,将自动调用析构函数.一些善后工作可放在析构函数中完成.  析构函数的名字为~类名,无返回类型,也无参数.Per

iOS开发三步搞定百度推送

iOS开发三步搞定百度推送 百度推送很简单,准备工作:在百度云推送平台注册应用,上传证书. 步骤一: 百度云推送平台 http://push.baidu.com/sdk/push_client_sdk_for_ios  在这里下载iOS端SDK包,如下图: 把SDK包里面的下图文件夹拖到你的工程中,如下图,第一步就这么简单. 步骤二: 在工程中AppDelegate.m中的- (BOOL)application:(UIApplication *)application didFinishLaun

Webcast / 技术小视频制作方法——自己动手录制video轻松搞定

Webcast / 技术小视频制作方法——自己动手录制video轻松搞定 http://blog.sina.com.cn/s/blog_67d387490100wdnh.html 最近申请加入MSP的童鞋应该发现了一个新的要求——制作简短的视频!视频的内容要求是与微软技术相关~我们希望通过使用这种方法,简化申请流程,加强对创意.微软相关技术的考察~关于MSP项目以及申请流程的细则近期也会出台,请童鞋们耐心等待~ 首先呢,就跟广大的童鞋们介绍一款简单使用的录屏软件~而通过简单的安装,便可以轻松.便

多key业务,数据库水平切分架构一次搞定

转发自:原创 2017-08-29 58沈剑 架构师之路 数据库水平切分是一个很有意思的话题,不同业务类型,数据库水平切分的方法不同. 本篇将以"订单中心"为例,介绍"多key"类业务,随着数据量的逐步增大,数据库性能显著降低,数据库水平切分相关的架构实践. 一.什么是"多key"类业务 所谓的"多key",是指一条元数据中,有多个属性上存在前台在线查询需求. 订单中心业务分析 订单中心是一个非常常见的"多key&q

一个共通的viewModel搞定所有的分页查询一览及数据导出(easyui + knockoutjs + mvc4.0)

前言 大家看标题就明白了我想写什么了,在做企业信息化系统中可能大家写的最多的一种页面就是查询页面了.其实每个查询页面,除了条件不太一样,数据不太一样,其它的其实都差不多.所以我就想提取一些共通的东西出来,再写查询时只要引入我共通的东西,再加上极少的代码就能完成.我个人比较崇尚代码简洁干净,有不合理的地方欢迎大家指出. 这篇文章主要介绍两个重点:1.前台viewModel的实现.2.后台服务端如何简洁的处理查询请求. 需求分析 查询页面要有哪些功能呢 1.有条件部输入查询条件(这个不打算做成共通的

[转]用GSON 五招之内搞定任何JSON数组

关于GSON的入门级使用,这里就不提了,如有需要可以看这篇博文 <Google Gson的使用方法,实现Json结构的相互转换> ,写的很好,通俗易懂. 我为什么写这篇文章呢?因为前几晚跟好友 xiasuhuei321 探讨了一下GSON解析复杂的JSON的时候,能不能只解析源数据中的数组,甚至只解析数组的某一部分.探讨了二十分钟,得出结论:没用过,不知道. 所以今天特地研究了一下,发现真的So Easy!之前想复杂了,学习的过程中,发现有五种方式分别搞定不同情况的JSON数组,也就是今天说的

bat批处理文件搞定所有系统问题

bat批处理文件搞定所有系统问题 分类: WINDOWS   -----------bat批处理文件搞定所有系统问题---------   一.查漏补缺——给系统功能添把火  我们的操作系统虽然功能强大,但是在某方面的应用上依旧存在欠缺,如:没有定时关机软件.而用bat文件可以解决很多这类问题. 1.关机与重启 我们先做个让电脑在每天指定时间关机的bat,具体方法如下:打开附件中的记事本,然后在里边写入,at 22:00 shutdown -s -f,然后选择“文件→保存”,保存类型选择“所有文