flume学习安装

最近项目组有需求点击流日志需要自己收集,学习了一下flume并且安装成功了。相关信息记录一下。

1)下载flume1.5版本

wget http://www.apache.org/dyn/closer.cgi/flume/1.5.0.1/apache-flume-1.5.0.1-bin.tar.gz

2) 解压flume1.5

tar -zxvf apache-flume-1.5.0.1-bin.tar.gz

3) 配置环境变量

jdk已装

export FLUME_HOME=/XXX/XX/apache-flume-1.5.0.1-bin

export PATH=$FLUME_HOME/bin:$PATH</p><p align="left">

4) 配置conf相关文件</p><p align="left">

4.1) 配置flume-env.sh 主要设置一下JAVA_HOME

4.2) 配置log4j.properties

如果是测试环境注释掉flume.root.logger=INFO,LOGFILE选择flume.root.logger=DEBUG,console把日志打印到控制台

4.3) 配置flume-conf.properties 这个文件名可以随便改 运行命令时指定你自己创建的属性文件即可

#set agent 名字为a1  sources名字为r1   sinks名字为k1    channels名字为c1

a1.sources = r1

a1.sinks = k1

a1.channels = c1

sources组件类型为exec 执行linux命令

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/flume/flume/conf/source.txt (大小tail -f有很大区别,解决了我们一个大问题)

sinks组件类型为logger

a1.sinks.k1.type = logger

channels组件类型为内存

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

把sources、sinks与管道连通起来

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

5) 在flume目录下运行命令

bin/flume-ng agent -n a1 -f test/source-tail-sink-logger.properties --conf conf

初步的例子完成了。目前我们生产环境是有两个节点往metaq里面生产数据。 metaq自定义一个sink(自定义sink见后面代码)

记得把metaq相关jar放到flume/lib下 gecko-1.1.4.jar metamorphosis-client-1.4.6.2.jar metamorphosis-commons-1.4.6.2.jar zkclient-0.3.jar zookeeper-3.4.3.jar

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/flume/flume/conf/source.txt

a1.sinks.k1.type = com.XX.flume.sink.MetaQSink

a1.sinks.k1.sink.zkConnect = 0.0.0.0:2181,0.0.0.0:2181,0.0.0.0:2181

a1.sinks.k1.sink.zkRoot = /meta(此目录必须写死)

a1.sinks.k1.sink.topic = XXXX

a1.sinks.k1.sink.batchSize = 20000

#a1.channels.c1.type = memory

#a1.channels.c1.capacity = 1000000

#a1.channels.c1.transactionCapacity = 100000

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /home/hadoop/flume/flume/channel/checkpoint

a1.channels.c1.dataDirs = /home/hadoop/flume/flume/channel/data

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

自定义sink代码

</pre><pre name="code" class="java">package com.jd.flume.sink;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 功能描述:
 * <p/>
 *      这个类主要是将flume收集的数据发送到metaq消息队列中
 * <p/>
 * ----------------------------
 * 姓名:毛祥溢
 * 邮箱:[email protected]
 * 网站:www.maoxiangyi.cn
 */
public class MetaQSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory.getLogger(MetaQSink.class);
    private MessageSessionFactory sessionFactory;
    private MessageProducer producer;

    private String zkConnect;
    private String zkRoot;
    private String topic;
    private int batchSize;
    private int threadNum;
    private ExecutorService executor;

    public MetaQSink() {
    }

    @Override
    public void configure(Context context) {
        this.zkConnect = context.getString("sink.zkConnect");
        this.zkRoot = context.getString("sink.zkRoot");
        this.topic = context.getString("sink.topic");
        this.batchSize = context.getInteger("sink.batchSize", 10000);
        this.threadNum = context.getInteger("sink.threadNum", 50);
        executor = Executors.newCachedThreadPool();

        MetaClientConfig metaClientConfig = new MetaClientConfig();
        ZkUtils.ZKConfig zkConfig = new ZkUtils.ZKConfig();
        zkConfig.zkConnect = zkConnect;
        zkConfig.zkRoot = zkRoot;
        metaClientConfig.setZkConfig(zkConfig);
        try {
            sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
        } catch (MetaClientException e) {
            e.printStackTrace();
            logger.error("", e);
            throw new RuntimeException("init error");
        }
        producer = sessionFactory.createProducer();
        logger.info("zkConnect:" + zkConnect + ", zkRoot:" + zkRoot
                + ", topic:" + topic);
    }

    @Override
    public Status process() throws EventDeliveryException {
        long start = System.currentTimeMillis();
        producer.publish(topic);
        Status result = Status.READY;
        final Channel channel = getChannel();
        final AtomicInteger al = new AtomicInteger(0);
        final CountDownLatch cdl = new CountDownLatch(threadNum);
        for (int t = 0; t < threadNum; t++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {

                    Event event = null;
                    Transaction transaction = null;
                    int i = 0;
                    try {
                        transaction = channel.getTransaction();
                        transaction.begin();
                        boolean startTransaction = false;
                        for (i = 0; i < batchSize; i++) {
                            event = channel.take();
                            if (event != null) {
                                if (i == 0) {
                                    producer.beginTransaction();
                                    startTransaction = true;
                                }
                                final SendResult sendResult = producer
                                        .sendMessage(new Message(topic, event
                                                .getBody()));
                                // check result
                                if (!sendResult.isSuccess()) {
                                    logger.error("Send message failed,error message:"
                                            + sendResult.getErrorMessage());
                                    throw new RuntimeException(
                                            "Send message failed,error message:"
                                                    + sendResult
                                                    .getErrorMessage());
                                } else {
                                    logger.debug("Send message successfully,sent to "
                                            + sendResult.getPartition());
                                }
                            } else {
                                // No event found, request back-off semantics
                                // from the sink
                                // runner
                                // result = Status.BACKOFF;
                                break;
                            }

                        }
                        if (startTransaction) {
                            producer.commit();
                        }
                        al.addAndGet(i);
                        transaction.commit();
                    } catch (Exception ex) {
                        logger.error("error while rollback:", ex);
                        try {
                            producer.rollback();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        transaction.rollback();
                    } finally {
                        cdl.countDown();
                        transaction.close();
                    }
                }
            });
        }
        try {
            cdl.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (al.get() == 0) {
            result = Status.BACKOFF;
        }

        logger.info("metaqSink_new,process:{},time:{},queue_size:{}",
                new Object[] { al.get(), System.currentTimeMillis() - start });
        return result;
    }
}

时间: 2024-10-10 07:56:23

flume学习安装的相关文章

flume学习(三):flume将log4j日志数据写入到hdfs(转)

原文链接:flume学习(三):flume将log4j日志数据写入到hdfs 在第一篇文章中我们是将log4j的日志输出到了agent的日志文件当中.配置文件如下: [plain] view plaincopy tier1.sources=source1 tier1.channels=channel1 tier1.sinks=sink1 tier1.sources.source1.type=avro tier1.sources.source1.bind=0.0.0.0 tier1.sources

学习安装linux系统

初学linux,感觉很糊里糊涂的,很高兴能够和大家一起学习linux操作系统, 学习安装linux系统,布布扣,bubuko.com

让你系统认识flume及安装和使用flume1.5传输数据到hadoop2.2

问题导读:1.什么是flume?2.如何安装flume?3.flume的配置文件与其它软件有什么不同?一.认识flume1.flume是什么?这里简单介绍一下,它是Cloudera的一个产品2.flume是干什么的?收集日志的3.flume如何搜集日志?我们把flume比作情报人员(1)搜集信息(2)获取记忆信息(3)传递报告间谍信息flume是怎么完成上面三件事情的,三个组件:source: 搜集信息channel:传递信息sink:存储信息上面有点简练,详细可以参考Flume内置channe

LARAVEL学习--安装

之前一直使用Codeignitor框架进行PHP的开发,Codeignitor是一个非常优秀的框架,上手简单,文档极其友好,流行程度甚高(这带来了很好的社区支持+云环境支持),很轻量,可扩展性佳,性能也不错.但它有两个问题: 1.快糙猛(当然这也是PHP的本来特点),不够优雅(有悖最佳实践),也有点老了(CI 2也一样),跟不上新的行业标准(PSR-0/Composer) 2.前景堪忧,CI作者已经放弃更新了. 所以我重新搜索了当今流行的PHP框架. Sitepoint网站做了一个小的调查,看看

日志采集框架Flume的安装及使用

日志采集框架Flume的安装及使用 1.Flume介绍 1.1.Flume概述 Flume是一个分布式.可靠.和高可用(旧版Flume og才有高可用)的海量日志采集.传输和聚合的系统. Flume可以采集文件,socket数据包等各种形式源数据, 又可以将采集到的数据输出到HDFS.hbase.hive.kafka等众多外部存储系统中 一般的采集需求,通过对flume的简单配置即可实现 Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景 1.2

Linux学习-安装

Linux文件系统结构 / --根目录 /home/alex --访问alex /dev --devices 设备 (Linux下一切皆文件) SCSI硬盘 /dev/sda /dev/sda/sda1 /dev/sda/sda2 /dev/sdb IDE硬盘 /dev/hda /dev/hdb 安装Linux必须分区 1.根分区/ 2.swap分区 交换分区相当于内存 Linux学习-安装

Flume单机安装与配置

下载 apache-flume-1.6.0-bin.tar.gz 解压 tar -zxvf apache-flume-1.6.0-bin.tar.gz 配置 cp conf/flume-conf.properties.template conf/flume-conf.properties cp conf/flume-env.sh.template conf/flume-env.sh flume-env.sh # Enviroment variables can be set here. expo

Flume的安装与配置

Flume的安装与配置 一.       资源下载 资源地址:http://flume.apache.org/download.html 程序地址:http://apache.fayea.com/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz 源码地址:http://mirrors.hust.edu.cn/apache/flume/1.6.0/apache-flume-1.6.0-src.tar.gz 二.       安装搭建 (1)编译好的包: 直接在安装

apache flume agent安装

1.Flume Agent安装(采用spoolDir模式以便获取系统.应用等日志信息) 注:使用jyapp用户安装 一台虚拟机部署了多个JAVA应用同时需要部署多个flume-agent进行监控时, 需要调整如下配置文件: a    flume-agent/conf/app.conf中的spool_Dir参数 b    flume-agent/conf/flume-env.sh中的jmxremote.port参数 c    logback.xml中fileNamePattern参数 d    f