kafka+flume-ng+hdfs 整合

Kafka 

   由LinkedIn于2010年12月(https://thenewstack.io/streaming-data-at-linkedin-apache-kafka-reaches-1-1-trillion-messages-per-day/)开源出来一个消息的发布/订阅系统,用scala实现;版本从0.05到现在0.10.2.0(2017-02-25)

  

   系统中,生产者(producer)主动向集群某个topic发送(push)消息(message);消费者(consumer)以组(group)为单位订阅topic,当消费者启动消费程序之后,如果集群中有未消费完的或者新的消息,则实时的拉取(pull)消息到消费者本地处理。

   集群是以topic为单位管理和存储消息的。既然是集群,就利用集群的优势;将topic分成多个分片(partition),生产者发送的消息存储在各个分片上。对应的可能是不同节点的不同本地磁盘。每个分片可以设置多个副本(replicated)用来确保数据的容错性。

   而且每个分片上的数据都是前后有序的;对应的就是后面的消息追加的文件中去;这种场景就能够利用磁盘顺序读写的特性。

   

   当然消费者消费也是有序消费的;偏移量(offset)从小到大顺序消费。

   

   整体效果就是

   

   磁盘顺序读写 内存随机读写

特点

  分布式:由zookeeper管理,可以启动多个broker-server;以集群的方式给生产环境提供稳定的服务。

  容错性:大部分分布式都具有的。

      1.只要有一个正常的broker-server,集群就能正常运行。

      2.可以设置为Topic的partition设置副本,确保就算一台机器的磁盘坏了;也不影响数据消费

  负载问题:

      1.生产者发送消息可以指定规则,发送到不同的partition上。

      2.topic中所有partition选取一个对外提供服务的leader;如果leader宕掉了,从后选中选取下一个。

  可扩展性:新增broker非常方便。

生产者样例代码

 1 import java.util.Properties
 2 import kafka.producer.ProducerConfig
 3 import kafka.javaapi.producer.Producer
 4 import java.util.Random
 5 import java.util.Date
 6 import kafka.producer.KeyedMessage
 7 import kafka.producer.Partitioner
 8 import kafka.utils.VerifiableProperties
 9 import org.apache.log4j.PropertyConfigurator
10 import java.util.concurrent.TimeUnit
11 import java.text.SimpleDateFormat
12 import org.tx.common.BIConstant
13
14 /**
15  * @date    2015-10-27 16:54:19
16  */
17 object FirstKafkaProducer {
18
19   PropertyConfigurator.configure("etc/log4j.properties");
20
21   def main(args: Array[String]): Unit = {
22
23
24 //    val Array(interval,records) = args
25     val (interval,records) = (1,1)
26     val props = new Properties()
27 //        props.put("metadata.broker.list", "own:9092,own:9093,own:9094")
28         props.put("metadata.broker.list", "hdpc1-dn003:9092")
29         props.put("serializer.class", "kafka.serializer.StringEncoder")
30         props.put("partitioner.class", "org.henry.scala.scalamvn.SimplePartitioner")
31 //        props.put("request.required.acks", "-1")
32     val config = new ProducerConfig(props);
33     val producer = new Producer[String, String](config)
34     val sdf = new SimpleDateFormat(BIConstant.DATE_SDF)
35
36     for (i <- 1 to records.toInt) {
37       val rnd = new Random();
38       val runtime = new Date();
39       val ip = rnd.nextInt(255).toString();
40 //      val msg = runtime + ",www.example.com," + ip;
41       val msg = "1001|2|2|"+runtime.getTime
42       println(" *** "+msg)
43       val data = new KeyedMessage[String, String]("mytopic", ip, msg)
44       TimeUnit.SECONDS.sleep(interval.toInt)
45       producer.send(data)
46     }
47     producer.close
48   }
49 }
50
51 class SimplePartitioner(props:VerifiableProperties) extends Partitioner {
52
53   override def partition(key: Any, numPartitions: Int): Int = {
54     var partition = 0;
55     val stringKey =  key.toString();
56     val offset = stringKey.lastIndexOf(‘.‘);
57     if (offset > 0) {
58        partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
59     }
60     partition;
61   }
62 }

消费者样例代码(https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

 1 package com.test.groups;
 2
 3 import kafka.consumer.ConsumerIterator;
 4 import kafka.consumer.KafkaStream;
 5
 6 public class ConsumerTest implements Runnable {
 7     private KafkaStream m_stream;
 8     private int m_threadNumber;
 9
10     public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
11         m_threadNumber = a_threadNumber;
12         m_stream = a_stream;
13     }
14
15     public void run() {
16         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
17         while (it.hasNext())
18             System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
19         System.out.println("Shutting down Thread: " + m_threadNumber);
20     }
21 }

flume-ng

   由Cloudera于2010年5月开源出来,在2010年7月加入Cloudera Hadoop的发行版本CDH3b2(http://blog.cloudera.com/blog/2010/07/whats-new-in-cdh3b2-flume/)中,用Java开发的。

   开始是flume-OG,一直到2011年10月,最后版本是0.94.0。后面对核心组件/代码架构的进行里程碑式的重构,就有了flume-ng;并开源到apache。

   flume OG:agent采集数据,然后发送到collector;collector汇集后存储到目的地。

   

   flume NG最小单位的架构;只有agent角色,分为三个步骤来接收和传输数据

     Source(数据采集): 默认实现了从不同数据源接受数据。如Avro/Kafka/Netcat/Http等等;也可以根据具体需求扩展实现source

     Channel(数据临时存储的地方): 接受source的数据,可选择持久化到数据库或者本地磁盘;确保sink处理完数据后,删除;保证数据完整性。

     Sink(数据存储目的地):数据存储介质的实现。可选择HDFS/kafka/local file等等。

   在整个流程中,接收到的每条数据被封装成Event来进行传递和处理的。

     

   

特点

   1.部署,配置简单方便;通用

   2.中间对数据做了临时存储,确保数据不丢失

整合

   那么将这几个有关联的开源组件为我所用;而且还要考虑到后期开发调试方便,版本管理;部署到生产环境下的时候操作简单,可维护性好;能够监控JVM状态。就需要进行自动化部署的改造,而且公司有成熟可靠的解决方案。那么一切就顺理成章了。

   整体思路是:使用maven将项目依赖的jar包/启动脚本,简单的配置打包成tar包。具体的配置项执行启动脚本,调用Main方法后从公共的配置中心加载。

   在这次改造中,主要的任务有两大块。

     1.理解,使用现在成熟的自动化部署步骤、过程。

     2.确保功能正常的情况下,将原flume的conf下面配置文件搬移到配置中心的事情。

   基本流程是:

     将代码提交到Git进行管理,使用jekins获取Git代码打成tar包。tar包里面基本包含bin,conf,lib目录。bin下面存放任务启动停止脚本,conf下面存放简单的配置参数,lib下面存放项目依赖的jar和自身jar包。发布到私服;

     服务器下载tar包,解压;启动bin下面start.sh脚本启动应用。应用启动时会将配套的日志、JMX监控服务注册,启动;再从配置中心获取详细的参数配置;启动目标程序。

   

    以本次flume的使用场景作为例子,具体做了哪些事。

    由于我们的数据源选取为Kafka,存储介质是HDFS。所以数据传输流就是kafka(source)-->local file(channel)-->HDFS(sink)

    而原生态的flume启动入口是使用脚本调用org.apache.flume.node.Application(Main方法入口程序),所以改造的切入点就是这里。

    调用node.Application时,额外传入参数

-Djava-container=com.appleframework.flume.node.Application
-Dconfig-factory=com.appleframework.config.PropertyConfigurerFactory

    其中PropertyConfigurerFactory是读取、加载system.properties指向配置中心的详细配置参数的作用。Application主要是增加从公共配置中加载参数的方法。其他地方保持统一。其中deploy.group=flume-demo,deploy.dataId=hdfs,deploy.env=dev这三个参数来识别一组配置信息。

    更改的代码详情如下

    node.Application

 1 public class Application {
 2
 3     ...
 4     public static void main(String[] args) {
 5
 6
 7     ...
 8     boolean isZkConfigured = false;
 9
10             Options options = new Options();
11
12             Option option = new Option("n", "name", true, "the name of this agent");
13             option.setRequired(true);
14             options.addOption(option);
15
16             option = new Option("f", "conf-file", true, "specify a config file (required if -z missing)");
17             option.setRequired(false);
18             options.addOption(option);
19
20             option = new Option(null, "no-reload-conf", false, "do not reload config file if changed");
21             options.addOption(option);
22
23             // Options for Zookeeper
24             option = new Option("z", "zkConnString", true, "specify the ZooKeeper connection to use (required if -f missing)");
25             option.setRequired(false);
26             options.addOption(option);
27
28             option = new Option("p", "zkBasePath", true, "specify the base path in ZooKeeper for agent configs");
29             option.setRequired(false);
30             options.addOption(option);
31
32             option = new Option("h", "help", false, "display help text");
33             options.addOption(option);
34
35             option = new Option("e", "env", true, "the environment of this app");
36             option.setRequired(false);
37             options.addOption(option);
38
39                         Component.init(args);
40     ...
41     }
42     ...
43 }

    Component

  1 package com.appleframework.flume.node;
  2
  3 import java.io.File;
  4 import java.lang.management.ManagementFactory;
  5 import java.text.SimpleDateFormat;
  6 import java.util.ArrayList;
  7 import java.util.Date;
  8 import java.util.Hashtable;
  9 import java.util.List;
 10 import java.util.Properties;
 11
 12 import javax.management.MBeanServer;
 13 import javax.management.ObjectName;
 14
 15 import org.apache.log4j.Logger;
 16
 17 import com.appleframework.boot.config.ConfigContainer;
 18 import com.appleframework.boot.core.CommandOption;
 19 import com.appleframework.boot.core.Container;
 20 import com.appleframework.boot.core.log4j.Log4jContainer;
 21 import com.appleframework.boot.core.log4j.LoggingConfig;
 22 import com.appleframework.boot.core.monitor.MonitorConfig;
 23 import com.appleframework.boot.core.monitor.MonitorContainer;
 24 import com.appleframework.config.core.PropertyConfigurer;
 25
 26 public class Component {
 27
 28     private static Logger logger = Logger.getLogger(Component.class);
 29
 30     static void init(String[] args) {
 31         //处理启动参数
 32         CommandOption.parser(args);
 33
 34         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 35
 36         final List<Container> containers = new ArrayList<Container>();
 37         containers.add(new Log4jContainer());
 38         containers.add(new MonitorContainer());
 39
 40         String configContainer = System.getProperty("config-factory");
 41         if (null != configContainer) {
 42             containers.add(new ConfigContainer(configContainer));
 43         }
 44
 45         for (Container container : containers) {
 46             container.start();
 47             try {
 48
 49                 Hashtable<String, String> properties = new Hashtable<String, String>();
 50
 51                 properties.put(Container.TYPE_KEY, Container.DEFAULT_TYPE);
 52                 properties.put(Container.ID_KEY, container.getType());
 53
 54                 ObjectName oname = ObjectName.getInstance("com.appleframework", properties);
 55                 Object mbean = null;
 56                 if(container instanceof Log4jContainer) {
 57                     mbean = new LoggingConfig();
 58                 }
 59                 else if(container instanceof MonitorContainer) {
 60                     mbean = new MonitorConfig();
 61                 }
 62                 else {
 63                     logger.error("The Error Container :" + container.getName());
 64                 }
 65
 66                 if (mbs.isRegistered(oname)) {
 67                     mbs.unregisterMBean(oname);
 68                 }
 69                 mbs.registerMBean(mbean, oname);
 70             } catch (Exception e) {
 71                 logger.error("注册JMX服务出错:" + e.getMessage(), e);
 72             }
 73             logger.warn("服务 " + container.getType() + " 启动!");
 74         }
 75
 76         logger.warn(new SimpleDateFormat("[yyyy-MM-dd HH:mm:ss]").format(new Date()) + " 所有服务启动成功!");
 77
 78         extHadoopConfToLocal();
 79     }
 80
 81     /**
 82      * 读取配置中心的hdfs配置到本地,然后动态加载到classpath
 83      */
 84     static void extHadoopConfToLocal() {
 85
 86         System.setProperty("HADOOP_USER_NAME","hdfs");
 87         String abc = PropertyConfigurer.getString("abc");
 88         logger.info("load conf from center ["+abc+"]");
 89         String dir = System.getProperty("user.dir");
 90         File file = new File(dir + "/conf/core-site.xml");
 91
 92         // write core-site.xml to local if not exists
 93         if (!file.exists()) {
 94             Properties centerProps = PropertyConfigurer.getProps();
 95             Properties hdfsProps = new Properties();
 96             for (String key : centerProps.keySet().toArray(new String[0])) {
 97                 if (HDFSUtil.startWith(key)) {
 98                     hdfsProps.put(key, centerProps.get(key));
 99                 }
100             }
101             try {
102                 HDFSUtil.writerXMLToLocal(file, hdfsProps);
103             } catch (Throwable t) {
104                 logger.error("write hdfs conf to local errors["+hdfsProps+"]", t);
105             }
106         }
107
108         //load dynamically to classpath
109         ExtClasspathLoader.loadResourceDir(file.getParent());
110     }
111 }

    HDFSUtil

 1 package com.appleframework.flume.node;
 2
 3 import java.io.File;
 4 import java.io.FileNotFoundException;
 5 import java.io.FileOutputStream;
 6 import java.io.UnsupportedEncodingException;
 7 import java.lang.reflect.Method;
 8 import java.net.URL;
 9 import java.net.URLClassLoader;
10 import java.util.HashSet;
11 import java.util.Properties;
12 import java.util.Set;
13
14 import org.dom4j.Document;
15 import org.dom4j.DocumentHelper;
16 import org.dom4j.Element;
17 import org.dom4j.io.OutputFormat;
18 import org.dom4j.io.XMLWriter;
19
20 /**
21  * @author tjuhenryli<[email protected]>
22  * @date 2017-02-23 10:31:14
23  *
24  **/
25
26 public class HDFSUtil {
27     //DFS_HA_NAMENODES_KEY_PREFIX DFS_NAMENODE_RPC_ADDRESS_KEY DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX
28     public static final String FS_DEFAULT_NAME_KEY = "fs.defaultFS";
29     public static final String DFS_NAMESERVICES = "dfs.nameservices";
30     public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
31     public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
32     public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
33     public static Set<String> CACHE = new HashSet<String>();
34     static {
35         CACHE.add(FS_DEFAULT_NAME_KEY);
36         CACHE.add(DFS_NAMESERVICES);
37         CACHE.add(DFS_HA_NAMENODES_KEY_PREFIX);
38         CACHE.add(DFS_NAMENODE_RPC_ADDRESS_KEY);
39         CACHE.add(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX);
40     }
41
42     public static boolean startWith(String content) {
43         for (String key : CACHE)
44             if (content.startsWith(key)) return true;
45         return false;
46     }
47
48     public static void writerXMLToLocal(File file,Properties props) throws Throwable {
49         Element root = DocumentHelper.createElement("configuration");
50         Document document = DocumentHelper.createDocument(root);
51
52         for (String key : props.keySet().toArray(new String[0])) {
53             Element property = root.addElement("property");
54             Element name = property.addElement("name");
55             Element value = property.addElement("value");
56             name.setText(key);
57             value.setText(props.getProperty(key));
58         }
59
60         OutputFormat format = new OutputFormat("    ", true);// 设置缩进为4个空格,并且另起一行为true
61         if (file.exists()) return;
62         else if (!file.getParentFile().exists()) file.getParentFile().mkdirs();
63         XMLWriter xmlWriter = new XMLWriter(new FileOutputStream(file), format);
64         xmlWriter.write(document);
65     }
66
67 }

    ExtClasspathLoader

  1 package com.appleframework.flume.node;
  2
  3 import java.io.File;
  4 import java.lang.reflect.Method;
  5 import java.net.URL;
  6 import java.net.URLClassLoader;
  7 import java.util.ArrayList;
  8 import java.util.List;
  9
 10 /**
 11  * 根据properties中配置的路径把jar和配置文件加载到classpath中。
 12  *  @author  jnbzwm
 13  *
 14   */
 15 public   final   class  ExtClasspathLoader  {
 16      /**  URLClassLoader的addURL方法  */
 17      private   static  Method addURL  =  initAddMethod();
 18
 19      private   static  URLClassLoader classloader  =  (URLClassLoader) ClassLoader.getSystemClassLoader();
 20
 21      /**
 22      * 初始化addUrl 方法.
 23      *  @return  可访问addUrl方法的Method对象
 24       */
 25      private   static  Method initAddMethod()  {
 26          try   {
 27             Method add  =  URLClassLoader.class.getDeclaredMethod( "addURL" ,  new  Class[]  { URL.class  } );
 28             add.setAccessible( true );
 29              return  add;
 30         }
 31          catch  (Exception e)  {
 32              throw   new  RuntimeException(e);
 33         }
 34     }
 35
 36      /**
 37      * 加载jar classpath。
 38       */
 39 //     public   static   void  loadClasspath()  {
 40 //        List < String >  files  =  getJarFiles();
 41 //         for  (String f : files)  {
 42 //            loadClasspath(f);
 43 //        }
 44 //
 45 //        List < String >  resFiles  =  getResFiles();
 46 //
 47 //         for  (String r : resFiles)  {
 48 //            loadResourceDir(r);
 49 //        }
 50 //    }
 51
 52      public   static   void  loadClasspath(String filepath)  {
 53         File file  =   new  File(filepath);
 54         loopFiles(file);
 55     }
 56
 57      public   static   void  loadResourceDir(String filepath)  {
 58         File file  =   new  File(filepath);
 59         loopDirs(file);
 60     }
 61
 62      /**
 63      * 循环遍历目录,找出所有的资源路径。
 64      *  @param  file 当前遍历文件
 65       */
 66      private   static   void  loopDirs(File file)  {
 67          //  资源文件只加载路径
 68          if  (file.isDirectory())  {
 69             addURL(file);
 70             File[] tmps  =  file.listFiles();
 71              for  (File tmp : tmps)  {
 72                 loopDirs(tmp);
 73             }
 74         }
 75     }
 76
 77      /**
 78      * 循环遍历目录,找出所有的jar包。
 79      *  @param  file 当前遍历文件
 80       */
 81      private   static   void  loopFiles(File file)  {
 82          if  (file.isDirectory())  {
 83             File[] tmps  =  file.listFiles();
 84              for  (File tmp : tmps)  {
 85                 loopFiles(tmp);
 86             }
 87         }
 88          else   {
 89              if  (file.getAbsolutePath().endsWith( " .jar " )  ||  file.getAbsolutePath().endsWith( " .zip " ))  {
 90                 addURL(file);
 91             }
 92         }
 93     }
 94
 95      /**
 96      * 通过filepath加载文件到classpath。
 97      *  @param  filePath 文件路径
 98      *  @return  URL
 99      *  @throws  Exception 异常
100       */
101      private   static   void  addURL(File file)  {
102          try   {
103             addURL.invoke(classloader,  new  Object[]  { file.toURI().toURL() } );
104         }
105          catch  (Exception e)  {
106         }
107     }
108
109      /**
110      * 从配置文件中得到配置的需要加载到classpath里的路径集合。
111      *  @return
112       */
113      private   static  List < String >  getJarFiles()  {
114          //  TODO 从properties文件中读取配置信息略
115          return   new ArrayList<String>() ;
116     }
117
118      /**
119      * 从配置文件中得到配置的需要加载classpath里的资源路径集合
120      *  @return
121       */
122      private   static  List < String >  getResFiles()  {
123          // TODO 从properties文件中读取配置信息略
124          List<String> files = new ArrayList<String>();
125          files.add("etc");
126          return   files ;
127     }
128
129      public   static   void  main(String[] args)  {
130     }
131 }

    

      

时间: 2024-10-10 13:37:34

kafka+flume-ng+hdfs 整合的相关文章

Flume 学习笔记之 Flume NG+Kafka整合

Flume NG集群+Kafka集群整合: 修改Flume配置文件(flume-kafka-server.conf),让Sink连上Kafka hadoop1: #set Agent name a1.sources = r1 a1.channels = c1 a1.sinks = k1 #set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacit

【Flume】flume ng中HDFS sink设置按天滚动,0点滚动文件,修改源码实现

HDFS sink里有个属性hdfs.rollInterval=86400,这个属性你设置了24小时滚动一次,它的确就到了24小时才滚动,但是我们的需求往往是到了0点就滚动文件了,因为离线的job因为都会放在夜里执行. 如果flume是早上9点启动的,那么要到明天早上9点,hdfs的文件才会关闭,难道job要等到9点后才执行,这显然不合适,所以通过修改源码使其能够在0点滚动文件. 首先添加一个属性,可配置为day,hour,min private String timeRollerFlag; t

Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了

高可用Hadoop平台-Flume NG实战图解篇

1.概述 今天补充一篇关于Flume的博客,前面在讲解高可用的Hadoop平台的时候遗漏了这篇,本篇博客为大家讲述以下内容: Flume NG简述 单点Flume NG搭建.运行 高可用Flume NG搭建 Failover测试 截图预览 下面开始今天的博客介绍. 2.Flume NG简述 Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.轻量,配置简单,适用于各种日志收集,并支持Failover和负载均衡.并且它拥有非常丰富的组件.Fl

Flume NG 学习笔记(一)简介

一.简介 Flume是一个分布式.可靠.高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力. Flume在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG(next generation),0.9.x的称为Flume OG(originalgeneration). 对于OG版本, Flume NG (1.x.x)的主要变化如下: 1.sources和sinks 使用chann

Flume NG 学习笔记(五)Sinks和Channel配置

一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. 目前,它支持HDFS的文本和序列文件格式,以及支持两个文件类型的压缩.支持将所用的时间.数据大小.事件的数量为操作参数,对HDFS文件进行关闭(关闭当前文件,并创建一个新的).它还可以对事源的机器名(hostname)及时间属性分离数据,即通过时间戳将数据分布到对应的文件路径. HDFS目录路径可

【Flume NG用户指南】(2)构造

作者:周邦涛(Timen) Email:[email protected] 转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/28277575 上一篇请參考[Flume NG用户指南](1)设置 3. 配置 前边的文章已经介绍过了,Flume Agent配置是从一个具有分层属性的Java属性文件格式的文件里读取的. 3.1 定义数据流 要在一个Flume Agent中定义数据流,你须要通过一个Channel将Source和Sin

【Flume NG用户指南】(2)配置

作者:周邦涛(Timen) Email:[email protected] 转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/28277575 上一篇请参考[Flume NG用户指南](1)设置 3. 配置 前边的文章已经介绍过了,Flume Agent配置是从一个具有分层属性的Java属性文件格式的文件中读取的. 3.1 定义数据流 要在一个Flume Agent中定义数据流,你需要通过一个Channel将Source和Sin

分布式实时日志系统(二) 环境搭建之 flume 集群搭建/flume ng资料

最近公司业务数据量越来越大,以前的基于消息队列的日志系统越来越难以满足目前的业务量,表现为消息积压,日志延迟,日志存储日期过短,所以,我们开始着手要重新设计这块,业界已经有了比较成熟的流程,即基于流式处理,采用 flume 收集日志,发送到 kafka 队列做缓冲,storm 分布式实时框架进行消费处理,短期数据落地到 hbase.mongo中,长期数据进入 hadoop 中存储. 接下来打算将这其间所遇到的问题.学习到的知识记录整理下,作为备忘,作为分享,带给需要的人. 学习flume ng的