PostgreSQL 流式数据处理(聚合、过滤、转换…)系列 - 2
背景
2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。
逐步优化,化繁为简。
在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。
比如以COUNT就是一个很典型的例子。
在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.
9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.
但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.
到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)
另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。
那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.
正文
接上一篇blog
http://blog.163.com/digoal@126/blog/static/163877040201331252945440/
上一篇blog的后面一部分讲解了如何利用advisory lock来规避死锁的问题.
但是在事务中如果a的dml sql只有1条, 即不可能发生死锁的情况下, 这种方法性能远不如不做死锁规避的函数.
因此还有其他更优秀的方法来规避死锁吗?
当然有的, 这里就要介绍另一种方法, 结合Linux process id来使用.
当一个客户端连接PostgreSQL的时候, Postgres进行会fork一个进程出来, 称为PostgreSQL backend process. 这个进程负责与客户端进行交互.
接下来就要用PostgreSQL backend process的process id来标记cnt_a中的id字段.
因为同一时间点每个backend process 的pid不一样, 完全规避了死锁的问题.
具体方法如下 :
postgres=# create table a(id serial4 primary key, info text, crt_time timestamp(0) default now());
NOTICE: CREATE TABLE will create implicit sequence "a_id_seq" for serial column "a.id"
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "a_pkey" for table "a"
CREATE TABLE
postgres=# create table cnt_a(id int primary key, cnt int);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "cnt_a_pkey" for table "cnt_a"
CREATE TABLE
cnt_a不需要初始值了
1. 创建插入触发器函数
CREATE OR REPLACE FUNCTION public.tg_insert_a()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
declare
begin
update cnt_a set cnt=cnt+1 where id=pg_backend_pid();
if not found then
insert into cnt_a(id, cnt) values (pg_backend_pid(), 1);
end if;
return null;
end;
$function$;
2. 创建删除触发器函数
CREATE OR REPLACE FUNCTION public.tg_delete_a()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
declare
begin
update cnt_a set cnt=cnt-1 where id=pg_backend_pid();
if not found then
insert into cnt_a(id, cnt) values (pg_backend_pid(), -1);
end if;
return null;
end;
$function$;
3. 创建truncate触发器函数
CREATE OR REPLACE FUNCTION public.tg_truncate_a()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
declare
begin
update cnt_a set cnt=0 where not cnt=0;
return null;
end;
$function$;
4. 创建触发器
create trigger tg1 after insert on a for each row execute procedure tg_insert_a();
create trigger tg2 after delete on a for each row execute procedure tg_delete_a();
create trigger tg3 after truncate on a for each statement execute procedure tg_truncate_a();
5. 测试单dmlsql场景pgbench, 有显著提升.
pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./insert.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 16
number of threads: 4
duration: 60 s
number of transactions actually processed: 2137104
tps = 35606.972536 (including connections establishing)
tps = 35615.523750 (excluding connections establishing)
statement latencies in milliseconds:
0.447084 insert into a (info) values ('test');
验证
postgres=# select count(*) from a;
count
---------
2137104
(1 row)
postgres=# select sum(cnt) from cnt_a ;
sum
---------
2137104
(1 row)
6. 测试多dmlsql场景pgbench, 有显著提升.
pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 16
number of threads: 4
duration: 60 s
number of transactions actually processed: 275704
tps = 4593.421227 (including connections establishing)
tps = 4594.495475 (excluding connections establishing)
statement latencies in milliseconds:
0.003119 \setrandom id 1 20000000
0.179727 begin;
0.215766 delete from a where id=:id;
0.215785 delete from a where id=:id;
0.335462 insert into a (info) values ('test');
0.222234 delete from a where id=:id;
0.328336 insert into a (info) values ('test');
0.222417 delete from a where id=:id;
0.325938 insert into a (info) values ('test');
0.221347 delete from a where id=:id;
0.325768 insert into a (info) values ('test');
0.329100 insert into a (info) values ('test');
0.328841 insert into a (info) values ('test');
0.212162 end;
验证
postgres=# select count(*) from a;
count
---------
3791328
(1 row)
postgres=# select sum(cnt) from cnt_a ;
sum
---------
3791328
(1 row)
小结
1.
需要注意, 因为linux进程id范围可能会比较大, 最后cnt_a表也许会变得比较大.
如果有方法能够限定启动postgresql数据库用户的pid分配范围, 那么这将不会成为一个问题.
当然你也可以对cnt_a表进行整理, 合并掉一些记录. 给cnt_a表瘦身.
如下 :
postgres=# begin;
BEGIN
postgres=# select * from cnt_a limit 1;
id | cnt
-------+--------
11305 | 122315
(1 row)
postgres=# select sum(cnt) from cnt_a ;
sum
---------
3791328
(1 row)
postgres=# update cnt_a set cnt=3791328 where id=11305;
UPDATE 1
postgres=# delete from cnt_a where id<>11305;
DELETE 31
postgres=# end;
COMMIT
2.
应用本例的触发器函数, 最终的dml qps如下 :
单事务单dml sql场景 : 35615 qps
单事务多dml sql场景 : 50534 qps
为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :
1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/
2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/
3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/
4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/
5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/
6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/
7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/
8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/