MaxCompute(原ODPS) 事件(Event)机制

摘要: 免费开通大数据服务:https://www.aliyun.com/product/odps 转自habai 什么是 MaxCompute事件机制 MaxCompute event 用于监控表和实例等MaxCompute资源(目前只用于监控表)。

免费开通大数据服务:https://www.aliyun.com/product/odps

什么是MaxCompute

大数据计算服务(MaxCompute,原名ODPS)是一种快速、完全托管的TB/PB级数据仓库解决方案。MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全。

什么是 MaxCompute事件机制

MaxCompute event 用于监控表和实例等MaxCompute资源(目前只用于监控表)。当表状态发生变化时,MaxCompute 会向预先注册(订阅)的 uri 发送信息。Event通知只有在订阅Event之后才能收到。每个project中可以订阅一个或多个Event。Event是用户的数据,同表数据一样,创建或修改时都需要有这个Project的操作权限。关于Event的Restful Api,在文章里有介绍。

为什么需要 MaxCompute 事件机制

考虑以下场景:当一个用户 A 关心某一个表 T 的操作(创建/删除/插入/修改/...)时,如果表 T 不是用户 A 创建的,那么用户 A 可以采用什么方法感知这个操作?一个方法是主动轮询这个表是否做了某个操作,但是缺点是不言而喻的。另一个方法是,注册一个回调,当表被操作时,被动接受通知。用这种方法可以使用户逻辑不必轮询和等待对表的操作。

MaxCompute Event机制就是第二种方法的实现。

在实际的生产中,对以上应用场景有大量的需求,并已经形成了对MaxCompute Event丰富的应用,例如:

  • 数据地图: 订阅了一些 project 的 Event,并根据 Event 通知展示这些 project 中表的元数据。
  • 跨集群复制: 监听 Event 通知以复制相应的表。
  • 蚂蚁金服: 依赖事件通知机制进行工作流管理,统计,授权等工作。 事实上,每个 project 都有大量用户订阅了所属project的表以及其它project表的事件通知。

MaxCompute 事件机制是怎样实现的

本节首先将 MaxCompute 事件机制 作为一个黑盒,从用户的角度介绍其功能和使用方法。而后以此为切入点,深入剖析 MaxCompute 事件机制的内部机理。最后,提出一些对当前事件机制的思考。

订阅(注册)一个事件 & 事件通知

在网络编程中,为了减轻多线程的压力,往往使用事件通知驱动的异步编程。如,libevent[2]。使用这个库编写一个服务器程序,可以这样做:

void on_accept(int sock, short event, void* arg);

int main(int argc, char* argv[])

{

// create socket s

struct sockadddr_in addrin;

int s = socket(AF_INET, SOCK_STREAM, 0);

BOOL bReuseaddr=TRUE;

setsockopt(s, SOL_SOCKET ,SO_REUSEADDR, (const char*)&bReuseaddr, sizeof(BOOL));

memset(&addrin, 0, sizeof(addrin));

addrin.sin_family = AF_INET;

addrin.sin_port = htons(PORT);

addrin.sin_addr.s_addr = INADDR_ANY;

bind(s, (struct sockaddr*)&addrin, sizeof(struct sockaddr));

listen(s, BACKLOG);

// 创建事件池 event base

struct event_base* eb = event_base_new();

// 创建事件 & 绑定回调

struct event e;

event_set(&e, s, EV_READ|EV_PERSIST, on_accept, NULL);

// 注册事件

event_base_set(eb, &e);

event_add(&e, NULL);

// 启动事件派发

event_base_dispatch(eb);

return 0;

}

抽取出上面事件通知逻辑的主线:创建事件池,创建一个 event 并绑定回调函数, 把 event 注册到事件池并启动事件派发器。

在这个过程中,事件生产者是socket(严格说是绑定在这个socket上的事件多路复用接口,如epoll),事件中转者是libevent中的事件池(event base)和事件派发器,事件消费者是事件处理回调函数。

同样的过程适用于MaxCompute event。事件池和派发器不需要用户创建。用户首先创建一个事件,然后绑定回调处理逻辑,最后把事件注册到事件池。代码如下:

public class TestOdpsEvent {

/**

* 创建事件方法

*/

static Event buildEvent(String eventName, String tableName, String callbackUri, String comment) throws URISyntaxException {

Event event = new Event();

event.setName(eventName);   // 指定事件名称

event.setComment(comment);    // 事件注释

event.setType(Event.SourceType.TABLE);   // 指定事件类型,目前支持 TABLE

Event.Config config = new Event.Config();

config.setName("source");

config.setValue(tableName);   // 指定事件源(即 表名). "*" 表示所有表.

event.setConfig(config);

event.setUri(new URI(callbackUri));   // 指定了一个回调地址

return event;

}

public static void main(String[] args) throws OdpsException, InterruptedException, URISyntaxException {

Account account = new AliyunAccount("xxxxxx", "xxxxxx");

Odps odps = new Odps(account);

String odps_endpoint = "http://xxxx/xxx";

odps.setEndpoint(odps_endpoint);

odps.setDefaultProject("project1");

InternalOdps iodps = new InternalOdps(odps);

// 创建事件 & 绑定回调

String callbackUri = "http://xxx.xxx.xxx.xxx:xxxx/xxxxx"; // this is different from odps_endpoint

Event e = buildEvent("table_create_event_1", "table1", callbackUri, "this is a test event");

// 注册事件

iodps.events().create(e);

// 查看已创建事件

Iterator<Event> events = iodps.events().iterator();

while(events.hasNext()) {

Event event1 = events.next();

System.out.println("Event found: " + event1.getName());

System.out.println("Event uri: " + event1.getUri());

// iodps.events().delete(event1.getName()); // 删除事件

}

}

}

在上面的代码中,指定了一个回调地址。当表发生变化时,就会通知这个回调地址。用户根据在这个回调地址接收到事件通知,使用相应的处理逻辑处理。事件回调地址作为事件处理逻辑入口,支持多种协议,包括但不限于kuafu, http, https等。与libevent不同的是,MaxCompute event的生产者,中转者和消费者可以位于不同网络区域。在用户注册事件之后,MaxCompute event机制会在该事件发生后立即通知用户注册的回调地址。

剖析 MaxCompute 事件机制

图3-1的三个部分分别表示了注册事件,转发通知,删除事件的过程。MessageService是 MaxCompute 内部消息服务,作用是转发事件通知到用户注册的回调地址。为方便理解,把 Create topic, Create subscription, Add endpoint 看作注册事件在消息服务层的三个操作。事件机制在消息服务层具体的实现将在后边介绍。

图3-1: 事件创建,转发,删除

在图3-1注册事件的过程中,用户的请求由 OdpsWorker 的 createEventHandler 处理。createEventHandler 依次检查相应的 MessageService topic,subscription,endpoint 是否存在,如果不存在,创建。

在图3-1删除事件的过程相对简单,用户的请求由 OdpsWorker 的 deleteEventHandler 处理。deleteEventHandler 直接删除相应的 MessageService subscription。

在图3-1转发事件通知的过程中,事件的生产者主要是ddl task(事实上,由于历史原因,还有HiveServer,CREATETABLE 事件从这里发出),当执行对表的meta相关的操作时,就会触发ddl task。如drop table, add partition, insert into partition等。ddl task 会发送相应操作的事件通知。事件通知发送给事件中转者——消息服务。消息服务将这个事件通知发送给相应事件绑定的回调地址。

消息服务作为事件中转者,主要完成如下功能:

1)作为事件池维护不同事件和回调地址的对应关系(一个事件对应一个或多个回调地址)

2)作为事件派发器根据事件通知匹配相应事件并将该事件通知转发到对应事件的各个回调地址。

目前不同的事件依据两个属性区分:project名和事件源。事件源目前是表名。在ddl task发出的事件通知中,包含了这两个关键信息。消息服务根据这两个信息匹配相应事件。在介绍消息服务匹配的实现方式时,首先需要了解 MaxCompute消息服务 的基本概念(为便于理解,本文简化了消息服务的一些概念,如隐藏了partition概念。在文章[3]中,具体介绍了消息服务的设计和实现)。如图3-2:

图3-2: 消息服务基本概念

MaxCompute消息服务包含四个基本概念:topic, subscription, filter, endpoint。消息服务使用了典型的发布订阅模型。用户可以创建topic。创建一个或多个subscription(包含一个或多个endpoint)订阅这个topic。消息发布者向topic发送消息。该消息被转发到该topic的所有filter匹配的subscription的所有的endpoint。其中,topic的创建者,subscription的创建者,消息的发送者,以及消息的接收者可以是不同的用户。在创建subscription时,需要指定filter matcher。在消息发送时需要指定filter。当某条消息发送到某个topic时,消息中的filter需要和这个topic的各个subscription的filter matcher匹配,如果匹配成功,将这个消息的一个副本发送给这个subscription的所有endpoint,否则不发送给它们,然后继续匹配其他的subscription。filter和filter_matcher的示例和匹配规则如下:

filter_matcher filter is matched
"" "k=v" yes. If filter_matcher is "", it will match forever.
"k=v" "k=v" yes
"k=v" "k=v1" no
"k1=v" "k=v" no
"k=v1|v2" "k=v1" yes
"k=v1|v2" "k=v2" yes
"k=v1|v2" "k=v1|v2" no. filter's value is 'v1|v2', not 'v1' or 'v2'.
"k1=v1,k2=v2" "k1=v1,k2=v2" yes
"k1=v1" "k1=v1,k2=v2" yes
"k1=v1,k2=v2" "k1=v1" no
"k=v" "" no
"" "" yes. If filter is "", filter_matcher will never hit except its value is ""

消息服务的这个机制,可以实现上述事件中转者的功能。将一个事件表达为一个subscription,将一个事件通知表达为一条消息,每个endpoint记录一个回调地址。每个project对应一个topic,用filter区分事件源。当一个事件通知产生之后,会被发送到产生通知的project所在的topic上。然后,经过匹配,转发所有的endpoint对应回调地址上。事件通知消息体示例如下:

<?xml version="1.0" encoding="UTF-8"?>

<Notification>

<Account>[email protected]</Account>

<Project>a_2_test_event</Project>

<SourceType>Table</SourceType>

<SourceName>backup_partition</SourceName>

<Reason>CREATETABLE</Reason>

<TimeStamp>Sun, 18 Sep 2016 14:21:32 GMT</TimeStamp>

<Properties/>

<OdpsMessagerId>1</OdpsMessagerId>

<OdpsMessagerTime>1474208492</OdpsMessagerTime>

</Notification>

<?xml version="1.0" encoding="UTF-8"?>

<Notification>

<Account>[email protected]</Account>

<Project>a_2_test_event</Project>

<SourceType>Table</SourceType>

<SourceName>backup_partition</SourceName>

<Reason>ADDPARTITION</Reason>

<TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>

<Properties>

<Property>

<Name>Name</Name>

<Value>ds=ds1/pt=pt1</Value>

</Property>

</Properties>

<OdpsMessagerId>4</OdpsMessagerId>

<OdpsMessagerTime>1474289142</OdpsMessagerTime>

</Notification>

当用户订阅了 Project a_2_test_event 的 Table "backup_partition" 的事件后,当发生对这个表的 CREATETABLE 和 ADDPARTITION 操作后,会接收到上面的两个事件通知。每个事件通知是一个 xml 格式的消息。SourceType 表示订阅的是表的事件通知还是其它类型资源的事件通知(目前只支持表)。SourceName 表示订阅的表的名字。Reason 表示在该表上发生的操作,上例中分别是创建表的操作和增加分区的操作(在 附录 中列举了更多的操作类型)。Properties 中会有一些附加的通知属性,常用来指出操作发生在表的哪个 parition 上。OdpsMessagerId 在一个 Project 的所有表中是唯一的。OdpsMessagerTime 是这条通知产生的时刻。

在MaxCompute线上服务环境中,每一个project p1,就会对应一个名为 SQL_p1的topic(因为历史原因hardcode了前缀SQL_,不过前缀是什么无所谓,只要可以区分事件机制的topic和其它应用中的topic就好),这个 topic 在第一次注册事件的时候自动创建(也可以在创建project时手动创建)。p1的所有事件通知都会发到这个topic上。这个topic在其对应的project删除时被删除。

MaxCompute消息服务为事件机制提供了对事件通知的持久化,保序,failover的功能,尽最大努力保证消息不丢,但是依然不能保证绝对不丢。下面分析事件机制可能出现的丢消息情况:事件生产者失败,消息服务失败,事件接收者失败,消息服务热升级。

1) 事件生产者失败:在事件通知到达消息服务之前,存在事件通知生产者失败的可能。具体的消息丢失概率取决于事件生产者的持久化,failover能力以及重试机制。

2) 消息服务失败:消息服务失败包括两种情况:消息到达消息服务前失败和消息到达之后失败。如果消息到达之前失败,那么消息服务提供的message client会重试3次,每次间隔5毫秒。如果事件成功地发送到消息服务,消息会首先被持久化。在消息服务中的一条消息只有满足下列两种情况才会被删除:a. 消息发送成功;b. 消息发送失败且超过重试次数(目前重试3600次,每次间隔60秒)。可以看到,事件(消息)丢失最大的风险在于发送到达消息服务之前的一段时间。

3) 事件接收者失败:如果接收者失败,且在获得事件通知之后,处理事件通知之前,消息服务不提供接口使接收者重获这条消息。当然对于这个问题,还有另一种解决方法,就是使用类似kafka的消息服务模型作为中转者,可以保证事件通知更强大的可靠性。kafka模型[4]不会主动推送消息,仅仅对消息做持久化以实现高吞吐和高可靠。消息订阅者需要给定消息id的范围从某个topic的partition拉消息。当订阅者失败,希望重新获得历史的消息时,只要给定消息id的范围,如果这个范围内的消息没有过期,就可以被重新获得。但是kafka模型使用拉消息的模式不具备完整的事件派发器功能,也就不能支持现在odps需要的异步事件通知编程方式。而事实上,MaxCompute消息服务设计的出发点,就是MaxCompute事件通知机制(在没有消息服务之前,MaxCompute worker履行着消息服务的职责)。

4)消息服务热升级:虽然说是热升级,但是新老服务之间切换也是需要时间的。在线上这个时间最夸张的一次达到了4个多小时(所有topic全部切换完成的时间间隔)。而在切换的过程中,新老消息服务中处于切换中间状态的 topic 是拒绝服务的(切换完成的 topic 可以服务)。

总之,MaxCompute事件通知机制提供了一定程度的高可用保证,但是还没有把丢消息的概率降低为0,其最大风险在于消息服务不服务。而此时,消息服务上某个 topic 丢失消息的数量和该 topic 不服务的时间成正比。

总结

MaxCompute事件机制给用户监听资源的变化带来了很大的便利。它借鉴了通用的事件异步编程模型,提供了友好的用户接口,支持了线上数据地图,跨集群复制等众多服务。但是,依然有不足之处,例如:

1) 事件监听(订阅/注册)的粒度粗且不可定制:我们曾经接到用户的需求,想监听一个表的 CREATETABLE 事件,但是现有的机制只支持监听到表的级别,这样用户就不得不自己过滤这个表的各种事件。

2) 事件机制的可靠性需要进一步提高:曾经出现过热升级切换消息服务4个小时的情况,原因是其中一个 topic 向某个 endpoint 发送消息卡在了发送那里一直无法退出,造成该 topic 上丢失大量消息。

3) 消息服务的 生产者qps(生产环境接收消息1000-2000),消费者 qps(消费极限qps未测过,因其取决于) 与 开源消息服务如 kafka 生产者 qps (50,000),消费者 qps (22,000) 依然有一定差距[4]。

对于当前希望使用MaxCompute消息服务的用户,最好确保满足以下条件:

1) 允许丢失少量的消息通知,因为的确存在小概率丢消息的可能;

2) 事件处理系统具有一定的事件处理能力,接受事件qps最好可以达到500以上。

3) 不用的事件(回调uri发不通且不再使用)请删除,否则会在消息服务中留下永久性垃圾,造成消息在pangu中大量堆积,因为消息服务无法判断用户的事件是否需要删除!!!

解决上述的问题目前依然有一些挑战,但是我们会不断改进和完善事件机制的各项功能,减小事件丢失率,细化事件订阅粒度,优化用户体验。

参考资料

[1] Event restful api

[2] Libevent: http://libevent.org

[3] Odps Message Service

[4] Kreps J, Narkhede N, Rao J. Kafka: A distributed messaging system for log processing[C]//Proceedings of the NetDB. 2011: 1-7.

附录

MaxCompute事件机制事件类型列表

在触发 DDL 时,Event 会向预先注册的 url 发送 POST 请求,消息体格式如下

<?xml version="1.0" encoding="UTF-8"?>

<Notification>

<Account>[email protected]</Account>

<Project>a_2_test_event</Project>

<SourceType>Table</SourceType>

<SourceName>backup_partition</SourceName>

<Reason>ADDPARTITION</Reason>

<TimeStamp>Mon, 19 Sep 2016 12:45:42 GMT</TimeStamp>

<Properties>

<Property>

<Name>Name</Name>

<Value>ds=ds1/pt=pt1</Value>

</Property>

</Properties>

<OdpsMessagerId>4</OdpsMessagerId>

<OdpsMessagerTime>1474289142</OdpsMessagerTime>

</Notification>

其中:

Reason 可能取值 事件生产者
CREATETABLE hiveserver
DROPTABLE ddltask
ALTERTABLE ddltask
ADDPARTITION ddltask
DROPPARTITION ddltask
ALTERPARTITION ddltask
INSERTOVERWRITETABLE ddltask
INSERTINTOTABLE ddltask
INSERTOVERWRITEPARTITION ddltask
INSERTINTOPARTITION ddltask
MERGETABLE ddltask
MERGEPARTITION ddltask
ALTERVOLUMEPARTITION ddltask
ADDVOLUMEPARTITION ddltask
  • SourceType 可能取值:Table

使用限制

1) 目前只有 Project Owner 可以创建 event,无法授权给其他人创建 event

2) 接收 post 信息的 url 应返回 http code 200,server 端 post 时并不支持如 302 这样的跳转。

原文链接

原文地址:http://blog.51cto.com/13679539/2108129

时间: 2024-09-29 01:30:27

MaxCompute(原ODPS) 事件(Event)机制的相关文章

在Unity中使用事件/委托机制(event/delegate)进行GameObject之

欢迎来到unity学习.unity培训.unity企业培训教育专区,这里有很多U3D资源.U3D培训视频.U3D教程.U3D常见问题.U3D项目源码,[狗刨学习网]unity极致学院,致力于打造业内unity3d培训.学习第一品牌. 一对多的观察者模式机制有什么缺点? 如果你对如何在Unity中使用事件/委托机制还不太了解,建议您查看我的前一篇文章:[Unity3D技巧]在Unity中使用事件/委托机制(event/delegate)进行GameObject之间的通信 在前一篇博客里面,我们写到

【Unity3D技巧】在Unity中使用事件/委托机制(event/delegate)进行GameObject之间的通信

作者:王选易,出处:http://www.cnblogs.com/neverdie/ 欢迎转载,也请保留这段声明.如果你喜欢这篇文章,请点[推荐].谢谢! 引子 在前面两篇文章: [Unity3D基础教程]给初学者看的Unity教程(四):通过制作Flappy Bird了解Native 2D中的RigidBody2D和Collider2D [Unity3D基础教程]给初学者看的Unity教程(三):通过制作Flappy Bird了解Native 2D中的Sprite,Animation 我们了解

在Unity中使用事件/委托机制(event/delegate)进行GameObject之间的通信

欢迎来到unity学习.unity培训.unity企业培训教育专区,这里有很多U3D资源.U3D培训视频.U3D教程.U3D常见问题.U3D项目源码,[狗刨学习网]unity极致学院,致力于打造业内unity3d培训.学习第一品牌. 引子 在前面两篇文章: 我们了解了2D中的Sprite,Animation,RigidBody和Collider,在继续开发游戏的过程中,我们会遇到这样的问题,如何处理GameObject之间的相互调用,比如说在FlappyBird中我们在小鸟撞倒管子的时候,要把这

Anroid View事件响应机制和ViewGroup的事件响应分发机制

注:低版本的源码内容比高版本的源码简单,分析起来方便,但是高版本源码更为严密. View的事件响应机制 涉及2个方法dispatchTouchEvent和onTouchEvent 1.View的dispatchTouchEvent方法(事件传递到View,View的这个方法就自动执行.) dispatchTouchEvent返回true,响应事件:返回false,不响应事件. public boolean dispatchTouchEvent(MotionEvent event) { ... L

ViewGroup的事件分发机制

我们用手指去触摸Android手机屏幕,就会产生一个触摸事件,但是这个触摸事件在底层是怎么分发的呢?这个我还真不知道,这里涉及到操作硬件 (手机屏幕)方面的知识,也就是Linux内核方面的知识,我也没有了解过这方面的东西,所以我们可能就往上层来分析分析,我们知道Android中负责 与用户交互,与用户操作紧密相关的四大组件之一是Activity, 所以我们有理由相信Activity中存在分发事件的方法,这个方法就是dispatchTouchEvent(),我们先看其源码吧 [java] view

Android开发-分析ViewGroup、View的事件分发机制、结合职责链模式

介绍 上一篇博客职责链/责任链模式(Chain of Responsibility)分析理解和在Android的应用 介绍了职责链模式,作为理解View事件分发机制的基础. 套用职责链模式的结构分析,当我们的手指在屏幕上点击或者滑动,就是一个事件,每个显示在屏幕上的View或者ViewGroup就是职责对象,它们通过Android中视图层级组织关系,层层传递事件,直到有职责对象处理消耗事件,或者没有职责对象处理导致事件消失. 关键概念介绍 要理解有关View的事件分发,先要看几个关键概念 Mot

从ViewPager嵌套RecyclerView再嵌套RecyclerView看安卓事件分发机制

这两天伟大的PM下了一个需求,在一个竖滑列表里实现一个横向滑动的列表,没错,又是这种常见但是又经常被具有着强烈责任心和职业操守程序员所嗤之以鼻的效果,废话不多说,先上图: 实现的方式很多,因为项目中已经ViewPager+RV实现基本框架,所以现我也选择再添加一个RV实现相应的效果. 不过在写代码之前,先预估一下这个效果所有的坑. VP是横向滑动的,RV是竖向滑动的,那么现在再添加一个横向滑动的RV,肯定会有滑动冲突,主要表现在 VP和横向滑动RV 的冲突,因为两者都是横向滑动的,肯定有冲突,无

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A

Android的Touch事件分发机制简单探析

前言 Android中关于触摸事件的分发传递是一个很值得研究的东西.曾不见你引入了一个ListView的滑动功能,ListView就不听你手指的指唤来滚动了:也不知道为啥Button设置了onClick和onTouch,其中谁会先响应:或许你会问onTouch和onTouchEvent有什么区别,又该如何使用?这里一切的一切,只要你了解了事件分发机制,你会发现,解释这都不是事儿! 相关Touch事件的方法 1.public boolean dispatchTouchEvent(MotionEve