python 发送kafka

python 发送kafka大体有三种方式

1 发送并忘记(不关注是否正常到达,不对返回结果做处理)

1 import pickle
2 import time
3 from kafka import KafkaProducer
4
5 producer = KafkaProducer(bootstrap_servers=[‘192.168.33.11:9092‘],
6                          key_serializer=lambda k: pickle.dumps(k),
7                          value_serializer=lambda v: pickle.dumps(v))
8
9 start_time = time.time()
10 for i in range(0, 10000):
11     print(‘------{}---------‘.format(i))
12     future = producer.send(‘test_topic‘, key=‘num‘, value=i, partition=0)
13
14 # 将缓冲区的全部消息push到broker当中
15 producer.flush()
16 producer.close()
17
18 end_time = time.time()
19 time_counts = end_time - start_time
20 print(time_counts)

2  同步发送(通过get方法等待Kafka的响应,判断消息是否发送成功)

1 import pickle
2 import time
3 from kafka import KafkaProducer
4 from kafka.errors import kafka_errors
5
6 producer = KafkaProducer(
7     bootstrap_servers=[‘192.168.33.11:9092‘],
8     key_serializer=lambda k: pickle.dumps(k),
9     value_serializer=lambda v: pickle.dumps(v)
10 )
11
12 start_time = time.time()
13 for i in range(0, 10000):
14     print(‘------{}---------‘.format(i))
15     future = producer.send(topic="test_topic", key="num", value=i)
16     # 同步阻塞,通过调用get()方法进而保证一定程序是有序的.
17     try:
18         record_metadata = future.get(timeout=10)
19         # print(record_metadata.topic)
20         # print(record_metadata.partition)
21         # print(record_metadata.offset)
22     except kafka_errors as e:
23         print(str(e))
24
25 end_time = time.time()
26 time_counts = end_time - start_time
27 print(time_counts)

3  异步发送+回调函数(消息以异步的方式发送,通过回调函数返回消息发送成功/失败)

1 import pickle
2 import time
3 from kafka import KafkaProducer
4
5 producer = KafkaProducer(
6     bootstrap_servers=[‘192.168.33.11:9092‘],
7     key_serializer=lambda k: pickle.dumps(k),
8     value_serializer=lambda v: pickle.dumps(v)
9 )
10
11
12 def on_send_success(*args, **kwargs):
13     """
14     发送成功的回调函数
15     :param args:
16     :param kwargs:
17     :return:
18     """
19     return args
20
21
22 def on_send_error(*args, **kwargs):
23     """
24     发送失败的回调函数
25     :param args:
26     :param kwargs:
27     :return:
28     """
29
30     return args
31
32
33 start_time = time.time()
34 for i in range(0, 10000):
35     print(‘------{}---------‘.format(i))
36     # 如果成功,传进record_metadata,如果失败,传进Exception.
37     producer.send(
38         topic="test_topic", key="num", value=i
39     ).add_callback(on_send_success).add_errback(on_send_error)
40
41 producer.flush()
42 producer.close()
43
44 end_time = time.time()
45 time_counts = end_time - start_time
46 print(time_counts)

 除此之外,还能发送压缩数据流

def gzip_compress(msg_str):
    try:
        buf = StringIO.StringIO()
        with gzip.GzipFile(mode=‘wb‘, fileobj=buf) as f:
            f.write(msg_str)
        return buf.getvalue()
    except BaseException, e:
        print ("Gzip压缩错误" + e)

def gzip_uncompress(c_data):
    try:
        buf = StringIO.StringIO(c_data)
        with gzip.GzipFile(mode=‘rb‘, fileobj=buf) as f:
            return f.read()
    except BaseException, e:
        print ("Gzip解压错误" + e)

def send_kafka(topic_name, msg, key=None):
    if key is not None:
        producer = KafkaProducer(bootstrap_servers=["fdw8.fengjr.inc:9092","fdw9.fengjr.inc:9092","fdw10.fengjr.inc:9092"],
                                 key_serializer=gzip_compress, value_serializer=gzip_compress)
        r = producer.send(topic_name, value=msg, key=key)
    else:
        producer = KafkaProducer(bootstrap_servers=["fdw8.fengjr.inc:9092","fdw9.fengjr.inc:9092","fdw10.fengjr.inc:9092"],
                                 value_serializer=gzip_compress)
        r = producer.send(topic_name, value=msg)
    # producer.flush(timeout=5)
    producer.close(timeout=5)
    return r

  

原文地址:https://www.cnblogs.com/tigerzhouv587/p/11232398.html

时间: 2024-07-31 13:00:03

python 发送kafka的相关文章

使用python操作kafka

使用python操作kafka目前比较常用的库是kafka-python库 安装kafka-python pip3 install kafka-python 生产者 producer_test.py from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='192.168.0.121:9092') # 连接kafka msg = "Hello World".encode('utf-8') #

[Python] 发送email的几种方式

python发送email还是比较简单的,可以通过登录邮件服务来发送,linux下也可以使用调用sendmail命令来发送,还可以使用本地或者是远程的smtp服务来发送邮件,不管是单个,群发,还是抄送都比较容易实现. 先把几个最简单的发送邮件方式记录下,像html邮件,附件等也是支持的,需要时查文档即可 1 登录邮件服务 #!/usr/bin/env python # -*- coding: utf-8 -*- #python2.7x #send_simple_email_by_account.

python 发送post和get请求

摘自:http://blog.163.com/[email protected]/blog/static/132229655201231085444250/ 测试用CGI,名字为test.py,放在apache的cgi-bin目录下:#!/usr/bin/pythonimport cgidef main():     print "Content-type: text/html\n"    form = cgi.FieldStorage()    if form.has_key(&qu

使用python发送简单的邮件

from:http://blog.csdn.net/zhaoweikid/article/details/125898 前些时间,论坛上有人讨论怎么用python发送需要认证的邮件,我在我的FreeBSD在telnet到163的的smtp服务器,分析了一下,就用python写个了发送邮件的程序,感觉有点粗糙,但还算能工作.import smtplib, base64 class SimpleSendMail:    def __init__(self, smtp_server, from_add

python 发送html邮件

简单的python发送html邮件代码,如下: #!/usr/bin/env python #-*- coding:utf-8 -*- import smtplib from email.header import Header from email.MIMEText import MIMEText from email.mime.multipart import MIMEMultipart ####################################################

【转】解决Maxwell发送Kafka消息数据倾斜问题

最近用Maxwell解析MySQL的Binlog,发送到Kafka进行处理,测试的时候发现一个问题,就是Kafka的Offset严重倾斜,三个partition,其中一个的offset已经快200万了,另外两个offset才不到两百.Kafka数据倾斜的问题一般是由于生产者使用的Partition接口实现类对分区处理的问题,一般是对key做hash之后,对分区数取模.当出现数据倾斜时,小量任务耗时远高于其它任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势(参考Apache Kaf

python发送微信

申请企业微信 使用python发送信息到企业微信,同时支持python2与python3环境,需要先申请一个企业微信,然后创建应用,获取以下三个信息 企业IP.Agentid.Secret 网信为创建的应用名称 脚本描述 将以上三个信息替换到脚本中,主要是 class WeiXin(object):部分,其他的辅助性工具类,收集的一些常用脚本可不用关注 #!/usr/bin/env python #coding=utf-8 ''' Created on 2018年2月8日 @author: ro

使用Python发送、订阅消息

使用Python发送.订阅消息 使用插件 paho-mqtt 官方文档:http://shaocheng.li/post/blog/2017-05-23 Paho 是一个开源的 MQTT 客户端项目,提供多种语言的 MQTT 客户端实现,包括 C.C++.C#.Java.Python.JavaScript 等,完全支持 MQTT v3.1 和 v3.1.1 .Paho Python Client 是它的 Python 语言版本,支持 Python 2.7 和 3.x .更多特性可以查看 http

Python 发送电子邮件

首先创建一个包含邮箱地址.密码.收件人的字典 import smtplib from email.mime.text import MIMEText from email.header import Header email = {'sender': '*****@126.com', 'password': '***', 'recipient': [***[email protected]','***@qq.com']} smtpServer ='smtp.126.com' smtpObj =