lume NG 学习笔记(九)Flune Client 开发

文章内容还是来自官网http://flume.apache.org/FlumeDeveloperGuide.html

由于在实际工作中,数据的生产方式极具多样性,Flume 虽然包含了一些内置的机制来采集数据,但是更多的时候用户更希望能将应用程序和flume直接相通。所以这边运行用户开发应用程序,通过IPC或者RPC连接flume并往flume发送数据。

一、RPC client interface

Flume的RpcClient实现了Flume的RPC机制。用户的应用程序可以很简单的调用Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法发送数据,不用担心底层信息交换的细节。用户可以提供所需的event通过直接实现Event接口,例如可以使用简单的方便的实现SimpleEvent类或者使用EventBuilder的writeBody()静态辅助方法。

自Flume 1.4.0起,Avro是默认的RPC协议。NettyAvroRpcClient和ThriftRpcClient实现了RpcClient接口。实现中我们需要知道我们将要连接的目标flume agent的host和port用于创建client实例,然后使用RpcClient发送数据到flume agent。

官网给了一个Avro RPCclients的例子,这边直接拿来做实际测试例子。

这里我们把client.init("host.example.org",41414);

改成 client.init("192.168.233.128",50000);  与我们的主机对接

[java] view plain copy

  1. import org.apache.flume.Event;
  2. import org.apache.flume.EventDeliveryException;
  3. import org.apache.flume.api.RpcClient;
  4. import org.apache.flume.api.RpcClientFactory;
  5. import org.apache.flume.event.EventBuilder;
  6. import java.nio.charset.Charset;
  7. public class MyApp {
  8. public static voidmain(String[] args) {
  9. MyRpcClientFacade client = new MyRpcClientFacade();
  10. // Initializeclient with the remote Flume agent‘s host and port
  11. //client.init("host.example.org",41414);
  12. client.init("192.168.233.128",50000);
  13. // Send 10events to the remote Flume agent. That agent should be
  14. // configured tolisten with an AvroSource.
  15. String sampleData = "Hello Flume!";
  16. for (int i =0; i < 10; i++) {
  17. client.sendDataToFlume(sampleData);
  18. }
  19. client.cleanUp();
  20. }
  21. }
  22. class MyRpcClientFacade {
  23. private RpcClient client;
  24. private String hostname;
  25. private int port;
  26. public void init(String hostname, int port) {
  27. // Setup the RPCconnection
  28. this.hostname = hostname;
  29. this.port = port;
  30. this.client = RpcClientFactory.getDefaultInstance(hostname, port);
  31. // Use thefollowing method to create a thrift client (instead of the above line):
  32. // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  33. }
  34. public void sendDataToFlume(String data) {
  35. // Create aFlume Event object that encapsulates the sample data
  36. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
  37. // Send theevent
  38. try {
  39. client.append(event);
  40. } catch (EventDeliveryException e) {
  41. // clean up andrecreate the client
  42. client.close();
  43. client = null;
  44. client = RpcClientFactory.getDefaultInstance(hostname, port);
  45. // Use thefollowing method to create a thrift client (instead of the above line):
  46. // this.client =RpcClientFactory.getThriftInstance(hostname, port);
  47. }
  48. }
  49. public void cleanUp() {
  50. // Close the RPCconnection
  51. client.close();
  52. }
  53. }

这边代码不解释了,主要是将HelloFlume 发送10遍给flume,同时记得将flume 安装主目录下的lib 文件都添加进项目,才能正常运行程序。

下面是代理配置:

[html] view plain copy

  1. #配置文件:avro_client_case20.conf
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = avro
  8. a1.sources.r1.port = 50000
  9. a1.sources.r1.host = 192.168.233.128
  10. a1.sources.r1.channels = c1
  11. # Describe the sink
  12. a1.sinks.k1.channel = c1
  13. a1.sinks.k1.type = logger
  14. # Use a channel which buffers events inmemory
  15. a1.channels.c1.type = memory
  16. a1.channels.c1.capacity = 1000
  17. a1.channels.c1.transactionCapacity = 100

这里要注意下,之前说了,在接收端需要AvroSource或者Thrift Source来监听接口。所以配置代理的时候要把a1.sources.r1.type 写成avro或者thrift

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

在eclipse 里运行Java程序,当然也可以打包后在服务器上运行JAVA程序。

#在启动源发送的代理终端查看console输出


可以看到10条数据正常发送。

这里要说明下,开发代码中client.append(event)不仅仅可以发送一条数据,也可以发送一个List(string) 的数据信息,也就是批量发送。这边就不做演示了。

二、Failover Client

这个类包封装了Avro RPCclient的类默认提供故障处理能力。hosts采用空格分开host:port所代表的flume agent,构成一个故障处理组。这Failover RPC Client目前不支持thrift。如果当前选择的host agent有问题,这个failover client会自动负载到组中下一个host中。

下面是官网开发例子:

[java] view plain copy

  1. // Setup properties for the failover
  2. Properties props = new Properties();
  3. props.put("client.type", "default_failover");
  4. // List of hosts (space-separated list of user-chosen host aliases)
  5. props.put("hosts", "h1 h2 h3");
  6. // host/port pair for each host alias
  7. String host1 = "host1.example.org:41414";
  8. String host2 = "host2.example.org:41414";
  9. String host3 = "host3.example.org:41414";
  10. props.put("hosts.h1", host1);
  11. props.put("hosts.h2", host2);
  12. props.put("hosts.h3", host3);
  13. // create the client with failover properties
  14. RpcClient client = RpcClientFactory.getInstance(props);

下面是测试的开发例子

[java] view plain copy

  1. import org.apache.flume.Event;
  2. import org.apache.flume.EventDeliveryException;
  3. import org.apache.flume.api.RpcClient;
  4. import org.apache.flume.api.RpcClientFactory;
  5. import org.apache.flume.event.EventBuilder;
  6. import java.nio.charset.Charset;
  7. import java.util.Properties;
  8. public class Failover_Client {
  9. public static void main(String[] args) {
  10. MyRpcClientFacade2 client = new MyRpcClientFacade2();
  11. // Initialize client with the remote Flume agent‘s host and port
  12. client.init();
  13. // Send 10 events to the remote Flume agent. That agent should be
  14. // configured to listen with an AvroSource.
  15. String sampleData = "Hello Flume!";
  16. for (int i = 0; i < 10; i++) {
  17. client.sendDataToFlume(sampleData);
  18. }
  19. client.cleanUp();
  20. }
  21. }
  22. class MyRpcClientFacade2 {
  23. private RpcClient client;
  24. private String hostname;
  25. private int port;
  26. public void init() {
  27. // Setup the RPC connection
  28. // Use the following method to create a thrift client (instead of the above line):
  29. // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  30. // Setup properties for the failover
  31. Properties props = new Properties();
  32. props.put("client.type", "default_failover");
  33. // List of hosts (space-separated list of user-chosen host aliases)
  34. props.put("hosts", "h1 h2 h3");
  35. // host/port pair for each host alias
  36. String host1 = "192.168.233.128:50000";
  37. String host2 = "192.168.233.128:50001";
  38. String host3 = "192.168.233.128:50002";
  39. props.put("hosts.h1", host1);
  40. props.put("hosts.h2", host2);
  41. props.put("hosts.h3", host3);
  42. // create the client with failover properties
  43. client = RpcClientFactory.getInstance(props);
  44. }
  45. public void sendDataToFlume(String data) {
  46. // Create a Flume Event object that encapsulates the sample data
  47. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
  48. // Send the event
  49. try {
  50. client.append(event);
  51. } catch (EventDeliveryException e) {
  52. // clean up and recreate the client
  53. client.close();
  54. client = null;
  55. client = RpcClientFactory.getDefaultInstance(hostname, port);
  56. // Use the following method to create a thrift client (instead of the above line):
  57. // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  58. }
  59. }
  60. public void cleanUp() {
  61. // Close the RPC connection
  62. client.close();
  63. }
  64. }

这边代码设三个host用于故障转移,这里偷懒,用同一个主机的3个端口模拟。代码还是将Hello Flume 发送10遍给第一个flume代理,当第一个代理故障的时候,则发送给第二个代理,以顺序进行故障转移。

下面是代理配置沿用之前的那个,并对配置文件进行拷贝,

cp avro_client_case20.conf avro_client_case21.conf

cp avro_client_case20.conf avro_client_case22.conf

分别修改avro_client_case21.conf与avro_client_case22.conf中的

a1.sources.r1.port= 50001 与a1.sources.r1.port = 50002

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

在eclipse 里运行JAVA程序Failover_Client.java,当然也可以打包后在服务器上运行JAVA程序。

#在启动源发送的3个代理终端查看console输出

我们可以看到第一个代理终端收到了,数据而其他2个终端没有数据。

然后我们把第一个终端的进程关掉,再运行一遍client程序,然后会发现这个时候是发生到第二个终端中。当第二个终端也关闭的时候,再发送数据,则是发送到最后一个终端。这里我们可以看到,故障转移的代理主机转移是采用顺序序列的。

三、LoadBalancing RPC client

Flume Client SDK也支持在多个host之间使用负载均衡的Rpc Client。这种类型的client带有一个通过空格分隔的host:port主机列表并构成了一个负载均衡组。这个client可以指定一个负载均衡的策略,既可以随机的选择一个配置的host,也可以循环选择一个host。当然你也可以自己编写一个类实现LoadBalancingRpcClient$HostSelector接口以至于用户可以使用自己编写的选择顺序。在这种情况下,用户自定义的类需要被指定为host-selector属性的值。LoadBalancing RPC Client当前不支持thrift。

如果开启了backoff,那么client失败将被放入黑名单中,只有过了被指定的超时之间之后这个被选择的失败的主机才会从黑名单中被排除。当超时到了,如果主机还是没有反应,那么这被认为是一个连续的失败并且超时时间会成倍的增长,以避免可能陷入对反应迟钝主机的长时间等待中。

这backoff的最大超时时间可以通过maxBackoff属性来配置,单位是毫秒。在默认情况下maxBackoff的值是30秒(在orderSelector类里面指定)。

下面是官网例子

[java] view plain copy

  1. // Setup properties for the load balancing
  2. Properties props = new Properties();
  3. props.put("client.type", "default_loadbalance");
  4. // List of hosts (space-separated list of user-chosen host aliases)
  5. props.put("hosts", "h1 h2 h3");
  6. // host/port pair for each host alias
  7. String host1 = "host1.example.org:41414";
  8. String host2 = "host2.example.org:41414";
  9. String host3 = "host3.example.org:41414";
  10. props.put("hosts.h1", host1);
  11. props.put("hosts.h2", host2);
  12. props.put("hosts.h3", host3);
  13. props.put("host-selector", "random"); // For random host selection
  14. // props.put("host-selector", "round_robin"); // For round-robin host
  15. //                                            // selection
  16. props.put("backoff", "true"); // Disabled by default.
  17. props.put("maxBackoff", "10000"); // Defaults 0, which effectively
  18. // becomes 30000 ms
  19. // Create the client with load balancing properties
  20. RpcClient client = RpcClientFactory.getInstance(props);

下面是测试的开发例子

[java] view plain copy

  1. import java.nio.charset.Charset;
  2. import org.apache.flume.Event;
  3. import org.apache.flume.EventDeliveryException;
  4. import org.apache.flume.api.RpcClient;
  5. import org.apache.flume.api.RpcClientFactory;
  6. import org.apache.flume.event.EventBuilder;
  7. import java.util.Properties;
  8. public class Load_Client {
  9. public static void main(String[] args) {
  10. MyRpcClientFacade3 client = new MyRpcClientFacade3();
  11. // Initialize client with the remote Flume agent‘s host and port
  12. client.init();
  13. // Send 10 events to the remote Flume agent. That agent should be
  14. // configured to listen with an AvroSource.
  15. String sampleData = "Flume Load_Client";
  16. for (int i = 0; i < 10; i++) {
  17. client.sendDataToFlume(sampleData);
  18. }
  19. client.cleanUp();
  20. }
  21. }
  22. class MyRpcClientFacade3{
  23. private RpcClient client;
  24. private String hostname;
  25. private int port;
  26. public void init() {
  27. Properties props = new Properties();
  28. props.put("client.type", "default_loadbalance");
  29. // List of hosts (space-separated list of user-chosen host aliases)
  30. props.put("hosts", "h1 h2 h3");
  31. // host/port pair for each host alias
  32. String host1 = "192.168.233.128:50000";
  33. String host2 = "192.168.233.128:50001";
  34. String host3 = "192.168.233.128:50002";
  35. props.put("hosts.h1", host1);
  36. props.put("hosts.h2", host2);
  37. props.put("hosts.h3", host3);
  38. props.put("host-selector", "random"); // For random host selection
  39. // props.put("host-selector", "round_robin"); // For round-robin host
  40. //                                                    // selection
  41. props.put("backoff", "true"); // Disabled by default.
  42. props.put("maxBackoff", "10000"); // Defaults 0, which effectively
  43. // becomes 30000 ms
  44. // Create the client with load balancing properties
  45. client = RpcClientFactory.getInstance(props);
  46. }
  47. public void sendDataToFlume(String data) {
  48. // Create a Flume Event object that encapsulates the sample data
  49. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
  50. // Send the event
  51. try {
  52. client.append(event);
  53. } catch (EventDeliveryException e) {
  54. // clean up and recreate the client
  55. client.close();
  56. client = null;
  57. client = RpcClientFactory.getDefaultInstance(hostname, port);
  58. // Use the following method to create a thrift client (instead of the above line):
  59. // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  60. }
  61. }
  62. public void cleanUp() {
  63. // Close the RPC connection
  64. client.close();
  65. }
  66. }

这里采用随机的负载均衡props.put("host-selector","random") 。测试的时候沿用之前的3个接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,并将他们起起来。

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

在eclipse 里运行JAVA程序Failover_Client.java,当然也可以打包后在服务器上运行JAVA程序。

#在启动源发送的3个代理终端查看console输出

下面是Host1,收到了2条数据

下面是Host2,收到了2条数据

下面是Host3,收到了6条数据。

可以看到我们开发例子中,host-selector选择的是随机,因此程序也是随机发送数据。下面我们测试轮询round_robin选项。

程序里我们修改这句

//props.put("host-selector","random"); // For random host selection

props.put("host-selector", "round_robin");// Forround-robin host

再运行Java 程序

下面是Host1,收到了4条数据

下面是Host2,收到了3条数据

同样Host3,收到了3条数据,这边就不放图了。轮询就是按照顺序放图。

时间: 2024-10-25 06:08:23

lume NG 学习笔记(九)Flune Client 开发的相关文章

初探swift语言的学习笔记九(OC与Swift混编)

swift 语言出来后,可能新的项目直接使用swift来开发,但可能在过程中会遇到一些情况,某些已用OC写好的类或封装好的模块,不想再在swift 中再写一次,哪就使用混编.这个在IOS8中是允许的. 先中简单的入手,先研究在同一个工程目录下混合使用的情况. 为了演示.先准备两个类 第一个是swift语言写的类,文件名为 act.swift import Foundation class Act : NSObject { func hasAct(tag:Int) -> String { swit

Android:日常学习笔记(8)———探究UI开发(5)

Android:日常学习笔记(8)---探究UI开发(5) ListView控件的使用 ListView的简单用法 public class MainActivity extends AppCompatActivity { private String[] data={"Apple","Banana","Orange","Watermelon","Pear","Grape","

APUE 学习笔记(九) 高级I/O

1. 非阻塞I/O 低速系统调用时可能会使进程永远阻塞的一类系统调用,包括以下调用: (1)某些文件类型你(网络socket套接字.终端设备.管道)暂无可使用数据,则读操作可能会使调用者永远阻塞 (2)如果数据不能立即被(1)中文件类型接受,则写操作会使调用者永远阻塞 (3)某些进程间通信函数 非阻塞I/O使我们可以调用open.read.write这样的I/O操作,并使这些操作不会永远阻塞,如果这种操作不能完成,则调用立即出错返回 对于一个给定的文件有两种方法对其指定非阻塞I/O: (1)调用

cocos2dx游戏开发——微信打飞机学习笔记(一)——开发准备

一.环境的搭建 1.Windows开发准备: (1)软件下载及安装 •下载Cocos2d-x 最新版本:http://www.cocos2d-x.org/download 或者从Cocos2d-x GitHub主页中克隆Develop分支:https://github.com/cocos2d/cocos2d-x •配置Python 2.7 环境:http://www.python.org/download/releases/ •建议IDE:Visual Studio 2013 •运行cocos2

python学习笔记九——文件与目录

1.python进行文件读写的函数是open或file类 mode:r  只读 r+   读写 w  写入,先删除原文件,再重新写入,如果文件没有则创建 w+  读写,先删除原文件,再重新写入,如果文件没有则创建(可写入和输出) a  写入,在文件末尾追加新的内容,文件不存在则创建 a+  读写,在文件末尾追加新的内容,文件不存在则创建 b  打开二进制文件,可与r,w,a,+结合使用 U  支持所有的换行符号,"\r","\n","\r\n"

Android:日常学习笔记(8)———探究UI开发(2)

Android:日常学习笔记(8)---探究UI开发(2) 对话框 说明: 对话框是提示用户作出决定或输入额外信息的小窗口. 对话框不会填充屏幕,通常用于需要用户采取行动才能继续执行的模式事件. 提示: Dialog 类是对话框的基类,但您应该避免直接实例化 Dialog,而是使用下列子类之一: AlertDialog此对话框可显示标题.最多三个按钮.可选择项列表或自定义布局. DatePickerDialog 或 TimePickerDialog此对话框带有允许用户选择日期或时间的预定义 UI

angular学习笔记(九)-css类和样式3

再来看一个选择li列表的例子: 点击li中的任意项,被点击的li高亮显示: <!DOCTYPE html> <html ng-app> <head> <title>6.3css类和样式</title> <meta charset="utf-8"> <script src="../angular.js"></script> <script src="scri

angular学习笔记(九)-css类和样式2

在上一个例子中,元素的类名使用拼接的方法,这样,类名中就不得不带有true或false,并且不易维护,所以,angular使用ng-class属性来控制元素的类名: 我们来看一个小例子,点击error按钮,顶部提示错误框,点击warning按钮,顶部提示警告框. 错误框的类名是.err,警告框的类名是.warn: <!DOCTYPE html> <html ng-app> <head> <title>6.2css类和样式</title> <

jQuery整理笔记九----功能性表格开发

示例中用到的一些图片.插件.样式文件等下载地址:点我进入下载 过去在开发过程中关于table方面的jquery应用仅仅是局限于使用jquery操作table增加一行.删除一列等等操作.今天整理的跟过去用的不一样. 1.uiTableFilter uiTableFilter是一款表格数据行过滤插件,使用很简单,具体用法如下: $.uiTableFilter(table,phrase)  该函数包含两个参数,其中第一个参数为jQuery对象,即为jQuery方法匹配的表格,或者也可以是jQuery匹