PostgreSQL 变态并行拉取单表的方法 - 按块并行(按行号(ctid)并行) + dblink 异步调用
背景
数据同步是一个较为常见的需求,为了提高性能,并行是一个非常简单暴力的手段。但是通常只能做到不同的对象之间并行,对于单个对象,比如单表,能否并行呢?
有一种方法是使用HASH并行,例如按某个或某几个字段,按hash值取模,切分为多组数据,每个进程读取一部分,并行将单表取出。但是这种方法有一个弊端,会导致重复的扫描。例子如下:
《阿里云RDS PostgreSQL OSS 外部表 - (dblink异步调用封装)并行写提速案例》
还有一种方法与之类似,可以使用某个唯一的自增字段,通过BRIN索引或B-TREE索引分段扫描取出。这种方法的好处是避免了HEAP BLOCKS的重复扫描。但是需要有相关性很好的字段以及索引的支撑,否则会有离散扫描,性能也不见得好。
那么还有其他的方法吗?
因为PostgreSQL的数据是堆存储,所以我们可以按BLOCK扫描,每个线程扫描指定的BLOCK分段。目前PG支持指定行号返回,如果需要指定BLOCK返回需要扩展内核功能。
按行号(最好按BLOCK)集合,并行取单表数据
按行号并行扫描,思路:
1、评估表占用了多少数据块
2、评估每个数据快最多有多少条记录
3、生成每个BATCH取数据的行号集合
4、分配任务,每个任务取各自的行号集合
如果支持按BLOCK扫描,思路如下:
1、评估表占了多少个数据块
2、分配任务,每个任务扫描连续的N个数据块
按行号并行的例子
1、创建一个函数,返回每个BATCH处理的连续行号集合,输入4个参数,分别为目标表占用多少数据块,每个数据块最多包含多少条记录,开启多少个并行,返回第几个并行号的行号集合。
create or replace function split_ctid(
v_blocks int, -- 多少个数据块,可通过元数据得到
v_rowsperblock int, -- 每个块内多少行,必须足够大,不要取平均值,可评估得到。 一定要大于或等于包含最多记录的某个数据块。
v_mod int, -- 拆分成多少个并行,必须小于或等于v_blocks 。
v_n int -- 当前进程读第几个MOD, 从0开始 ,到 v_mod-1
) returns tid[] as $$
select array (
select ('('||blkid||','||generate_series(1,v_rowsperblock)||')')::tid from
generate_series
(
(v_blocks/v_mod)*v_n
,
case
when v_n+1=v_mod then greatest(((v_blocks/v_mod)*(v_n+1))-1, v_blocks-1)
else ((v_blocks/v_mod)*(v_n+1))-1
end
) t(blkid)
);
$$ language sql strict immutable;
使用举例:
例如某个表有999个数据块,每个数据块最多10条记录,开启16个并行,获得第0号并行的行号集合。
postgres=# select array_length(split_ctid(999,10,16,0),1);
array_length
--------------
620
(1 row)
获得第1号并行的行号集合。
postgres=# select array_length(split_ctid(999,10,16,1),1);
array_length
--------------
620
(1 row)
获得第15号并行的行号集合。
postgres=# select array_length(split_ctid(999,10,16,15),1);
array_length
--------------
690
(1 row)
2、实际的例子
获得一个表占用了多少个数据块
postgres=# select relpages from pg_class where oid='public.a'::regclass;
relpages
----------
540541
(1 row)
评估每个数据块内有多少记录.
postgres=# select n_live_tup+n_dead_tup from pg_stat_all_tables where relid='public.a'::regclass;
?column?
-----------
100000085
(1 row)
postgres=# select reltuples from pg_class where oid='public.a'::regclass;
reltuples
-----------
1e+08
(1 row)
评估如下,建议乘以一个系数,表示一个BLOCK最多可能有多少条记录(注意,这个而仅仅是评估,所以有误差):
select (greatest(a.id,b.id)/relpages)*1.15 from -- 乘以1.15的放大系数
(select n_live_tup+n_dead_tup as id from pg_stat_all_tables where relid='public.a'::regclass) a,
(select reltuples as id from pg_class where oid='public.a'::regclass) as b,
(select relpages from pg_class where oid='public.a'::regclass) c;
?column?
------------------
212.750006382495
(1 row)
这种方法,并不能保证百分百完全一致。只是很大概率上可以做到一致。当有很多垃圾时,可能导致膨胀,计算得到的每个块内的记录数会变少,就可能导致数据缺失。
然后,我们就可以使用以上方法来并行查询单张表了。
postgres=# show enable_tidscan ;
enable_tidscan
----------------
on
(1 row)
postgres=# select count(*) from a where ctid = any (split_ctid(540541, 212, 64, 0));
count
---------
1562325
(1 row)
Time: 1818.409 ms (00:01.818)
开启64个并行,每个并行的耗时约1.8秒。
如果不使用并行,扫描1亿记录需要耗费7.2秒。
postgres=# select count(*) from a;
count
-----------
100000000
(1 row)
Time: 7271.676 ms (00:07.272)
我们看一下64个并行加起来的记录数是否准确
do language plpgsql $$
declare
v_sum int :=0;
v_tmp int;
begin
for i in 0..63 loop
select count(*) into v_tmp from a where ctid = any (split_ctid(540541, 212, 64, i));
v_sum := v_sum + v_tmp;
end loop;
raise notice 'sum: %', v_sum;
end;
$$;
NOTICE: sum: 100000000
DO
准确无误。
但是请特别注意:
这种方法,并不能保证百分百完全一致。只是很大概率上可以做到一致。当有很多垃圾时,可能导致膨胀,计算得到的每个块内的记录数会变少,就可能导致数据缺失。
并行例子
并行就不用说了,程序端发起。在数据库中使用dblink异步调用可以模拟效果。
create extension dblink;
create or replace function conn(
name, -- dblink名字
text -- 连接串,URL
) returns void as $$
declare
begin
perform dblink_connect($1, $2);
return;
exception when others then
return;
end;
$$ language plpgsql strict;
create or replace function get_cnt(v_blocks int, v_rowsperblk int, v_mod int) returns setof record as $$
declare
begin
for i in 0..(v_mod-1) loop
perform conn('link'||i, 'hostaddr=127.0.0.1 port=1923 user=postgres dbname=postgres');
perform 1 from dblink_get_result('link'||i) as t(cnt int8);
perform dblink_send_query('link'||i, format('select count(*) as cnt from a where ctid = any (split_ctid(%s, %s, %s, %s)) ', v_blocks, v_rowsperblk, v_mod, i));
end loop;
for i in 0..(v_mod-1) loop
return query select * from dblink_get_result('link'||i) as t(cnt int8);
end loop;
end;
$$ language plpgsql strict;
postgres=# select sum(cnt) from get_cnt(540541, 190, 48) t (cnt int8);
sum
-----------
100000000
(1 row)
Time: 4949.961 ms (00:04.950)
小结
目前PostgreSQL还没有把指定某个数据块的数据记录全部取出的扫描方法,需要扩展采样接口或者AM节点,才能实现此功能。再此之前,我们可以使用按行号扫描,将一个BLOCK中的记录全部取出来,不过这个方法有一定的弊端,就是我们并不知道一个BLOCK里面到底有多少条记录,写多了浪费性能,写少了又会导致记录获取缺失。所以最好还是在内核层面实现BLOCK级别的扫描,告诉数据库你要扫描哪个BLOCK的数据,直接OFFSET到对应的BLOCK并将这个BLOCK的记录全部取出。
可以借鉴PostgreSQL采样接口来实现按块扫描。
https://www.postgresql.org/docs/devel/static/tablesample-method.html
select * from tbl where ctid between ? and ?; 从某行扫描到某行,支持跨BLOCK。
select * from tbl where block_id between ? and ?; 从某个BLOCK扫描到某个BLOCK
select * from tbl where ctid = any (array(?)); -- 已支持。扫描指定集合
select * from tbl where block_id = any(array(?)); -- 扫描指定BLOCK集合
本文通过块评估、行评估、以及行扫描的方法,实现了一个比较折中的并行扫描连续BLOCK的方法。
参考
《PostgreSQL VOPS 向量计算 + DBLINK异步并行 - 单实例 10亿 聚合计算跑进2秒》
《PostgreSQL 相似搜索分布式架构设计与实践 - dblink异步调用与多机并行(远程 游标+记录 UDF实例)》
《惊天性能!单RDS PostgreSQL实例 支撑 2000亿 - 实时标签透视案例 (含dblink异步并行调用)》
《阿里云RDS PostgreSQL OSS 外部表 - (dblink异步调用封装)并行写提速案例》
https://www.postgresql.org/docs/devel/static/tablesample-method.html
《PostgreSQL 任意列组合条件 行数估算 实践 - 采样估算》
《秒级任意维度分析1TB级大表 - 通过采样估值满足高效TOP N等统计分析需求》
《PostgreSQL Oracle 兼容性 之 - 数据采样与脱敏》