Kafka与Logstash的数据采集

Kafka与Logstash的数据采集

基于Logstash跑通Kafka还是需要注意很多东西,最重要的就是理解Kafka的原理。

Logstash工作原理

由于Kafka采用解耦的设计思想,并非原始的发布订阅,生产者负责产生消息,直接推送给消费者。而是在中间加入持久化层——broker,生产者把数据存放在broker中,消费者从broker中取数据。这样就带来了几个好处:

  • 1 生产者的负载与消费者的负载解耦
  • 2 消费者按照自己的能力fetch数据
  • 3 消费者可以自定义消费的数量

另外,由于broker采用了主题topic-->分区的思想,使得某个分区内部的顺序可以保证有序性,但是分区间的数据不保证有序性。这样,消费者可以以分区为单位,自定义读取的位置——offset。

Kafka采用zookeeper作为管理,记录了producer到broker的信息,以及consumer与broker中partition的对应关系。因此,生产者可以直接把数据传递给broker,broker通过zookeeper进行leader-->followers的选举管理;消费者通过zookeeper保存读取的位置offset以及读取的topic的partition分区信息。

由于上面的架构设计,使得生产者与broker相连;消费者与zookeeper相连。有了这样的对应关系,就容易部署logstash-->kafka-->logstash的方案了。

接下来,按照下面的步骤就可以实现logstash与kafka的对接了。

启动kafka

启动zookeeper:

$zookeeper/bin/zkServer.sh start

启动kafka:

$kafka/bin/kafka-server-start.sh $kafka/config/server.properties &

创建主题

创建主题:

$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1

查看主题:

$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe

测试环境

执行生产者脚本:

$kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello

执行消费者脚本,查看是否写入:

$kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello

输入测试

input{
    stdin{}
}
output{
    kafka{
        topic_id => "hello"
        bootstrap_servers => "192.168.0.4:9092" # kafka的地址
        batch_size => 5
    }
    stdout{
        codec => rubydebug
    }
}

读取测试

logstash配置文件:

input{
    kafka {
        codec => "plain"
        group_id => "logstash1"
        auto_offset_reset => "smallest"
        reset_beginning => true
        topic_id => "hello"
        #white_list => ["hello"]
        #black_list => nil
        zk_connect => "192.168.0.5:2181" # zookeeper的地址
   }

}
output{
    stdout{
        codec => rubydebug
    }
}

分类: KafkaLogstash

时间: 2024-12-21 07:45:08

Kafka与Logstash的数据采集的相关文章

kafka(logstash) + elasticsearch 构建日志分析处理系统

第一版:logstash + es 第二版:kafka 替换 logstash的方案

ELK实时日志分析平台(elk+kafka+metricbeat)-logstash(四)

1. 安装并测试: 2.  添加配置: 3. 启动检查:

ELK架构下利用Kafka Group实现Logstash的高可用

系统运维的过程中,每一个细节都值得我们关注 下图为我们的基本日志处理架构 所有日志由Rsyslog或者Filebeat收集,然后传输给Kafka,Logstash作为Consumer消费Kafka里边的数据,分别写入Elasticsearch和Hadoop,最后使用Kibana输出到web端供相关人员查看,或者是由Spark接手进入更深层次的分析 在以上整个架构中,核心的几个组件Kafka.Elasticsearch.Hadoop天生支持高可用,唯独Logstash是不支持的,用单个Logsta

ELK5.3+Kafka集群配置

[一]资源准备 # 3台4C*8G, 安装Zookeeper.Kafka.Logstash--Broker(input: filebeat; output: Kafka) 10.101.2.23 10.101.2.24 10.101.2.25 # 2台4C*8G, 安装Logstash--Indexer(input: Kafaka; output: Elasticsearch) 10.101.2.26 10.101.2.27 # 3台8C*16G, 安装Elasticsearch 10.101.

离线部署ELK+kafka日志管理系统

1.简介 对于日志来说,最常见的需求就是收集.查询.显示,正对应logstash.elasticsearch.kibana的功能. ELK日志系统在系统中,主要可解决的问题: 基于日志的数据挖掘 问题排查,上线检查 根据关键字查询日志详情 异常数据自动触发消息通知 服务器监控,应用监控,Bug管理 统计分析,比如接口的调用次数.执行时间.成功率等 性能分析,用户行为分析,安全漏洞分析,时间管理 Logstash: Logstash是一个用来搜集.分析.过滤日志的工具.它支持几乎任何类型的日志,包

Logstash + DataHub + MaxCompute/StreamCompute 进行实时数据分析

Logstash是一款开源日志收集处理框架,有各种不同的input.filter.output插件,用户使用这些插件可以将各种数据源导入到其他系统.logstash-output-datahub插件,实现将数据导入DataHub的功能,通过简单的配置即可完成数据采集和向DataHub的传输任务.结合StreamCompute(Galaxy)用户可以方便的完成流式数据从采集,传输,开发到结果数据存储与展示的整套解决方案.同时,还可以通过创建Collector同步任务将数据同步到MaxCompute

logstash+es+kibana+redis搭建

环境信息: CentOS 6.5 redis 3.0.4 logstash elasticsearch kibana 服务端ip:192.168.0.65 客户端ip:192.168.0.66 关系结构图: 图片引用自:http://www.wklken.me/posts/2015/04/26/elk-for-nginx-log.html 安装redis 首先下载redis wget http://download.redis.io/releases/redis-3.0.4.tar.gz 下载之

ELK + Kafka + Filebeat

ELK + Kafka + Filebeat学习 https://blog.csdn.net/qq_21383435/article/details/79463832 https://blog.csdn.net/xiangyuan1988/article/details/78977471 https://www.jianshu.com/p/f149a76ea5b5 https://blog.csdn.net/qq_21383435/article/category/7486820 ELK + K

zk+kafka+elk

软件版本zookeeper 3.4.10kafka 2.11-0.10.2.0elaticsearch 2.10.16logstash 2.10.2kibana 4.3.1jdk 1.8.0_171服务器准备:10.253.2.5010.253.2.5110.553.2.41本次安装均在linux系统下操作软件均安装在 /usr/local/share/applications/目录下日志输出均在 /tmp/包名一软件安装及配置1.jdk安装 检测服务器的Java版本 Java -version