kafak-python函数使用详解

  • Consumer的非线程安全
  • Kafka只保证消息不漏,即at lease once,而不保证消息不重。关键点:假如consumer挂了重启,那它将从committed offset位置(告诉server的消费的位置点)开始重新消费,而不是consume offset位置(真正的消费位置点)。这也就意味着有可能重复消费(自己消费到了某个位置,而后在告诉服务器这个位置时,发送失败)
  • kafka可以重置commit吗?给服务器指定任意值为最后消费位置,下次消费从这个指定的位置开始消费。可以,使用commit函数,下文有讲。但是需要注意:修改偏移量不会改变当前会话,在新连接里生效
  • subscribe表示订阅topic,从kafka记录的offset开始消费。assign表示从指定的offset开始消费。
  • kafka自动会从上次没有消费的地方开始消费
  • 使用kafak自带的脚本查看偏移量:./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test --offsets
  • 使用了subscribe,就不能使用assign
  • 提交:更新分区的当前位置称为提交,当前版本(0.10.1.1)用topic ___consumer_offsets 保存提交的偏移量
  • 偏移量:消费者在Kafka追踪到消息在分区里的位置
  • 消费者在崩溃或者有新的消费者加入群组,就会触发再均衡。这时需要读取最后一次偏移量,然后从偏移量指定的地方继续处理。提交的偏移量小于真实的偏移量,消息会被重复处理。大于真实的偏移量,消息会丢失。
from kafka.structs import TopicPartition,OffsetAndMetadata
configs = {
            ‘bootstrap_servers‘: ‘10.57.19.60‘,
            ‘enable_auto_commit‘: False,
            ‘group_id‘: ‘test‘,
            ‘api_version‘: (0, 8, 2),
            ‘ssl_check_hostname‘: False,
            ‘consumer_timeout_ms‘: 3000,  # 若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
            # ‘ssl_certfile‘: ssl_certfile,
            # ‘security_protocol‘: ‘SSL‘,
            # ‘ssl_cafile‘: ssl_cafile
        }
topics=(‘test‘, )
# 注意指定分区将会失去故障转移/负载均衡的支持,当然也没有了自动分配分区的功能(因为已经人为指定了嘛)
topic_partition = TopicPartition(topic=‘test‘,partition=0)
#
consumer = KafkaConsumer(**configs)
# 参数必须是列表,表示订阅的topic/partition列表
consumer.assign([topic_partition])
# 获取分给当前用户的topic/partition信息
consumer.assignment()
# 提交偏移量:可以告知服务器当前偏移量,也可以设置偏移量
consumer.commit({TopicPartition(topic=‘test‘, partition=0): OffsetAndMetadata(offset=280, metadata=‘‘)})
# 异步提交
consumer.commit_async()
# 获取服务器的最后确认的偏移量,即最新数据开始读取的地方
consumer.committed(TopicPartition(topic=‘test‘, partition=0))
# 获取服务器当前最新的偏移量,读到这个偏移量后,所有数据都读取完了
consumer.highwater(TopicPartition(topic=‘test‘, partition=0))
# 获取消费的性能
consumer.metrics()
# 获取某个topic的partition信息
consumer.partitions_for_topic(topic)
# 获取下一条数据开始读取的偏移量,即从这个便宜量开始继续读取数据
consumer.position(TopicPartition(topic=‘test‘, partition=0))
# 从指定偏移量位置开始读取数据
consumer.seek(TopicPartition(topic=‘test‘, partition=0), 283)
# 从头开始读取数据
consumer.seek_to_beginning()
# 从最后开始读取数据
consumer.seek_to_end()
# 订阅topic,可以订阅多个,可以使用正则表达式匹配多个
consumer.subscribe()
# 获取订阅的信息,无法获取使用assign分配的topic/partition信息
consumer.subscription()
# 获取当前用户授权的topic信息
consumer.topics()
# 取消消息的订阅
consumer.unsubscribe()# 一起消费多条消息,最多等待时间timeout_ms,最多消费max_recordsconsumer.poll(self, timeout_ms=0, max_records=None)

# 获取指定分区第一个偏移量
consumer.beginning_offsets([topic_partition])
# 获取指定分区最后一个偏移量,最新的偏移量
consumer.end_offsets([topic_partition])
# 关闭连接
consumer.close()
#
#consumer.seek(topic_partition,284)
for message in consumer:
    print(message)

重复消费是如何产生的?

消费者设置为自动提交偏移量时,需要同时设置自动提交偏移量的时间间隔。如果消费完若干消息后,还没有到自动提交偏移量的时间时,应用挂了,则系统记录的偏移量还是之前的值,那么刚才消费的若干消息,会在应用重连之后重新消费

如何保证不会重复消费?

消费段记录下发送给服务器的偏移量,获取最新数据时再判断这个偏移量是否正确

生产的消息队列长度,会堆积吗?

消费的信息队列长度,会堆积吗?

生产者速度大于消费者速度怎么处理?

kafka 认证与授权机制

Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制。目前支持以下安全措施:

  • clients 与 brokers 认证
  • brokers 与 zookeeper认证
  • 数据传输加密  between  brokers and clients, between brokers, or between brokers and tools using SSL
  • 授权clients read/write

kafka偏移量的相关配置

enable.auto.commit

true(默认):自动提交偏移量,可以通过配置 auto.commit.interval.ms属性来控制提交偏移量的频率。(基于时间间隔)

false:手动控制偏移量。可以在程序逻辑必要的时候提交偏移量,而不是基于时间隔。此时可以进行同步,异步,同步异步组合(参考相应api)。

auto.offset.reset

无法读取偏移量时候读取消息的设置

latest(默认):从最新记录读取数据。

earliest:从起始位置读取数据

参考:

1、https://zhuanlan.zhihu.com/p/33238750

2、https://help.aliyun.com/document_detail/68331.html

3、https://blog.csdn.net/xiaoguozi0218/article/details/80513849

4、https://zhuanlan.zhihu.com/p/38330574

5、https://blog.csdn.net/ZhongGuoZhiChuang/article/details/79550570

6、https://help.aliyun.com/document_detail/67233.html

原文地址:https://www.cnblogs.com/shengulong/p/10211681.html

时间: 2024-10-23 10:08:16

kafak-python函数使用详解的相关文章

Python函数参数详解

  Python函数定义时参数灵活,使用不同参数的组合不仅可以简化调用者的代码,还可以处理复杂的参数.函数的参数除了有必选参数外,还可以使用默认参数,可变参数,关键字参数和命名关键字参数. 位置参数 定义一个计算x^2的函数,以及一个计算x^n的函数 def calc1(x): return x * x def calc2(x, n): s = 1 for i in range(n): s *= x return s 对于这两个函数,其参数都是位置参数,同时也是必选参数,调用函数时实参需和形参一

Python学习入门教程,字符串函数扩充详解

因有用户反映,在基础文章对字符串函数的讲解太过少,故写一篇文章详细讲解一下常用字符串函数.本文章是对:程序员带你十天快速入门Python,玩转电脑软件开发(三)中字符串函数的详解与扩充. 如果您想学习并参与本教程的完善与写作.请在下方讨论区,回复相关问题.一起完善本文章教程的书写. Python字符串常用函数. 声明字符串变量: str = ‘关注做全栈攻城狮,写代码也要读书,爱全栈,更爱生活.’ 下面所有字符串函数函数,是对变量str进行操作: 求字符串长度: 函数使用: 运行结果: 值得注意

Python中dict详解

yangyzh Python中dict详解 python3.0以上,print函数应为print(),不存在dict.iteritems()这个函数. 在python中写中文注释会报错,这时只要在头部加上# coding=gbk即可 #字典的添加.删除.修改操作dict = {"a" : "apple", "b" : "banana", "g" : "grape", "o&qu

Python字符编码详解(转)

1. 字符编码简介 1.1. ASCII ASCII(American Standard Code for Information Interchange),是一种单字节的编码.计算机世界里一开始只有英文,而单字节可以表示256个不同的字符,可以表示所有的英文字符和许多的控制符号.不过ASCII只用到了其中的一半(\x80以下),这也是MBCS得以实现的基础. 1.2. MBCS 然而计算机世界里很快就有了其他语言,单字节的ASCII已无法满足需求.后来每个语言就制定了一套自己的编码,由于单字节

Python 字符串方法详解

Python 字符串方法详解 本文最初发表于赖勇浩(恋花蝶)的博客(http://blog.csdn.net/lanphaday),如蒙转载,敬请保留全文完整,切勿去除本声明和作者信息. 在编程中,几乎90% 以上的代码都是关于整数或字符串操作,所以与整数一样,Python 的字符串实现也使用了许多拿优化技术,使得字符串的性能达到极致.与 C++ 标准库(STL)中的 std::string 不同,python 字符串集合了许多字符串相关的算法,以方法成员的方式提供接口,使用起来非常方便. 字符

Python之print详解

Python之print详解 http://www.jb51.net/article/55768.htm print的一些基本用法,在前面的讲述中也涉及一些,本讲是在复习的基础上,尽量再多点内容. eval() 在print干事情之前,先看看这个东东.不是没有用,因为说不定某些时候要用到. 复制代码 代码如下: >>> help(eval)      #这个是一招鲜,凡是不理解怎么用,就用这个看文档 Help on built-in function eval in module __b

转载:唐磊的个人博客《python中decorator详解》【转注:深入浅出清晰明了】

转载请注明来源:唐磊的个人博客<python中decorator详解> 前面写python的AOP解决方案时提到了decorator,这篇文章就详细的来整理下python的装饰器--decorator. python中的函数即objects 一步一步来,先了解下python中的函数. def shout(word='hello,world'):     return word.capitalize() + '!'print shout()#输出:Hello,world!#跟其他对象一样,你同样

windows上安装Anaconda和python的教程详解

一提到数字图像处理编程,可能大多数人就会想到matlab,但matlab也有自身的缺点: 1.不开源,价格贵 2.软件容量大.一般3G以上,高版本甚至达5G以上. 3.只能做研究,不易转化成软件. 因此,我们这里使用Python这个脚本语言来进行数字图像处理. 要使用Python,必须先安装python,一般是2.7版本以上,不管是在windows系统,还是Linux系统,安装都是非常简单的. 要使用python进行各种开发和科学计算,还需要安装对应的包.这和matlab非常相似,只是matla

【python进阶】详解元类及其应用2

前言 在上一篇文章[python进阶]详解元类及其应用1中,我们提到了关于元类的一些前置知识,介绍了类对象,动态创建类,使用type创建类,这一节我们将继续接着上文来讲~~~ 5.使?type创建带有?法的类 最终你会希望为你的类增加?法.只需要定义?个有着恰当签名的函数并将 其作为属性赋值就可以了.添加实例?法 In [14]: def echo_bar(self):#定义了一个普通的函数 ...: print(self.bar) ...: In [15]: FooChild = type('

使用Python操作Redis详解

之前的五天,过了个愉快的周末,然后将公司AbaseDump的调度部分代码看懂并且在此之上完成了OnlyDump的功能代码,代码不可以公开,今天完工,明天测试,晚上来总结一下这几天学到的一点应用. 使用Python操作Redis详解 ------------------------------------------------------------------------------------------------- 一. 文档说明 本文档仅介绍Redis在Python中的使用,Redis