阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 数据并行导出到OSS

5 minute read

背景

阿里云RDS PostgreSQL、HybridDB for PostgreSQL提供了一个非常强大的功能,OSS对象存储外部表。

阿里云的RDS PostgreSQL用户可以利用OSS存储冷数据(OSS外部表的形态呈现),实现冷热分离;也可以利用OSS作为数据的中转桥梁,打通其他云端业务,例如HDB FOR PostgreSQL分析型数据库。

oss外部表的用法文档如下。

https://help.aliyun.com/document_detail/44461.html

目前oss外部表支持文本\GZIP等格式。将来还会支持流行的列存格式(ORC,parquet等),扫描下推,并行读写OSS文件等,提升体验。

由于目前RDS PG的版本是9.4,9.4的版本目前不支持并行框架,单个写进程是15MB/s左右。采用gzip压缩格式,可能能提升到20MB/s。

采用并行框架的PostgreSQL 10,可以在写出到OSS时开启并行写,每个WORKER进程 20MB/s,单表导到OSS的速度将得到大幅度的提升(读取也一样支持并行)。

如果RDS PG 9.4的用户需要将大表快速的写出到OSS的话,有什么优化手段呢?

答案是通过PG DBLINK来实现异步并行。

业务背景

用户的订餐、购物、寄送包裹等操作,会产生订单,订单与业务逻辑挂钩,在各个业务系统流转会生成新的状态或属性(每个业务系统产生的数据字段可能都不一样)。

为了对订单数据进行统一管理、准实时数据分析、透视。需要实时的将订单数据在各个业务系统中生成的状态、属性进行合并,输送到分析型数据库HybridDB for PostgreSQL。

数据流

订单信息,从业务系统流入阿里云的流计算平台,从流计算平台实时写入RDS PG,从RDS PG批量写入OSS,从OSS批量合并到HybridDB PG(HybridDB PostgreSQL保存最完整的订单信息,提供分析透视)。

1、从流计算平台到RDS PG。

实时、批量,采用UPSERT的方式,PostgreSQL UPSERT的语法请参考:

《PostgreSQL upsert功能(insert on conflict do)的用法》

我们采用了其中FUNCTION批量upsert的方法。对于PostgreSQL 9.5以及以上版本,可以在function中使用insert into on conflict语法(因为insert into on conflict不支持values (),(),()…()的批量写法)。

2、从RDS PG写入OSS

由于RDS PG 9.4没有内置写OSS并行,当数据量很大的时候,单线程写速度很慢,容易成为瓶颈。

这个是本文的重点,RDS PG 9.4如何采用单表异步并行,写入OSS。(未来PG 10上线,内置了并行,不需要这么麻烦)

3、从OSS合并到HybridDB PostgreSQL

采用三步走的方法:

3.1 oss_tmp1 inner join big_table into tmp2 得到大表(总表)已有订单已有字段属性+订单新状态的数据tmp2。

3.2 delete from big_table using tmp2 删除总表中已剥离出来的tmp2。

3.3 insert into bit_table select * from oss_tmp1 left join tmp2 where tmp2.* is null (union all) tmp2。 将数据汇入总表。

RULE的方式,并不能提升效果

创建4个外部表(4个并行),表名不一样,其他外部参数(bucket, dir)一样,文件名会以表名来命名,所以不用担心写入OSS 同一目录的时候文件重名:    
    
tbl_oss_ext0    
tbl_oss_ext1    
tbl_oss_ext2    
tbl_oss_ext3    

创建一张规则表,与外部表定义一致:

create table tbl_entry (like tbl_oss_ext0);    

创建规则:

create rule r0 as on insert to tbl_entry where mod(order_id, 4)=0 do instead insert into tbl_oss_ext0 values (NEW.*);    
create rule r1 as on insert to tbl_entry where mod(order_id, 4)=1 do instead insert into tbl_oss_ext1 values (NEW.*);    
create rule r2 as on insert to tbl_entry where mod(order_id, 4)=2 do instead insert into tbl_oss_ext2 values (NEW.*);    
create rule r3 as on insert to tbl_entry where mod(order_id, 4)=3 do instead insert into tbl_oss_ext3 values (NEW.*);    

写入规则表,数据将重定向到4个外部表。

insert into tbl_entry select * from stream_table;    

因为只使用了一个进程在做这件事情,所以这种方法并不是真正的并行。

所以采用DBLINK异步调用,实现真正的并行。

https://www.postgresql.org/docs/10/static/dblink.html

基于DBLINK异步调用的并行设计

1、前端直接写PG的分区表(可选)

例如前端将数据直接写入到16个分区,导出时,每个分区表对应一个OSS外部表,从而实现16的并行度。

分区表有两种写法:

PG内置分区(可以使用继承+触发器、规则,pg_pathman,内置分区语法 等)。

业务层逻辑分区,业务层确定数据写入哪个分区。

这两种方法,方法1更灵活,但是性能会受到一定的影响(pg_pathman几乎没有影响,建议选择pg_pathman)。

如果不写分区表,单表开启并行的话,可以使用取模的方法来并行,会带来一定的重复扫描本地表的成本(每个并行都需要扫描所有记录,而且不建议用索引来分割,因为索引扫描速度也好不到哪里去)。

并行例子如下(以取模法为例)

1、建立断开连接的函数,目的是不抛异常。

create or replace function dis_conn(name) returns void as $$  
declare  
begin  
  perform dblink_disconnect($1);  
  return;  
exception when others then  
  return;  
end;  
$$ language plpgsql strict;  

2、使用do来编写并行逻辑,也可以将这部分写成plpgsql函数,随用户的喜好。

封装成plpgsql函数,可以简化用户的调用。

do language plpgsql $$  
declare  
begin  
  -- 1、断开已有连接  
  perform dis_conn('外部表名_1');  
  perform dis_conn('外部表名_2');  
  perform dis_conn('外部表名_3');  
  perform dis_conn('外部表名_4');  
  
  -- 2、清空状态表。  
  delete from t_result;  
  
  -- 3、写入状态表初始记录  
  insert into t_result values ('外部表名_1'),('外部表名_2'),('外部表名_3'),('外部表名_4');  
  
  -- 4、打开dblink连接。建立本地DBLINK连接(并设置连接指纹)    
  --    使用application_name来设置连接指纹。       
  perform dblink_connect('外部表名_1','hostaddr=127.0.0.1 port='||current_setting('port')||' dbname=postgres user=xxx password=pwd application_name=外部表名_1');    
  perform dblink_connect('外部表名_2','hostaddr=127.0.0.1 port='||current_setting('port')||' dbname=postgres user=xxx password=pwd application_name=外部表名_2');    
  perform dblink_connect('外部表名_3','hostaddr=127.0.0.1 port='||current_setting('port')||' dbname=postgres user=xxx password=pwd application_name=外部表名_3');    
  perform dblink_connect('外部表名_4','hostaddr=127.0.0.1 port='||current_setting('port')||' dbname=postgres user=xxx password=pwd application_name=外部表名_4');    
  
  -- 5、执行异步SQL,包括并行写OSS的SQL,以及更新写入OSS成功的状态。  
  --   (原子操作,如果写OSS失败,状态将不会被更新)  
  perform dblink_send_query('外部表名_1','begin; insert into 外部表1 select * from tmp where mod(order_id,4)=0; update tbl_result set status='done' where task='外部表名_1'; end;');    
  perform dblink_send_query('外部表名_2','begin; insert into 外部表2 select * from tmp where mod(order_id,4)=1; update tbl_result set status='done' where task='外部表名_2'; end;');    
  perform dblink_send_query('外部表名_3','begin; insert into 外部表3 select * from tmp where mod(order_id,4)=2; update tbl_result set status='done' where task='外部表名_3'; end;');    
  perform dblink_send_query('外部表名_4','begin; insert into 外部表4 select * from tmp where mod(order_id,4)=3; update tbl_result set status='done' where task='外部表名_4'; end;');    
  
  -- 6、断开DBLINK  
  --    为什么要断开DBLINK?  
  --    开启了异步调用的连接,需要get异步调用的结果后,才能继续使用这个连接。或者关闭连接后,重新建立连接即可使用。    
  --    断开在跑异步SQL的DBLINK,不会影响远程异步SQL的正常运行,可以放心关闭。  
  perform dblink_disconnect('外部表名_1');  
  perform dblink_disconnect('外部表名_2');  
  perform dblink_disconnect('外部表名_3');  
  perform dblink_disconnect('外部表名_4');  
end;  
$$;  

4、监控异步任务状态,判断异步任务是否全部结束。

4.1、以下SQL没有记录返回,说明任务跑完。但是是否全部成功,还需要第二步判断。

select * from pg_stat_activity where application_name in ('外部表名_1','外部表名_2','外部表名_3','外部表名4') and state !~ 'idle';    

4.2、通过查询tbl_result,如果status=’done’的记录数不等于线程数,则说明有任务失败。

4.3、任务结束后的处理:

如果任务正常结束:清除tbl_result表。

如果任务异常结束:清除tbl_result表、清除oss dir,重跑任务。

5、DBLINK相关手册

https://www.postgresql.org/docs/10/static/dblink.html

dblink_connect — opens a persistent connection to a remote database    
dblink_connect_u — opens a persistent connection to a remote database, insecurely    
dblink_disconnect — closes a persistent connection to a remote database    
dblink — executes a query in a remote database    
dblink_exec — executes a command in a remote database    
dblink_open — opens a cursor in a remote database    
dblink_fetch — returns rows from an open cursor in a remote database    
dblink_close — closes a cursor in a remote database    
dblink_get_connections — returns the names of all open named dblink connections    
dblink_error_message — gets last error message on the named connection    
dblink_send_query — sends an async query to a remote database    
dblink_is_busy — checks if connection is busy with an async query    
dblink_get_notify — retrieve async notifications on a connection    
dblink_get_result — gets an async query result    
dblink_cancel_query — cancels any active query on the named connection    
dblink_get_pkey — returns the positions and field names of a relation's primary key fields    
dblink_build_sql_insert — builds an INSERT statement using a local tuple, replacing the primary key field values with alternative supplied values    
dblink_build_sql_delete — builds a DELETE statement using supplied values for primary key field values    
dblink_build_sql_update — builds an UPDATE statement using a local tuple, replacing the primary key field values with alternative supplied values    

6、达到的效果

开启40个并行,26GB的数据,140秒,达到190MB/s的写出速度。

7、采用hash取模的方法,将单表写出变成了并行,而如果使用写入分区表、或者采用数据库分区表的模式,那么性在扫描的CPU开销IO开销方面还可以大幅降低,例子

性能对比:

直接扫描单表,耗时429毫秒。

-- 假设32个并行,每个并行只读取其中的1/32数据。  
postgres=# create table tbl2 as select * from tbl1 where mod(id,32)=1;  
SELECT 2572288  
  
-- 直接扫描分区表429毫秒  
postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl2;  
                                                        QUERY PLAN                                                          
--------------------------------------------------------------------------------------------------------------------------  
 Seq Scan on public.tbl2  (cost=0.00..218291.97 rows=18256497 width=36) (actual time=0.010..309.304 rows=2572288 loops=1)  
   Output: id, info  
   Buffers: shared hit=35727  
 Planning time: 0.038 ms  
 Execution time: 428.955 ms  
(5 rows)  

采样HASH过滤,扫描全表,耗时9800毫秒。差异非常大。

-- 采用hash取模的方式,扫描全表,并使用CPU过滤当前并发WORKER工作线程不需要的数据,需要10秒左右。差了20倍。  
postgres=# explain (analyze,verbose,timing,costs,buffers) select * from tbl1 where mod(id,32)=1;  
                                                        QUERY PLAN                                                           
---------------------------------------------------------------------------------------------------------------------------  
 Seq Scan on public.tbl1  (cost=0.00..1952916.52 rows=271713 width=416) (actual time=0.014..9665.796 rows=2572288 loops=1)  
   Output: id, info  
   Filter: (mod(tbl1.id, 32) = 1)  
   Rows Removed by Filter: 79347712  
   Buffers: shared hit=1137778  
 Planning time: 0.050 ms  
 Execution time: 9801.723 ms  
(7 rows)  

其他例子

postgres=# create table t(id int, var varbit);
CREATE TABLE
postgres=# insert into t select generate_series(1,10000000), bit '1111111111';
INSERT 0 10000000

create or replace function dis_conn(name) returns void as $$  
declare  
begin  
  perform dblink_disconnect($1);  
  return;  
exception when others then  
  return;  
end;  
$$ language plpgsql strict;  


do language plpgsql $$  
declare  
begin  

  for i in 1..32 loop
    perform dis_conn('link'||i);
  end loop;

  create table if not exists tmp1 (id int, state int, s_time timestamp, end_time timestamp);
  delete from tmp1;
 
  
  for i in 1..32 loop
    perform dblink_connect('link'||i, 'hostaddr=127.0.0.1'::text);    
  end loop;


  for i in 1..32 loop
    perform dblink_send_query('link'||i, format('
      begin; 
      insert into tmp1 values (%s, 0, clock_timestamp()); 
      update t set var=repeat(''1'',8192)::varbit where mod(id,32)=%s-1; 
      update tmp1 set state=1,end_time=clock_timestamp() where id=%s;
      end;
      ', i, i, i) 
      );    
  end loop;

  for i in 1..32 loop
    perform dblink_disconnect('link'||i);  
  end loop;

end;  
$$;  



postgres=# select * from tmp1;
 id | state |           s_time           |          end_time          
----+-------+----------------------------+----------------------------
  2 |     1 | 2017-11-24 22:22:20.905842 | 2017-11-24 22:22:43.407478
  1 |     1 | 2017-11-24 22:22:20.905842 | 2017-11-24 22:22:43.002223
  4 |     1 | 2017-11-24 22:22:20.906027 | 2017-11-24 22:22:43.068313
  5 |     1 | 2017-11-24 22:22:20.906058 | 2017-11-24 22:22:43.335777
  3 |     1 | 2017-11-24 22:22:20.906069 | 2017-11-24 22:22:43.225404
  6 |     1 | 2017-11-24 22:22:20.906035 | 2017-11-24 22:22:43.149979
  9 |     1 | 2017-11-24 22:22:20.906171 | 2017-11-24 22:22:43.142008
 14 |     1 | 2017-11-24 22:22:20.906291 | 2017-11-24 22:22:43.055062
 10 |     1 | 2017-11-24 22:22:20.906285 | 2017-11-24 22:22:42.921642
 11 |     1 | 2017-11-24 22:22:20.906288 | 2017-11-24 22:22:43.177545
 15 |     1 | 2017-11-24 22:22:20.90638  | 2017-11-24 22:22:43.213801
 13 |     1 | 2017-11-24 22:22:20.906417 | 2017-11-24 22:22:43.230308
  7 |     1 | 2017-11-24 22:22:20.906379 | 2017-11-24 22:22:43.335494
  8 |     1 | 2017-11-24 22:22:20.90646  | 2017-11-24 22:22:43.258312
 12 |     1 | 2017-11-24 22:22:20.906483 | 2017-11-24 22:22:42.616339
 16 |     1 | 2017-11-24 22:22:20.90677  | 2017-11-24 22:22:43.092587
(16 rows)


vacuum verbose t;

云端相关产品

阿里云 RDS PostgreSQL

阿里云 HybridDB for PostgreSQL

相关案例

《打造云端流计算、在线业务、数据分析的业务数据闭环 - 阿里云RDS、HybridDB for PostgreSQL最佳实践》

小结

目前阿里云RDS PostgreSQL、HybridDB PostgreSQL oss外部表支持文本\GZIP等格式。将来还会支持流行的列存格式(ORC,parquet等),扫描下推,并行读写OSS文件等,提升体验。

由于目前RDS PG的版本是9.4,9.4的版本目前不支持并行框架,单个写进程是15MB/s左右。采用gzip压缩格式,可能能提升到20MB/s。

采用并行框架的PostgreSQL 10,可以在写出到OSS时开启并行写,每个WORKER进程 20MB/s,单表导到OSS的速度将得到大幅度的提升(读取也一样支持并行)。

如果RDS PG 9.4的用户需要将大表快速的写出到OSS的话,通过PG DBLINK来实现异步并行。

开启40个并行,26GB的数据,140秒,达到190MB/s的写出速度。

如果前端或PG内部采用分区表,而不是取模的方式,那么输出速度还可以再提高。(因为扫描更快了)。

将这个功能封装成UDF,便于用户调用:

https://www.atatech.org/articles/98990

Flag Counter

digoal’s 大量PostgreSQL文章入口