PostgreSQL 流式数据处理(聚合、过滤、转换…)系列 - 8
背景
2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。
逐步优化,化繁为简。
在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。
比如以COUNT就是一个很典型的例子。
在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.
9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.
但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.
到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)
另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。
那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.
正文
前面以及写了7篇关于count(*)准实时和实时统计的案例, 有基于触发器的, 有基于xid单线程取数据的.
本文的优化方法与londiste的ticket有点类似, 即数据切片.
将log表切分为多段, 这样的话如果分析线程串行处理速度跟不上的话, 可以并行处理. 有效的解决了基于xid单线程取数据的性能问题.
具体的场景就不介绍了, 前面7篇按顺序读下来就知道了.
1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/
2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/
3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/
4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/
5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/
6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/
7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/
这里只介绍切片函数和取数据函数.
测试表 :
create table log
(
id serial primary key,
xid int8 default txid_current() not null,
c1 int not null,
c2 int not null,
c3 int not null,
c4 text not null,
crt_time timestamp default now()
);
create index idx_log_1 on log(xid);
切片表
create table log_ticket(
id serial8 primary key,
split_xid int8 not null default txid_current(), -- 切片时的xid, 取数据的事务的xid必须大于该xid
split_time timestamp not null default now(),
log_xid_le int8 not null,
log_xid_st int8 not null,
log_xip int8[],
xid_readed boolean not null default false,
xip_readed boolean not null default false
);
切片函数
create or replace function log_spilt(
i_timeout_sec int, -- 上次切片以来超出多少秒, 则新增切片
i_limit int -- 上次切片以来超出多少条, 则新增切片
) returns void as $$
declare
v_advisory_xact_lock int8 := null; -- 串行处理锁.
v_xid_snap txid_snapshot := null; -- 当前事务状态快照
v_xmin int8 := null; -- 当前事务状态快照中未完成的最小事务
v_xmax int8 := null; -- 当前事务状态快照中未分配的最小事务
v_xip int8[] := null; -- 当前事务状态快照中未完成的事务数组
v_log_xid_le int8 := null; -- 大于等于该xid
v_log_xid_st int8 := null; -- 小于该xid
v_split_time timestamp := null; -- 上次切片时间
v_cnt int := null; -- 从上次切片后有多少条log被插入了
v_log_xip int8[] := null; -- 记录切片数据段内未完成的xid
begin
-- 判断i_timeout_sec, i_limit
if ( i_timeout_sec<=0 or i_limit <=0 ) then
raise notice 'please ensure i_timeout_sec > 0 and i_limit > 0 .';
return;
end if;
-- 串行处理, 如果不能获得锁则直接退出. 确保v_advisory_xact_lock全局唯一.
v_advisory_xact_lock := 1;
if not pg_try_advisory_xact_lock(v_advisory_xact_lock) then
raise notice 'Another function is calling, this call will exit.';
return;
end if;
-- 生成 xid snapshot 数据.
v_xid_snap := txid_current_snapshot();
v_xmin := txid_snapshot_xmin(v_xid_snap);
v_xmax := txid_snapshot_xmax(v_xid_snap);
select array_agg(t) into v_xip from txid_snapshot_xip(v_xid_snap) g(t);
perform 1 from log_ticket limit 1;
if found then
select log_xid_st,split_time into v_log_xid_le,v_split_time from log_ticket order by split_xid desc limit 1;
select count(*),(max(xid)+1) into v_cnt,v_log_xid_st from (select xid from log where xid >= v_log_xid_le and xid < v_xmax order by xid limit i_limit) t;
if ( ((now()-v_split_time) >= i_timeout_sec::text::interval or v_cnt>=i_limit) and v_cnt>=1 ) then
select array_agg(i) into v_log_xip from unnest(v_xip) i where i>=v_log_xid_le and i<v_log_xid_st;
insert into log_ticket(log_xid_le,log_xid_st,log_xip,xip_readed)
values (v_log_xid_le,v_log_xid_st,v_log_xip,case when v_log_xip is null then true else false end);
else
raise notice '不满足split条件';
end if;
else
select count(*),min(xid),(max(xid)+1) into v_cnt,v_log_xid_le,v_log_xid_st from (select xid from log where xid < v_xmax order by xid limit i_limit) t;
if ( v_cnt>=i_limit ) then
select array_agg(i) into v_log_xip from unnest(v_xip) i where i>=v_log_xid_le and i<v_log_xid_st;
insert into log_ticket(log_xid_le,log_xid_st,log_xip,xip_readed)
values (v_log_xid_le,v_log_xid_st,v_log_xip,case when v_log_xip is null then true else false end);
else
raise notice '不满足split条件';
end if;
end if;
return;
end;
$$ language plpgsql strict;
取数据函数
create or replace function get_log(i_mod int, i_mod_rem int) returns log[] as $$
declare
v_xid_now int8 := null;
v_log_ticket_pk int8 := null;
v_log_xid_le int8 := null;
v_log_xid_st int8 := null;
v_log_xip int8[] := null;
v_xid_readed boolean := null;
v_xip_readed boolean := null;
v_xid_snap txid_snapshot := null; -- 当前事务状态快照
v_xip int8[] := null; -- 当前事务状态快照中未完成的事务数组
v_log1 log[] := null;
v_log2 log[] := null;
v_result log[] := null; -- 结果
begin
-- mod约束
if (not i_mod>0) or (not (i_mod_rem>=0 and i_mod_rem<i_mod)) then
raise notice 'please ensure i_mod>0 and 0<=i_mod_rem<i_mod .';
return null;
end if;
v_xid_now := txid_current();
-- 生成 xid snapshot 数据.
v_xid_snap := txid_current_snapshot();
select array_agg(t) into v_xip from txid_snapshot_xip(v_xid_snap) g(t);
select id,log_xid_le,log_xid_st,log_xip,xid_readed,xip_readed
into v_log_ticket_pk,v_log_xid_le,v_log_xid_st,v_log_xip,v_xid_readed,v_xip_readed
from log_ticket
where split_xid<v_xid_now
and (not xid_readed or not xip_readed)
and mod(id,i_mod)=i_mod_rem
limit 1
for update;
if found then
if not v_xid_readed then
select array_agg(log) into v_log1 from log where xid>=v_log_xid_le and xid<v_log_xid_st and (not (xid = any(v_log_xip)) or v_log_xip is null);
v_xid_readed := true;
v_result := v_log1;
end if;
if (not v_xip_readed) and v_log_xip is not null then
perform 1 from unnest(v_xip) i where i = any(v_log_xip);
if (not found) or v_xip is null then
select array_agg(log) into v_log2 from log where xid = any(v_log_xip);
v_xip_readed := true;
v_result := array_cat(v_result,v_log2);
end if;
end if;
else
raise notice 'no data in log_ticket with i_mod=% and i_mod_rem=%.', i_mod, i_mod_rem;
return null;
end if;
update log_ticket set xid_readed=v_xid_readed,xip_readed=v_xip_readed where id=v_log_ticket_pk;
return v_result;
end;
$$ language plpgsql strict;
测试
pg92@digoal-PowerEdge-R610-> cat ins.sql
begin;
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
end;
begin;
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
rollback;
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);
pgbench
pg92@digoal-PowerEdge-R610-> pgbench -M prepared -f ./ins.sql -r -n -h $PGDATA -U postgres -T 60 -c 32 -j 2
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 32
number of threads: 2
duration: 60 s
number of transactions actually processed: 84781
tps = 1411.655384 (including connections establishing)
tps = 1412.958688 (excluding connections establishing)
pgbench测试过程中执行select log_spilt(1,100000);
digoal=# select log_spilt(1,10000);
log_spilt
-----------
(1 row)
digoal=# select log_spilt(1,10000);
log_spilt
-----------
(1 row)
digoal=# select log_spilt(1,1000000000);
log_spilt
-----------
(1 row)
digoal=# select log_spilt(1,100000);
log_spilt
-----------
(1 row)
... 略
digoal=# select log_spilt(1,100000000);
log_spilt
-----------
(1 row)
确保所有数据都写入log_ticket了.
digoal=# select min(log_xid_le),max(log_xid_st) from log_ticket;
min | max
-----------+-----------
505354161 | 506624142
(1 row)
digoal=# select min(xid),max(xid) from log;
min | max
-----------+-----------
505354161 | 506624141
(1 row)
查询log_ticket
digoal=# select * from log_ticket;
id | split_xid | split_time | log_xid_le | log_xid_st |
log_xip
| xid_readed | xip_readed
----+-----------+----------------------------+------------+------------+------------------------------------------------------------
------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------+------------+------------
26 | 506284999 | 2013-05-03 15:05:53.963654 | 505354161 | 505356093 | NULL
| f | t
27 | 506285000 | 2013-05-03 15:05:54.887422 | 505356093 | 505358092 | NULL
| f | t
28 | 506285001 | 2013-05-03 15:05:55.369707 | 505358092 | 505360095 | NULL
| f | t
29 | 506285002 | 2013-05-03 15:05:55.757843 | 505360095 | 505362101 | NULL
| f | t
30 | 506285003 | 2013-05-03 15:05:56.132186 | 505362101 | 505364103 | NULL
| f | t
31 | 506285004 | 2013-05-03 15:05:56.487359 | 505364103 | 505366103 | NULL
| f | t
32 | 506285005 | 2013-05-03 15:05:56.921683 | 505366103 | 505368098 | NULL
| f | t
33 | 506285006 | 2013-05-03 15:05:57.220986 | 505368098 | 505370109 | NULL
| f | t
34 | 506285007 | 2013-05-03 15:05:57.624854 | 505370109 | 505372109 | NULL
| f | t
35 | 506285008 | 2013-05-03 15:05:58.055403 | 505372109 | 505374113 | NULL
| f | t
36 | 506315377 | 2013-05-03 15:07:22.414773 | 505374113 | 505376115 | NULL
| f | t
37 | 506320066 | 2013-05-03 15:07:23.147482 | 505376115 | 505378109 | NULL
| f | t
38 | 506409173 | 2013-05-03 15:07:36.081153 | 505378109 | 506384559 | {506384464,506384481,506384492,506384503,506384507,50638450
9,506384517,506384521,506384522,506384523,506384525,506384529,506384534,506384535,506384537,506384538,506384539,506384541,506384542,
506384543,506384546,506384548,506384552,506384553,506384554,506384556,506384557} | f | f
39 | 506423332 | 2013-05-03 15:07:43.592782 | 506384559 | 506404567 | NULL
| f | t
40 | 506427897 | 2013-05-03 15:07:44.306657 | 506404567 | 506424564 | NULL
| f | t
41 | 506434174 | 2013-05-03 15:07:45.330145 | 506424564 | 506433989 | {506433901,506433908,506433928,506433934,506433937,50643393
8,506433940,506433941,506433946,506433947,506433954,506433955,506433956,506433957,506433958,506433960,506433962,506433964,506433965,
506433968,506433971,506433974,506433976,506433980,506433985} | f | f
42 | 506442222 | 2013-05-03 15:07:46.573332 | 506433989 | 506442067 | {506441993,506442005,506442008,506442012,506442015,50644201
6,506442024,506442032,506442033,506442034,506442037,506442039,506442040,506442041,506442042,506442043,506442045,506442047,506442048,
506442051,506442052,506442053,506442055,506442059,506442062,506442064,506442065} | f | f
43 | 506467572 | 2013-05-03 15:07:50.910016 | 506442067 | 506462070 | NULL
| f | t
44 | 506473021 | 2013-05-03 15:07:51.998294 | 506462070 | 506472808 | {506472727,506472747,506472751,506472763,506472764,50647276
7,506472773,506472774,506472775,506472777,506472778,506472779,506472780,506472781,506472782,506472794,506472796,506472799,506472800,
506472801,506472802,506472803,506472804} | f | f
(19 rows)
pgbench过程中同时执行get_log
digoal=# select count(*) from unnest(get_log(2,0));
count
-------
10000
(1 row)
digoal=# select count(*) from unnest(get_log(2,1));
count
-------
10000
(1 row)
...
直到所有线程都报
digoal=# select count(*) from unnest(get_log(2,0));
NOTICE: no data in log_ticket with i_mod=2 and i_mod_rem=0.
count
-------
0
(1 row)
digoal=# select count(*) from unnest(get_log(2,1));
NOTICE: no data in log_ticket with i_mod=2 and i_mod_rem=1.
count
-------
0
(1 row)
略
把所有的get_log结果相加看看是否等于count(*) from log;
验证结果正确.
此处略.
其他
1. 如果觉得取数据的函数不太通用, 可以改成返回log_ticket的游标. 程序尽管自己去取.
为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :
1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/
2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/
3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/
4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/
5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/
6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/
7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/
8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/