PostgreSQL 流式数据处理(聚合、过滤、转换…)系列 - 3

8 minute read

背景

2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。

逐步优化,化繁为简。

在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。

比如以COUNT就是一个很典型的例子。

在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.

9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.

但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.

到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)

另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。

那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.

正文

前两篇blog介绍了表的count(*)的优化, 但是未涉及group by column的情况, 有兴趣的朋友可以阅读 :

http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

本例则是这个话题的延展. 主要讲述针对列group by的count(*)的优化.

测试表 :

postgres=# create table log (id serial primary key, c1 int not null, c2 int not null, c3 int not null, c4 text not null, crt_time timestamp);  
CREATE TABLE  

存放count()的表, 假设经常需要按log.c1以及log.crt_time分天, 周, 月, 年进行count()

create table log_c1_cnt_day (c1 int, pid int, cnt int8, stat_time text, primary key(c1,pid,stat_time));  
create table log_c1_cnt_week (c1 int, pid int, cnt int8, stat_time text, primary key(c1,pid,stat_time));  
create table log_c1_cnt_month (c1 int, pid int, cnt int8, stat_time text, primary key(c1,pid,stat_time));  
create table log_c1_cnt_year (c1 int, pid int, cnt int8, stat_time text, primary key(c1,pid,stat_time));  

存放count()的表, 假设经常需要按log.c2, log.c3以及log.crt_time分天, 周, 月, 年进行count()

create table log_c2_c3_cnt_day (c2 int, c3 int, pid int, cnt int8, stat_time text, primary key(c2,c3,pid,stat_time));  
create table log_c2_c3_cnt_week (c2 int, c3 int, pid int, cnt int8, stat_time text, primary key(c2,c3,pid,stat_time));  
create table log_c2_c3_cnt_month (c2 int, c3 int, pid int, cnt int8, stat_time text, primary key(c2,c3,pid,stat_time));  
create table log_c2_c3_cnt_year (c2 int, c3 int, pid int, cnt int8, stat_time text, primary key(c2,c3,pid,stat_time));  

创建插入触发器函数, 每条插入的

注意触发器内执行是串行的,后面会将优化方法

CREATE OR REPLACE FUNCTION public.tg_insert_log()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
  v_pid int;  
  v_cnt1 int8 := null;  
  v_cnt2 int8 := null;  
  v_cnt3 int8 := null;  
  v_cnt4 int8 := null;  
begin  
  select pg_backend_pid() into v_pid;  
  -- c1统计  
  update log_c1_cnt_day set cnt=cnt+1 where c1=NEW.c1 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyymmdd') returning cnt into v_cnt1;  
  update log_c1_cnt_week set cnt=cnt+1 where c1=NEW.c1 and pid=v_pid and stat_time=to_char(date(NEW.crt_time)-(EXTRACT(ISODOW FROM date(NEW.crt_time)))::int+1,'yyyymmdd') returning cnt into v_cnt2;  
  update log_c1_cnt_month set cnt=cnt+1 where c1=NEW.c1 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyymm') returning cnt into v_cnt3;  
  update log_c1_cnt_year set cnt=cnt+1 where c1=NEW.c1 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyy') returning cnt into v_cnt4;  
  if v_cnt1 is null then  
    insert into log_c1_cnt_day(c1, pid, cnt, stat_time) values (NEW.c1, v_pid, 1, to_char(NEW.crt_time,'yyyymmdd'));  
  end if;  
  if v_cnt2 is null then  
    insert into log_c1_cnt_week(c1, pid, cnt, stat_time) values (NEW.c1, v_pid, 1, to_char(date(NEW.crt_time)-(EXTRACT(ISODOW FROM date(NEW.crt_time)))::int+1,'yyyymmdd'));  
  end if;  
  if v_cnt3 is null then  
    insert into log_c1_cnt_month(c1, pid, cnt, stat_time) values (NEW.c1, v_pid, 1, to_char(NEW.crt_time,'yyyymm'));  
  end if;  
  if v_cnt4 is null then  
    insert into log_c1_cnt_year(c1, pid, cnt, stat_time) values (NEW.c1, v_pid, 1, to_char(NEW.crt_time,'yyyy'));  
  end if;  
  -- c2_c3 统计  
  v_cnt1 := null;  
  v_cnt2 := null;  
  v_cnt3 := null;  
  v_cnt4 := null;  
  update log_c2_c3_cnt_day set cnt=cnt+1 where c2=NEW.c2 and c3=NEW.c3 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyymmdd') returning cnt into v_cnt1;  
  update log_c2_c3_cnt_week set cnt=cnt+1 where c2=NEW.c2 and c3=NEW.c3 and pid=v_pid and stat_time=to_char(date(NEW.crt_time)-(EXTRACT(ISODOW FROM date(NEW.crt_time)))::int+1,'yyyymmdd') returning cnt into v_cnt2;  
  update log_c2_c3_cnt_month set cnt=cnt+1 where c2=NEW.c2 and c3=NEW.c3 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyymm') returning cnt into v_cnt3;  
  update log_c2_c3_cnt_year set cnt=cnt+1 where c2=NEW.c2 and c3=NEW.c3 and pid=v_pid and stat_time=to_char(NEW.crt_time,'yyyy') returning cnt into v_cnt4;  
  if v_cnt1 is null then  
    insert into log_c2_c3_cnt_day(c2, c3, pid, cnt, stat_time) values (NEW.c2, NEW.c3, v_pid, 1, to_char(NEW.crt_time,'yyyymmdd'));  
  end if;  
  if v_cnt2 is null then  
    insert into log_c2_c3_cnt_week(c2, c3, pid, cnt, stat_time) values (NEW.c2, NEW.c3, v_pid, 1, to_char(date(NEW.crt_time)-(EXTRACT(ISODOW FROM date(NEW.crt_time)))::int+1,'yyyymmdd'));  
  end if;  
  if v_cnt3 is null then  
    insert into log_c2_c3_cnt_month(c2, c3, pid, cnt, stat_time) values (NEW.c2, NEW.c3, v_pid, 1, to_char(NEW.crt_time,'yyyymm'));  
  end if;  
  if v_cnt4 is null then  
    insert into log_c2_c3_cnt_year(c2, c3, pid, cnt, stat_time) values (NEW.c2, NEW.c3, v_pid, 1, to_char(NEW.crt_time,'yyyy'));  
  end if;  
  -- 其他列统计, 以此类推  
  return null;  
end;  
$function$;  

创建删除触发器函数

CREATE OR REPLACE FUNCTION public.tg_delete_log()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
  v_pid int;  
  v_cnt1 int8 := null;  
  v_cnt2 int8 := null;  
  v_cnt3 int8 := null;  
  v_cnt4 int8 := null;  
begin  
  select pg_backend_pid() into v_pid;  
  -- c1统计  
  update log_c1_cnt_day set cnt=cnt-1 where c1=OLD.c1 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyymmdd') returning cnt into v_cnt1;  
  update log_c1_cnt_week set cnt=cnt-1 where c1=OLD.c1 and pid=v_pid and stat_time=to_char(date(OLD.crt_time)-(EXTRACT(ISODOW FROM date(OLD.crt_time)))::int+1,'yyyymmdd') returning cnt into v_cnt2;  
  update log_c1_cnt_month set cnt=cnt-1 where c1=OLD.c1 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyymm') returning cnt into v_cnt3;  
  update log_c1_cnt_year set cnt=cnt-1 where c1=OLD.c1 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyy') returning cnt into v_cnt4;  
  if v_cnt1 is null then  
    insert into log_c1_cnt_day(c1, pid, cnt, stat_time) values (OLD.c1, v_pid, -1, to_char(OLD.crt_time,'yyyymmdd'));  
  end if;  
  if v_cnt2 is null then  
    insert into log_c1_cnt_week(c1, pid, cnt, stat_time) values (OLD.c1, v_pid, -1, to_char(date(OLD.crt_time)-(EXTRACT(ISODOW FROM date(OLD.crt_time)))::int+1,'yyyymmdd'));  
  end if;  
  if v_cnt3 is null then  
    insert into log_c1_cnt_month(c1, pid, cnt, stat_time) values (OLD.c1, v_pid, -1, to_char(OLD.crt_time,'yyyymm'));  
  end if;  
  if v_cnt4 is null then  
    insert into log_c1_cnt_year(c1, pid, cnt, stat_time) values (OLD.c1, v_pid, -1, to_char(OLD.crt_time,'yyyy'));  
  end if;  
  -- c2_c3 统计  
  v_cnt1 := null;  
  v_cnt2 := null;  
  v_cnt3 := null;  
  v_cnt4 := null;  
  update log_c2_c3_cnt_day set cnt=cnt-1 where c2=OLD.c2 and c3=OLD.c3 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyymmdd') returning cnt into v_cnt1;  
  update log_c2_c3_cnt_week set cnt=cnt-1 where c2=OLD.c2 and c3=OLD.c3 and pid=v_pid and stat_time=to_char(date(OLD.crt_time)-(EXTRACT(ISODOW FROM date(OLD.crt_time)))::int+1,'yyyymmdd') returning cnt into v_cnt2;  
  update log_c2_c3_cnt_month set cnt=cnt-1 where c2=OLD.c2 and c3=OLD.c3 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyymm') returning cnt into v_cnt3;  
  update log_c2_c3_cnt_year set cnt=cnt-1 where c2=OLD.c2 and c3=OLD.c3 and pid=v_pid and stat_time=to_char(OLD.crt_time,'yyyy') returning cnt into v_cnt4;  
  if v_cnt1 is null then  
    insert into log_c2_c3_cnt_day(c2, c3, pid, cnt, stat_time) values (OLD.c2, OLD.c3, v_pid, -1, to_char(OLD.crt_time,'yyyymmdd'));  
  end if;  
  if v_cnt2 is null then  
    insert into log_c2_c3_cnt_week(c2, c3, pid, cnt, stat_time) values (OLD.c2, OLD.c3, v_pid, -1, to_char(date(OLD.crt_time)-(EXTRACT(ISODOW FROM date(OLD.crt_time)))::int+1,'yyyymmdd'));  
  end if;  
  if v_cnt3 is null then  
    insert into log_c2_c3_cnt_month(c2, c3, pid, cnt, stat_time) values (OLD.c2, OLD.c3, v_pid, -1, to_char(OLD.crt_time,'yyyymm'));  
  end if;  
  if v_cnt4 is null then  
    insert into log_c2_c3_cnt_year(c2, c3, pid, cnt, stat_time) values (OLD.c2, OLD.c3, v_pid, -1, to_char(OLD.crt_time,'yyyy'));  
  end if;  
  -- 其他列统计, 以此类推  
  return null;  
end;  
$function$;  

创建truncate触发器函数

CREATE OR REPLACE FUNCTION public.tg_truncate_log()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
begin  
  -- c1统计  
  truncate log_c1_cnt_day;  
  truncate log_c1_cnt_week;  
  truncate log_c1_cnt_month;  
  truncate log_c1_cnt_year;  
  -- c2_c3 统计  
  truncate log_c2_c3_cnt_day;  
  truncate log_c2_c3_cnt_week;  
  truncate log_c2_c3_cnt_month;  
  truncate log_c2_c3_cnt_year;  
  return null;  
end;  
$function$;  

创建触发器

create trigger tg1 after insert on log for each row execute procedure tg_insert_log();  
create trigger tg2 after delete on log for each row execute procedure tg_delete_log();  
create trigger tg3 after truncate on log for each statement execute procedure tg_truncate_log();  

INSERT pgbench

pg92@digoal-PowerEdge-R610-> cat insert.sql   
\setrandom c1 1 10  
\setrandom c2 1 5  
\setrandom c3 1 2  
\setrandom c4 1 100  
\setrandom day 1 300  
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c4, current_date+:day::int);  

测试结果

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./insert.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 392105  
tps = 6533.401527 (including connections establishing)  
tps = 6534.950565 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.003611        \setrandom c1 1 10  
        0.000821        \setrandom c2 1 5  
        0.000762        \setrandom c3 1 2  
        0.000740        \setrandom c4 1 100  
        0.000873        \setrandom day 1 300  
        2.438667        insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c4, current_date+:day::int);  

UPDATE pgbench

pg92@digoal-PowerEdge-R610-> cat id.sql   
\setrandom c1 1 10  
\setrandom c2 1 5  
\setrandom c3 1 2  
\setrandom c4 1 100  
\setrandom day 1 300  
\setrandom id 1 5000000  
begin;  
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c4, current_date+:day::int);  
insert into log (c1,c2,c3,c4,crt_time) values (:c2, :c1, :c3, :c4, current_date+:day::int);  
delete from log where id=:id;  
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c4, :c3, current_date+:day::int);  
delete from log where id=:id;  
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c1, :c4, current_date+:day::int);  
delete from log where id=:id;  
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c1, :c3, :c4, current_date+:day::int);  
delete from log where id=:id;  
delete from log where id=:id;  
insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c1, current_date+:day::int);  
end;  

测试结果

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 68535  
tps = 1141.824233 (including connections establishing)  
tps = 1142.090645 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.004085        \setrandom c1 1 10  
        0.000841        \setrandom c2 1 5  
        0.000848        \setrandom c3 1 2  
        0.000800        \setrandom c4 1 100  
        0.000899        \setrandom day 1 300  
        0.000858        \setrandom id 1 5000000  
        0.124333        begin;  
        2.105232        insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c4, current_date+:day::int);  
        2.099313        insert into log (c1,c2,c3,c4,crt_time) values (:c2, :c1, :c3, :c4, current_date+:day::int);  
        0.553053        delete from log where id=:id;  
        1.951419        insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c4, :c3, current_date+:day::int);  
        0.484705        delete from log where id=:id;  
        1.856407        insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c1, :c4, current_date+:day::int);  
        0.479868        delete from log where id=:id;  
        1.661506        insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c1, :c3, :c4, current_date+:day::int);  
        0.469769        delete from log where id=:id;  
        0.267489        delete from log where id=:id;  
        1.623206        insert into log (c1,c2,c3,c4,crt_time) values (:c1, :c2, :c3, :c1, current_date+:day::int);  
        0.301404        end;  

验证数据准确性的SQL如下 :

select sum(hashtext(t.*::text)) from (select c1,stat_time,sum(cnt) from log_c1_cnt_year group by 1,2 having sum(cnt)<>0 order by 1,2) t;  
select sum(hashtext(t.*::text)) from (select c1,to_char(crt_time,'yyyy'),count(*) from log group by 1,2 having count(*)<>0 order by 1,2) t;  
  
select sum(hashtext(t.*::text)) from (select c1,stat_time,sum(cnt) from log_c1_cnt_month group by 1,2 having sum(cnt)<>0 order by 1,2) t;  
select sum(hashtext(t.*::text)) from (select c1,to_char(crt_time,'yyyymm'),count(*) from log group by 1,2 having count(*)<>0 order by 1,2) t;  
  
select sum(hashtext(t.*::text)) from (select c1,stat_time,sum(cnt) from log_c1_cnt_week group by 1,2 having sum(cnt)<>0 order by 1,2) t;  
select sum(hashtext(t.*::text)) from (select c1,to_char(date(crt_time)-(EXTRACT(ISODOW FROM date(crt_time)))::int+1,'yyyymmdd'),count(*) from log group by 1,2 having count(*)<>0 order by 1,2) t;  
  
select sum(hashtext(t.*::text)) from (select c1,stat_time,sum(cnt) from log_c1_cnt_day group by 1,2 having sum(cnt)<>0 order by 1,2) t;  
select sum(hashtext(t.*::text)) from (select c1,to_char(crt_time,'yyyymmdd'),count(*) from log group by 1,2 having count(*)<>0 order by 1,2) t;  
  
  
select sum(hashtext(t.*::text)) from (select c2,c3,stat_time,sum(cnt) from log_c2_c3_cnt_year group by 1,2,3 having sum(cnt)<>0 order by 1,2,3) t;  
select sum(hashtext(t.*::text)) from (select c2,c3,to_char(crt_time,'yyyy'),count(*) from log group by 1,2,3 having count(*)<>0 order by 1,2,3) t;  
  
select sum(hashtext(t.*::text)) from (select c2,c3,stat_time,sum(cnt) from log_c2_c3_cnt_month group by 1,2,3 having sum(cnt)<>0 order by 1,2,3) t;  
select sum(hashtext(t.*::text)) from (select c2,c3,to_char(crt_time,'yyyymm'),count(*) from log group by 1,2,3 having count(*)<>0 order by 1,2,3) t;  
  
select sum(hashtext(t.*::text)) from (select c2,c3,stat_time,sum(cnt) from log_c2_c3_cnt_week group by 1,2,3 having sum(cnt)<>0 order by 1,2,3) t;  
select sum(hashtext(t.*::text)) from (select c2,c3,to_char(date(crt_time)-(EXTRACT(ISODOW FROM date(crt_time)))::int+1,'yyyymmdd'),count(*) from log group by 1,2,3 having count(*)<>0 order by 1,2,3) t;  
  
select sum(hashtext(t.*::text)) from (select c2,c3,stat_time,sum(cnt) from log_c2_c3_cnt_day group by 1,2,3 having sum(cnt)<>0 order by 1,2,3) t;  
select sum(hashtext(t.*::text)) from (select c2,c3,to_char(crt_time,'yyyymmdd'),count(*) from log group by 1,2,3 having count(*)<>0 order by 1,2,3) t;  

经过验证数据准确.

小结

1.

实时性的统计带来的开销与维度有关, 维度越多, 开销越大, 这样带来的后果是入口表(明细表)的dml吞吐量会急剧下降.

下一篇将介绍非实时的统计. 结合非实时的统计以及明细数据也能达到实时count的效果.

为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :

1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/

5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/

6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/

7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/

8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/

Flag Counter

digoal’s 大量PostgreSQL文章入口