kafka 消费?

  • kafka消费中的问题及解决方法:
    • 情况1:
      • 问题:脚本读取kafka 数据,写入到数据库,有时候出现MySQL server has gone away,导致脚本死掉。再次启动,这过程中的kafka数据丢失。
      • 原因:MySQL server has gone away 出现可能是连接超时,可能超过每秒请求上限…这些异常是小概率事件,难以避免。git kafka 的demo脚本是实时监听的脚本, 简单明了,没有再去针对kafka偏移量研究;但是一旦断掉, 过程中的kafka数据即丢失。
      • 解决思路MySQL server has gone away无法避免,try catch 即可,异常消息存入日志中。通过数据补偿脚本把日志中的数据存入数据库,做到正常  异常的数据最终都能录入数据库。且脚本也不会异常死掉。脚本其他原因死掉(如服务器宕机),通过集群控制风险。
    • 情况2:
      • 问题:kafka消费时做数据检验,数据处理。代码看的很混乱, 且一旦业务调整或者有bug意味着要停止消费后重启。风险高,维护困难。
      • 解决思路:解藕。kafka消费脚本只负责把数据写入数据库,标识初始状态。业务处理脚本集中处理数据。
  • kafka 消费时序图

原文地址:https://www.cnblogs.com/adair123/p/8203294.html

时间: 2024-10-08 11:07:44

kafka 消费?的相关文章

分享一些 Kafka 消费数据的小经验

前言 之前写过一篇<从源码分析如何优雅的使用 Kafka 生产者> ,有生产者自然也就有消费者. 建议对 Kakfa 还比较陌生的朋友可以先看看. 就我的使用经验来说,大部分情况都是处于数据下游的消费者角色.也用 Kafka 消费过日均过亿的消息(不得不佩服 Kakfa 的设计),本文将借助我使用 Kakfa 消费数据的经验来聊聊如何高效的消费数据. 单线程消费 以之前生产者中的代码为例,事先准备好了一个 Topic:data-push,3个分区. 先往里边发送 100 条消息,没有自定义路由

SSM(十六) 曲线救国-Kafka消费异常

最近线上遇到一个问题:在消费kafka消息的时候如果长时间(大概半天到一天的时间)队列里没有消息就可能再也消费不了.针对这个问题我们反复调试多次.线下模拟,调整代码,但貌似还是没有找到原因.但是只要重启消费进程就又可以继续消费. 解决方案 由于线上业务非常依赖kafka的消费,但一时半会也没有找到原因,所以最后只能想一个临时的替换方案: 基于重启就可以消费这个特点,我们在每次消费的时候都记下当前的时间点,当这个时间点在十分钟之内都没有更新我们就认为当前队列中没有消息了,就需要重启下消费进程. 既

监控Kafka消费进度

使用Kafka作为消息中间件消费数据时,监控Kafka消费的进度很重要.其中,在监控消费进度的过程中,主要关注消费Lag. 常用监控Kafka消费进度的方法有三种,分别是使用Kafka自带的命令行工具.使用Kafka Consumer API和Kafka自带的JMX监控指标,这里介绍前两种方法. 注: 内网IP:10.12.100.126 10.12.100.127 10.12.100.128 外网IP:47.90.133.76 47.90.133.77 47.90.133.78 用户名:ser

Kafka消费组(consumer group)

一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少.最近Kafka社区邮件组已经在讨论是否应该正式使用新版本consumer替换老版本,笔者也觉得时机成熟了,于是写下这篇文章讨论并总结一下新版本consumer的些许设计理念,希望能把consumer这点事说清楚,从而对广大使用者有所帮助. 在开始之前,我想花一点时间先来明确一些概念和术语,这会极大地方便我们下面的讨论.另外请原谅这文章有点长,毕竟要讨论的东西很多,虽然已然删除了很多太过细节的东

Kafka - 消费接口分析

1.概述 在 Kafka 中,官方对外提供了两种消费 API,一种是高等级消费 API,另一种是低等级的消费 API.在 <高级消费 API>一文中,介绍了其高级消费的 API 实现.今天给大家介绍另一种消费 API. 2.内容 在使用过 Kafka 的高级消费 API 后,我们知道它是一种高度抽象的消费 API,使用起来简单,方便,但是对于某些特殊的需求我们可能要用到第二种更加底层的 API.那么,我们首先需要知道低级消费 API 的作用.它能帮助我们去做那些事情: 一个消息进行多次读取 在

使用flume抓取tomcat的日志文件下沉到kafka消费

Tomcat生产日志 Flume抓取日志下沉到kafka中 将写好的web项目打包成war包,eclise直接导出export,IDEA 在artifact中添加新的artifact-achieve项,选择web项目所在目录,再build即可 在Linux的Tomcat  Webapp目录下放入war包,在bin下启动Tomcat时,war包会自动解压,然后从浏览器访问,注意是http://bigdata2:8080/WebAnalysis_war/ 的形式 host:8080/+项目文件名 配

消费滚动滴log日志文件(flume监听,kafka消费,zookeeper协同)

第一步:数据源 手写程序实现自动生成如下格式的日志文件: 15837312345,13737312345,2017-01-09 08:09:10,0360 打包放到服务器,使用如下命令执行,模拟持续不断的日志文件: java -cp ct_producter-1.0-SNAPSHOT.jar producter.ProductLog ./awen.tsv 第二步:监听log.tsv日志 使用Flume监控滚动的awen.tsv日志,编写flume # Name the components on

消息队列之kafka(消费语义)

1. 消费语义的介绍    at last once:至少消费一次(对一条消息有可能多次消费,有可能会造成重复消费数据)     原因:Proudcer产生数据的时候,已经写入在broker中,但是由于broker的网络异常,没有返回ACK,这时Producer,认为数据没有写入成功,此时producer会再次写入,相当于一条数据,被写入了多次.   at most once:最多消费一次,对于消息,有可能消费一次,有可能一次也消费不了    原因:producer在产生数据的时候,有可能写数据

Kafka在高并发的情况下,如何避免消息丢失和消息重复?kafka消费怎么保证数据消费一次?数据的一致性和统一性?数据的完整性?

1.kafka在高并发的情况下,如何避免消息丢失和消息重复? 消息丢失解决方案: 首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功 消息重复解决方案: 消息可以使用唯一id标识 生产者(ack=all 代表至少成功发送一次) 消费者 (offset手动提交,业务逻辑成功处理后,提交offset) 落表(主键或者唯一索引的方式,避免重复数据) 业务逻辑处理(选择唯一主键存储到R