PostgreSQL 流式数据处理(聚合、过滤、转换…)系列 - 6

11 minute read

背景

2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。

逐步优化,化繁为简。

在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。

比如以COUNT就是一个很典型的例子。

在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.

9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.

但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.

到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)

另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。

那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.

正文

前五篇关于PostgreSQL实时和非实时数据统计的案例如下 :

http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

http://blog.163.com/digoal@126/blog/static/16387704020133156636579/

http://blog.163.com/digoal@126/blog/static/16387704020133218305242/

本文主要添加一个动态的新增统计维度的功能.

详细的实施过程

测试表 :

create table log   
(  
  id serial primary key,   
  xid int8 default txid_current() not null,   
  c1 int not null,   
  c2 int not null,   
  c3 int not null,   
  c4 text not null,   
  crt_time timestamp default now()  
);  
create index idx_log_1 on log(xid);  

存放count()的表, 假设经常需要按log.c1以及log.crt_time分天, 周, 月, 年进行count()

create table log_c1_cnt_day (c1 int, cnt int8, stat_time text, primary key(c1,stat_time));  
create table log_c1_cnt_week (c1 int, cnt int8, stat_time text, primary key(c1,stat_time));  
create table log_c1_cnt_month (c1 int, cnt int8, stat_time text, primary key(c1,stat_time));  
create table log_c1_cnt_year (c1 int, cnt int8, stat_time text, primary key(c1,stat_time));  

存放count()的表, 假设经常需要按log.c2, log.c3以及log.crt_time分天, 周, 月, 年进行count()

create table log_c2_c3_cnt_day (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time));  
create table log_c2_c3_cnt_week (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time));  
create table log_c2_c3_cnt_month (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time));  
create table log_c2_c3_cnt_year (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time));  

插入测试数据

insert into log (c1,c2,c3,c4) values (1,1,1,1);  
insert into log (c1,c2,c3,c4) values (2,2,2,2);  

验证

digoal=# select * from log;  
 id |    xid    | c1 | c2 | c3 | c4 |          crt_time            
----+-----------+----+----+----+----+----------------------------  
  1 | 480125659 |  1 |  1 |  1 | 1  | 2013-04-21 20:55:45.907713  
  2 | 480125660 |  2 |  2 |  2 | 2  | 2013-04-21 20:55:46.286933  
(2 rows)  

创建分析注册表, 记录每个明细表每次分析的截止xid, xip.

xid 记录统计到哪个xid了, xip记录当前活动事务, 不计入当前统计范畴. 避免气泡问题.

create table log_read   
(  
tablename name not null,   
xid int8 not null,   
xip int8[],   
xip_res int8[],  -- 用于与xid比对的数据. 必须保留所有>=xid的xip信息.  
mod_time timestamp,   
primary key (tablename)  
);  

插入初始记录, 表的初始记录xid取值范围( >=0 and <=txid_snapshot_xmin-1 )

insert into log_read values ('log', 0, null, null, now());  

创建分析维度信息注册表, 记录每个维度的初始xid信息,

因为所有维度使用同一个表级别的截至值(log_read.xid), 但是维度可能是后期加入的, 所以初始xid可能不一样.

增加func 字段. tablename+func组合pk.

增加初始init_xid字段. ( 历史数据的统计交给其他进程来处理, 历史数据将统计到 <=init_xid and 不包含init_xip )

增加isinit字段, 标识是否为新加的维度.

增加func 字段, 标识该维度调用的函数.

create table log_read_func (  
  tablename name not null,   
  func text not null,  -- 统计函数名  
  init_xid int8,  -- 初始值由analyze_log计算并更新.  
  init_xip int8[],  -- 初始值由analyze_log计算并更新.  
  isinit boolean not null,  -- true标识这个维度是初始的, false标识不是初始的.  
  info text,  -- 备注  
  primary key (tablename, func)  
);  
alter table log_read_func add constraint fk_log_read_func_1 foreign key(tablename) references log_read(tablename);  

插入维度初始记录

insert into log_read_func(tablename,func,init_xid,init_xip,isinit,info)   
  values('log','stat_log_c1',null,null,true,null);  

创建log表的串行批量数据分析函数

注意v_limit用于限制多少个xid, 而不是多少条记录, 当一个XID有多条记录时, 如果取到这个XID的话, 这个XID的数据也会全部被处理.

但是如果使用v_limit来限定行数, 就会出现比较危险的情况, 因为如果一个事务包含多条记录的话, 限定行数的方法可能导致一个事务的数据只取到中间部分, 而剩余部分下次就取不到了.

create or replace function analyze_log(v_limit int) returns void as $$  
declare  
  v_advisory_xact_lock int8 := null;  -- 串行处理锁.  
    
  v_xid_snap txid_snapshot := null;  -- 当前事务状态快照  
  v_xmin int8 := null;  -- 当前事务状态快照中未完成的最小事务  
  v_xmax int8 := null;  -- 当前事务状态快照中未分配的最小事务  
  v_xip int8[] := null;  -- 当前事务状态快照中未完成的事务数组  
  
  v_func_agg text[];  -- 统计维度函数数组  
  v_func text;  -- 统计维度函数  
  
  v_log_read_log_xid int8 := null;  -- 上次log的xid分析截止位  
  v_log_read_log_xid_update int8 := null;  -- 更新值, 不能为空  
  
  v_log_read_log_xip int8[] := null;  -- 上次log_read.xip(tablename=log)  
  v_log_read_log_xip_do int8[] := null;  -- 解析本次log_read.xip(tablename=log) where (xip !@ txid_snapshot)  
  v_log_read_log_xip_update int8[] := null;  -- xip更新值  
  v_log_read_log_xip_res int8[] := null;  -- xip保留值  
  v_log_read_log_xip_res_update int8[] := null;  -- xip保留更新值, 所有大于v_log_read_log_xid_update的元素必须保留.  
  
  v_log log[] := null;  -- 聚合本次log的分析数组, [末尾调用,false]  
  v_log_doxip log[] := null;  -- 聚合本次分析log数组:   
                          -- where log.xid (@ log_read.xip(tablename=log) and !@ txid_snapshot) , [末尾调用,false]  
  
  v_log_read_func_xid int8;  -- 本次取出的v_log中的最小xid-1, 没有则取OLD.log_read.xid. [用于更新log_read_func.init_xid where isinit=true]   
  v_log_read_func_xip int8[];  -- (OLD.log_read.xip + v_xip) ; [用于更新log_read_func.init_xip where isinit=true]   
  
begin  
  -- 判断limit  
  if v_limit <=0 then  
    raise notice 'please ensure v_limit > 0 .';  
    return;  
  end if;  
  
  -- 串行处理, 如果不能获得锁则直接退出. 确保v_advisory_xact_lock全局唯一.  
  v_advisory_xact_lock := 1;  
  if not pg_try_advisory_xact_lock(v_advisory_xact_lock) then  
    raise notice 'Another function is calling, this call will exit.';  
    return;  
  end if;  
  
  -- 生成统计维度, 没有则直接退出  
  perform 1 from log_read_func where tablename='log' limit 1;  
  if not found then  
    raise notice 'No func in log_read_func with tablename:%.', 'log';  
    return;  
  else  
    select array_agg(func) into v_func_agg from log_read_func where tablename='log';  
  end if;  
  
  -- 生成 xid snapshot 数据.  
  v_xid_snap := txid_current_snapshot();  
  v_xmin := txid_snapshot_xmin(v_xid_snap);  
  v_xmax := txid_snapshot_xmax(v_xid_snap);  
  select array_agg(t) into v_xip from txid_snapshot_xip(v_xid_snap) g(t);  
  
  -- 取v_log_read_log_xid截止值, v_log_read_log_xip数组.  
  select xid,xip,xip_res into v_log_read_log_xid,v_log_read_log_xip,v_log_read_log_xip_res from log_read where tablename='log';  
  if not found then  
    raise notice 'log_read no log entry. please add it in log_read table first.';  
    return;  
  end if;  
  
  -- 生成v_log_read_func_xip.  
  -- 必须放在更新log_read之前, 否则取到的就是更新后的数据了.  
  v_log_read_func_xip := array_cat(v_log_read_log_xip, v_xip);  
  
  -- 取log1(取非xip中的数据, 隔离log2操作)  
  -- 取xid临界点  
  select max(xid) into v_log_read_log_xid_update from (select xid from log where xid > v_log_read_log_xid and xid < v_xmax and xid not in (select * from unnest(v_xip) union all select * from unnest(v_log_read_log_xip_res)) order by xid limit v_limit) t;  
  if v_log_read_log_xid_update is not null then  
    raise notice '取log1';  
    -- 根据临界点, 取log数据  
    select array_agg(log) into v_log from (select log from log where xid > v_log_read_log_xid and xid<=v_log_read_log_xid_update and xid not in (select * from unnest(v_xip) union all select * from unnest(v_log_read_log_xip_res)) order by xid) t;  
    -- 如果有数据, v_log_read_func_xid取v_log.min(xid) - 1  
    select min(i.xid) - 1 into v_log_read_func_xid from unnest(v_log) i;  
  else   
    -- 如果没有数据, 更新值不变  
    v_log_read_log_xid_update := v_log_read_log_xid;  
    -- 如果没有数据, v_log_read_func_xid取OLD.log_read.xid  
    v_log_read_func_xid := v_log_read_log_xid;  
  end if;  
  
  -- 取log2 (log_xip - v_xip) (取xip中的数据, 隔离log1操作)  
  -- 生成log_read.xip(tablename=log) do数组(已经完成的事务)  
  select array_agg(i) into v_log_read_log_xip_do from (select * from unnest(v_log_read_log_xip) i except select * from unnest(v_xip))t where i is not null;  
  -- 生成log_read.xip(tablename=log) update数组(未完成的事务)  
  select array_agg(i) into v_log_read_log_xip_update from   
  (  select i from (select * from unnest(v_log_read_log_xip) i union all select * from unnest(v_xip)  
     except select * from unnest(v_log_read_log_xip_do)) t where i is not null group by i ) t;  
  -- 生成xip_res更新值  
  select array_agg(i) into v_log_read_log_xip_res_update from (select * from unnest(v_log_read_log_xip_res) i union select * from unnest(v_log_read_log_xip) union select * from unnest(v_xip))t where i>v_log_read_log_xid_update;  
  -- 生成log do数组  
  select array_agg(log) into v_log_doxip from log where xid in (select * from unnest(v_log_read_log_xip_do));  
  
  -- 更新log_read(tablename=log)  
  update log_read set   
    xip=v_log_read_log_xip_update,   
    xid=v_log_read_log_xid_update,   
    xip_res=v_log_read_log_xip_res_update,  
    mod_time=now()   
  where tablename='log';  
  -- raise notice 'log_read.oldxip(log): %.', v_log_read_log_xip;  
  -- raise notice 'log_read.newxip(log): %.', v_log_read_log_xip_update;  
  -- raise notice 'log_read.newxipres(log): %.', v_log_read_log_xip_res_update;  
  
  -- 分析函数可以另外写, 在此调用.  
  foreach v_func in array v_func_agg loop  
    -- 更新log_read_func where isinit=true  
    update log_read_func set   
      init_xid=v_log_read_func_xid,  
      init_xip=v_log_read_func_xip,  
      isinit=false   
      where tablename='log'   
      and func=v_func   
      and isinit;  
    -- 执行统计函数  
    execute 'select '||v_func||'($1)' using v_log;  
    execute 'select '||v_func||'($1)' using v_log_doxip;  
  end loop;  
  
return;  
end;  
$$ language plpgsql;  

统计函数stat_log_c1

CREATE OR REPLACE FUNCTION public.stat_log_c1(v_log log[])  
 RETURNS void  
 LANGUAGE plpgsql  
AS $function$  
declare  
  v_stat_time text;  
  v_c1 int;  
  v_cnt int8;  
begin  
  -- 统计log_c1_cnt_day  
  for v_stat_time, v_c1, v_cnt in select to_char(crt_time, 'yyyymmdd'), c1 , count(*) from (select ((unnest(v_log)::log)).*) t group by to_char(crt_time, 'yyyymmdd'), c1 loop  
    perform 1 from log_c1_cnt_day where c1=v_c1 and stat_time=v_stat_time;  
    if not found then  
      insert into log_c1_cnt_day(c1, cnt, stat_time) values (v_c1, v_cnt, v_stat_time);  
    else  
      update log_c1_cnt_day set cnt=cnt+v_cnt where c1=v_c1 and stat_time=v_stat_time;  
    end if;  
  end loop;  
end;  
$function$;  

测试, 清理原始数据

truncate log;  
truncate log_c1_cnt_day;  
truncate log_c1_cnt_week;  
update log_read set xid=0, xip=null, xip_res=null;  
update log_read_func set isinit=true;  

pgbench脚本, 测试插入场景

cat ins.sql   
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  

pgbench

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -f ./ins.sql -r -n -h $PGDATA -U postgres -T 60 -c 8 -j 2  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 8  
number of threads: 2  
duration: 60 s  
number of transactions actually processed: 2940924  
tps = 49015.188418 (including connections establishing)  
tps = 49026.219678 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.161868        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  

压力测试的同时执行analyze_log. 确保pgbench同时执行analyze_log.

pg92@digoal-PowerEdge-R610-> cat analyze.sh   
#!/bin/bash  
for ((i=0;i<100;i++))  
do  
psql -c "select * from analyze_log(1);"  
psql -c "select * from analyze_log(1000000);"  
done  

调用analyze.sh

验证数据是否准确

digoal=# select c1,count(*),to_char(crt_time,'yyyymmdd') from log group by c1,to_char(crt_time,'yyyymmdd') order by c1;  
 c1 | count  | to_char    
----+--------+----------  
  0 | 147225 | 20130422  
  1 | 294651 | 20130422  
  2 | 294146 | 20130422  
  3 | 293377 | 20130422  
  4 | 295507 | 20130422  
  5 | 294433 | 20130422  
  6 | 293960 | 20130422  
  7 | 292733 | 20130422  
  8 | 294489 | 20130422  
  9 | 293249 | 20130422  
 10 | 147154 | 20130422  
(11 rows)  
Time: 6453.502 ms  
digoal=# select * from log_c1_cnt_day where cnt<>0 order by c1;  
 c1 |  cnt   | stat_time   
----+--------+-----------  
  0 | 147225 | 20130422  
  1 | 294651 | 20130422  
  2 | 294146 | 20130422  
  3 | 293377 | 20130422  
  4 | 295507 | 20130422  
  5 | 294433 | 20130422  
  6 | 293960 | 20130422  
  7 | 292733 | 20130422  
  8 | 294489 | 20130422  
  9 | 293249 | 20130422  
 10 | 147154 | 20130422  
(11 rows)  
Time: 0.660 ms  

测试多SQL, 带回滚场景.

pgbench脚本

cat ins.sql  
begin;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
end;  
begin;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
rollback;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  

pgbench

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -f ./ins.sql -r -n -h $PGDATA -U postgres -T 60 -c 8 -j 2  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 8  
number of threads: 2  
duration: 60 s  
number of transactions actually processed: 394520  
tps = 6573.363077 (including connections establishing)  
tps = 6574.835673 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.057675        begin;  
        0.152981        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
        0.148598        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
        0.076637        end;  
        0.054797        begin;  
        0.153712        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
        0.148711        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
        0.077051        rollback;  
        0.168767        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
        0.167405        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  

压力测试的同时执行analyze_log. 确保pgbench同时执行analyze_log.

pg92@digoal-PowerEdge-R610-> cat analyze.sh   
#!/bin/bash  
for ((i=0;i<100;i++))  
do  
psql -c "select * from analyze_log(1);"  
psql -c "select * from analyze_log(1000000);"  
done  

调用analyze.sh

验证数据是否准确

digoal=# select c1,count(*),to_char(crt_time,'yyyymmdd') from log group by c1,to_char(crt_time,'yyyymmdd') order by c1;  
 c1 | count  | to_char    
----+--------+----------  
  0 | 206061 | 20130422  
  1 | 412799 | 20130422  
  2 | 411847 | 20130422  
  3 | 409796 | 20130422  
  4 | 411644 | 20130422  
  5 | 412170 | 20130422  
  6 | 410857 | 20130422  
  7 | 411128 | 20130422  
  8 | 411509 | 20130422  
  9 | 411140 | 20130422  
 10 | 205053 | 20130422  
(11 rows)  
Time: 8760.367 ms  
digoal=# select * from log_c1_cnt_day where cnt<>0 order by c1;  
 c1 |  cnt   | stat_time   
----+--------+-----------  
  0 | 206061 | 20130422  
  1 | 412799 | 20130422  
  2 | 411847 | 20130422  
  3 | 409796 | 20130422  
  4 | 411644 | 20130422  
  5 | 412170 | 20130422  
  6 | 410857 | 20130422  
  7 | 411128 | 20130422  
  8 | 411509 | 20130422  
  9 | 411140 | 20130422  
 10 | 205053 | 20130422  
(11 rows)  
Time: 0.716 ms  

新增测试项目, 增加统计维度.

统计函数stat_log_c1_week

CREATE OR REPLACE FUNCTION public.stat_log_c1_week(v_log log[])  
 RETURNS void  
 LANGUAGE plpgsql  
AS $function$  
declare  
  v_stat_time text;  
  v_c1 int;  
  v_cnt int8;  
begin  
  -- 统计log_c1_cnt_week  
  for v_stat_time, v_c1, v_cnt in select to_char(date(crt_time)-(EXTRACT(ISODOW FROM date(crt_time)))::int+1,'yyyymmdd'), c1 , count(*) from (select ((unnest(v_log)::log)).*) t group by to_char(date(crt_time)-(EXTRACT(ISODOW FROM date(crt_time)))::int+1,'yyyymmdd'), c1 loop  
    perform 1 from log_c1_cnt_week where c1=v_c1 and stat_time=v_stat_time;  
    if not found then  
      insert into log_c1_cnt_week(c1, cnt, stat_time) values (v_c1, v_cnt, v_stat_time);  
    else  
      update log_c1_cnt_week set cnt=cnt+v_cnt where c1=v_c1 and stat_time=v_stat_time;  
    end if;  
  end loop;  
end;  
$function$;  

注册统计维度函数, 注册时isinit=true. 第一次调用analyze_log后更新为false, 同时更新init_xid, init_xip等.

insert into log_read_func(tablename,func,init_xid,init_xip,isinit,info) values('log','stat_log_c1_week',null,null,true,null);  

pgbench

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -f ./ins.sql -r -n -h $PGDATA -U postgres -T 60 -c 8 -j 2  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 8  
number of threads: 2  
duration: 60 s  
number of transactions actually processed: 365012  
tps = 6083.034829 (including connections establishing)  
tps = 6084.366283 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.066413        begin;  
        0.163332        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
        0.159310        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
        0.086508        end;  
        0.063532        begin;  
        0.163740        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
        0.159271        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
        0.086722        rollback;  
        0.177647        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
        0.177949        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  

压力测试的同时执行analyze_log. 确保pgbench同时执行analyze_log.

pg92@digoal-PowerEdge-R610-> cat analyze.sh   
#!/bin/bash  
for ((i=0;i<100;i++))  
do  
psql -c "select * from analyze_log(1);"  
psql -c "select * from analyze_log(1000000);"  
done  

调用analyze.sh

数据全部取完后

digoal=# select analyze_log(1000000);  
NOTICE:  v_log_read_log_xip no element.  
NOTICE:  no data in log table which xid>493263112 and xid<493263115 and xid not in (select unnest(<NULL>) union all select unnest(<NULL>)).  
 analyze_log   
-------------  
   
(1 row)  
Time: 1.169 ms  
digoal=# select * from log_read;  
 tablename |    xid    | xip |          mod_time            
-----------+-----------+-----+----------------------------  
 log       | 493263112 |     | 2013-04-22 15:51:00.450235  
(1 row)  
Time: 0.566 ms  
digoal=# select * from log_read_func;  
 tablename |       func       | init_xid  |       init_xip        | isinit | info   
-----------+------------------+-----------+-----------------------+--------+------  
 log       | stat_log_c1      |         0 |                       | f      |   
 log       | stat_log_c1_week | 491803059 | {492492724,492492725} | f      |   
(2 rows)  
Time: 0.349 ms  

数据校验

log_c1_cnt_day (取>0 or 包含init_xip)

digoal=# select c1,count(*),to_char(crt_time,'yyyymmdd') from log where xid>0 group by c1,to_char(crt_time,'yyyymmdd') order by c1;  
 c1 | count  | to_char    
----+--------+----------  
  0 | 278990 | 20130422  
  1 | 559150 | 20130422  
  2 | 557552 | 20130422  
  3 | 555797 | 20130422  
  4 | 557475 | 20130422  
  5 | 558780 | 20130422  
  6 | 556803 | 20130422  
  7 | 557337 | 20130422  
  8 | 557065 | 20130422  
  9 | 557328 | 20130422  
 10 | 277775 | 20130422  
(11 rows)  
Time: 15838.259 ms  
digoal=# select * from log_c1_cnt_day where cnt<>0 order by c1;  
 c1 |  cnt   | stat_time   
----+--------+-----------  
  0 | 278990 | 20130422  
  1 | 559150 | 20130422  
  2 | 557552 | 20130422  
  3 | 555797 | 20130422  
  4 | 557475 | 20130422  
  5 | 558780 | 20130422  
  6 | 556803 | 20130422  
  7 | 557337 | 20130422  
  8 | 557065 | 20130422  
  9 | 557328 | 20130422  
 10 | 277775 | 20130422  
(11 rows)  
Time: 0.668 ms  

log_c1_cnt_week (取>491803059 or 包含init_xip)

digoal=# select c1,count(*),to_char(date(crt_time)-(EXTRACT(ISODOW FROM date(crt_time)))::int+1,'yyyymmdd') from log where xid>491803059 or xid in (492492724,492492725) group by c1,to_char(date(crt_time)-(EXTRACT(ISODOW FROM date(crt_time)))::int+1,'yyyymmdd') order by c1;  
 c1 | count  | to_char    
----+--------+----------  
  0 |  72929 | 20130422  
  1 | 146351 | 20130422  
  2 | 145705 | 20130422  
  3 | 146001 | 20130422  
  4 | 145831 | 20130422  
  5 | 146610 | 20130422  
  6 | 145946 | 20130422  
  7 | 146209 | 20130422  
  8 | 145556 | 20130422  
  9 | 146188 | 20130422  
 10 |  72722 | 20130422  
(11 rows)  
Time: 5415.514 ms  
digoal=# select * from log_c1_cnt_week where cnt<>0 order by c1;  
 c1 |  cnt   | stat_time   
----+--------+-----------  
  0 |  72929 | 20130422  
  1 | 146351 | 20130422  
  2 | 145705 | 20130422  
  3 | 146001 | 20130422  
  4 | 145831 | 20130422  
  5 | 146610 | 20130422  
  6 | 145946 | 20130422  
  7 | 146209 | 20130422  
  8 | 145556 | 20130422  
  9 | 146188 | 20130422  
 10 |  72722 | 20130422  
(11 rows)  
Time: 0.658 ms  

历史数据分析

digoal=# select * from log_read_func;  
 tablename |       func       | init_xid  |       init_xip        | isinit | info   
-----------+------------------+-----------+-----------------------+--------+------  
 log       | stat_log_c1      |         0 |                       | f      |   
 log       | stat_log_c1_week | 491803059 | {492492724,492492725} | f      |   
(2 rows)  
Time: 0.349 ms  

stat_log_c1 历史数据分析条件(xid<=0)

stat_log_c1_week 历史数据分析条件(xid<=491803059 and xid not in (492492724,492492725))

特别注意

由于本例采用了PostgreSQL系统xid来解决气泡问题, 所以特别需要注意以下问题 :

xid的问题, 当使用pg_resetxlog修改xid时(如果xid改小)将打破使用该方法的统计. 所以安全的做法是xid改大可以, 改小不行.

当使用pg_dump导出明细数据到另一个库后, 记得先使用pg_resetxlog将新集群的xid调整到大于明细表的max(xid)

为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :

1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/

5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/

6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/

7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/

8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/

Flag Counter

digoal’s 大量PostgreSQL文章入口