python kafka

kafka简介(摘自百度百科)

简介:

kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消费。

特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量[2]  :即使是非常普通的硬件Kafka也可以支持每秒数百万[2]  的消息
支持通过Kafka服务器和消费机集群来分区消息
支持Hadoop并行数据加载

术语:
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

一、安装

在pypi.python.org有很多关于操作kafka的组件,我们选择weight最高的kafka 1.3.5
1、有网的情况下执行如下命令安装:
pip install kafka
easy_install kafka

2、无网的情况下把源码下载下来,上传到需要安装的主机
压缩包:kafka-1.3.5.tar.gz
解压: tar xvf kafka-1.3.5.tar.gz
执行安装命令:   cd kafka-1.3.5   
               python setup.py install
               
如安装报依赖错误,需要把依赖的组件也下载下来,然后进行安装,同样的方法,不赘述!

二、按照官网的样例,先跑一个应用

1、生产者:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=[‘172.21.10.136:9092‘])  #此处ip可以是多个[‘0.0.0.1:9092‘,‘0.0.0.2:9092‘,‘0.0.0.3:9092‘ ]

for i in range(3):
    msg = "msg%d" % i
    producer.send(‘test‘, msg)
producer.close()

2、消费者(简单demo):

from kafka import KafkaConsumer

consumer = KafkaConsumer(‘test‘,
                         bootstrap_servers=[‘172.21.10.136:9092‘])

for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

启动后生产者、消费者可以正常消费。

3、消费者(消费群组)

from kafka import KafkaConsumer

consumer = KafkaConsumer(‘test‘,
                         group_id=‘my-group‘,
                         bootstrap_servers=[‘172.21.10.136:9092‘])

for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力

4、消费者(读取目前最早可读的消息)

from kafka import KafkaConsumer

consumer = KafkaConsumer(‘test‘,
                         auto_offset_reset=‘earliest‘,
                         bootstrap_servers=[‘172.21.10.136:9092‘])

for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
源码定义:{‘smallest‘: ‘earliest‘, ‘largest‘: ‘latest‘}

5、消费者(手动设置偏移量)

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(‘test‘,
                         bootstrap_servers=[‘172.21.10.136:9092‘])

print consumer.partitions_for_topic("test")  #获取test主题的分区信息
print consumer.topics()  #获取主题列表
print consumer.subscription()  #获取当前消费者订阅的主题
print consumer.assignment()  #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=u‘test‘, partition=0), 5)  #重置偏移量,从第5个偏移量消费
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

6、消费者(订阅多个主题)

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=[‘172.21.10.136:9092‘])
consumer.subscribe(topics=(‘test‘,‘test0‘))  #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u‘test‘, partition=0)) #获取当前主题的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

7、消费者(手动拉取消息)

from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(bootstrap_servers=[‘172.21.10.136:9092‘])
consumer.subscribe(topics=(‘test‘,‘test0‘))
while True:
    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
    print msg
    time.sleep(1)

8、消费者(消息挂起与恢复)

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=[‘172.21.10.136:9092‘])
consumer.subscribe(topics=(‘test‘))
consumer.topics()
consumer.pause(TopicPartition(topic=u‘test‘, partition=0))
num = 0
while True:
    print num
    print consumer.paused()   #获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print msg
    time.sleep(2)
    num = num + 1
    if num == 10:
        print "resume..."
        consumer.resume(TopicPartition(topic=u‘test‘, partition=0))
        print "resume......"

pause执行后,consumer不能读取,直到调用resume后恢复。

原文地址:https://www.cnblogs.com/wangbaihan/p/9467892.html

时间: 2024-07-31 13:00:00

python kafka的相关文章

ReferenceError: weakly-referenced object no longer exists Python kafka

Python存入kafka报错,ReferenceError: weakly-referenced object no longer exists. Exception in thread 14: pykafka.OwnedBroker.queue_reader for broker 101: Traceback (most recent call last): File "C:\Python27\lib\threading.py", line 801, in __bootstrap_

python kafka权限校验client.id

kafka集群有权限校验,在连接时需要加入client.id.但pykafka不能配置该选项.搜索了一下,需要使用confluent-kafka 链接: https://blog.csdn.net/lanyang123456/article/details/80639625 #coding:utf-8 from confluent_kafka import Consumer, KafkaError mybroker = "127.0.0.1:9092" #host client_id

Python通过SSH隧道链接Kafka

Python通过SSH隧道链接Kafka 最近有一个需求需要连接Kafka,但是它只允许内网链接,但是有些服务跑在服务器上总没有在我本机调试起来爽,毕竟很多开发工具还是在客户端机器上用的熟练.于是我想到了通过SSH连接Kafka,至于怎么连接可以通过XShell.Proxifier等等,由于个人还是觉得自己写更灵活,所以我是用Python里的sshtunnel写的(有需要后面我也可以分享下),个人喜好啊,你们自行选择. 由于笔者这里的Kafka环境使用Zookeeper做分布式部署,有多个bro

python抓取系统metrics吐给kafka

本篇介绍用python写脚本,抓取系统metrics,然后调用kafka client library把metrics吐给kafka的案例分享.对于用kafka的同学实用性很高. 在运行本实例前需要先下载两个python库到本地 : six和kafka-python cat config_system_metrics.json  { "env": { "site": "cluster", "component": "

storm问题记录(1) python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理

一.问题背景 Python 写的脚本,不断从txt文件中读取一行数据封装成消息,作为producer发给kafka, storm的spout从kafka中读取这些消息后做一些处理发送给bolt,bolt最后将数据按既定的格式写入到HBASE 二.问题描述 一共14000条左右的数据,加调试信息观察到spout把消息都读到处理并发射了,但是bolt中只处理了一部分(2000多条,还有一万条显然没有处理到),写入HBASE的也只有2000多条,即Bolt读到的那些 出问题时的最后的log: OLT

kafka+docker+python

昨天晚上刚刚才花3小时看完<日志:每个软件工程师都应该知道的有关实时数据的统一概念>. 今天就把kafka在docker容器里运行起来,github上有几个,但都太复杂了. 我自己写个最简单的python的demo体验一下:https://github.com/xuqinghan/docker-kafka 和上周部署taiga相比,kafka不愧是大家手笔,基本无坑,简单记录一下: 首先是docker-compose.yml version: '3.1' services: zoo: imag

Kafka(八)Python生产者和消费者API使用

单线程生产者 #!/usr/bin/env python # -*- coding: utf-8 -*- import random import sys from kafka import KafkaProducer from kafka.client import log import time import json __metaclass__ = type class Producer:     def __init__(self, KafkaServer='127.0.0.1', Ka

Python 基于Python结合pykafka实现kafka生产及消费速率&amp;主题分区偏移实时监控

基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控   By: 授客 QQ:1033553122   1.测试环境 python 3.4 zookeeper-3.4.13.tar.gz 下载地址1: http://zookeeper.apache.org/releases.html#download https://www.apache.org/dyn/closer.cgi/zookeeper/ https://mirrors.tuna.tsinghua.edu

python 发送kafka

python 发送kafka大体有三种方式 1 发送并忘记(不关注是否正常到达,不对返回结果做处理) 1 import pickle 2 import time 3 from kafka import KafkaProducer 4 5 producer = KafkaProducer(bootstrap_servers=['192.168.33.11:9092'], 6 key_serializer=lambda k: pickle.dumps(k), 7 value_serializer=l