浅谈PipelineDB系列一: Stream数据是如何写到Continuous View中的

PipelineDB Version:0.9.7

PostgreSQL Version:9.5.3

PipelineDB的数据处理组件:

从上图来看主要就是pipeline_streams,stream_fdw,Continuous View,Transform。

其实就是运用了Postgres的FDW功能来实现的stream功能。

从数据库也能看到这个FDW

pipeline=# \des
                  List of foreign servers
       Name       |      Owner      | Foreign-data wrapper
------------------+-----------------+----------------------
 pipeline_streams | unknown (OID=0) | stream_fdw
(1 row)

数据流转入下图

可以看到数据流转都是通过ZeroMQ来实现的(前面的版本0.8.2之前是通过TupleBuff来实现)

数据插入到Stream后然后调用ForiegnInsert,插入到初始化的IPC里面去,在数据库目录下面有个pipeline/zmq

TransForm其实就是把数据的dest指向了Stream,数据库默认有个pipeline_stream_insert其实这个是个Trigger,把tuple再扔到目标stream里面。

或者你可以自己写UDF,就是写个trigger,数据可以写到表或者别的FDW里面,或者是自己封装的消息队列IPC都没问题,这块自由发挥的空间就比较大。

首先我们来创建个STREAM跟CV

pipeline=# create stream my_stream(x bigint,y bigint,z bigint);
CREATE STREAM
pipeline=# create continuous view v_1 as select x,y,z from my_stream;
CREATE CONTINUOUS VIEW
pipeline=#

插入一条数据:

pipeline=# insert into my_stream(x,y,z) values(1,2,3);
INSERT 0 1
pipeline=# select * from v_1;
 x | y | z
---+---+---
 1 | 2 | 3
(1 row)

pipeline=#

数据插入到CV中了,我们现在来看看PipelineDB是如何插入的。

上面有介绍了Stream就是个FDW。我们来看看他的handler(source:src/backend/pipeline/stream_fdw.c)

/*
 * stream_fdw_handler
 */
Datum
stream_fdw_handler(PG_FUNCTION_ARGS)
{
	FdwRoutine *routine = makeNode(FdwRoutine);

	/* Stream SELECTS (only used by continuous query procs) */
	routine->GetForeignRelSize = GetStreamSize;
	routine->GetForeignPaths = GetStreamPaths;
	routine->GetForeignPlan = GetStreamScanPlan;
	routine->BeginForeignScan = BeginStreamScan;
	routine->IterateForeignScan = IterateStreamScan;
	routine->ReScanForeignScan = ReScanStreamScan;
	routine->EndForeignScan = EndStreamScan;

	/* Streams INSERTs */
	routine->PlanForeignModify = PlanStreamModify;
	routine->BeginForeignModify = BeginStreamModify;
	routine->ExecForeignInsert = ExecStreamInsert;
	routine->EndForeignModify = EndStreamModify;

	routine->ExplainForeignScan = NULL;
	routine->ExplainForeignModify = NULL;

	PG_RETURN_POINTER(routine);
}

主要是关注Streams Inserts这几个函数.

每个worker process启动的时候都会初始化一个recv_id,其实这个就是ZeroMQ的ID

数据会发送到对应的队列里面去,worker process就去这个IPC里面去获取数据

source:src/backend/pipeline/ipc/microbath.c

void
microbatch_send_to_worker(microbatch_t *mb, int worker_id)
{
    ......

			worker_id = rand() % continuous_query_num_workers;
		}
	}

	recv_id = db_meta->db_procs[worker_id].pzmq_id;

	microbatch_send(mb, recv_id, async, db_meta);
	microbatch_reset(mb);
}

首先是获取worker_id 这个是随机获取的一个worker进程。stream数据随机发到一worker process里面去了

recv_id这个就是从初始化的IPC队列获取ID,数据就发送到该队列里面

最后就调用

pzmq_send(recv_id, buf, len, true)

数据就推送到了IPC中了。

(gdb) p	recv_id
$12 = 1404688165
(gdb)

这部分就是数据生产者部分。

下面就是数据消费者CV

数据接受还是通过ZMQ的API来接受的

这个主要是worker process来干活的

srouce:src/backend/pipeline/ipc/pzmq.c&reader.c

(gdb) p *zmq_state->me
$8 = {id = 1404688165, type = 7 ‘\a‘, sock = 0x1139ba0, addr = "ipc:///home/pipeline/db_0.9.7/pipeline/zmq/1404688165.sock", ‘\000‘ <repeats 965 times>}
(gdb)

可以看到这个数据是从1404688165里面获取的 ,并且把IPC的addr也给出来了,这个就是我数据库目录

获取到是个buf,然后unpack,从消息里面获取到对应的Tuple.

获取到了tuple后,然后就找所有的CV跟这个stream相关的target。遍历他们,然后执行CV中对应的SQL。

执行流程跟标准SQL差不多也是初始化执行计划然后ExecutePlan然后endplan 。

数据会到Combiner里面,如果是AGG还会有一系列操作的。

如果数据符合CV的SQL逻辑,那么数据就插入到对应的物理表。

这就是Stream的一个简单的工作原理。

谢谢

时间: 2024-10-26 12:10:04

浅谈PipelineDB系列一: Stream数据是如何写到Continuous View中的的相关文章

《浅谈JavaScript系列》系列技术文章整理收藏

<浅谈JavaScript系列>系列技术文章整理收藏 1浅谈JavaScript中面向对象技术的模拟 2浅谈javascript函数劫持[转自xfocus]第1/3页 3浅谈javascript 面向对象编程 4老鱼 浅谈javascript面向对象编程 5浅谈javascript的数据类型检测 6浅谈Javascript嵌套函数及闭包 7根据一段代码浅谈Javascript闭包 8浅谈JavaScript编程语言的编码规范 9浅谈Javascript面向对象编程 10浅谈JavaScript

浅谈ASP.net处理XML数据

XML是一种可扩展的标记语言,比之之前谈到的html有着很大的灵活性,虽然它只是与HTML仅有一个字母只差,但两者有很大的区别. XML也是标记语言,所以它每个标签必须要闭合,而HTML偶尔忘了闭合也没有多大的影响(这里也不建议大家可以遗漏,好的书写规范还是要有的) 其次,XML作为一种可以说是纯文本吧,它的主要作用并不是可以直接显示在网页上,而是作为一种数据存储或者数据传输的工具而已.但凡提及到数据这个层面,XML便是很重要的,有的数据可以不用到SQL,ORACEL的尽量不要用大型的数据库,这

浅谈数据仓库 (三) 数据漂移处理

前言 我们是一个做传统会员管理CRM的团队,应用数据的级别比较高,过去会存在这样一种现象,T+1 财务报表,运营统计各类报表 的指标偶尔会和商户的自己记录用户消费储值记录会有出入,后来了解到,我们系统中存在很严重的数据漂移问题,这也是dw系统或者ods来说普遍存在的问题,今天我们好好聊下该如何解决数据漂移问题   数据漂移产生的原因 通常我们把从源系统同步进入数据仓库的第一层数据成为ODS层数据,我们公司目前只有ODS一层,虽说只有一层,但是仍然有有一个顽疾:数据漂移,简单来说就是ODS表的同一

浅谈数据结构系列 栈和队列

计算机程序离不开算法和数据结构,在数据结构算法应用中,栈和队列应用你比较广泛,因为两者在数据存放和读取方面效率比较高,本章节重点讲解两者的基本概念和实现. 基本概念 栈:是一种先进后出,后进先出的数据结构,本质上是线性表,只是限制仅允许在表的一段进行插入和删除工作.此端为栈顶,这是在栈中应用很关键的概念.所有数据的处理都是在栈顶进行的,进栈时,栈中元素增加,栈顶上移一位,出栈时栈顶下移一位.应用中比如:洗碗,每次洗干净的碗放在上面-进栈,取碗,从顶上取出一个-出栈:装子弹-进栈,开枪-出栈. 队

浅谈oracle逻辑备份、数据泵备份及冷备份

逻辑备份(数据迁移):以逻辑结构为为单位进行的备份跨用户移动数据跨数据库移动数据库为测试保存原始的数据状态对数据库进行版本升级 逻辑导出的注意事项:exp程序在目录中发现同名文件时会直接覆盖,不提示!!exp无法备份无段的空表执行逻辑导出时一定要注意字符集!最好使用包含中文的小表做测试!!导入时的数据和导出时的数据一模一样,导出之后数据库中表的数据变化全都丢失!! 逻辑导出:所有版本都可用,服务器端和客户端都可用 mkdir -p /home/oracle/expbk SQL> create t

浅谈Android数据库版本升级及数据的迁移

概述 Android开发涉及到的数据库采用的是轻量级的SQLite3,而在实际开发中,在存储一些简单的数据,使用SharedPreferences就足够了,只有在存储数据结构稍微复杂的时候,才会使用数据库来存储.而数据库表的设计往往不是一开始就非常完美,可能在应用版本开发迭代中,表的结构也需要调整,这时候就涉及到数据库升级的问题了. 数据库升级 数据库升级,主要有以下这几种情况: 增加表 删除表 修改表 增加表字段 删除表字段 增加表和删除表问题不大,因为它们都没有涉及到数据的迁移问题,增加表只

浅谈【树】的数据生成

本人水平有限,题解不到为处,请多多谅解 本蒟蒻谢谢大家观看 如何让一份自己写的程序跑一遍随机造的数据,这时我们要用到 数据生成. 本文只介绍树的生成(我是不会告诉你我现在只会树) 题目:传送门 普通树的生成:(随机生成10个数据) 注意:最后第10个数据要手动调试,虽然我也不知道为什么. 以下上半部分是树,下半部分是题目要求. code: 1 #include<iostream> 2 #include<stdlib.h> 3 #include<time.h> 4 inl

浅谈Servlet和Filter的区别以及两者在Struts2和Springmvc中的应用

在javaweb开发中,Servlet和Filter是很重要的两个概念,我们平时进行javaweb开发的时候,会经常和Servlet和Filter打交道,但我们真的了解Servlet和Filter吗? 一.基本概念 Servlet: Servlet 是在WEB服务器上运行的程序.这个词是在 Java applet的环境中创造的,Java applet 是一种当作单独文件跟网页一起发送的小程序,它通常用于在客户端运行,结果得到为用户进行运算或者根据用户互作用定位图形等服务. 服务器上需要一些程序,

浅谈Xcode5和Xcode7在系统创建的文件夹和文件中的区别

*:first-child { margin-top: 0 !important; } body > *:last-child { margin-bottom: 0 !important; } a { color: #4183C4; } a.absent { color: #cc0000; } a.anchor { display: block; padding-left: 30px; margin-left: -30px; cursor: pointer; position: absolute