打造云端流计算、在线业务、数据分析的业务数据闭环 - 阿里云RDS、HybridDB for PostgreSQL最佳实践

3 minute read

背景

水的流动汇成江河大海,孕育生命,形成大自然生态。数据流动,推进社会进步,拓展业务边界。

《从人类河流文明 洞察 数据流动的重要性》

以某淘系业务案例展开,看看用户如何利用阿里云RDS PostgreSQL,HybridDB for PostgreSQL,海量对象存储OSS,打造一个从流计算到在线业务,再到数据分析和挖掘的业务,发挥数据的价值,拓展业务的边界。

业务简介

一个电商业务通常会涉及 商家、门店、物流、用户、支付渠道、贷款渠道、商品、平台、小二、广告商、厂家、分销商、店主、店员、监管员、税务、质检等等角色。

这些对象的活动会产生大量的 浏览、订单、投诉、退款、纠纷等数据。

平台业务的目标:

根据这些对象的数据,实时的进行分析,实时的舆情展示,实时的找出需要主动服务的对象等。属于一个智能化的服务运营平台。

架构

要实现“智能化的服务运营平台”的目标,我们需要将各个业务线产生的相关行为、字典化数据准实时汇总起来,进行统一的实时分析。

pic

1、数据源

数据源是来自各个业务线的数据库或实时日志、消息队列、关系数据库、流计算平台等。

2、实时分析数据库

实时分析型数据库,选择阿里云HybridDB for PostgreSQL是一款基于Greenplum开源版本GPDB打造的分布式实时分析型数据库。支持PB级的数据量,支持行列混合存储,支持压缩,支持更新。

已有互联网、金融、国家安全等行业大量用户案例。

OLAP语法支持全面。阿里云HDB在开源基础上还增加了JSON,估值类型HLL的支持。同时内核层面进行了优化,增加了LLVM,SORTKEY,METADATA,OSS外部表,MADLIB机器学习库 等特性,大幅提升分析型SQL的性能。

3、调度平台

HDB for PostgreSQL是一个分析型的数据库,为分析场景设计,虽然支持单条INSERT的写入,但是性能最好的依旧是并行的数据灌入。比如使用阿里云HybridDB for PostgreSQL的OSS外部表功能,从阿里云OSS对象存储并行的导入数据。

由于数据来源多,写入并发高,所以建议的做法是通过调度的方法,准实时的将数据合并然后并行写入到HybridDB for PostgreSQL

同时对于一些状态数据,实际上业务仅需最新的一条,所以单条INSERT也会增加数据的写入量。因此我们会增加一个实时合并数据的数据库。

4、实时合并数据库

实时合并数据库的设计目标:

可以接受高并发的写入,同时支持数据合并,支持直接将数据写入OSS对象存储。

在阿里云中对应的是RDS PostgreSQL产品。它可以支持非常高并发的数据写入(单实例可以达到几百万行/s的写入速度),同时支持将数据并发的写入OSS对象存储。从而实现数据的合并,最终合并到HDB for PostgreSQL的目标。

5、OSS对象存储

阿里云OSS对象存储,与RDS PostgreSQL, HybridDB for PostgreSQL结合,可以实现用户数据的冷热分离,数据中转等功能(作为数据中转时,每个HybridDB for PostgreSQL 目前已支持每个数据节点30MB/s的速度,如果有128个实例,速度就达到了4GB/s)。

OSS海量存储,海量带宽,廉价。

在阿里集团、阿里公有云用户群中已经有大量成熟的案例,通过OSS实现数据库的冷热分离,数据高速中转。

6、BI运营平台

业务方围绕HybridDB for PostgreSQL数据库打造的一个BI平台。运算交给HybridDB for PostgreSQL。

7、其他数据平台(例如MQ(kafka))

通过RDS PG提供的ali_decode插件,可以实时的从WAL(重做日志)将数据提取出来,并写入到对应的目标接收平台,如Kafka,实现数据的实时分发。

《PostgreSQL 最佳实践 - 逻辑增量复制(MySQL <-> PgSQL <-> PgSQL)》

实时合并与调度实施细节

RDS PostgreSQL

RDS PostgreSQL 负责接收实时业务数据,合并业务数据,并将数据写入OSS。

为了实现高效的数据切换和清理(因为作为数据合并的数据库,不需要保留历史数据,只管合并和导出到OSS就好了)。

同时RDB PostgreSQL也可以作为业务方的OLTP业务库来使用。功能强大且稳定性好。

1、需要创建两张表,应用程序往其中的一张表写,通过调度系统来实现导出到OSS和切换表名的动作。

create table feed_log (id int, c1 int, c2 int, info text, crt_time timestamp);  
  
create index idx_feed_log on feed_log (id, crt_time desc);  
  
create table feed_log_shadow (id int, c1 int, c2 int, info text, crt_time timestamp);  
  
create index idx_feed_log_shadow on feed_log (id, crt_time desc);  

2、来源数据写入feed_log

insert into feed_log values (?,?,?,?,?);  

3、调度(将feed_log数据写入OSS)

3.1 切换表名。

begin;  
set lock_timeout='2s';  
alter table feed_log_shadow rename to tmp_feed_log_shadow;  
alter table feed_log rename to feed_log_shadow;  
alter table tmp_feed_log_shadow rename to feed_log;  
end;  

3.2 将feed_log的数据,通过RDS PostgreSQL OSS_EXT外部表接口导入到OSS

begin;  
set lock_timeout='2s';  
lock table feed_log_shadow in ACCESS EXCLUSIVE mode;   
  
drop table if exists oss_ext;  
  
-- 创建外部表语法请参考   
-- https://help.aliyun.com/document_detail/44461.html   
-- https://help.aliyun.com/document_detail/35457.html  
  
create oss_ext ....;       
  
-- 使用窗口查询,将合并后的数据写入外部表,每个PK,取最后一条记录。(取最后一条是业务需求,请读者按自己的需求来)  
insert into oss_ext   
  select id,c1,c2,info,crt_time from   
    (select row_number() over(partition by id order by crt_time desc) as rn, * from feed_log_shadow) t   
  where rn=1;  
  
-- 清除已导入oss的记录  
truncate feed_log_shadow;  
end;  

RDS PostgreSQL的调度步骤结束,接下来就是将OSS的数据合并到HybridDB for PostgreSQL。

HybridDB for PostgreSQL

创建HybridDB for PostgreSQL的全量表,但是业务上的需求是ID唯一。

我们选择ID为分布键,crt_time为分区键,但是分区键的VALUE可能变更,所以后面合并的时候需要注意方法。

create table feed_uniq (id int, c1 int, c2 int, info text, crt_time timestamp)   
with (APPENDONLY=true, BLOCKSIZE=2097152,ORIENTATION=column)   
distributed by (id)   
partition by range (crt_time)   
(   
  start (date '2017-07-01') inclusive   
  end (date '2017-08-01') exclusive   
  every (interval '1 day')   
) ;  

创建HybridDB for PostgreSQL OSS外部表语法请参考

https://help.aliyun.com/document_detail/44461.html

https://help.aliyun.com/document_detail/35457.html

-- 创建HybridDB for PostgreSQL外部表,指向RDS PG导出的OSS对应的目录  
-- 仅需创建一次  
create writeable external table oss_ext ....;    

3.3 导入(合并,存在则更新,不存在则插入)到HybridDB for PostgreSQL

begin;  
  
-- 合并来自上游的OSS文件到HDB的feed_uniq已有记录  
  
-- update(由于可能涉及到分区字段的跨分区更新,所以拆分成两个步骤delete, insert)  
  
-- delete  
delete from feed_uniq using oss_ext where feed_uniq.id=oss_ext.id;  
  
-- insert(包括更新和实际的新增数据)  
  
insert into feed_uniq (...) select * from oss_ext;  
  
-- 清除oss bucket  
调用OSS API清除对应的oss bucket  

附加知识点,如果目标端是PG,可以用insert xxx on conflict xxx do xxx;的合并语法。如果是9.5以前的版本,

1、可以使用函数合并的方法例如:

do language plpgsql $$
declare
  x tmp;
begin
  for x in select * from tmp 
  loop
    update old_tbl set xx=xx where pk=x.pk;
    if not found then
      insert into old_tbl values (x.*);
    end if;
  end loop;
  exception when others then
    return;
end;
$$;

2、RULE法

实际上PostgreSQL很早就支持RULE语法,可以在RULE中创建规则,存在则更新,不存在则插入。

postgres=# create table d(id int primary key, info text, crt_time timestamp);

postgres=# create rule r1 as on insert to d where (exists (select 1 from d where d.id=NEW.id)) do instead update d set info=NEW.info,crt_time=NEW.crt_time where id=NEW.id;

postgres=# insert into d values (1,'test',now());
INSERT 0 1
postgres=# select * from d;
 id | info |          crt_time          
----+------+----------------------------
  1 | test | 2017-08-10 14:12:20.053353
(1 row)
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# select * from d;
 id |  info   |          crt_time          
----+---------+----------------------------
  1 | test123 | 2017-08-10 14:12:26.964074
(1 row)

存在则不插入(忽略),不存在则更新。实现幂等写入(断点续传写入不出问题)。

postgres=# create table d(id int primary key, info text, crt_time timestamp);

postgres=# create rule r1 as on insert to d where (exists (select 1 from d where d.id=NEW.id)) do instead nothing;
CREATE RULE
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (0,'test123',now());
INSERT 0 1

注意,HDB for PG请不要使用以上2种方法MERGE,因为HDB的所有DML都是2PC的,单条单条的处理性能不好,如果使用了列存则有一个更加严重的问题。

HDB的列存是每次事务结束记录列存BLOCK级别偏移量作为事务结束标记,需要调用系统的FSYNC接口进行持久化,一个事务不管多大,凡是事务结束时,每个列对应的数据文件的最后一个追加的BLOCK是需要被冻结的,下次事务就会使用新追加的BLOCK。

由于HDB的列存储持久化机制的问题,如果我们使用类似PostgreSQL的insert on conflict或function合并的方法,会导致非常严重的性能问题。

建议HDB for PG的数据合并,采用三步走的方法。

1、需要合并的数据写入临时表。

2、采用delete from xx using tmp where xx.pk=tmp.pk;删除重复数据。

3、采用insert into xx select * from tmp;写入。

以上三步可以在事务中完成。

调度系统

将以上的调度事务,写入调度平台,设置好依赖关系,就可以实现增量、准实时的数据写入到HybridDB for PostgreSQL了。

《使用D2工作流在ODPS和HybridDB for PG(Greenplum)间自动同步数据》

阿里云HybridDB for PostgreSQL的内核进化

HybridDB for PostgreSQL作为一款支持冷热分离、线性扩展的分析型数据库,除了具备最基本的数据分析能力,阿里云数据库内核团队还对其进行了功能、性能方面的扩展?

1、OSS 海量外部存储。

通过OSS海量存储,打通了云端所有的数据源,同时通过OSS支持冷热数据分离。

2、HLL估值插件。

在分析场景中,估值计算是一个非常常见的需求,通过HLL估值插件,HDB PG支持了海量数据的高效估值查询。

3、MADlib机器学习插件。

通过MADlib机器学习插件,用户可以使用pivotalR和R语言连接HDB PG,实现R的并行计算。

4、开放plpython编程接口。

用户可以在HDB PG中编写python程序,实现复杂的业务处理逻辑或UDF。

5、PostGIS插件

支持地理位置海量数据的挖掘,例如时间、空间维度的数据挖掘。

6、JSON

采用JSON类型,支持更加灵活数据来源的数据挖掘。

7、SORT KEY

通过SORT KEY,用户可以在不建索引的情况下,实现高效的数据过滤和检索。特别适合分析业务中的历史静态数据。对常用的查询列或排序列进行CLUSTER操作后,通过METASCAN,可以在没有索引的情况下,实现比全表扫描快千倍的性能提升。

8、LLVM

静态编译,用户在对大量数据进行访问或运算时,降低数据库内核层面的开销。(类似电池充电或放电时的内阻开销),内耗降低后,性能有3到5倍的提升。

9、向量计算。(开发中)

利用CPU的向量计算指令,批量处理数据,有10倍左右的性能提升。

10、METASCAN

结合SORT KEY,STATS等元信息,实现页级、存储级的WHERE,PROJECTION等下推。从而减少计算层的数据接收量或处理开销。

11、数据写入支持三角模式,开放式驱动包。(开发中)

利用三角模式,用户可以直接写数据到HDB PG的数据节点,减少MASTER节点的开销,消除直接写入的瓶颈。(现有技术通过OSS消除写入瓶颈)

12、支持云生态圈。包括 ETL云服务、BI云服务 等。

简化用户的开发成本,利用云生态,打造智能的企业数据BI平台,作为企业大数据分析的运算和存储的核心引擎。

13、内置高可用、备份调度。扩容、缩容 一键完成。降低用户的使用成本。

达到的效果

通过这个架构,用户实现了流计算、在线业务、数据分析的业务数据闭环。

将分析时间从天的频率提升到了分钟级别。

小结

利用阿里云的云生态,RDS PostgreSQL,HybridDB for PostgreSQL,对象存储OSS,QuickBI,流计算平台,消息队列,中间件服务等一系列云服务,帮助企业打造智能的企业数据BI平台,HybridDB for PostgreSQL也企业大数据实时分析运算和存储的核心引擎。

实现了企业在云端从流计算、在线业务、到数据实时分析的业务数据闭环。

参考

《从人类河流文明 洞察 数据流动的重要性》

阿里云 RDS PostgreSQL

阿里云 HybridDB for PostgreSQL

《Greenplum 性能评估公式 - 阿里云HybridDB for PostgreSQL最佳实践》

Flag Counter

digoal’s 大量PostgreSQL文章入口