C++ Thrift Client 与 Flume Thrift Source 对接

项目需要C++代码与flume对接,进而将日志写入HDFS。
flume原生为java代码,原先的解决方案是通过JNI调用flume java方法。
但是由于一来对jni的调用效率的担心,二来C++调用JNI需要照顾local reference和GC的问题,被搞得头痛了。
一怒之下,重写代码了,使用C++与远端的JAVA Flume对接。

在协议的选择上,AVRO C++虽然也有apache的开源项目,但是目前只支持读写文件,而不能使用RPC。
故使用了thrift与远端Flume thrift source通信。
以下是一些实现的具体方法:

Flume thrift 协议准备:

1. 安装thrift,安装flume。
2. 下载flume的source包,解压。
3. 在解压目录flume-ng-sdk/src/main/thrift下,存在文件flume.thrift。
    这是flume的thrift协议规则文件,执行命令:thrift -r --gen cpp ./flume.thrift
    在./gen-cpp/目录下会生成一些.h .cpp文件,之后C++代码中需要调用其中的方法进行RPC通信。

注意:如果是在windows/visual studio下开发,由于ERROR与关键词冲突,需要对flume.thrift修改。

enum Status {
    OK,
    FAILED,
    FERROR, //原先是ERROR,修改为FERROR。
    UNKNOWN
    }

如果是linux下工作,则无需改动任何代码。

C++ 代码实现:

1. include一些头文件,以及使用thrift的名字空间

1 #include <thrift/protocol/TBinaryProtocol.h>
2 #include <thrift/protocol/TCompactProtocol.h>
3 #include <thrift/transport/TSocket.h>
4 #include <thrift/transport/TTransportUtils.h>
5
6 using namespace std;
7 using namespace apache::thrift;
8 using namespace apache::thrift::protocol;
9 using namespace apache::thrift::transport;

2. 创建thrift的socket, transport, protocol, client.

1 class ThriftClient{
2     private:
3     /* Thrift protocol needings... */
4     boost::shared_ptr<TTransport> socket;
5     boost::shared_ptr<TTransport> transport;
6     boost::shared_ptr<TProtocol> protocol;
7     ThriftSourceProtocolClient* pClient;
8 }
1     ThriftClient::ThriftClient(std::string inIpAddress, std::string inPort):
2         socket(new TSocket(inIpAddress.c_str(), atoi(inPort.c_str()))),
3         transport(new TFramedTransport(socket)),
4         protocol(new TCompactProtocol(transport))
5     {
6         pClient = new ThriftSourceProtocolClient(protocol);
7     }

3. 与远端flume thrift source通信:

 1       bool ThriftClient::sendEvent(const Event* event)
 2       {
 3           //build the head
 4           std::map<std::string, std::string>  headers;
 5           std::ostringstream timeBuffer;
 6           timeBuffer << event->timestamp <<"000";
 7           headers.insert(std::make_pair("timestamp", timeBuffer.str()));
 8           headers.insert(std::make_pair("appId", appId));
 9           //build the body
10           std::ostringstream  osBody;
11           osBody << *event;
12           std::string sBody = osBody.str();
13
14           ThriftFlumeEvent tfEvent;
15           tfEvent.__set_body(sBody);
16           tfEvent.__set_headers(headers);
17           if(!transport->isOpen())
18           {
19               transport->open();
20           }
21           Status::type res=pClient->append(tfEvent);
22           if(res == Status::OK)
23           {
24               return true;
25           }
26           else
27           {
28               printf("WARNING: send event via thrift failed, return code:%d\n",res);
29               return false;
30           }
31       }

其他注意点:

1. append方法用来发送一条event:

Status::type ThriftSourceProtocolClient::append(const ThriftFlumeEvent& event)

同时还有一个方法appendBatch用来一次发送多个event:

Status::type ThriftSourceProtocolClient::appendBatch(const std::vector<ThriftFlumeEvent> & events)

2. 无论是append还是appendBatch方法,都是阻塞方法。

3. 可能可以通过send_append,send_appendBatch来发送无需确认成功的event(未测试)。

void ThriftSourceProtocolClient::send_append(const ThriftFlumeEvent& event)
void ThriftSourceProtocolClient::send_appendBatch(const std::vector<ThriftFlumeEvent> & events)

4. Thrift只提供协议RPC功能,并没有提供flume的channel功能,以及多个source的情况下的load balance功能。这些都需要自己实现。

关于性能:

测试环境: vmware+ubuntu,i3-4150 CPU, 配置1G内存,双核CPU。

在本机配置两个flume thrift source(load balance),网络回环。

在每条event大约50字符的情况下,可以达到16000条每秒的吞吐量,此时CPU被耗尽(两个flume thrift source大约占用30%CPU)。

时间: 2024-08-28 03:00:56

C++ Thrift Client 与 Flume Thrift Source 对接的相关文章

高可用的池化 Thrift Client 实现(源码分享)

本文将分享一个高可用的池化 Thrift Client 及其源码实现,欢迎阅读源码(Github)并使用,同时欢迎提出宝贵的意见和建议,本人将持续完善. 本文的主要目标读者是对 Thrift 有一定了解并使用的童鞋,如对 Thrift 的基础知识了解不多或者想重温一下基础知识,推荐先阅读本站文章<和 Thrift 的一场美丽邂逅>. 下面进入正题. 为什么我们需要这么一个组件? 我们知道,Thrift 是一个 RPC 框架体系,可以非常方便的进行跨语言 RPC 服务的开发和调用.然而,它并没有

创建Thrift Server和Thrift Client

1.创建Server package cn.horace.thrift.server; import cn.horace.thrift.idl.IUserService; import cn.horace.thrift.rpc.IUserServiceImpl; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apac

Flume -- 初识flume、source和sink

Flume – 初识flume.source和sink 目录基本概念常用源 Source常用sink 基本概念 ? 什么叫flume? 分布式,可靠的大量日志收集.聚合和移动工具. ? events 事件,是一行数据的字节数据,是flume发送文件的基本单位. ? flume配置文件 重命名flume-env.sh.template为flume-env.sh,并添加[export JAVA_HOME=/soft/jdk] ? flume的Agent source //从哪儿读数据. 负责监控并收

Flume Avro Source 远程连接拒绝的解决办法

昨天做了一个Java连接虚拟机,实现Flume Avro Source 的远程连接,确报了一个这样的错,经过了一晚上,终于找到了解决的方案. 我来给大家分享一下! 报错如下: Exception in thread "main" org.apache.flume.FlumeException:NettyAvroRpcClient{ host:xxxx,port:xxxx}:RPC connection error 解决的办法是: 把配置文件中a1.sources.r1.bind必须设置

Thrift RPC实战(三) thrift序列化揭秘

本文主要讲解Thrift的序列化机制, 看看thrift作为数据交换格式是如何工作的? 1.构造应用场景: 1). 首先我们先来定义下thrift的简单结构. 1 2 3 4 5 namespace java com.yangyang.thrift.api struct Pair { ? ? 1: required string key ? ? 2: required string value } required修饰符你肯定能猜测到它的意义, 但是你是否有没有这样的疑惑, "1",

Flume学习之路 (二)Flume的Source类型

一.概述 官方文档介绍:http://flume.apache.org/FlumeUserGuide.html#flume-sources 二.Flume Sources 描述 2.1 Avro Source 2.1.1 介绍 Avro端口监听并接收来自外部的Avro客户流的事件.当内置Avro去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构.官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接.==字体加粗的属性必须进行设置

Flume的Source、Sink总结,及常用使用场景

数据源Source RPC异构流数据交换 Avro Source Thrift Source 文件或目录变化监听 Exec Source Spooling Directory Source Taildir Source MQ或队列订阅数据持续监听 JMS Source SSL and JMS Source Kafka Source Network类数据交换 NetCat TCP Source NetCat UDP Source HTTP Source Syslog Sources Syslog

flume的source, channel, sink 列表

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息.主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载

Thrift教程初级篇——thrift安装环境变量配置第一个实例

前言: 因为项目需要跨语言,c++客户端,web服务端,远程调用等需求,所以用到了RPC框架Thrift,刚开始有点虚,第一次接触RPC框架,后来没想到Thrift开发方便上手快,而且性能和稳定性也不错,项目也顺利完成.所以给各位小白们,"科普"一下如何使用Thrift完成自己的远程调用. 1.什么是RPC:                      平时开发的服务,大多都是本地调用,如果说需要依赖他人服务了,而且他人的服务在远端,那怎么调用呢? RPC能够游刃有余的解决这样的问题.首