Logstash之四:logstash接收kafka数据

3、kafka+logstash整合
logstash1.5以后已经集成了对kafka的支持扩展,可以在conf配置中直接使用

vim /etc/logstash/conf.d/pay.conf

input {
kafka{
zk_connect => "your zookeeper address:2181"
group_id => "logstash"
topic_id => "pay-replicated"
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
output {
# stdout{ codec=> rubydebug }
redis {
host => ["your redis address:6380"]
batch => true
key => "logstash-nginx-pay-replicated"
data_type => "list"
}
}

重启logstash
service logstash restart

时间: 2024-10-24 16:01:32

Logstash之四:logstash接收kafka数据的相关文章

spark streaming 接收 kafka 数据java代码WordCount示例

1. 首先启动zookeeper 2. 启动kafka 3. 核心代码 生产者生产消息的java代码,生成要统计的单词 package streaming; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyProducer { pu

java spark-streaming接收TCP/Kafka数据

本文将展示 1.如何使用spark-streaming接入TCP数据并进行过滤: 2.如何使用spark-streaming接入TCP数据并进行wordcount: 内容如下: 1.使用maven,先解决pom依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1

Spark Streaming 读取 Kafka 数据的两种方式

在Spark1.3之前,默认的Spark接收Kafka数据的方式是基于Receiver的,在这之后的版本里,推出了Direct Approach,现在整理一下两种方式的异同. 1. Receiver-based Approach val kafkaStream = KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 2. Direct Approach (No Receivers) v

logstash解析嵌套json格式数据

logstash解析嵌套json格式数据 1.源文件 1.原日志文件为 2019-10-28 09:49:44:947 [http-nio-8080-exec-23] INFO [siftLog][qewrw123ffwer2323fdsafd] - logTime:2019-10-28 09:49:25.833-receiveTime:2019-10-28 09:49:44.044-{"area":"","frontInitTime":0,&q

logstash消费阿里云kafka消息

logstash版本: 5.5.3 及以后logstash消费阿里云kafka信息并返回到elasticsearch系统 配置信息解析: bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"] #kafka系统的连接地址 client_id => 'tt' #客户端上传到es时,新增字段 group_id => "CID-LOG" #kafka分组的信息 auto_offset_rese

kafka数据可靠性深度解读

Kafka起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark等都支持与Kafka集成. 1 概述 Kafka与传统消息系统相比,有以下不同: 它被设计为一个分布式系统,易于向外扩展: 它同时为发布和订阅提供高吞吐量: 它支持多订阅者,当失败时能自动平衡消费者: 它将消息持久化到磁盘,因此可用于批量消费,例如ETL以及实

Kafka数据辅助和Failover

数据辅助与Failover CAP理论(它具有一致性.可用性.分区容忍性) CAP理论:分布式系统中,一致性.可用性.分区容忍性最多只可同时满足两个.一般分区容忍性都要求有保障,因此很多时候在可用性与一致性之间做权衡. 一致性方案 1.Master-slave >RDBMS的读写分离即为典型的Master-slave方案 >同步复制可保证强一致性但会影响可用性(等master确保将数据复制给全部的slave,slave才返回结果) >异步复制可提供高可用性但会降低一致性 2.WNR &g

kafka数据祸福和failover

k CAP帽子理论. consistency:一致性 Availability:可用性 partition tolerance:分区容忍型 CA :mysql oracle(抛弃了网络分区) CP:hbase redis mongodb(抛弃了可用性) AP:cassandra simpleDB(抛弃了强一致性,采用弱一致性或者最终一致性,不定时一致性) 一致性的方案 master-slave(hadoop) WNR 读取后还得判断哪个数据是最新的.常用做法(版本号或者时间戳) 平时读取数据是从

spark streaming 接收kafka消息之二 -- 运行在driver端的receiver

先从源码来深入理解一下 DirectKafkaInputDStream 的将 kafka 作为输入流时,如何确保 exactly-once 语义. val stream: InputDStream[(String, String, Long)] = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder, (String, String, Long)]( ssc, kafkaParams, fromO