PostgreSQL 流式数据处理(聚合、过滤、转换…)系列 - 3
背景
2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。
逐步优化,化繁为简。
在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。
比如以COUNT就是一个很典型的例子。
在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.
9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.
但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.
到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)
另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。
那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.
正文
前两篇blog介绍了表的count(*)的优化, 但是未涉及group by column的情况, 有兴趣的朋友可以阅读 :
http://blog.163.com/digoal@126/blog/static/163877040201331252945440/
http://blog.163.com/digoal@126/blog/static/16387704020133151402415/
本例则是这个话题的延展. 主要讲述针对列group by的count(*)的优化.
测试表 :
postgres=# create table log (id serial primary key, c1 int not null, c2 int not null, c3 int not null, c4 text not null, crt_time timestamp);
CREATE TABLE
存放count()的表, 假设经常需要按log.c1以及log.crt_time分天, 周, 月, 年进行count()
create table log_c1_cnt_day (c1 int, pid int, cnt int8, stat_time text, primary key(c1,pid,stat_time));
create table log_c1_cnt_week (c1 int, pid int, cnt int8, stat_time text, primary key(c1,pid,stat_time));
create table log_c1_cnt_month (c1 int, pid int, cnt int8, stat_time text, primary key(c1,pid,stat_time));
create table log_c1_cnt_year (c1 int, pid int, cnt int8, stat_time text, primary key(c1,pid,stat_time));
存放count()的表, 假设经常需要按log.c2, log.c3以及log.crt_time分天, 周, 月, 年进行count()
create table log_c2_c3_cnt_day (c2 int, c3 int, pid int, cnt int8, stat_time text, primary key(c2,c3,pid,stat_time));
create table log_c2_c3_cnt_week (c2 int, c3 int, pid int, cnt int8, stat_time text, primary key(c2,c3,pid,stat_time));
create table log_c2_c3_cnt_month (c2 int, c3 int, pid int, cnt int8, stat_time text, primary key(c2,c3,pid,stat_time));
create table log_c2_c3_cnt_year (c2 int, c3 int, pid int, cnt int8, stat_time text, primary key(c2,c3,pid,stat_time));
创建插入触发器函数, 每条插入的
注意触发器内执行是串行的,后面会将优化方法
CREATE OR REPLACE FUNCTION public.tg_insert_log()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
declare
v_pid int;
v_cnt1 int8 := null;
v_cnt2 int8 := null;
v_cnt3 int8 := null;
v_cnt4 int8 := null;
begin
select pg_backend_pid() into v_pid;
-- c1统计
update log_c1_cnt_day set cnt=cnt+1 where c1=NEW.c1 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyymmdd') returning cnt into v_cnt1;
update log_c1_cnt_week set cnt=cnt+1 where c1=NEW.c1 and pid=v_pid and stat_time=to_char(date(NEW.crt_time)-(EXTRACT(ISODOW FROM date(NEW.crt_time)))::int+1,'yyyymmdd') returning cnt into v_cnt2;
update log_c1_cnt_month set cnt=cnt+1 where c1=NEW.c1 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyymm') returning cnt into v_cnt3;
update log_c1_cnt_year set cnt=cnt+1 where c1=NEW.c1 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyy') returning cnt into v_cnt4;
if v_cnt1 is null then
insert into log_c1_cnt_day(c1, pid, cnt, stat_time) values (NEW.c1, v_pid, 1, to_char(NEW.crt_time,'yyyymmdd'));
end if;
if v_cnt2 is null then
insert into log_c1_cnt_week(c1, pid, cnt, stat_time) values (NEW.c1, v_pid, 1, to_char(date(NEW.crt_time)-(EXTRACT(ISODOW FROM date(NEW.crt_time)))::int+1,'yyyymmdd'));
end if;
if v_cnt3 is null then
insert into log_c1_cnt_month(c1, pid, cnt, stat_time) values (NEW.c1, v_pid, 1, to_char(NEW.crt_time,'yyyymm'));
end if;
if v_cnt4 is null then
insert into log_c1_cnt_year(c1, pid, cnt, stat_time) values (NEW.c1, v_pid, 1, to_char(NEW.crt_time,'yyyy'));
end if;
-- c2_c3 统计
v_cnt1 := null;
v_cnt2 := null;
v_cnt3 := null;
v_cnt4 := null;
update log_c2_c3_cnt_day set cnt=cnt+1 where c2=NEW.c2 and c3=NEW.c3 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyymmdd') returning cnt into v_cnt1;
update log_c2_c3_cnt_week set cnt=cnt+1 where c2=NEW.c2 and c3=NEW.c3 and pid=v_pid and stat_time=to_char(date(NEW.crt_time)-(EXTRACT(ISODOW FROM date(NEW.crt_time)))::int+1,'yyyymmdd') returning cnt into v_cnt2;
update log_c2_c3_cnt_month set cnt=cnt+1 where c2=NEW.c2 and c3=NEW.c3 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyymm') returning cnt into v_cnt3;
update log_c2_c3_cnt_year set cnt=cnt+1 where c2=NEW.c2 and c3=NEW.c3 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyy') returning cnt into v_cnt4;
if v_cnt1 is null then
insert into log_c2_c3_cnt_day(c2, c3, pid, cnt, stat_time) values (NEW.c2, NEW.c3, v_pid, 1, to_char(NEW.crt_time,'yyyymmdd'));
end if;
if v_cnt2 is null then
insert into log_c2_c3_cnt_week(c2, c3, pid, cnt, stat_time) values (NEW.c2, NEW.c3, v_pid, 1, to_char(date(NEW.crt_time)-(EXTRACT(ISODOW FROM date(NEW.crt_time)))::int+1,'yyyymmdd'));
end if;
if v_cnt3 is null then
insert into log_c2_c3_cnt_month(c2, c3, pid, cnt, stat_time) values (NEW.c2, NEW.c3, v_pid, 1, to_char(NEW.crt_time,'yyyymm'));
end if;
if v_cnt4 is null then
insert into log_c2_c3_cnt_year(c2, c3, pid, cnt, stat_time) values (NEW.c2, NEW.c3, v_pid, 1, to_char(NEW.crt_time,'yyyy'));
end if;
-- 其他列统计, 以此类推
return null;
end;
$function$;
创建删除触发器函数
CREATE OR REPLACE FUNCTION public.tg_delete_log()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
declare
v_pid int;
v_cnt1 int8 := null;
v_cnt2 int8 := null;
v_cnt3 int8 := null;
v_cnt4 int8 := null;
begin
select pg_backend_pid() into v_pid;
-- c1统计
update log_c1_cnt_day set cnt=cnt-1 where c1=OLD.c1 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyymmdd') returning cnt into v_cnt1;
update log_c1_cnt_week set cnt=cnt-1 where c1=OLD.c1 and pid=v_pid and stat_time=to_char(date(OLD.crt_time)-(EXTRACT(ISODOW FROM date(OLD.crt_time)))::int+1,'yyyymmdd') returning cnt into v_cnt2;
update log_c1_cnt_month set cnt=cnt-1 where c1=OLD.c1 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyymm') returning cnt into v_cnt3;
update log_c1_cnt_year set cnt=cnt-1 where c1=OLD.c1 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyy') returning cnt into v_cnt4;
if v_cnt1 is null then
insert into log_c1_cnt_day(c1, pid, cnt, stat_time) values (OLD.c1, v_pid, -1, to_char(OLD.crt_time,'yyyymmdd'));
end if;
if v_cnt2 is null then
insert into log_c1_cnt_week(c1, pid, cnt, stat_time) values (OLD.c1, v_pid, -1, to_char(date(OLD.crt_time)-(EXTRACT(ISODOW FROM date(OLD.crt_time)))::int+1,'yyyymmdd'));
end if;
if v_cnt3 is null then
insert into log_c1_cnt_month(c1, pid, cnt, stat_time) values (OLD.c1, v_pid, -1, to_char(OLD.crt_time,'yyyymm'));
end if;
if v_cnt4 is null then
insert into log_c1_cnt_year(c1, pid, cnt, stat_time) values (OLD.c1, v_pid, -1, to_char(OLD.crt_time,'yyyy'));
end if;
-- c2_c3 统计
v_cnt1 := null;
v_cnt2 := null;
v_cnt3 := null;
v_cnt4 := null;
update log_c2_c3_cnt_day set cnt=cnt-1 where c2=OLD.c2 and c3=OLD.c3 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyymmdd') returning cnt into v_cnt1;
update log_c2_c3_cnt_week set cnt=cnt-1 where c2=OLD.c2 and c3=OLD.c3 and pid=v_pid and stat_time=to_char(date(OLD.crt_time)-(EXTRACT(ISODOW FROM date(OLD.crt_time)))::int+1,'yyyymmdd') returning cnt into v_cnt2;
update log_c2_c3_cnt_month set cnt=cnt-1 where c2=OLD.c2 and c3=OLD.c3 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyymm') returning cnt into v_cnt3;
update log_c2_c3_cnt_year set cnt=cnt-1 where c2=OLD.c2 and c3=OLD.c3 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyy') returning cnt into v_cnt4;
if v_cnt1 is null then
insert into log_c2_c3_cnt_day(c2, c3, pid, cnt, stat_time) values (OLD.c2, OLD.c3, v_pid, -1, to_char(OLD.crt_time,'yyyymmdd'));
end if;
if v_cnt2 is null then
insert into log_c2_c3_cnt_week(c2, c3, pid, cnt, stat_time) values (OLD.c2, OLD.c3, v_pid, -1, to_char(date(OLD.crt_time)-(EXTRACT(ISODOW FROM date(OLD.crt_time)))::int+1,'yyyymmdd'));
end if;
if v_cnt3 is null then
insert into log_c2_c3_cnt_month(c2, c3, pid, cnt, stat_time) values (OLD.c2, OLD.c3, v_pid, -1, to_char(OLD.crt_time,'yyyymm'));
end if;
if v_cnt4 is null then
insert into log_c2_c3_cnt_year(c2, c3, pid, cnt, stat_time) values (OLD.c2, OLD.c3, v_pid, -1, to_char(OLD.crt_time,'yyyy'));
end if;
-- 其他列统计, 以此类推
return null;
end;
$function$;
创建truncate触发器函数
CREATE OR REPLACE FUNCTION public.tg_truncate_log()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
declare
begin
-- c1统计
truncate log_c1_cnt_day;
truncate log_c1_cnt_week;
truncate log_c1_cnt_month;
truncate log_c1_cnt_year;
-- c2_c3 统计
truncate log_c2_c3_cnt_day;
truncate log_c2_c3_cnt_week;
truncate log_c2_c3_cnt_month;
truncate log_c2_c3_cnt_year;
return null;
end;
$function$;
创建触发器
create trigger tg1 after insert on log for each row execute procedure tg_insert_log();
create trigger tg2 after delete on log for each row execute procedure tg_delete_log();
create trigger tg3 after truncate on log for each statement execute procedure tg_truncate_log();
INSERT pgbench
pg92@digoal-PowerEdge-R610-> cat insert.sql
\setrandom c1 1 10
\setrandom c2 1 5
\setrandom c3 1 2
\setrandom c4 1 100
\setrandom day 1 300
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c4, current_date+:day::int);
测试结果
pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./insert.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 16
number of threads: 4
duration: 60 s
number of transactions actually processed: 392105
tps = 6533.401527 (including connections establishing)
tps = 6534.950565 (excluding connections establishing)
statement latencies in milliseconds:
0.003611 \setrandom c1 1 10
0.000821 \setrandom c2 1 5
0.000762 \setrandom c3 1 2
0.000740 \setrandom c4 1 100
0.000873 \setrandom day 1 300
2.438667 insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c4, current_date+:day::int);
UPDATE pgbench
pg92@digoal-PowerEdge-R610-> cat id.sql
\setrandom c1 1 10
\setrandom c2 1 5
\setrandom c3 1 2
\setrandom c4 1 100
\setrandom day 1 300
\setrandom id 1 5000000
begin;
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c4, current_date+:day::int);
insert into log (c1,c2,c3,c4,crt_time) values (:c2, :c1, :c3, :c4, current_date+:day::int);
delete from log where id=:id;
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c4, :c3, current_date+:day::int);
delete from log where id=:id;
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c1, :c4, current_date+:day::int);
delete from log where id=:id;
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c1, :c3, :c4, current_date+:day::int);
delete from log where id=:id;
delete from log where id=:id;
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c1, current_date+:day::int);
end;
测试结果
pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 16
number of threads: 4
duration: 60 s
number of transactions actually processed: 68535
tps = 1141.824233 (including connections establishing)
tps = 1142.090645 (excluding connections establishing)
statement latencies in milliseconds:
0.004085 \setrandom c1 1 10
0.000841 \setrandom c2 1 5
0.000848 \setrandom c3 1 2
0.000800 \setrandom c4 1 100
0.000899 \setrandom day 1 300
0.000858 \setrandom id 1 5000000
0.124333 begin;
2.105232 insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c4, current_date+:day::int);
2.099313 insert into log (c1,c2,c3,c4,crt_time) values (:c2, :c1, :c3, :c4, current_date+:day::int);
0.553053 delete from log where id=:id;
1.951419 insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c4, :c3, current_date+:day::int);
0.484705 delete from log where id=:id;
1.856407 insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c1, :c4, current_date+:day::int);
0.479868 delete from log where id=:id;
1.661506 insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c1, :c3, :c4, current_date+:day::int);
0.469769 delete from log where id=:id;
0.267489 delete from log where id=:id;
1.623206 insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c1, current_date+:day::int);
0.301404 end;
验证数据准确性的SQL如下 :
select sum(hashtext(t.*::text)) from (select c1,stat_time,sum(cnt) from log_c1_cnt_year group by 1,2 having sum(cnt)<>0 order by 1,2) t;
select sum(hashtext(t.*::text)) from (select c1,to_char(crt_time,'yyyy'),count(*) from log group by 1,2 having count(*)<>0 order by 1,2) t;
select sum(hashtext(t.*::text)) from (select c1,stat_time,sum(cnt) from log_c1_cnt_month group by 1,2 having sum(cnt)<>0 order by 1,2) t;
select sum(hashtext(t.*::text)) from (select c1,to_char(crt_time,'yyyymm'),count(*) from log group by 1,2 having count(*)<>0 order by 1,2) t;
select sum(hashtext(t.*::text)) from (select c1,stat_time,sum(cnt) from log_c1_cnt_week group by 1,2 having sum(cnt)<>0 order by 1,2) t;
select sum(hashtext(t.*::text)) from (select c1,to_char(date(crt_time)-(EXTRACT(ISODOW FROM date(crt_time)))::int+1,'yyyymmdd'),count(*) from log group by 1,2 having count(*)<>0 order by 1,2) t;
select sum(hashtext(t.*::text)) from (select c1,stat_time,sum(cnt) from log_c1_cnt_day group by 1,2 having sum(cnt)<>0 order by 1,2) t;
select sum(hashtext(t.*::text)) from (select c1,to_char(crt_time,'yyyymmdd'),count(*) from log group by 1,2 having count(*)<>0 order by 1,2) t;
select sum(hashtext(t.*::text)) from (select c2,c3,stat_time,sum(cnt) from log_c2_c3_cnt_year group by 1,2,3 having sum(cnt)<>0 order by 1,2,3) t;
select sum(hashtext(t.*::text)) from (select c2,c3,to_char(crt_time,'yyyy'),count(*) from log group by 1,2,3 having count(*)<>0 order by 1,2,3) t;
select sum(hashtext(t.*::text)) from (select c2,c3,stat_time,sum(cnt) from log_c2_c3_cnt_month group by 1,2,3 having sum(cnt)<>0 order by 1,2,3) t;
select sum(hashtext(t.*::text)) from (select c2,c3,to_char(crt_time,'yyyymm'),count(*) from log group by 1,2,3 having count(*)<>0 order by 1,2,3) t;
select sum(hashtext(t.*::text)) from (select c2,c3,stat_time,sum(cnt) from log_c2_c3_cnt_week group by 1,2,3 having sum(cnt)<>0 order by 1,2,3) t;
select sum(hashtext(t.*::text)) from (select c2,c3,to_char(date(crt_time)-(EXTRACT(ISODOW FROM date(crt_time)))::int+1,'yyyymmdd'),count(*) from log group by 1,2,3 having count(*)<>0 order by 1,2,3) t;
select sum(hashtext(t.*::text)) from (select c2,c3,stat_time,sum(cnt) from log_c2_c3_cnt_day group by 1,2,3 having sum(cnt)<>0 order by 1,2,3) t;
select sum(hashtext(t.*::text)) from (select c2,c3,to_char(crt_time,'yyyymmdd'),count(*) from log group by 1,2,3 having count(*)<>0 order by 1,2,3) t;
经过验证数据准确.
小结
1.
实时性的统计带来的开销与维度有关, 维度越多, 开销越大, 这样带来的后果是入口表(明细表)的dml吞吐量会急剧下降.
下一篇将介绍非实时的统计. 结合非实时的统计以及明细数据也能达到实时count的效果.
为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :
1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/
2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/
3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/
4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/
5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/
6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/
7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/
8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/