PostgreSQL 流式数据处理(聚合、过滤、转换…)系列 - 8

6 minute read

背景

2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。

逐步优化,化繁为简。

在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。

比如以COUNT就是一个很典型的例子。

在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.

9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.

但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.

到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)

另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。

那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.

正文

前面以及写了7篇关于count(*)准实时和实时统计的案例, 有基于触发器的, 有基于xid单线程取数据的.

本文的优化方法与londiste的ticket有点类似, 即数据切片.

将log表切分为多段, 这样的话如果分析线程串行处理速度跟不上的话, 可以并行处理. 有效的解决了基于xid单线程取数据的性能问题.

具体的场景就不介绍了, 前面7篇按顺序读下来就知道了.

1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/

5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/

6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/

7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/

这里只介绍切片函数和取数据函数.

测试表 :

create table log   
(  
  id serial primary key,   
  xid int8 default txid_current() not null,   
  c1 int not null,   
  c2 int not null,   
  c3 int not null,   
  c4 text not null,   
  crt_time timestamp default now()  
);  
create index idx_log_1 on log(xid);  

切片表

create table log_ticket(   
  id serial8 primary key,   
  split_xid int8 not null default txid_current(), -- 切片时的xid, 取数据的事务的xid必须大于该xid  
  split_time timestamp not null default now(),   
  log_xid_le int8 not null,   
  log_xid_st int8 not null,   
  log_xip int8[],   
  xid_readed boolean not null default false,   
  xip_readed boolean not null default false  
);  

切片函数

create or replace function log_spilt(  
  i_timeout_sec int, -- 上次切片以来超出多少秒, 则新增切片  
  i_limit int  -- 上次切片以来超出多少条, 则新增切片  
) returns void as $$  
declare  
  v_advisory_xact_lock int8 := null;  -- 串行处理锁.  
  
  v_xid_snap txid_snapshot := null;  -- 当前事务状态快照  
  v_xmin int8 := null;  -- 当前事务状态快照中未完成的最小事务  
  v_xmax int8 := null;  -- 当前事务状态快照中未分配的最小事务  
  v_xip int8[] := null;  -- 当前事务状态快照中未完成的事务数组  
  
  v_log_xid_le int8 := null;  -- 大于等于该xid  
  v_log_xid_st int8 := null;  -- 小于该xid  
  v_split_time timestamp := null;  -- 上次切片时间  
  v_cnt int := null;  -- 从上次切片后有多少条log被插入了  
  v_log_xip int8[] := null;  -- 记录切片数据段内未完成的xid  
  
begin  
  -- 判断i_timeout_sec, i_limit  
  if ( i_timeout_sec<=0 or i_limit <=0 ) then  
    raise notice 'please ensure i_timeout_sec > 0 and i_limit > 0 .';  
    return;  
  end if;  
  
  -- 串行处理, 如果不能获得锁则直接退出. 确保v_advisory_xact_lock全局唯一.  
  v_advisory_xact_lock := 1;  
  if not pg_try_advisory_xact_lock(v_advisory_xact_lock) then  
    raise notice 'Another function is calling, this call will exit.';  
    return;  
  end if;  
  
  -- 生成 xid snapshot 数据.  
  v_xid_snap := txid_current_snapshot();  
  v_xmin := txid_snapshot_xmin(v_xid_snap);  
  v_xmax := txid_snapshot_xmax(v_xid_snap);  
  select array_agg(t) into v_xip from txid_snapshot_xip(v_xid_snap) g(t);  
  
  perform 1 from log_ticket limit 1;  
  if found then  
    select log_xid_st,split_time into v_log_xid_le,v_split_time from log_ticket order by split_xid desc limit 1;  
    select count(*),(max(xid)+1) into v_cnt,v_log_xid_st from (select xid from log where xid >= v_log_xid_le and xid < v_xmax order by xid limit i_limit) t;  
    if ( ((now()-v_split_time) >= i_timeout_sec::text::interval or v_cnt>=i_limit) and v_cnt>=1 ) then  
      select array_agg(i) into v_log_xip from unnest(v_xip) i where i>=v_log_xid_le and i<v_log_xid_st;  
      insert into log_ticket(log_xid_le,log_xid_st,log_xip,xip_readed)   
        values (v_log_xid_le,v_log_xid_st,v_log_xip,case when v_log_xip is null then true else false end);  
    else  
      raise notice '不满足split条件';  
    end if;  
  else  
    select count(*),min(xid),(max(xid)+1) into v_cnt,v_log_xid_le,v_log_xid_st from (select xid from log where xid < v_xmax order by xid limit i_limit) t;  
    if ( v_cnt>=i_limit ) then  
      select array_agg(i) into v_log_xip from unnest(v_xip) i where i>=v_log_xid_le and i<v_log_xid_st;  
      insert into log_ticket(log_xid_le,log_xid_st,log_xip,xip_readed)   
        values (v_log_xid_le,v_log_xid_st,v_log_xip,case when v_log_xip is null then true else false end);  
    else  
      raise notice '不满足split条件';  
    end if;  
  end if;  
return;  
end;  
$$ language plpgsql strict;  

取数据函数

create or replace function get_log(i_mod int, i_mod_rem int) returns log[] as $$  
declare  
  
  v_xid_now int8 := null;  
  v_log_ticket_pk int8 := null;  
  v_log_xid_le int8 := null;  
  v_log_xid_st int8 := null;  
  v_log_xip int8[] := null;  
  v_xid_readed boolean := null;  
  v_xip_readed boolean := null;  
  
  v_xid_snap txid_snapshot := null;  -- 当前事务状态快照  
  v_xip int8[] := null;  -- 当前事务状态快照中未完成的事务数组  
  
  v_log1 log[] := null;  
  v_log2 log[] := null;  
  v_result log[] := null;  -- 结果  
  
begin  
  -- mod约束  
  if (not i_mod>0) or (not (i_mod_rem>=0 and i_mod_rem<i_mod)) then  
    raise notice 'please ensure i_mod>0 and 0<=i_mod_rem<i_mod .';  
    return null;  
  end if;  
    
  v_xid_now := txid_current();  
  -- 生成 xid snapshot 数据.  
  v_xid_snap := txid_current_snapshot();  
  select array_agg(t) into v_xip from txid_snapshot_xip(v_xid_snap) g(t);  
  
  select id,log_xid_le,log_xid_st,log_xip,xid_readed,xip_readed  
    into v_log_ticket_pk,v_log_xid_le,v_log_xid_st,v_log_xip,v_xid_readed,v_xip_readed   
    from log_ticket   
    where split_xid<v_xid_now   
      and (not xid_readed or not xip_readed)   
      and mod(id,i_mod)=i_mod_rem   
    limit 1   
    for update;  
  if found then  
    if not v_xid_readed then  
      select array_agg(log) into v_log1 from log where xid>=v_log_xid_le and xid<v_log_xid_st and (not (xid = any(v_log_xip)) or v_log_xip is null);  
      v_xid_readed := true;  
      v_result := v_log1;  
    end if;  
    if (not v_xip_readed) and v_log_xip is not null then  
      perform 1 from unnest(v_xip) i where i = any(v_log_xip);  
      if (not found) or v_xip is null then  
        select array_agg(log) into v_log2 from log where xid = any(v_log_xip);  
        v_xip_readed := true;  
        v_result := array_cat(v_result,v_log2);  
      end if;  
    end if;  
  else  
    raise notice 'no data in log_ticket with i_mod=% and i_mod_rem=%.', i_mod, i_mod_rem;  
    return null;  
  end if;  
  update log_ticket set xid_readed=v_xid_readed,xip_readed=v_xip_readed where id=v_log_ticket_pk;  
  return v_result;  
end;  
$$ language plpgsql strict;  

测试

pg92@digoal-PowerEdge-R610-> cat ins.sql  
begin;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
end;  
begin;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
rollback;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  

pgbench

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -f ./ins.sql -r -n -h $PGDATA -U postgres -T 60 -c 32 -j 2  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 32  
number of threads: 2  
duration: 60 s  
number of transactions actually processed: 84781  
tps = 1411.655384 (including connections establishing)  
tps = 1412.958688 (excluding connections establishing)  

pgbench测试过程中执行select log_spilt(1,100000);

digoal=# select log_spilt(1,10000);  
 log_spilt   
-----------  
   
(1 row)  
  
digoal=# select log_spilt(1,10000);  
 log_spilt   
-----------  
   
(1 row)  
digoal=# select log_spilt(1,1000000000);  
 log_spilt   
-----------  
   
(1 row)  
  
digoal=# select log_spilt(1,100000);  
 log_spilt   
-----------  
   
(1 row)  
... 略  
digoal=# select log_spilt(1,100000000);  
 log_spilt   
-----------  
   
(1 row)  

确保所有数据都写入log_ticket了.

digoal=# select min(log_xid_le),max(log_xid_st) from log_ticket;  
    min    |    max      
-----------+-----------  
 505354161 | 506624142  
(1 row)  
digoal=# select min(xid),max(xid) from log;  
    min    |    max      
-----------+-----------  
 505354161 | 506624141  
(1 row)  

查询log_ticket

digoal=# select * from log_ticket;  
 id | split_xid |         split_time         | log_xid_le | log_xid_st |                                                              
                                                                         log_xip                                                      
                                                                                 | xid_readed | xip_readed   
----+-----------+----------------------------+------------+------------+------------------------------------------------------------  
------------------------------------------------------------------------------------------------------------------------------------  
---------------------------------------------------------------------------------+------------+------------  
 26 | 506284999 | 2013-05-03 15:05:53.963654 |  505354161 |  505356093 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 27 | 506285000 | 2013-05-03 15:05:54.887422 |  505356093 |  505358092 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 28 | 506285001 | 2013-05-03 15:05:55.369707 |  505358092 |  505360095 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 29 | 506285002 | 2013-05-03 15:05:55.757843 |  505360095 |  505362101 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 30 | 506285003 | 2013-05-03 15:05:56.132186 |  505362101 |  505364103 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 31 | 506285004 | 2013-05-03 15:05:56.487359 |  505364103 |  505366103 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 32 | 506285005 | 2013-05-03 15:05:56.921683 |  505366103 |  505368098 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 33 | 506285006 | 2013-05-03 15:05:57.220986 |  505368098 |  505370109 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 34 | 506285007 | 2013-05-03 15:05:57.624854 |  505370109 |  505372109 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 35 | 506285008 | 2013-05-03 15:05:58.055403 |  505372109 |  505374113 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 36 | 506315377 | 2013-05-03 15:07:22.414773 |  505374113 |  505376115 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 37 | 506320066 | 2013-05-03 15:07:23.147482 |  505376115 |  505378109 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 38 | 506409173 | 2013-05-03 15:07:36.081153 |  505378109 |  506384559 | {506384464,506384481,506384492,506384503,506384507,50638450  
9,506384517,506384521,506384522,506384523,506384525,506384529,506384534,506384535,506384537,506384538,506384539,506384541,506384542,  
506384543,506384546,506384548,506384552,506384553,506384554,506384556,506384557} | f          | f  
 39 | 506423332 | 2013-05-03 15:07:43.592782 |  506384559 |  506404567 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 40 | 506427897 | 2013-05-03 15:07:44.306657 |  506404567 |  506424564 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 41 | 506434174 | 2013-05-03 15:07:45.330145 |  506424564 |  506433989 | {506433901,506433908,506433928,506433934,506433937,50643393  
8,506433940,506433941,506433946,506433947,506433954,506433955,506433956,506433957,506433958,506433960,506433962,506433964,506433965,  
506433968,506433971,506433974,506433976,506433980,506433985}                     | f          | f  
 42 | 506442222 | 2013-05-03 15:07:46.573332 |  506433989 |  506442067 | {506441993,506442005,506442008,506442012,506442015,50644201  
6,506442024,506442032,506442033,506442034,506442037,506442039,506442040,506442041,506442042,506442043,506442045,506442047,506442048,  
506442051,506442052,506442053,506442055,506442059,506442062,506442064,506442065} | f          | f  
 43 | 506467572 | 2013-05-03 15:07:50.910016 |  506442067 |  506462070 | NULL                                                         
                                                                                                                                      
                                                                                 | f          | t  
 44 | 506473021 | 2013-05-03 15:07:51.998294 |  506462070 |  506472808 | {506472727,506472747,506472751,506472763,506472764,50647276  
7,506472773,506472774,506472775,506472777,506472778,506472779,506472780,506472781,506472782,506472794,506472796,506472799,506472800,  
506472801,506472802,506472803,506472804}                                         | f          | f  
(19 rows)  

pgbench过程中同时执行get_log

digoal=# select count(*) from unnest(get_log(2,0));  
 count   
-------  
 10000  
(1 row)  
digoal=# select count(*) from unnest(get_log(2,1));  
 count   
-------  
 10000  
(1 row)  
...  

直到所有线程都报

digoal=# select count(*) from unnest(get_log(2,0));  
NOTICE:  no data in log_ticket with i_mod=2 and i_mod_rem=0.  
 count   
-------  
     0  
(1 row)  
digoal=# select count(*) from unnest(get_log(2,1));  
NOTICE:  no data in log_ticket with i_mod=2 and i_mod_rem=1.  
 count   
-------  
     0  
(1 row)  
略  

把所有的get_log结果相加看看是否等于count(*) from log;

验证结果正确.

此处略.

其他

1. 如果觉得取数据的函数不太通用, 可以改成返回log_ticket的游标. 程序尽管自己去取.

为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :

1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/

5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/

6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/

7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/

8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/

Flag Counter

digoal’s 大量PostgreSQL文章入口