打造云端流计算、在线业务、数据分析的业务数据闭环 - 阿里云RDS、HybridDB for PostgreSQL最佳实践
背景
水的流动汇成江河大海,孕育生命,形成大自然生态。数据流动,推进社会进步,拓展业务边界。
以某淘系业务案例展开,看看用户如何利用阿里云RDS PostgreSQL,HybridDB for PostgreSQL,海量对象存储OSS,打造一个从流计算到在线业务,再到数据分析和挖掘的业务,发挥数据的价值,拓展业务的边界。
业务简介
一个电商业务通常会涉及 商家、门店、物流、用户、支付渠道、贷款渠道、商品、平台、小二、广告商、厂家、分销商、店主、店员、监管员、税务、质检等等角色。
这些对象的活动会产生大量的 浏览、订单、投诉、退款、纠纷等数据。
平台业务的目标:
根据这些对象的数据,实时的进行分析,实时的舆情展示,实时的找出需要主动服务的对象等。属于一个智能化的服务运营平台。
架构
要实现“智能化的服务运营平台”的目标,我们需要将各个业务线产生的相关行为、字典化数据准实时汇总起来,进行统一的实时分析。
1、数据源
数据源是来自各个业务线的数据库或实时日志、消息队列、关系数据库、流计算平台等。
2、实时分析数据库
实时分析型数据库,选择阿里云HybridDB for PostgreSQL是一款基于Greenplum开源版本GPDB打造的分布式实时分析型数据库。支持PB级的数据量,支持行列混合存储,支持压缩,支持更新。
已有互联网、金融、国家安全等行业大量用户案例。
OLAP语法支持全面。阿里云HDB在开源基础上还增加了JSON,估值类型HLL的支持。同时内核层面进行了优化,增加了LLVM,SORTKEY,METADATA,OSS外部表,MADLIB机器学习库 等特性,大幅提升分析型SQL的性能。
3、调度平台
HDB for PostgreSQL是一个分析型的数据库,为分析场景设计,虽然支持单条INSERT的写入,但是性能最好的依旧是并行的数据灌入。比如使用阿里云HybridDB for PostgreSQL的OSS外部表功能,从阿里云OSS对象存储并行的导入数据。
由于数据来源多,写入并发高,所以建议的做法是通过调度的方法,准实时的将数据合并然后并行写入到HybridDB for PostgreSQL
同时对于一些状态数据,实际上业务仅需最新的一条,所以单条INSERT也会增加数据的写入量。因此我们会增加一个实时合并数据的数据库。
4、实时合并数据库
实时合并数据库的设计目标:
可以接受高并发的写入,同时支持数据合并,支持直接将数据写入OSS对象存储。
在阿里云中对应的是RDS PostgreSQL产品。它可以支持非常高并发的数据写入(单实例可以达到几百万行/s的写入速度),同时支持将数据并发的写入OSS对象存储。从而实现数据的合并,最终合并到HDB for PostgreSQL的目标。
5、OSS对象存储
阿里云OSS对象存储,与RDS PostgreSQL, HybridDB for PostgreSQL结合,可以实现用户数据的冷热分离,数据中转等功能(作为数据中转时,每个HybridDB for PostgreSQL 目前已支持每个数据节点30MB/s的速度,如果有128个实例,速度就达到了4GB/s)。
OSS海量存储,海量带宽,廉价。
在阿里集团、阿里公有云用户群中已经有大量成熟的案例,通过OSS实现数据库的冷热分离,数据高速中转。
6、BI运营平台
业务方围绕HybridDB for PostgreSQL数据库打造的一个BI平台。运算交给HybridDB for PostgreSQL。
7、其他数据平台(例如MQ(kafka))
通过RDS PG提供的ali_decode插件,可以实时的从WAL(重做日志)将数据提取出来,并写入到对应的目标接收平台,如Kafka,实现数据的实时分发。
《PostgreSQL 最佳实践 - 逻辑增量复制(MySQL <-> PgSQL <-> PgSQL)》
实时合并与调度实施细节
RDS PostgreSQL
RDS PostgreSQL 负责接收实时业务数据,合并业务数据,并将数据写入OSS。
为了实现高效的数据切换和清理(因为作为数据合并的数据库,不需要保留历史数据,只管合并和导出到OSS就好了)。
同时RDB PostgreSQL也可以作为业务方的OLTP业务库来使用。功能强大且稳定性好。
1、需要创建两张表,应用程序往其中的一张表写,通过调度系统来实现导出到OSS和切换表名的动作。
create table feed_log (id int, c1 int, c2 int, info text, crt_time timestamp);
create index idx_feed_log on feed_log (id, crt_time desc);
create table feed_log_shadow (id int, c1 int, c2 int, info text, crt_time timestamp);
create index idx_feed_log_shadow on feed_log (id, crt_time desc);
2、来源数据写入feed_log
insert into feed_log values (?,?,?,?,?);
3、调度(将feed_log数据写入OSS)
3.1 切换表名。
begin;
set lock_timeout='2s';
alter table feed_log_shadow rename to tmp_feed_log_shadow;
alter table feed_log rename to feed_log_shadow;
alter table tmp_feed_log_shadow rename to feed_log;
end;
3.2 将feed_log的数据,通过RDS PostgreSQL OSS_EXT外部表接口导入到OSS
begin;
set lock_timeout='2s';
lock table feed_log_shadow in ACCESS EXCLUSIVE mode;
drop table if exists oss_ext;
-- 创建外部表语法请参考
-- https://help.aliyun.com/document_detail/44461.html
-- https://help.aliyun.com/document_detail/35457.html
create oss_ext ....;
-- 使用窗口查询,将合并后的数据写入外部表,每个PK,取最后一条记录。(取最后一条是业务需求,请读者按自己的需求来)
insert into oss_ext
select id,c1,c2,info,crt_time from
(select row_number() over(partition by id order by crt_time desc) as rn, * from feed_log_shadow) t
where rn=1;
-- 清除已导入oss的记录
truncate feed_log_shadow;
end;
RDS PostgreSQL的调度步骤结束,接下来就是将OSS的数据合并到HybridDB for PostgreSQL。
HybridDB for PostgreSQL
创建HybridDB for PostgreSQL的全量表,但是业务上的需求是ID唯一。
我们选择ID为分布键,crt_time为分区键,但是分区键的VALUE可能变更,所以后面合并的时候需要注意方法。
create table feed_uniq (id int, c1 int, c2 int, info text, crt_time timestamp)
with (APPENDONLY=true, BLOCKSIZE=2097152,ORIENTATION=column)
distributed by (id)
partition by range (crt_time)
(
start (date '2017-07-01') inclusive
end (date '2017-08-01') exclusive
every (interval '1 day')
) ;
创建HybridDB for PostgreSQL OSS外部表语法请参考
https://help.aliyun.com/document_detail/44461.html
https://help.aliyun.com/document_detail/35457.html
-- 创建HybridDB for PostgreSQL外部表,指向RDS PG导出的OSS对应的目录
-- 仅需创建一次
create writeable external table oss_ext ....;
3.3 导入(合并,存在则更新,不存在则插入)到HybridDB for PostgreSQL
begin;
-- 合并来自上游的OSS文件到HDB的feed_uniq已有记录
-- update(由于可能涉及到分区字段的跨分区更新,所以拆分成两个步骤delete, insert)
-- delete
delete from feed_uniq using oss_ext where feed_uniq.id=oss_ext.id;
-- insert(包括更新和实际的新增数据)
insert into feed_uniq (...) select * from oss_ext;
-- 清除oss bucket
调用OSS API清除对应的oss bucket
附加知识点,如果目标端是PG,可以用insert xxx on conflict xxx do xxx;的合并语法。如果是9.5以前的版本,
1、可以使用函数合并的方法例如:
do language plpgsql $$
declare
x tmp;
begin
for x in select * from tmp
loop
update old_tbl set xx=xx where pk=x.pk;
if not found then
insert into old_tbl values (x.*);
end if;
end loop;
exception when others then
return;
end;
$$;
2、RULE法
实际上PostgreSQL很早就支持RULE语法,可以在RULE中创建规则,存在则更新,不存在则插入。
postgres=# create table d(id int primary key, info text, crt_time timestamp);
postgres=# create rule r1 as on insert to d where (exists (select 1 from d where d.id=NEW.id)) do instead update d set info=NEW.info,crt_time=NEW.crt_time where id=NEW.id;
postgres=# insert into d values (1,'test',now());
INSERT 0 1
postgres=# select * from d;
id | info | crt_time
----+------+----------------------------
1 | test | 2017-08-10 14:12:20.053353
(1 row)
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# select * from d;
id | info | crt_time
----+---------+----------------------------
1 | test123 | 2017-08-10 14:12:26.964074
(1 row)
存在则不插入(忽略),不存在则更新。实现幂等写入(断点续传写入不出问题)。
postgres=# create table d(id int primary key, info text, crt_time timestamp);
postgres=# create rule r1 as on insert to d where (exists (select 1 from d where d.id=NEW.id)) do instead nothing;
CREATE RULE
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (0,'test123',now());
INSERT 0 1
注意,HDB for PG请不要使用以上2种方法MERGE,因为HDB的所有DML都是2PC的,单条单条的处理性能不好,如果使用了列存则有一个更加严重的问题。
HDB的列存是每次事务结束记录列存BLOCK级别偏移量作为事务结束标记,需要调用系统的FSYNC接口进行持久化,一个事务不管多大,凡是事务结束时,每个列对应的数据文件的最后一个追加的BLOCK是需要被冻结的,下次事务就会使用新追加的BLOCK。
由于HDB的列存储持久化机制的问题,如果我们使用类似PostgreSQL的insert on conflict或function合并的方法,会导致非常严重的性能问题。
建议HDB for PG的数据合并,采用三步走的方法。
1、需要合并的数据写入临时表。
2、采用delete from xx using tmp where xx.pk=tmp.pk;删除重复数据。
3、采用insert into xx select * from tmp;写入。
以上三步可以在事务中完成。
调度系统
将以上的调度事务,写入调度平台,设置好依赖关系,就可以实现增量、准实时的数据写入到HybridDB for PostgreSQL了。
《使用D2工作流在ODPS和HybridDB for PG(Greenplum)间自动同步数据》
阿里云HybridDB for PostgreSQL的内核进化
HybridDB for PostgreSQL作为一款支持冷热分离、线性扩展的分析型数据库,除了具备最基本的数据分析能力,阿里云数据库内核团队还对其进行了功能、性能方面的扩展?
1、OSS 海量外部存储。
通过OSS海量存储,打通了云端所有的数据源,同时通过OSS支持冷热数据分离。
2、HLL估值插件。
在分析场景中,估值计算是一个非常常见的需求,通过HLL估值插件,HDB PG支持了海量数据的高效估值查询。
3、MADlib机器学习插件。
通过MADlib机器学习插件,用户可以使用pivotalR和R语言连接HDB PG,实现R的并行计算。
4、开放plpython编程接口。
用户可以在HDB PG中编写python程序,实现复杂的业务处理逻辑或UDF。
5、PostGIS插件
支持地理位置海量数据的挖掘,例如时间、空间维度的数据挖掘。
6、JSON
采用JSON类型,支持更加灵活数据来源的数据挖掘。
7、SORT KEY
通过SORT KEY,用户可以在不建索引的情况下,实现高效的数据过滤和检索。特别适合分析业务中的历史静态数据。对常用的查询列或排序列进行CLUSTER操作后,通过METASCAN,可以在没有索引的情况下,实现比全表扫描快千倍的性能提升。
8、LLVM
静态编译,用户在对大量数据进行访问或运算时,降低数据库内核层面的开销。(类似电池充电或放电时的内阻开销),内耗降低后,性能有3到5倍的提升。
9、向量计算。(开发中)
利用CPU的向量计算指令,批量处理数据,有10倍左右的性能提升。
10、METASCAN
结合SORT KEY,STATS等元信息,实现页级、存储级的WHERE,PROJECTION等下推。从而减少计算层的数据接收量或处理开销。
11、数据写入支持三角模式,开放式驱动包。(开发中)
利用三角模式,用户可以直接写数据到HDB PG的数据节点,减少MASTER节点的开销,消除直接写入的瓶颈。(现有技术通过OSS消除写入瓶颈)
12、支持云生态圈。包括 ETL云服务、BI云服务 等。
简化用户的开发成本,利用云生态,打造智能的企业数据BI平台,作为企业大数据分析的运算和存储的核心引擎。
13、内置高可用、备份调度。扩容、缩容 一键完成。降低用户的使用成本。
达到的效果
通过这个架构,用户实现了流计算、在线业务、数据分析的业务数据闭环。
将分析时间从天的频率提升到了分钟级别。
小结
利用阿里云的云生态,RDS PostgreSQL,HybridDB for PostgreSQL,对象存储OSS,QuickBI,流计算平台,消息队列,中间件服务等一系列云服务,帮助企业打造智能的企业数据BI平台,HybridDB for PostgreSQL也企业大数据实时分析运算和存储的核心引擎。
实现了企业在云端从流计算、在线业务、到数据实时分析的业务数据闭环。
参考
《Greenplum 性能评估公式 - 阿里云HybridDB for PostgreSQL最佳实践》