storm kafkaSpout 踩坑问题记录! offset问题!

整合kafka和storm例子网上很多,自行查找

问题描述:

  kafka是之前早就搭建好的,新建的storm集群要消费kafka的主题,由于kafka中已经记录了很多消息,storm消费时从最开始消费

问题解决:

  下面是摘自官网的一段话:

How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures

As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by setting KafkaConfig.startOffsetTime as follows:

  1. kafka.api.OffsetRequest.EarliestTime(): read from the beginning of the topic (i.e. from the oldest messages onwards)
  2. kafka.api.OffsetRequest.LatestTime(): read from the end of the topic (i.e. any new messsages that are being written to the topic)
  3. A Unix timestamp aka seconds since the epoch (e.g. via System.currentTimeMillis()): see How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? in the Kafka FAQ

As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information under the ZooKeeper path SpoutConfig.zkRoot+ "/" + SpoutConfig.id. In the case of failures it recovers from the last written offset in ZooKeeper.

Important: When re-deploying a topology make sure that the settings for SpoutConfig.zkRoot and SpoutConfig.id were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.

This means that when a topology has run once the setting KafkaConfig.startOffsetTime will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in ZooKeeper to determine from where it should begin (more precisely: resume) reading. If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime as described above.

  这段话的包含的内容大概有,通过SpoutConfig对象的startOffsetTime字段设置消费进度,默认值是kafka.api.OffsetRequest.EarliestTime(),也就是从最早的消息开始消费,如果想从最新的消息开始消费需要手动设置成kafka.api.OffsetRequest.LatestTime()。另外还有一个问题是,这个字段只会在第一次消费消息时起作用,之后消费的offset是从zookeeper中记录的offset开始的

  如果想要当前的topology的消费进度接着上一个topology的消费进度继续消费,那么不要修改SpoutConfig对象的id。换言之,如果你第一次已经从最早的消息开始消费了,那么如果不换id的话,它就要从最早的消息一直消费到最新的消息,这个时候如果想要跳过中间的消息直接从最新的消息开始消费,那么修改SpoutConfig对象的id就可以了

  下面是SpoutConfig对象的一些字段的含义,其实是继承的KafkaConfig的字段,可看源码

  public int fetchSizeBytes = 1024 * 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
    public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间
    public int fetchMaxWait = 10000;   //当服务器没有新消息时,消费者会等待这些时间
    public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小
    public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化
    public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
    public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime    public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次metrics
时间: 2024-12-20 04:19:31

storm kafkaSpout 踩坑问题记录! offset问题!的相关文章

lubuntu踩坑全记录

为了降低系统占用,毕业之后一直用lubuntu不用ubuntu...操作其实差不多,就是lubuntu有一些小坑坑:P 本文是我的踩坑全记录.长期更新. 表现:修改开机程序,改完重启进不了系统了.输入密码后,仍回到登录界面,无法进入系统. 原因:我也不知道,可能系统自己绕死了吧:) 坑:我当前只有一个root用户,没有保存快照,开机看不到GNU GRUB,所以也无法以救援模式进入系统. 解决: 我首先用系统盘片进入了lubuntu安装界面,看help信息 F4说: 这就是让我们自己动手丰衣足食.

Windows Server 2012搭建SQL Server Always On踩坑全记录

Windows Server 2012搭建SQL Server Always On踩坑全记录 环境信息: Windows Server 2012 R2 Sql Server 2012 整个搭建集群的过程主要分为3步: 1.配置预控,并把机器加域. 2.配置windows故障转移集群(Windows Failover Cluster) 3.配置SQL Server Always On High Availability Group(AG) 踩坑全记录 预控配置 比较好配置,需要注意的是非预控机器在

学习(踩坑)记录——新建工程

配置:CCS8.3 + CC3200 + SDK1.3.0 踩了一下午的坑,才完整的建立了一个可以用工程.为了方面说明,后面直接使用SDK中的PWM例程中的文件. 1.红色框框为需要注意的地方 2.右键工程,选择添加文件,然后定位到SDK中的pwm例程中,选中三个文件,如下图: 然后点击打开,ccs会提示复制还是链接到工程中,如下图 这里需要注意: (1)添加这个main文件之前需要删除创建工程时自动创建的main文件. (2)如果是添加库文件的话,因为库文件我们一般不会去改动,所以选择link

安装VMWare tools,踩坑。记录一下

对于没钱买服务器的小白,只能靠安装虚拟机来勉强度日子.最近比较闲,想实现一下docker的自动化部署,在制作java8镜像时出了个问题.就是怎么在虚拟机上实现与本机文件夹的共享. 坑了2天.记录一下,环境 VMware® Workstation 12 Pro + CentOS 1. 首先是 CD/DVD(SATA) 这里的话 选择VMan安装目录的 linux.iso (C:\Program Files (x86)\VMware\VMware Workstation\linux.iso) 这里选

JavaScript两数相加(踩坑)记录

Adding two numbers concatenates them instead of calculating the sum JavaScript里两个变量 var a = 2: var b = 1: var c = "(" + a + b +")"; alert(c); 结果输出:(21) 正确写法: var c = + a  +  + b: var d = "(" + c +")";

unionId突然不能获取的踩坑记录

昨天(2016-2-2日),突然发现系统的一个微信接口使用不了了.后来经查发现,是在网页授权获取用户基本信息的时候,unionid获取失败导致的. 在网页授权获取用户基本信息的介绍中(http://mp.weixin.qq.com/wiki/17/c0f37d5704f0b64713d5d2c37b468d75.html),unionid可以在第二步(https://api.weixin.qq.com/sns/oauth2/access_token)和第四步的2个接口中获得.其中,第四步的接口是

IDFA踩坑记录

IDFA踩坑记录: 1.iOS10.0 以下,即使打开"限制广告跟踪",依然可以读取idfa: 2.打开"限制广告跟踪",然后再关闭"限制广告跟踪",idfa会改变: 3.越狱机器安装开发证书打的包,读取的idfa正常: 4.越狱机器安装本地打的发布证书的包,读取idfa异常,第一次打开app读取的是一个错误的idfa,第二次打开会变成另外一个错误的idfa: 5.越狱机器安装Testflight的包,读取idfa异常,第一次打开app读取的是一

微信小程序之蓝牙 BLE 踩坑记录

前言 前段时间接手了一个微信小程序的开发,主要使用了小程序在今年 3 月开放的蓝牙 API ,此过程踩坑无数,特此记录一下跳坑过程.顺便开了另一个相关的小项目,欢迎 start 和 fork: BLE_MiniProgram API简介 微信小程序目前有蓝牙 API 共 18 个,其中操作蓝牙适配器的共有 4 个,分别是 wx.openBluetoothAdapter 初始化蓝牙适配器 wx.closeBluetoothAdapter 关闭蓝牙模块 wx.getBluetoothAdapterS

记录某项目中的踩坑与解决(持续更新)

前言 最近参加了某个比赛, 我所选的赛题就是个类似知乎这样的安卓app,由于着手近一个月了,踩了不少坑,之前没怎么记录,估计事后也会忘记干净. 因此特开一帖,在此记录下相关的坑. 记录 写完某个Recyclerview的item布局和相关适配器后, 然后展示的时候, 发现显示出来的Item数量小于List中绑定的数据量: 首先是检查了一遍适配器中的逻辑,发现没啥问题.然后在onCreateViewHolder中打log,发现只创建了两次就没了(而实际应该创建6次). 最后百度才发现是外面的Scr