PostgreSQL 流式数据处理(聚合、过滤、转换…)系列 - 4

14 minute read

背景

2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。

逐步优化,化繁为简。

在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。

比如以COUNT就是一个很典型的例子。

在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.

9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.

但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.

到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)

另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。

那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.

正文

前面三篇blog针对PostgreSQL的coung(*)如何优化做了比较详细的分析和测试, 如下 :

http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

但是都是实时的方式进行的优化, 特别是第三篇, 对于按列进行统计分析的场景, 因为统计维度较多, 实时的更新count(*)会带来较大的插入性能瓶颈. 例如有8个维度表的时候, 单步插入的性能会下降到insert = 6500 qps左右.

这个时候, 你可能需要非实时批量统计. 也就是异步的更新维度表的统计数据.

例如每100条数据更新一次, 或者每1秒更新一次. 来减少维度表的更新频率. 降低数据库, 但是统计数据是非实时的, 这点必须清楚.

如果需要查询实时的count还需要查询未计入统计表的明细表数据.

异步处理会遇到以下问题

1. 气泡问题, (有解决方法,不要使用界限,使用order by limit,或者直接使用limit).

例如这样,是不是很帅呢(本文未涉及)? (with tmp as (delete from tbl limit 10000 returning *) insert into xxx select x from tmp group by x; )

自增长字段作为分段统计的分隔字段安全吗?

如果明细表的插入是当线程的, 回答是安全的. 但是如果明细表是并行插入的, 那么就不安全了.

举个例子(这里以时间为分隔字段) :

t1:2013-03-01 11:01:00 SESSION A :   
begin;  
insert into log(id,info,crt_time) values (1,'test1',clock_timestamp());  -- 假设clock_timestamp()='2013-03-01 11:01:00'  
  
t2:2013-03-01 11:01:02 SESSION B :   
begin;  
insert into log(id,info,crt_time) values (2,'test2',clock_timestamp());  -- 假设clock_timestamp()='2013-03-01 11:01:02'  
commit;  
  
t3:2013-03-01 11:01:03 统计操作1 : (由于此时a未提交,所以取不到a的数据)  
select * from log where crt_time >='2013-03-01 11:01:00' and crt_time<'2013-03-01 11:01:02';  
然后对这批数据进行统计分析, 合并统计数据.  
  
t4:2013-03-01 11:01:04 SESSION A :   
commit;  
  
t5:2013-03-01 11:01:05 统计操作2 :  
select * from log where crt_time >='2013-03-01 11:01:02' and crt_time<'?';  
  
此时进行统计显然会漏掉SESSION A插入明细表的数据.  
使用序列同样会有这种问题.  

要避免以上问题, 统计数据分段截止改一个条件即可, 以上是截至到明细数据的最大值. 改为截至小于正在操作明细表的所有未提交的事务.

需要结合事务ID来操作, 因为事务id是自增长分配的同时又具备了mvcc的含义. 用来做分隔字段是可以的.

方法一

分析小于txid_snapshot_xmin的记录, 因为小于xmin的事务都已经提交或者回滚了. 可以规避气泡问题.

使用这种方法要注意长事务的问题, 长事务可能会带来巨大的分析延迟. (因为xmin可能远小于明细表当前最大的已提交事务号.)

使用这种方法以xmin为过滤条件时, 还要注意分辨锁级别, 只有insert, delete, alter, truncate, drop的锁需要过滤, 其他的不应该过滤. 以便减少延迟.

方法二

由于方法1的缺陷, 当数据库中存在较长事务时, 这种分析可能存在大的延迟.

所以采用另一种方法, 分析小于txid_snapshot_xmax的明细记录, 同时记录下txid_snapshot_xip() 或者txid_current_snapshot()的xip值. 这里的xip包含了未提交或未回滚的prepare transaction xid. 所以就不需要再去查找pg_prepared_xacts了.

为了得到一致的xmin,xmax,xip值, 最好都从txid_current_snapshot()取.

具体的流程如下 :

begin;  
select agg(txid_current_snapshot)  into v_xmin, v_xmax, v_xip;  
-- 处理 select * from log where txid < v_xmax and txid>=log_read_last_txid;  
-- 处理 select * from log_del where txid < v_xmax and txid>=log_read_last_txid;  
-- 处理 select * from log where txid in log_xip and not in v_xip;  
-- 处理 select * from log_del where txid in log_xip and not in v_xip;  
-- 记录处理截止点log_read_last_txid;  
-- 更新log_xip  
end;  

本例将采用方法二的解决办法.

2. 并行处理的问题, 如何保证并行安全, 高效.

本例不涉及并行处理, 所以等下一篇BLOG再来解决这个问题. 感兴趣的朋友可以关注一下.

3. 如何减少取数次数(扫描次数).

多个统计维度使用同一份数据. 这个本例已经解决.

4. 明细表delete带来的问题. 可能造成被删除的数据无法被统计.

异步处理的解决办法

1. 加一个字段用来标识(记录是否删除). 应用程序查询数据是需要过滤删除数据.

2. 增加del明细表.

本例已经解决.

详细的实施过程

测试表 :

create table log   
(  
  id serial primary key,   
  xid int8 default txid_current() not null,   
  isdel boolean default false not null,   
  c1 int not null,   
  c2 int not null,   
  c3 int not null,   
  c4 text not null,   
  crt_time timestamp default now()  
);  
create index idx_log_1 on log(xid);  

存放count()的表, 假设经常需要按log.c1以及log.crt_time分天, 周, 月, 年进行count()

create table log_c1_cnt_day (c1 int, cnt int8, stat_time text, primary key(c1,stat_time));  
create table log_c1_cnt_week (c1 int, cnt int8, stat_time text, primary key(c1,stat_time));  
create table log_c1_cnt_month (c1 int, cnt int8, stat_time text, primary key(c1,stat_time));  
create table log_c1_cnt_year (c1 int, cnt int8, stat_time text, primary key(c1,stat_time));  

存放count()的表, 假设经常需要按log.c2, log.c3以及log.crt_time分天, 周, 月, 年进行count()

create table log_c2_c3_cnt_day (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time));  
create table log_c2_c3_cnt_week (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time));  
create table log_c2_c3_cnt_month (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time));  
create table log_c2_c3_cnt_year (c2 int, c3 int, cnt int8, stat_time text, primary key(c2,c3,stat_time));  

存放删除记录的表,

create table log_del (xid int8 default txid_current() not null, log_rec log not null);  
create index idx_log_del_1 on log_del(xid);  

创建删除触发器函数, 更新log.isdel标记, 不删除log表. 同时插入log_del.

create or replace function tg_log_del() returns trigger as $$  
declare  
begin  
  -- 避免重复删除  
  if not OLD.isdel then  
    update log set isdel=true where id=OLD.id;  
    insert into log_del(log_rec) values (OLD);  
    return null;  
  else  
    -- 如果已经删除, 则直接返回空, 不处理.  
    return null;  
  end if;  
end;  
$$ language plpgsql;  

在log上创建before删除触发器, 注意是before. 必须的.

create trigger tg1 before delete on log for each row execute procedure tg_log_del();  

插入测试数据

insert into log (c1,c2,c3,c4) values (1,1,1,1);  
insert into log (c1,c2,c3,c4) values (2,2,2,2);  

验证

digoal=# select * from log;  
 id |    xid    | isdel | c1 | c2 | c3 | c4 |          crt_time            
----+-----------+-------+----+----+----+----+----------------------------  
  2 | 444403112 | f     |  2 |  2 |  2 | 2  | 2013-04-19 11:47:17.140624  
  1 | 444403111 | f     |  1 |  1 |  1 | 1  | 2013-04-19 11:47:16.778672  
(2 rows)  

删除验证

digoal=# delete from log where id=1;  
DELETE 0  
digoal=# select * from log_del;  
    xid    |                       log_rec                          
-----------+------------------------------------------------------  
 444403113 | (1,444403111,f,1,1,1,1,"2013-04-19 11:47:16.778672")  
(1 row)  
digoal=# select * from log;  
 id |    xid    | isdel | c1 | c2 | c3 | c4 |          crt_time            
----+-----------+-------+----+----+----+----+----------------------------  
  2 | 444403112 | f     |  2 |  2 |  2 | 2  | 2013-04-19 11:47:17.140624  
  1 | 444403111 | t     |  1 |  1 |  1 | 1  | 2013-04-19 11:47:16.778672  
(2 rows)  
digoal=# select xid,(log_rec::log).* from log_del ;  
    xid    | id |    xid    | isdel | c1 | c2 | c3 | c4 |          crt_time            
-----------+----+-----------+-------+----+----+----+----+----------------------------  
 444403113 |  1 | 444403111 | f     |  1 |  1 |  1 | 1  | 2013-04-19 11:47:16.778672  
(1 row)  

创建分析维度注册表, 记录每个明细表每次分析的截止xid, xip. (未来可以精细化, 每个统计维度一条记录. 增加dime 字段. tablename+dime组合pk)

xid 记录统计到哪个xid了, xip记录当前活动事务, 不计入当前统计范畴. 避免气泡问题.

create table log_read   
(  
tablename name not null,   
xid int8 not null,   
xip int8[],   
xip_res int8[],  -- 用于与xid比对的数据. 必须保留所有>=xid的xip信息.  
mod_time timestamp,   
primary key (tablename)  
);  
  
insert into log_read values ('log', 0, null, null, now());  
insert into log_read values ('log_del', 0, null, null, now());  

创建串行批量数据分析函数

注意xid边界的选取. 如果单事务插入的语句过多, 可能造成内存溢出.

生产环境中也尽量避免单事务过大, 控制在10万条以内一个事务比较好.

create or replace function analyze_log(v_limit int) returns void as $$  
declare  
  v_advisory_xact_lock int8 := null;  -- 串行处理锁.  
    
  v_xid_snap txid_snapshot := null;  -- 当前事务状态快照  
  v_xmin int8 := null;  -- 当前事务状态快照中未完成的最小事务  
  v_xmax int8 := null;  -- 当前事务状态快照中未分配的最小事务  
  v_xip int8[] := null;  -- 当前事务状态快照中未完成的事务数组  
  
  v_log_read_log_xid int8 := null;  -- 上次log的xid分析截止位  
  v_log_read_log_del_xid int8 := null;  -- 上次log_del的xid分析截止位  
  v_log_read_log_xid_update int8 := null;  -- 更新值, 不能为空  
  v_log_read_log_del_xid_update int8 := null;  -- 更新值, 不能为空  
  
  v_log_read_log_xip int8[] := null;  -- 上次log_read.xip(tablename=log)  
  v_log_read_log_xip_do int8[] := null;  -- 解析本次log_read.xip(tablename=log) where (xip !@ txid_snapshot)  
  v_log_read_log_xip_update int8[] := null;  -- xip更新值  
  v_log_read_log_xip_res int8[] := null;  -- xip保留值  
  v_log_read_log_xip_res_update int8[] := null;  -- xip保留更新值, 所有大于v_log_read_log_xid_update的元素必须保留.  
  
  v_log_read_log_del_xip int8[] := null;  -- 上次log_read.xip(tablename=log_del)  
  v_log_read_log_del_xip_do int8[] := null;  -- 解析本次log_read.xip(tablename=log_del) where (xip !@ txid_snapshot)  
  v_log_read_log_del_xip_update int8[] := null;  -- xip更新值  
  v_log_read_log_del_xip_res int8[] := null;  -- xip保留值  
  v_log_read_log_del_xip_res_update int8[] := null;  -- xip保留更新值, 所有大于v_log_read_log_del_xid_update的元素必须保留.  
  
  v_log log[] := null;  -- 聚合本次log的分析数组, [末尾调用,false]  
  v_log_doxip log[] := null;  -- 聚合本次分析log数组:   
                          -- where log.xid (@ log_read.xip(tablename=log) and !@ txid_snapshot) , [末尾调用,false]  
  
  v_log_del_log_rec log[] := null;  -- 聚合本次分析log_del.log_rec数组:   
                            -- where log_del.xid ( > log_read.xid(tablename=log_del) ) order by log_del.xid ), [末尾调用,true]  
  v_log_del_log_rec_doxip log[] := null;  -- 聚合本次分析log_del.log_rec数组:   
                                  -- where log_del.xid (@ log_read.xip(tablename=log_del) and !@ txid_snapshot) , [末尾调用,true]  
begin  
  -- 判断limit  
  if v_limit <=0 then  
    raise notice 'please ensure v_limit > 0 .';  
    return;  
  end if;  
  
  -- 串行处理, 如果不能获得锁则直接退出. 确保v_advisory_xact_lock全局唯一.  
  v_advisory_xact_lock := 1;  
  if not pg_try_advisory_xact_lock(v_advisory_xact_lock) then  
    raise notice 'Another function is calling, this call will exit.';  
    return;  
  end if;  
  
  -- 生成 xid snapshot 数据.  
  v_xid_snap := txid_current_snapshot();  
  v_xmin := txid_snapshot_xmin(v_xid_snap);  
  v_xmax := txid_snapshot_xmax(v_xid_snap);  
  select array_agg(t) into v_xip from txid_snapshot_xip(v_xid_snap) g(t);  
  
  -- 取v_log_read_log_xid截止值, v_log_read_log_xip数组.  
  select xid,xip,xip_res into v_log_read_log_xid,v_log_read_log_xip,v_log_read_log_xip_res from log_read where tablename='log';  
  if not found then  
    raise notice 'log_read no log entry. please add it in log_read table first.';  
    return;  
  end if;  
  
  -- 取v_log_read_log_del_xid截止值, v_log_read_log_del_xip数组.  
  select xid,xip,xip_res into v_log_read_log_del_xid,v_log_read_log_del_xip,v_log_read_log_del_xip_res from log_read where tablename='log_del';  
  if not found then  
    raise notice 'log_read no log_del entry. please add it in log_read table first.';  
    return;  
  end if;  
  
  -- 取log1(取非xip中的数据, 隔离log2操作)  
  -- 取xid临界点  
  select max(xid) into v_log_read_log_xid_update from (select xid from log where xid > v_log_read_log_xid and xid < v_xmax and xid not in (select * from unnest(v_xip) union all select * from unnest(v_log_read_log_xip_res)) order by xid limit v_limit) t;  
  if v_log_read_log_xid_update is not null then  
    raise notice '取log1';  
    -- 根据临界点,取log数据  
    select array_agg(log) into v_log from (select log from log where xid > v_log_read_log_xid and xid<=v_log_read_log_xid_update and xid not in (select * from unnest(v_xip) union all select * from unnest(v_log_read_log_xip_res)) order by xid) t;  
  else   
    -- 如果没有数据, 更新值不变  
    v_log_read_log_xid_update := v_log_read_log_xid;  
  end if;  
  
  -- 取log_del1(取非xip中的数据, 隔离log_del2操作)  
  -- 取xid临界点  
  select max(xid) into v_log_read_log_del_xid_update from (select xid from log_del where xid > v_log_read_log_del_xid and xid < v_xmax and xid not in (select * from unnest(v_xip) union all select * from unnest(v_log_read_log_del_xip_res)) order by xid limit v_limit) t;  
  if v_log_read_log_del_xid_update is not null then  
    raise notice '取log_del1';  
    -- 根据临界点,取log_del.log_rec数据  
    select array_agg(log_rec) into v_log_del_log_rec from (select (log_del).log_rec as log_rec from log_del where xid > v_log_read_log_del_xid and xid<=v_log_read_log_del_xid_update and xid not in (select * from unnest(v_xip) union all select * from unnest(v_log_read_log_del_xip_res)) order by xid) t;  
  else   
    -- 如果没有数据, 更新值不变  
    v_log_read_log_del_xid_update := v_log_read_log_del_xid;  
  end if;  
  
  -- 取log2 (log_xip - v_xip) (取xip中的数据, 隔离log1操作)  
  -- 生成log_read.xip(tablename=log) do数组(已经完成的事务)  
  select array_agg(i) into v_log_read_log_xip_do from (select * from unnest(v_log_read_log_xip) i except select * from unnest(v_xip))t where i is not null;  
  -- 生成log_read.xip(tablename=log) update数组(未完成的事务)  
  select array_agg(i) into v_log_read_log_xip_update from   
  (  select i from (select * from unnest(v_log_read_log_xip) i union all select * from unnest(v_xip)  
     except select * from unnest(v_log_read_log_xip_do)) t where i is not null group by i ) t;  
  -- 生成xip_res更新值  
  select array_agg(i) into v_log_read_log_xip_res_update from (select * from unnest(v_log_read_log_xip_res) i union select * from unnest(v_log_read_log_xip) union select * from unnest(v_xip))t where i>v_log_read_log_xid_update;  
  -- 生成log do数组  
  select array_agg(log) into v_log_doxip from log where xid in (select * from unnest(v_log_read_log_xip_do));  
  
  -- 取log_del2 (log_xip - v_xip) (取xip中的数据, 隔离log_del1操作)  
  -- 生成log_read.xip(tablename=log_del) do数组(已经完成的事务)  
  select array_agg(i) into v_log_read_log_del_xip_do from (select * from unnest(v_log_read_log_del_xip) i except select * from unnest(v_xip))t where i is not null;  
  -- 生成log_read.xip(tablename=log_del) update数组(未完成的事务)  
  select array_agg(i) into v_log_read_log_del_xip_update from   
  (  select i from (select * from unnest(v_log_read_log_del_xip) i union all select * from unnest(v_xip)  
     except select * from unnest(v_log_read_log_del_xip_do)) t where i is not null group by i ) t;  
  -- 生成xip_res更新值  
  select array_agg(i) into v_log_read_log_del_xip_res_update from (select * from unnest(v_log_read_log_del_xip_res) i union select * from unnest(v_log_read_log_del_xip) union select * from unnest(v_xip)) t where i>v_log_read_log_del_xid_update;  
  -- 生成log_del.log_rec do数组  
  select array_agg(log_rec) into v_log_del_log_rec_doxip from log_del where xid in (select * from unnest(v_log_read_log_del_xip_do));  
  
  -- 更新log_read(tablename=log)  
  update log_read set   
    xip=v_log_read_log_xip_update,   
    xid=v_log_read_log_xid_update,   
    xip_res=v_log_read_log_xip_res_update,  
    mod_time=now()   
  where tablename='log';  
  -- DEBUG  
  -- raise notice 'log_read.oldxip(log): %.', v_log_read_log_xip;  
  -- raise notice 'log_read.newxip(log): %.', v_log_read_log_xip_update;  
  -- raise notice 'log_read.newxipres(log): %.', v_log_read_log_xip_res_update;  
  
  -- 更新log_read(tablename=log_del)  
  update log_read set   
    xip=v_log_read_log_del_xip_update,   
    xid=v_log_read_log_del_xid_update,   
    xip_res=v_log_read_log_del_xip_res_update,  
    mod_time=now()  
  where tablename='log_del';  
  -- DEBUG  
  -- raise notice 'log_read.oldxip(log_del): %.', v_log_read_log_del_xip;  
  -- raise notice 'log_read.newxip(log_del): %.', v_log_read_log_del_xip_update;  
  -- raise notice 'log_read.newxipres(log_del): %.', v_log_read_log_del_xip_res_update;  
  
  -- 分析函数可以另外写, 在此调用.  
  perform stat_log_c1(v_log, false);  
  perform stat_log_c1(v_log_doxip, false);  
  perform stat_log_c1(v_log_del_log_rec, true);  
  perform stat_log_c1(v_log_del_log_rec_doxip, true);  
  
return;  
end;  
$$ language plpgsql;  

统计函数stat_log_c1

CREATE OR REPLACE FUNCTION public.stat_log_c1(v_log log[], isdel boolean DEFAULT false)  
 RETURNS void  
 LANGUAGE plpgsql  
AS $function$  
declare  
  v_stat_time text;  
  v_c1 int;  
  v_cnt int8;  
begin  
  -- 统计log_c1_cnt_day  
  for v_stat_time, v_c1, v_cnt in select to_char(crt_time, 'yyyymmdd'), c1 , count(*) from (select ((unnest(v_log)::log)).*) t group by to_char(crt_time, 'yyyymmdd'), c1 loop  
    perform 1 from log_c1_cnt_day where c1=v_c1 and stat_time=v_stat_time;  
    if not found then  
      if isdel then  
        insert into log_c1_cnt_day(c1, cnt, stat_time) values (v_c1, -v_cnt, v_stat_time);  
      else  
        insert into log_c1_cnt_day(c1, cnt, stat_time) values (v_c1, v_cnt, v_stat_time);  
      end if;  
    else  
      if isdel then  
        update log_c1_cnt_day set cnt=cnt-v_cnt where c1=v_c1 and stat_time=v_stat_time;  
      else  
        update log_c1_cnt_day set cnt=cnt+v_cnt where c1=v_c1 and stat_time=v_stat_time;  
      end if;  
    end if;  
  end loop;  
  -- 统计log_c1_cnt_week , .... 略  
end;  
$function$;  

测试, 清理原始数据

truncate log;  
truncate log_del;  
truncate log_c1_cnt_day;  
update log_read set xid=0, xip=null, xip_res=null;  

pgbench脚本, 测试插入场景

cat ins.sql   
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  

pgbench

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -f ./ins.sql -r -n -h $PGDATA -U postgres -T 60 -c 8 -j 2  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 8  
number of threads: 2  
duration: 60 s  
number of transactions actually processed: 2679540  
tps = 44658.871978 (including connections establishing)  
tps = 44668.857300 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.177730        insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  

压力测试的同时执行analyze_log. 确保pgbench同时执行analyze_log.

pg92@digoal-PowerEdge-R610-> cat analyze.sh   
#!/bin/bash  
for ((i=0;i<100;i++))  
do  
psql -c "select * from analyze_log(1);"  
psql -c "select * from analyze_log(1000000);"  
done  

验证数据是否准确

digoal=# select c1,count(*),to_char(crt_time,'yyyymmdd') from log where not isdel group by c1,to_char(crt_time,'yyyymmdd') order by c1;  
 c1 |  count  | to_char    
----+---------+----------  
  0 |  643764 | 20130426  
  1 | 1291330 | 20130426  
  2 | 1289282 | 20130426  
  3 | 1289503 | 20130426  
  4 | 1290164 | 20130426  
  5 | 1290816 | 20130426  
  6 | 1290268 | 20130426  
  7 | 1288713 | 20130426  
  8 | 1289126 | 20130426  
  9 | 1288201 | 20130426  
 10 |  645811 | 20130426  
(11 rows)  
  
digoal=# select * from log_c1_cnt_day where cnt<>0 order by c1;  
 c1 |   cnt   | stat_time   
----+---------+-----------  
  0 |  643764 | 20130426  
  1 | 1291330 | 20130426  
  2 | 1289282 | 20130426  
  3 | 1289503 | 20130426  
  4 | 1290164 | 20130426  
  5 | 1290816 | 20130426  
  6 | 1290268 | 20130426  
  7 | 1288713 | 20130426  
  8 | 1289126 | 20130426  
  9 | 1288201 | 20130426  
 10 |  645811 | 20130426  
(11 rows)  

测试insert, delete混合场景

postgres=# select min(id),max(id) from log;  
    min    |    max      
-----------+-----------  
 109027255 | 121036868  
(1 row)  

pgbench脚本, 测试包含delete, rollback的场景, 同时测试单事务包含多条SQL的场景.

pg92@digoal-PowerEdge-R610-> cat ins.sql   
\setrandom id 109027255 131036868  
begin;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
end;  
begin;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
rollback;  
begin;  
delete from log where id=:id;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
delete from log where id=:id;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
delete from log where id=:id;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
delete from log where id=:id;  
end;  
begin;  
delete from log where id=:id;  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
insert into log (c1,c2,c3,c4) values(round(random()*10),1,2,3);  
delete from log where id=:id;  
delete from log where id=:id;  
delete from log where id=:id;  
delete from log where id=:id;  
delete from log where id=:id;  
delete from log where id=:id;  
rollback;  

pgbench

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -f ./ins.sql -r -n -h $PGDATA -U postgres -T 60 -c 32 -j 2  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 32  
number of threads: 2  
duration: 60 s  
number of transactions actually processed: 22635  
tps = 376.649815 (including connections establishing)  
tps = 376.980475 (excluding connections establishing)  
.....语句略  

pgbench过程中同时调用analyze_log.

pg92@digoal-PowerEdge-R610-> cat analyze.sh   
#!/bin/bash  
for ((i=0;i<100;i++))  
do  
psql -c "select * from analyze_log(1);"  
psql -c "select * from analyze_log(1000000);"  
done  
  
  
# 多次调用, 直到取完所有数据.  
  
pg92@digoal-PowerEdge-R610-> ./analyze.sh  

验证数据是否准确

digoal=# select c1,count(*),to_char(crt_time,'yyyymmdd') from log where not isdel group by c1,to_char(crt_time,'yyyymmdd') order by c1;  
 c1 |  count  | to_char    
----+---------+----------  
  0 |  960617 | 20130426  
  1 | 1926373 | 20130426  
  2 | 1924853 | 20130426  
  3 | 1924954 | 20130426  
  4 | 1923913 | 20130426  
  5 | 1924408 | 20130426  
  6 | 1924650 | 20130426  
  7 | 1924305 | 20130426  
  8 | 1923113 | 20130426  
  9 | 1924381 | 20130426  
 10 |  962460 | 20130426  
(11 rows)  
  
digoal=# select * from log_c1_cnt_day where cnt<>0 order by c1;  
 c1 |   cnt   | stat_time   
----+---------+-----------  
  0 |  960617 | 20130426  
  1 | 1926373 | 20130426  
  2 | 1924853 | 20130426  
  3 | 1924954 | 20130426  
  4 | 1923913 | 20130426  
  5 | 1924408 | 20130426  
  6 | 1924650 | 20130426  
  7 | 1924305 | 20130426  
  8 | 1923113 | 20130426  
  9 | 1924381 | 20130426  
 10 |  962460 | 20130426  
(11 rows)  
  
digoal=# select count(*) from log_del ;  
 count   
-------  
 46085  
(1 row)  

insert, delete的问题.

1. 可能出现insert未被统计到, 但是delete被统计到的情况.

解决办法 :

log加个isdel字段, del时不真实的删除记录. 这样就可以避免insert未被统计到, 但是delete被统计到的情况.

应用程序在对log表查询时加上isdel is false条件.

2. log_del表的清理, 以及log.isdel=true清理. 使用如下在线过程.

   do language plpgsql $$  
   declare  
     v_xid int8;  
     v_xip int8[];  
   begin  
     select xid,xip into v_xid,v_xip from log_read where tablename='log';  
     if found then  
       delete from log where isdel and xid<=v_xid and xid not in (select unnest(v_xip));  
     end if;  
     select xid,xip into v_xid,v_xip from log_read where tablename='log_del';  
     if found then  
       delete from log_del where xid<=v_xid and xid not in (select unnest(v_xip));  
     end if;  
   end;  
   $$;  

特别注意

由于本例采用了PostgreSQL系统xid来解决气泡问题, 所以特别需要注意以下问题 :

xid的问题, 当使用pg_resetxlog修改xid时(如果xid改小)将打破使用该方法的统计. 所以安全的做法是xid改大可以, 改小不行.

当使用pg_dump导出明细数据到另一个库后, 记得先使用pg_resetxlog将新集群的xid调整到大于明细表的max(xid)

为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :

1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/

5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/

6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/

7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/

8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/

Flag Counter

digoal’s 大量PostgreSQL文章入口