logstash消费阿里云kafka消息

logstash版本: 5.5.3 及以后
logstash消费阿里云kafka信息并返回到elasticsearch系统

配置信息解析:

        bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"]  #kafka系统的连接地址
        client_id => ‘tt‘   #客户端上传到es时,新增字段
        group_id => "CID-LOG"   #kafka分组的信息
        auto_offset_reset => "latest" #从最新的偏移量开始消费
        consumer_threads => 5
        #decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中
        topics => ["alikafka-cid-log"] #//数组类型,可配置多个topic
        type => "bhy" #//所有插件通用属性,尤其在input里面配置多个数据源时很有用
        security_protocol => "SASL_SSL"  #kafka连接阿里云的协议
        sasl_mechanism => "ONS"          #kafka阿里云的消费机制, logstash中默认的是 GSSAPI
        jaas_path => "/data/logstash/config/kafka_client_jaas.conf"  # ONS登录信息的路径
        ssl_keystore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘  #证书
        ssl_truststore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘#信任证书
        ssl_keystore_password => "xxxx"      #证书密码
        ssl_truststore_password => "xxxx"    #证书密码

关键信息:
logstash使用ONS机制连接kafka时,需要需要用到一些额外的jar包,可以把开发所使用的jar包,都放到 /data/logstash/vendor/jruby/lib/ 下面。

我的配置模板:

input{
      kafka {
        bootstrap_servers => ["kafka-cn-internet.aliyun.com:8080"]
        client_id => ‘tt‘
        group_id => "CID-LOG"
        auto_offset_reset => "latest" #从最新的偏移量开始消费
        consumer_threads => 5
        #decorate_events => true #此属性会将当前topic、offset、group、partition等信息也带到message中
        topics => ["alikafka-cid-log"] #//数组类型,可配置多个topic
        type => "bhy" #//所有插件通用属性,尤其在input里面配置多个数据源时很有用
        security_protocol => "SASL_SSL"
        sasl_mechanism => "ONS"
        jaas_path => "/data/logstash/config/kafka_client_jaas.conf"
        ssl_keystore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘
        ssl_truststore_location => ‘/data/logstash/config/kafka.client.truststore.jks‘
        ssl_keystore_password => "xx"
        ssl_truststore_password => "xx"
      }
}

output {
   elasticsearch {
       hosts => ["es-ip:9200"]
       user => ["xxxx"]
       password => ["xxxx"]
       index => ["services"]
   }
   stdout {
       codec=>plain
   }
}

java参数优化路径:

        config/jvm.options

原文地址:http://blog.51cto.com/zhenfen/2105234

时间: 2024-10-26 20:27:26

logstash消费阿里云kafka消息的相关文章

Kafka自建集群通过MirrorMaker同步数据到阿里云kafka标准版实操

说明:1.本次仅实现了两个topic的数据同步,后续优化会持续更新.....2.自建集群CDH5.8,kafka2.1.0;阿里云集群标准版kafka0.10.x踩坑:1.cdh添加kafka角色实例CMM,应该是不支持SSL连接2.VPC网络接入,不知道购买的阿里云实例有VPC网络,这个是没有SSL加密的连接3.kafka0.10.2的mirrormaker不能连接自建集群4.阿里云控制提示是SSl接入点,实际验证方式需要SASL_SSL5.不懂java,不知道这个是加在什么位置export

【结果很简单,过程很艰辛】记阿里云Ons消息队列服务填坑过程

Maybe 这个问题很简单,因为解决方法是非常简单,但填坑过程会把人逼疯,在阿里云ONS工作人员.同事和朋友的协助下,经过一天的调试和瞎捣鼓,终于解决了这个坑,把问题记下来,也许更多人在碰到类似问题的时候,会开放思路.当然不得不说,Ons的.NET接口还很不完善,甚至没有独立在Windos 2008/2012服务器测试过,希望官方加把力. 1.阿里云ONS介绍 ONS(Open Notification Service)即开放消息服务,是基于阿里开源消息中间件MetaQ(RocketMQ)打造的

logstash 消费数据到kafka异常

报错 :[logstash.outputs.kafka ] Sending batch to Kafka failed. Will retry after a delay. {:batch_size=>1, :failures=>1, :sleep=>0.1} 原因: logstash 日志报错生产数据到 kafka 失败 解决办法: ? ? ? ? ?查看kafka配置,默认单条消息最大为1M,当单条消息长度超过1M时,就会出现发送到broker失败,从而导致消息在producer的队

阿里云消息队列Kafka商业化:支持消息无缝迁移到云上

列Kafka彻底解决了开源产品稳定性不足的痛点,可用性达99.9%,数据可靠性99.999999%,并且支持消息无缝迁移到云上. 7月25日,阿里云宣布正式推出消息队列Kafka,全面融合开源生态.在兼容Apache生态的基础上,阿里云消息队列Kafka彻底解决了开源产品稳定性不足的痛点,可用性达99.9%,数据可靠性99.999999%,并且支持消息无缝迁移到云上. Kafka是一个分布式.高吞吐量.高可扩展性的消息队列服务,广泛用于日志收集.监控数据聚合.流式数据处理.在线和离线分析等大数据

阿里云ONS而微软Azure Service Bus体系结构和功能比较

阿里云ONS而微软Azure Service bus体系结构和功能比较 版权所有所有,转载请注明出处http://blog.csdn.net/yangzhenping.谢谢! 阿里云的开放消息服务: 一.如图所看到的,ProducerID1 的producer 实例有三个,可能是部署在三个机器上的三个进程,也可能是一台机 器上的三个进程. 每一个实例都会发送TopicA 的消息.同理,ProducerID2 与之类似. 二.ConsumerID1 有三个实例,假设是集群消费方式,那么每一个实例消

阿里云ONS和微软Azure Service Bus的架构和特性比较

阿里云ONS和微软Azure Service bus的架构和特性比较 版权所有,转载请注明出处http://blog.csdn.net/yangzhenping,谢谢! 阿里云的开放消息服务: 一.如图所示,ProducerID1 的producer 实例有三个,可能是部署在三个机器上的三个进程,也可能是一台机 器上的三个进程.每个实例都会发送TopicA 的消息.同理,ProducerID2 与之类似. 二.ConsumerID1 有三个实例,如果是集群消费方式,那么每个实例消费TopicA

双11同款!阿里云发布全局事务服务GTS:每秒处理10万笔事务

摘要: 5月30日,阿里云宣布全局事务服务产品GTS正式商用,每秒可处理10万笔事务,将分布式事务这个"贵族技术"变为"平民技术 ",可解决跨数据库.消息.服务的分布式环境下的事务一致性问题,让开发者无需考虑复杂的事务问题,加速微服务落地,效率比传统的XA协议提升了10倍之多. 5月30日,阿里云宣布全局事务服务产品GTS正式商用,每秒可处理10万笔事务,将分布式事务这个"贵族技术"变为"平民技术 ",可解决跨数据库.消息.服

阿里云构建Kafka单机集群环境

简介 在一台ECS阿里云服务器上构建Kafa单个集群环境需要如下的几个步骤: 服务器环境 JDK的安装 ZooKeeper的安装 Kafka的安装 1. 服务器环境 CPU: 1核 内存: 2048 MB (I/O优化) 1Mbps 操作系统 ubuntu14.04 64位 感觉服务器性能还是很好的,当然不是给阿里打广告,汗. 随便向kafka里面发了点数据,性能图如下所示:  2. 安装JDK 想要跑Java程序,就必须安装JDK.JDK版本,本人用的是JDK1.7. 基本操作如下: 从JDK

使用java实现阿里云消息队列简单封装

一.前言 最近公司有使用阿里云消息队列的需求,为了更加方便使用,本人用了几天时间将消息队列封装成api调用方式以方便内部系统的调用,现在已经完成,特此记录其中过程和使用到的相关技术,与君共勉. 现在阿里云提供了两种消息服务:mns服务和ons服务,其中我认为mns是简化版的ons,而且mns的消息消费需要自定义轮询策略的,相比之下,ons的发布与订阅模式功能更加强大(比如相对于mns,ons提供了消息追踪.日志.监控等功能),其api使用起来更加方便,而且听闻阿里内部以后不再对mns进行新的开发