日志服务Python消费组实战(二):实时分发数据

场景目标
使用日志服务的Web-tracking、logtail(文件极简)、syslog等收集上来的日志经常存在各种各样的格式,我们需要针对特定的日志(例如topic)进行一定的分发到特定的logstore中处理和索引,本文主要介绍如何使用消费组实时分发日志到不通的目标日志库中。并且利用消费组的特定,达到自动平衡、负载均衡和高可用性。

基本概念
协同消费库(Consumer Library)是对日志服务中日志进行消费的高级模式,提供了消费组(ConsumerGroup)的概念对消费端进行抽象和管理,和直接使用SDK进行数据读取的区别在于,用户无需关心日志服务的实现细节,只需要专注于业务逻辑,另外,消费者之间的负载均衡、failover等用户也都无需关心。

消费组(Consumer Group) - 一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个logstore中的数据,消费者之间不会重复消费数据。
消费者(Consumer) - 消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同。

在日志服务中,一个logstore下面会有多个shard,协同消费库的功能就是将shard分配给一个消费组下面的消费者,分配方式遵循以下原则:

每个shard只会分配到一个消费者。
一个消费者可以同时拥有多个shard。
新的消费者加入一个消费组,这个消费组下面的shard从属关系会调整,以达到消费负载均衡的目的,但是上面的分配原则不会变,分配过程对用户透明。
协同消费库的另一个功能是保存checkpoint,方便程序故障恢复时能接着从断点继续消费,从而保证数据不会被重复消费。

使用消费组进行实时分发
这里我们描述用Python使用消费组进行编程,实时根据数据的topic进行分发。
注意:本篇文章的相关代码可能会更新,最新版本在这里可以找到:Github样例.

安装
环境

建议程序运行在源日志库同Region下的ECS上,并使用局域网服务入口,这样好处是网络速度最快,其次是读取没有外网费用产生。
强烈推荐PyPy3来运行本程序,而不是使用标准CPython解释器。
日志服务的Python SDK可以如下安装:
pypy3 -m pip install aliyun-log-python-sdk -U
更多SLS Python SDK的使用手册,可以参考这里

程序配置
如下展示如何配置程序:

配置程序日志文件,以便后续测试或者诊断可能的问题(跳过,具体参考样例)。
基本的日志服务连接与消费组的配置选项。
目标Logstore的一些连接信息
请仔细阅读代码中相关注释并根据需要调整选项:

#encoding: utf8
def get_option():
##########################

基本选项

##########################

# 从环境变量中加载SLS参数与选项,根据需要可以配置多个目标
accessKeyId = os.environ.get(‘SLS_AK_ID‘, ‘‘)
accessKey = os.environ.get(‘SLS_AK_KEY‘, ‘‘)
endpoint = os.environ.get(‘SLS_ENDPOINT‘, ‘‘)
project = os.environ.get(‘SLS_PROJECT‘, ‘‘)
logstore = os.environ.get(‘SLS_LOGSTORE‘, ‘‘)
to_endpoint = os.environ.get(‘SLS_ENDPOINT_TO‘, endpoint)
to_project = os.environ.get(‘SLS_PROJECT_TO‘, project)
to_logstore1 = os.environ.get(‘SLS_LOGSTORE_TO1‘, ‘‘)
to_logstore2 = os.environ.get(‘SLS_LOGSTORE_TO2‘, ‘‘)
to_logstore3 = os.environ.get(‘SLS_LOGSTORE_TO3‘, ‘‘)
consumer_group = os.environ.get(‘SLS_CG‘, ‘‘)

# 消费的起点。这个参数在第一次跑程序的时候有效,后续再次运行将从上一次消费的保存点继续。
# 可以使”begin“,”end“,或者特定的ISO时间格式。
cursor_start_time = "2018-12-26 0:0:0"

# 一般不要修改消费者名,尤其是需要并发跑时
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)

# 构建一个消费组和消费者
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time)

# bind put_log_raw which is faster
to_client = LogClient(to_endpoint, accessKeyId, accessKey)
put_method1 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore1)
put_method2 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore2)
put_method3 = partial(to_client.put_log_raw, project=to_project, logstore=to_logstore3)

return option, {u‘ngnix‘: put_method1, u‘sql_audit‘: put_method2, u‘click‘: put_method3}

注意,这里使用了functools.partial对put_log_raw进行绑定,以便后续调用方便。

数据消费与分发
如下代码展示如何从SLS拿到数据后根据topic进行转发。

if name == ‘main‘:
option, put_methods = get_copy_option()

def copy_data(shard_id, log_groups):
    for log_group in log_groups.LogGroups:
        # update topic
        if log_group.Topic in put_methods:
            put_methods[log_group.Topic](log_group=log_group)

logger.info("*** start to consume data...")
worker = ConsumerWorker(ConsumerProcessorAdaptor, option, args=(copy_data, ))
worker.start(join=True)

启动
假设程序命名为"dispatch_data.py",可以如下启动:

export SLS_ENDPOINT=<Endpoint of your region>
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_PROJECT=<SLS Project Name>
export SLS_LOGSTORE=<SLS Logstore Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore1 Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore2 Name>
export SLS_LOGSTORE_TO1=<SLS To Logstore3 Name>
export SLS_CG=<消费组名,可以简单命名为"dispatch_data">

pypy3 dispatch_data.py
性能考虑
启动多个消费者
基于消费组的程序可以直接启动多次以便达到并发作用:

nohup pypy3 dispatch_data.py &
nohup pypy3 dispatch_data.py &
nohup pypy3 dispatch_data.py &
...
注意:
所有消费者使用了同一个消费组的名字和不同的消费者名字(因为消费者名以进程ID为后缀)。
因为一个分区(Shard)只能被一个消费者消费,假设一个日志库有10个分区,那么最多有10个消费者同时消费。

性能吞吐
基于测试,在没有带宽限制、接收端速率限制(如Splunk端)的情况下,以推进硬件用pypy3运行上述样例,单个消费者占用大约10%的单核CPU下可以消费达到5 MB/s原始日志的速率。因此,理论上可以达到50 MB/s原始日志每个CPU核,也就是每个CPU核每天可以消费4TB原始日志。

注意: 这个数据依赖带宽、硬件参数和目标Logstore是否能够较快接收数据。

高可用性
消费组会将检测点(check-point)保存在服务器端,当一个消费者停止,另外一个消费者将自动接管并从断点继续消费。

可以在不同机器上启动消费者,这样当一台机器停止或者损坏的清下,其他机器上的消费者可以自动接管并从断点进行消费。

理论上,为了备用,也可以启动大于shard数量的消费者。

其他
限制与约束
每一个日志库(logstore)最多可以配置10个消费组,如果遇到错误ConsumerGroupQuotaExceed则表示遇到限制,建议在控制台端删除一些不用的消费组。

监测
在控制台查看消费组状态
通过云监控查看消费组延迟,并配置报警
Https
如果服务入口(endpoint)配置为https://前缀,如https://cn-beijing.log.aliyuncs.com,程序与SLS的连接将自动使用HTTPS加密。

服务器证书*.aliyuncs.com是GlobalSign签发,默认大多数Linux/Windows的机器会自动信任此证书。如果某些特殊情况,机器不信任此证书,可以参考这里下载并安装此证书。

原文地址:http://blog.51cto.com/14031893/2344323

时间: 2024-11-07 11:00:40

日志服务Python消费组实战(二):实时分发数据的相关文章

Docker最全教程之Python爬网实战(二十一)

原文:Docker最全教程之Python爬网实战(二十一) Python目前是流行度增长最快的主流编程语言,也是第二大最受开发者喜爱的语言(参考Stack Overflow 2019开发者调查报告发布).笔者建议.NET.Java开发人员可以将Python发展为第二语言,一方面Python在某些领域确实非常犀利(爬虫.算法.人工智能等等),另一方面,相信我,Python上手完全没有门槛,你甚至无需购买任何书籍! 由于近期在筹备4.21的长沙开发者大会,耽误了不少时间.不过这次邀请到了腾讯资深技术

Python网络爬虫实战(二)数据解析

上一篇说完了如何爬取一个网页,以及爬取中可能遇到的几个问题.那么接下来我们就需要对已经爬取下来的网页进行解析,从中提取出我们想要的数据. 根据爬取下来的数据,我们需要写不同的解析方式,最常见的一般都是HTML数据,也就是网页的源码,还有一些可能是Json数据,Json数据是一种轻量级的数据交换格式,相对来说容易解析,它的格式如下. { "name": "中国", "province": [{ "name": "黑龙江

云服务器之安全组之二_安全组规则的备份与还原

原文地址 一般来说安全组包含了很多规则,这些规则共同决定了安全组中的ECS实例开放和关闭了哪些大门,重要性不言而喻:当安全组中的规则和加入的ECS实例越来越多,安全组规则的维护愈发困难,不敢删不敢改,牵一发而动全身:同样,随着业务的发展,需要在不同地域部署时,同样的配置要在另一个地域再设置一遍,重复劳动.有没有办法解决这些问题? 克隆安全组(备份) 功能简介 创建出一个新的安全组,其中包含的安全组规则与克隆目标完全一致.支持跨地域克隆,并且可以在克隆时变更网络类型,可用于安全组及其规则的备份,在

2017.08.04 Python网络爬虫之Scrapy爬虫实战二 天气预报的数据存储问题

1.数据存储到JSon:程序阅读一般都是使用更方便的Json或者cvs等待格式,继续讲解Scrapy爬虫的保存方式,也就是继续对pipelines.py文件动手脚 (1)创建pipelines2json.py文件: import timeimport jsonimport codecs class WeatherPipeline(object): def process_item(self, item, spider): today=time.strftime('%Y%m%d',time.loc

Python核心技术与实战——二一|巧用上下文管理器和with语句精简代码

我们在Python中对于with的语句应该是不陌生的,特别是在文件的输入输出操作中,那在具体的使用过程中,是有什么引伸的含义呢?与之密切相关的上下文管理器(context manager)又是什么呢? 什么是上下文管理器 在任何一种编程语言里,文件的输入输出.数据库的建立连接和断开等操作,都是很常见的资源管理操作.但是资源是有限的,在写程序的时候,我们必须保证这些资源在使用后得到释放,不然就容易造成资源泄漏,轻者系统处理缓慢,重则系统崩溃. 我们看一个例子: for i in range(100

python调用zabbix api接口实时展示数据

近日公司准备自已做一个运维管理平台,其中的监控部分,打算调用zabbix api接口来进行展示. 经过思考之后,计划获取如下内容: 1.  获得认证密钥 2.  获取zabbix所有的主机组 3.  获取单个组下的所有主机 4.  获取某个主机下的所有监控项 5.  获取某个监控项的历史数据 6.  获取某个监控项的最新数据 计划最后展示框架如下内容(这只是值方面,其它的会再加): 主机组1 ----主机名1---监控项1----当前值 ---监控项2----当前值 ----主机名2----监控

如何将日志服务的数据秒级同步到表格存储

原文地址 最近在容器服务的官方镜像中,新增了loghub-shipper的镜像,使用该镜像,可以订阅日志服务中的日志库,以秒级的延时将日志数据从日志服务中读出并转换成结构化数据存储在表格存储中,以满足实时在线服务的精确查询需求. 什么是日志服务? 日志服务(Log Service,Log)是针对日志场景的一站式解决方案,解决海量日志数据采集/订阅.转储与查询功能,比如在海量游戏日志收集与分析场景上的应用. 什么是表格存储? 表格存储(TableStore)提供海量NoSQL数据的存储与实时访问服

Spring Cloud构建微服务架构 消息驱动的微服务(消费分区)【Dalston版】

通过上一篇<消息驱动的微服务(消费组)>的学习,我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理.但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一个实例进行消费.这时候我们就需要对消息进行分区处理. 使用消息分区 在Spring Cloud Stream中实现消息分区非常简单,我们可以根据消费组示例做一些配置修改就能实现,具体如下: 在消费者应用SinkReceiver中,我们对配置文件做一些修改,具体如下: spring.c

kafka-Message、日志和索引文件、消费组、rebalance

记录下和kafka相关的Message.日志文件.索引文件.consumer记录消费的offset相关内容,文中很多理解参考文末博文.书籍还有前辈. kafka中的消息 kafka中的消息Message,在V1版本中是如下部分组成,主要关系key和value. (1)key:当需要将消息写入到某个topic下的指定partition分区时,需要给定key的值. (2)value:实际消息内容保存在这里. (3)其他均是消息的元数据,一般不用关心,对用户来说是透明的. 为了保存这些消息数据,kaf