PostgreSQL 流式数据处理(聚合、过滤、转换…)系列 - 1

20 minute read

背景

2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。

逐步优化,化繁为简。

在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。

比如以COUNT就是一个很典型的例子。

在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.

9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.

但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.

到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)

另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。

那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.

正文

方法1, 比如给表建立几个触发器, 每次插入, 删除, truncate表时触发, 将表的记录数更新到一个记录表中.

但是问题也很多,比如: 触发器是串行的,如果统计维度多,会导致数据操作的RT变高。 另外,并发的插入和删除操作, 如果仅仅使用1条记录来存储表的count(*)值的话, 会有严重的锁冲突的问题.

例如两个session, 同时插入1条记录, 在触发触发器时, 由于都要更新count表的同一条记录, 那么会发生行锁等待.

因此, 可以使用多条记录来缓解行锁冲突的问题, 如下 :

一、插入测试

1. 创建测试表, a, 假设要经常count(*) from a.

pg92@digoal-PowerEdge-R610-> psql  
psql (9.2.4)  
Type "help" for help.  
postgres=# drop table a;  
DROP TABLE  
postgres=# create table a(id serial4 primary key, info text, crt_time timestamp(0) default now());  
NOTICE:  CREATE TABLE will create implicit sequence "a_id_seq" for serial column "a.id"  
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "a_pkey" for table "a"  
CREATE TABLE  

2. 创建记录a表记录数的表

postgres=# create table cnt_a(id int primary key, cnt int);  
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "cnt_a_pkey" for table "cnt_a"  
CREATE TABLE  

为了缓解行锁冲突, 这里使用了1001条记录来存储count(*) from a的值.

在计算count(*) a时, 使用sum(cnt) from cnt_a就可以了. 因此只需要扫描1001行.

后面会看到当a表的记录数越多, 性能提升约明显.

postgres=# insert into cnt_a select generate_series(0,1000),0;  
INSERT 0 1001  

3. 创建插入触发器函数

CREATE OR REPLACE FUNCTION public.tg_insert_a()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
  m_id int;  
  rm numeric;  
begin  
  select max(id),random() into m_id,rm from cnt_a;  
  update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int;  
  return null;  
end;  
$function$;  

4. 创建删除触发器函数

CREATE OR REPLACE FUNCTION public.tg_delete_a()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
  m_id int;  
  rm numeric;  
begin  
  select max(id),random() into m_id,rm from cnt_a;  
  update cnt_a set cnt=cnt-1 where id=(rm*m_id)::int;  
  return null;  
end;  
$function$;  

5. 创建truncate触发器函数

CREATE OR REPLACE FUNCTION public.tg_truncate_a()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
begin  
  update cnt_a set cnt=0 where not cnt=0;  
  return null;  
end;  
$function$;  

6. 创建触发器

create trigger tg1 after insert on a for each row execute procedure tg_insert_a();  
create trigger tg2 after delete on a for each row execute procedure tg_delete_a();  
create trigger tg3 after truncate on a for each statement execute procedure tg_truncate_a();  

7. 创建pgbench 使用的插入脚本

pg92@digoal-PowerEdge-R610-> cat insert.sql   
insert into a (info) values ('test');  

8. pgbench做插入测试

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./insert.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 1831418  
tps = 30514.831839 (including connections establishing)  
tps = 30522.057886 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.522411        insert into a (info) values ('test');  

9. 测试完后通过count(*) 和sum(cnt)比对数据是否一致

postgres=# select count(*) from a;  
  count    
---------  
 1755964  
(1 row)  
Time: 285.491 ms  
postgres=# select sum(cnt) from cnt_a ;  
   sum     
---------  
 1755964  
(1 row)  
Time: 0.689 ms  

性能提升非常明显.

二、删除测试

1. 创建pgbench用于删除a表记录的测试脚本

vi delete.sql  
\setrandom id 1 1000000  
delete from a where id=:id;  

2. 进行测试

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./delete.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 3353233  
tps = 55865.635772 (including connections establishing)  
tps = 55878.855793 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.002594        \setrandom id 1 1000000  
        0.282123        delete from a where id=:id;  

3. 测试完删除操作后, 比对count(*)和sum(cnt)是否一致

postgres=# select count(*) from a;  
  count    
---------  
 9687739  
(1 row)  
Time: 1550.239 ms  
postgres=# select sum(cnt) from cnt_a ;  
   sum     
---------  
 9687739  
(1 row)  
Time: 0.817 ms  

当记录数到达千万级别后, 性能以及提升几千倍了.

三、创建同时进行删除和插入操作的测试脚本

vi id.sql  
\setrandom id 1 20000000  
delete from a where id=:id;  
insert into a (info) values ('test');  

测试

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 1061090  
tps = 17680.045577 (including connections establishing)  
tps = 17684.251890 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.003181        \setrandom id 1 20000000  
        0.381986        delete from a where id=:id;  
        0.516256        insert into a (info) values ('test');  

测试完后比对count(*)和sum(cnt)的结果是否一致

postgres=# select count(*) from a;  
  count     
----------  
 10219555  
(1 row)  
Time: 1648.371 ms  
postgres=# select sum(cnt) from cnt_a ;  
   sum      
----------  
 10219555  
(1 row)  
Time: 1.339 ms  

四、最后要测试的是truncate表.

postgres=# truncate a;  
TRUNCATE TABLE  
Time: 434.581 ms  
postgres=# select count(*) from a;  
 count   
-------  
     0  
(1 row)  
Time: 0.831 ms  
postgres=# select sum(cnt) from cnt_a ;  
 sum   
-----  
   0  
(1 row)  
Time: 1.354 ms  

五、优化

当并行的超过1001时, 或者以及明显感觉到行锁冲突时, 可以通过实时增加cnt_a表的记录来达到缓解行锁冲突的目的.

不需要中断业务, 但是必须注意cnt_a表的id必须连续, 并且cnt的初始值必须为0. 不要出现空档. 否则使用以上触发器函数会出现数据不准确的现象.

例如 :

pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  

在测试的同时添加维度记录

postgres=# insert into cnt_a (id,cnt) select generate_series(1001,2000),0;  
INSERT 0 1000  

测试完后检查是否准确, 测试新增的cnt_a.id是否有计数.

postgres=# select count(*) from a;  
  count    
---------  
 1283144  
(1 row)  
postgres=# select sum(cnt) from cnt_a ;  
   sum     
---------  
 1283144  
(1 row)  
postgres=# select sum(cnt) from cnt_a where id>1000;  
  sum     
--------  
 623957  
(1 row)  

如果要避免不准确的现象, 除了cnt_a.id连续, 还可以在触发器函数中添加一个异常捕获.

CREATE OR REPLACE FUNCTION public.tg_insert_a()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
  m_id int;  
  rm numeric;  
  new_cnt int;  
begin  
  select max(id),random() into m_id,rm from cnt_a;  
  update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt into new_cnt;  
  if not found or new_cnt is null then   
    raise exception '';  
  end if;  
  return null;  
end;  
$function$;  
  
CREATE OR REPLACE FUNCTION public.tg_delete_a()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
  m_id int;  
  rm numeric;  
  new_cnt int;  
begin  
  select max(id),random() into m_id,rm from cnt_a;  
  update cnt_a set cnt=cnt-1 where id=(rm*m_id)::int returning cnt into new_cnt;  
  if not found or new_cnt is null then   
    raise exception '';  
  end if;  
  return null;  
end;  
$function$;  

测试 :

插入cnt=null的非法值, 看看会不会捕获异常, 看看结果是否正确.

postgres=# insert into cnt_a (id,cnt) select 2001,null;  
INSERT 0 1  

测试pgbench

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
Client 13 aborted in state 2: ERROR:    
Client 6 aborted in state 2: ERROR:    
Client 8 aborted in state 2: ERROR:    
Client 1 aborted in state 2: ERROR:    
Client 0 aborted in state 2: ERROR:    
Client 2 aborted in state 2: ERROR:    
Client 7 aborted in state 2: ERROR:    
Client 11 aborted in state 2: ERROR:    
Client 4 aborted in state 2: ERROR:    
Client 3 aborted in state 2: ERROR:    
Client 9 aborted in state 2: ERROR:    
Client 12 aborted in state 2: ERROR:    
Client 10 aborted in state 2: ERROR:    
Client 14 aborted in state 2: ERROR:    
Client 15 aborted in state 2: ERROR:    
Client 5 aborted in state 2: ERROR:    
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 54704  
tps = 7617.195278 (including connections establishing)  
tps = 7632.604983 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.003084        \setrandom id 1 20000000  
        0.184270        delete from a where id=:id;  
        0.366083        insert into a (info) values ('test');  

结果校验, 加了异常捕获, 所以结果正确.

postgres=# select sum(cnt) from cnt_a;  
   sum     
---------  
 1334221  
(1 row)  
postgres=# select count(*) from a;  
  count    
---------  
 1334221  
(1 row)  

插入不连续的id, 看看是否可以捕获异常, 比对结果是否准确

直接跳过1000条, 导致id不连续. random()*max_id将有可能取到无记录的情况. 所以会出现not found, 捕获这个异常

postgres=# insert into cnt_a (id,cnt) select 3001,null;  
INSERT 0 1  

如下pgbench实际每个连接平均只处理了28条, 看看结果是否正确

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
Client 0 aborted in state 1: ERROR:    
Client 3 aborted in state 2: ERROR:    
Client 13 aborted in state 2: ERROR:    
Client 14 aborted in state 2: ERROR:    
Client 7 aborted in state 2: ERROR:    
Client 2 aborted in state 2: ERROR:    
Client 8 aborted in state 2: ERROR:    
Client 4 aborted in state 2: ERROR:    
Client 5 aborted in state 2: ERROR:    
Client 10 aborted in state 2: ERROR:    
Client 6 aborted in state 1: ERROR:    
Client 1 aborted in state 1: ERROR:    
Client 9 aborted in state 2: ERROR:    
Client 11 aborted in state 2: ERROR:    
Client 15 aborted in state 2: ERROR:    
Client 12 aborted in state 2: ERROR:    
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 28  
tps = 801.167415 (including connections establishing)  
tps = 1372.515380 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.004773        \setrandom id 1 20000000  
        1.731136        delete from a where id=:id;  
        2.530098        insert into a (info) values ('test');  

结果正确

postgres=# select sum(cnt) from cnt_a;  
   sum     
---------  
 1334246  
(1 row)  
postgres=# select count(*) from a;  
  count    
---------  
 1334246  
(1 row)  

优化阶段1

1. 使用这种方法带来来优化count(*), 如果insert和delete本来就不是系统瓶颈的话, 是值得提倡的.

2. random()函数为volatile属性, 所以同一个事务中多次调用时需要多次运算. rm*max_id势必得到不同的id.

postgres=# select provolatile from pg_proc where proname='random';  
 provolatile   
-------------  
 v  
(1 row)  

因此可以想象一下.

2.1. random()多次运算比一次运算的开销大

2.2. 由于每次得到的id不一样, 如果是批量插入的话, 一个事务中将会锁cnt_a表的多行, 这种场景容易产生死锁.

要解决这个问题, 可以尝试使用stable或者immutable随机函数. 那么一个事务中多次调用的话都将得到同一个值, 减少了运算量同时也避免了以上场景中死锁的产生. 实现方法是使用advisory lock, 如下 :

新增pid和lock_time用来记录会话pid和事务启动时间.

postgres=# alter table cnt_a add column pid int;  
ALTER TABLE  
Time: 18.649 ms  
postgres=# alter table cnt_a add column lock_time timestamp;  
ALTER TABLE  
Time: 1.018 ms  
postgres=# \d cnt_a  
                  Table "public.cnt_a"  
  Column   |            Type             |   Modifiers     
-----------+-----------------------------+---------------  
 id        | integer                     | not null  
 cnt       | integer                     |   
 pid       | integer                     |   
 lock_time | timestamp without time zone |   
Indexes:  
    "cnt_a_pkey" PRIMARY KEY, btree (id)  

创建插入触发器函数

CREATE OR REPLACE FUNCTION public.tg_insert_a()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
  m_id int;  
  a_lock boolean;  
  rm numeric;  
  max_id int;  
  new_cnt int;  
begin  
  -- now()为stable, 同一事务结果一致.  
  select id into m_id from cnt_a where pid=pg_backend_pid() and lock_time=now() limit 1;  
  if found then   
    update cnt_a set cnt=cnt+1 where id=m_id returning cnt into new_cnt;  
    if new_cnt is null then   
      raise exception 'cnt_a.cnt is null, please init with zero.';  
    end if;  
    return null;  
  else  
    -- 1 由于read committed, 并发时可能同时抢锁1条记录. 造成不必要的等待.  
    -- 1 select id into m_id from cnt_a where locked=false limit 1 for update;    
    -- 2 使用这种方法可以减轻锁同一记录的压力,但是增加了查询开销.  
    -- 2 select id into m_id from cnt_a where locked=false order by random() limit 1 for update;    
    -- 3 通过55P03捕获异常. 并发明显时, 这种异常会很多.  
    -- 3 select id into m_id from cnt_a where locked=false limit 1 for update nowait;    
    -- 4 以下需要关注高并发的情况下, 得到锁需要遍历的记录条数, 还有优化的空间. (结合mvcc与检索机制)  
    for a_lock,m_id in select pg_try_advisory_xact_lock(id),id from cnt_a loop   
      if a_lock then  
        -- 加锁成功  
	update cnt_a set cnt=cnt+1,pid=pg_backend_pid(),lock_time=now() where id=m_id returning cnt into new_cnt;  
        if new_cnt is null then   
          raise exception 'cnt_a.cnt is null, please init with zero.';  
        end if;  
	return null;  
      end if;  
    end loop;  
    -- 到这里说明遍历所有的cnt_a都没有加锁成功, 原因是都被锁了.  
    -- 那么随机取一条更新进行等待即可  
    select max(id),random() into max_id,rm from cnt_a;  
    update cnt_a set cnt=cnt+1,pid=pg_backend_pid(),lock_time=now() where id=(rm*m_id)::int returning cnt into new_cnt;  
    if not found or new_cnt is null then   
      raise exception 'cnt_a.id:%, cnt_a.cnt:%.', (rm*m_id)::int, new_cnt;  
    end if;  
    return null;  
  end if;  
return null;  
end;  
$function$;  

创建删除触发器函数

CREATE OR REPLACE FUNCTION public.tg_delete_a()  
 RETURNS trigger  
 LANGUAGE plpgsql  
AS $function$  
declare  
  m_id int;  
  a_lock boolean;  
  rm numeric;  
  max_id int;  
  new_cnt int;  
begin  
  -- now()为stable, 同一事务结果一致.  
  select id into m_id from cnt_a where pid=pg_backend_pid() and lock_time=now() limit 1;  
  if found then   
    update cnt_a set cnt=cnt-1 where id=m_id returning cnt into new_cnt;  
    if new_cnt is null then   
      raise exception 'cnt_a.cnt is null, please init with zero.';  
    end if;  
    return null;  
  else  
    -- 1 由于read committed, 并发时可能同时抢锁1条记录. 造成不必要的等待.  
    -- 1 select id into m_id from cnt_a where locked=false limit 1 for update;    
    -- 2 使用这种方法可以减轻锁同一记录的压力,但是增加了查询开销.  
    -- 2 select id into m_id from cnt_a where locked=false order by random() limit 1 for update;    
    -- 3 通过55P03捕获异常. 并发明显时, 这种异常会很多.  
    -- 3 select id into m_id from cnt_a where locked=false limit 1 for update nowait;    
    -- 4 以下需要关注高并发的情况下, 得到锁需要遍历的记录条数, 还有优化的空间. (结合mvcc与检索机制)  
    for a_lock,m_id in select pg_try_advisory_xact_lock(id),id from cnt_a loop   
      if a_lock then  
        -- 加锁成功  
	update cnt_a set cnt=cnt-1,pid=pg_backend_pid(),lock_time=now() where id=m_id returning cnt into new_cnt;  
        if new_cnt is null then   
          raise exception 'cnt_a.cnt is null, please init with zero.';  
        end if;  
	return null;  
      end if;  
    end loop;  
    -- 到这里说明遍历所有的cnt_a都没有加锁成功, 原因是都被锁了.  
    -- 那么随机取一条更新进行等待即可  
    select max(id),random() into max_id,rm from cnt_a;  
    update cnt_a set cnt=cnt-1,pid=pg_backend_pid(),lock_time=now() where id=(rm*m_id)::int returning cnt into new_cnt;  
    if not found or new_cnt is null then   
      raise exception 'cnt_a.id:%, cnt_a.cnt:%.', (rm*m_id)::int, new_cnt;  
    end if;  
    return null;  
  end if;  
return null;  
end;  
$function$;  

以下测试原始场景16个并发以及16条cnt_a记录的单事务多sql的场景, 发生了可以预料到的死锁.

postgres=# truncate a;  
TRUNCATE TABLE  
postgres=# delete from cnt_a ;  
DELETE 2002  
postgres=# insert into cnt_a(id,cnt) select generate_series(0,15),0;  
INSERT 0 16  
pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
Client 2 aborted in state 8: ERROR:  deadlock detected  
DETAIL:  Process 10738 waits for ShareLock on transaction 433211275; blocked by process 10737.  
Process 10737 waits for ShareLock on transaction 433211280; blocked by process 10738.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 3 aborted in state 8: ERROR:  deadlock detected  
DETAIL:  Process 10742 waits for ShareLock on transaction 433211275; blocked by process 10737.  
Process 10737 waits for ExclusiveLock on tuple (0,11) of relation 25592 of database 12044; blocked by process 10740.  
Process 10740 waits for ShareLock on transaction 433211281; blocked by process 10742.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 12 aborted in state 4: ERROR:  deadlock detected  
DETAIL:  Process 10732 waits for ShareLock on transaction 433211286; blocked by process 10740.  
Process 10740 waits for ShareLock on transaction 433211276; blocked by process 10736.  
Process 10736 waits for ExclusiveLock on tuple (0,12) of relation 25592 of database 12044; blocked by process 10734.  
Process 10734 waits for ShareLock on transaction 433211275; blocked by process 10737.  
Process 10737 waits for ExclusiveLock on tuple (0,11) of relation 25592 of database 12044; blocked by process 10732.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 13 aborted in state 11: ERROR:  deadlock detected  
DETAIL:  Process 10736 waits for ExclusiveLock on tuple (0,12) of relation 25592 of database 12044; blocked by process 10734.  
Process 10734 waits for ShareLock on transaction 433211275; blocked by process 10737.  
Process 10737 waits for ShareLock on transaction 433211286; blocked by process 10740.  
Process 10740 waits for ShareLock on transaction 433211276; blocked by process 10736.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 5 aborted in state 12: ERROR:  deadlock detected  
DETAIL:  Process 10737 waits for ShareLock on transaction 433211286; blocked by process 10740.  
Process 10740 waits for ShareLock on transaction 433211272; blocked by process 10731.  
Process 10731 waits for ShareLock on transaction 433211279; blocked by process 10734.  
Process 10734 waits for ShareLock on transaction 433211275; blocked by process 10737.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 1 aborted in state 10: ERROR:  deadlock detected  
DETAIL:  Process 10734 waits for ShareLock on transaction 433211287; blocked by process 10730.  
Process 10730 waits for ShareLock on transaction 433211286; blocked by process 10740.  
Process 10740 waits for ShareLock on transaction 433211272; blocked by process 10731.  
Process 10731 waits for ShareLock on transaction 433211279; blocked by process 10734.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 7 aborted in state 8: ERROR:  deadlock detected  
DETAIL:  Process 10743 waits for ShareLock on transaction 433211288; blocked by process 10744.  
Process 10744 waits for ShareLock on transaction 433211282; blocked by process 10733.  
Process 10733 waits for ExclusiveLock on tuple (0,22) of relation 25592 of database 12044; blocked by process 10730.  
Process 10730 waits for ShareLock on transaction 433211286; blocked by process 10740.  
Process 10740 waits for ShareLock on transaction 433211284; blocked by process 10743.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 14 aborted in state 12: ERROR:  deadlock detected  
DETAIL:  Process 10740 waits for ExclusiveLock on tuple (0,16) of relation 25592 of database 12044; blocked by process 10735.  
Process 10735 waits for ShareLock on transaction 433211274; blocked by process 10739.  
Process 10739 waits for ShareLock on transaction 433211282; blocked by process 10733.  
Process 10733 waits for ExclusiveLock on tuple (0,22) of relation 25592 of database 12044; blocked by process 10730.  
Process 10730 waits for ShareLock on transaction 433211286; blocked by process 10740.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 11 aborted in state 4: ERROR:  deadlock detected  
DETAIL:  Process 10745 waits for ExclusiveLock on tuple (0,3) of relation 25592 of database 12044; blocked by process 10741.  
Process 10741 waits for ShareLock on transaction 433211278; blocked by process 10735.  
Process 10735 waits for ShareLock on transaction 433211274; blocked by process 10739.  
Process 10739 waits for ShareLock on transaction 433211291; blocked by process 10733.  
Process 10733 waits for ShareLock on transaction 433211290; blocked by process 10730.  
Process 10730 waits for ExclusiveLock on tuple (0,3) of relation 25592 of database 12044; blocked by process 10745.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 0 aborted in state 10: ERROR:  deadlock detected  
DETAIL:  Process 10730 waits for ExclusiveLock on tuple (0,3) of relation 25592 of database 12044; blocked by process 10741.  
Process 10741 waits for ShareLock on transaction 433211278; blocked by process 10735.  
Process 10735 waits for ShareLock on transaction 433211274; blocked by process 10739.  
Process 10739 waits for ShareLock on transaction 433211291; blocked by process 10733.  
Process 10733 waits for ShareLock on transaction 433211290; blocked by process 10730.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 8 aborted in state 10: ERROR:  deadlock detected  
DETAIL:  Process 10731 waits for ShareLock on transaction 433211294; blocked by process 10733.  
Process 10733 waits for ExclusiveLock on tuple (0,76) of relation 25592 of database 12044; blocked by process 10744.  
Process 10744 waits for ShareLock on transaction 433211289; blocked by process 10731.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 4 aborted in state 8: ERROR:  deadlock detected  
DETAIL:  Process 10733 waits for ShareLock on transaction 433211293; blocked by process 10744.  
Process 10744 waits for ExclusiveLock on tuple (0,89) of relation 25592 of database 12044; blocked by process 10735.  
Process 10735 waits for ShareLock on transaction 433211294; blocked by process 10733.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 15 aborted in state 4: ERROR:  deadlock detected  
DETAIL:  Process 10744 waits for ShareLock on transaction 433211296; blocked by process 10735.  
Process 10735 waits for ShareLock on transaction 433211298; blocked by process 10739.  
Process 10739 waits for ExclusiveLock on tuple (0,90) of relation 25592 of database 12044; blocked by process 10744.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 9 aborted in state 10: ERROR:  deadlock detected  
DETAIL:  Process 10735 waits for ShareLock on transaction 433211298; blocked by process 10739.  
Process 10739 waits for ShareLock on transaction 433211296; blocked by process 10735.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
Client 6 aborted in state 11: ERROR:  deadlock detected  
DETAIL:  Process 10741 waits for ShareLock on transaction 433211317; blocked by process 10739.  
Process 10739 waits for ShareLock on transaction 433211316; blocked by process 10741.  
HINT:  See server log for query details.  
CONTEXT:  SQL statement "update cnt_a set cnt=cnt+1 where id=(rm*m_id)::int returning cnt"  
PL/pgSQL function tg_insert_a() line 8 at SQL statement  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 23826  
tps = 397.094633 (including connections establishing)  
tps = 397.187975 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.002638        \setrandom id 1 20000000  
        0.063353        begin;  
        0.098917        delete from a where id=:id;  
        0.090903        delete from a where id=:id;  
        1.541656        insert into a (info) values ('test');  
        0.096450        delete from a where id=:id;  
        1.784244        insert into a (info) values ('test');  
        0.095878        delete from a where id=:id;  
        0.899185        insert into a (info) values ('test');  
        0.096219        delete from a where id=:id;  
        0.942108        insert into a (info) values ('test');  
        0.441609        insert into a (info) values ('test');  
        0.482926        insert into a (info) values ('test');  
        0.079380        end;  

以下测试改进函数后的场景16个并发以及16条cnt_a记录的单事务多sql的场景, 避免了死锁, 同上提高了tps.

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 42402  
tps = 706.377762 (including connections establishing)  
tps = 706.544148 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.004023        \setrandom id 1 20000000  
        0.128100        begin;  
        0.376305        delete from a where id=:id;  
        0.149250        delete from a where id=:id;  
        14.473279       insert into a (info) values ('test');  
        0.206936        delete from a where id=:id;  
        1.340881        insert into a (info) values ('test');  
        0.207271        delete from a where id=:id;  
        1.301736        insert into a (info) values ('test');  
        0.209022        delete from a where id=:id;  
        1.294269        insert into a (info) values ('test');  
        1.342260        insert into a (info) values ('test');  
        1.337499        insert into a (info) values ('test');  
        0.250370        end;  
  
  
postgres=# select count(*) from a;  
 count    
--------  
 396719  
(1 row)  
  
postgres=# select sum(cnt) from cnt_a ;  
  sum     
--------  
 396719  
(1 row)  

测试原始场景单事务单sql, 16并发16条cnt_a记录的结果. 比对于改进后的函数tps

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./insert.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 1480488  
tps = 24668.474181 (including connections establishing)  
tps = 24674.365320 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.646597        insert into a (info) values ('test');  

测试改进函数后的场景单事务单sql, 16并发16条cnt_a记录的结果. 比对于改进后的函数tps

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./insert.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 736812  
tps = 12278.457482 (including connections establishing)  
tps = 12281.288634 (excluding connections establishing)  
statement latencies in milliseconds:  
        1.300583        insert into a (info) values ('test');  

测试cnt_a记录足够多的情况下(例如2000条), 测试原始场景单事务单sql, 16并发 :

postgres=# insert into cnt_a(id,cnt) select generate_series(16,1999),0;  
INSERT 0 1984  
pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./insert.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 1722562  
tps = 28705.262293 (including connections establishing)  
tps = 28712.163471 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.555513        insert into a (info) values ('test');  

测试cnt_a记录足够多的情况下(例如2000条), 测试改进函数后的场景单事务单sql, 16并发 :

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./insert.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 482195  
tps = 8034.913678 (including connections establishing)  
tps = 8036.928653 (excluding connections establishing)  
statement latencies in milliseconds:  
        1.988503        insert into a (info) values ('test');  

测试cnt_a记录足够多的情况下(例如2000条), 测试原始场景单事务多sql, 16并发 :

与上面的测试一样出现了大量的死锁

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
Client 0 aborted in state 12: ERROR:  deadlock detected  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 123264  
tps = 2054.315191 (including connections establishing)  
tps = 2054.804565 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.002890        \setrandom id 1 20000000  
        0.055029        begin;  
        0.154473        delete from a where id=:id;  
        0.092312        delete from a where id=:id;  
        0.398831        insert into a (info) values ('test');  
        0.099380        delete from a where id=:id;  
        0.374859        insert into a (info) values ('test');  
        0.099221        delete from a where id=:id;  
        0.400103        insert into a (info) values ('test');  
        0.099028        delete from a where id=:id;  
        0.397862        insert into a (info) values ('test');  
        0.444252        insert into a (info) values ('test');  
        0.460034        insert into a (info) values ('test');  
        0.082733        end;  

测试cnt_a记录足够多的情况下(例如2000条), 测试改进函数后的场景单事务多sql, 16并发 :

pg92@digoal-PowerEdge-R610-> pgbench -M prepared -r -n -f ./id.sql -h $PGDATA -p 1919 -U postgres -T 60 -c 16 -j 4 postgres  
transaction type: Custom query  
scaling factor: 1  
query mode: prepared  
number of clients: 16  
number of threads: 4  
duration: 60 s  
number of transactions actually processed: 178495  
tps = 2974.062219 (including connections establishing)  
tps = 2974.751878 (excluding connections establishing)  
statement latencies in milliseconds:  
        0.003536        \setrandom id 1 20000000  
        0.145519        begin;  
        0.432378        delete from a where id=:id;  
        0.190400        delete from a where id=:id;  
        1.394283        insert into a (info) values ('test');  
        0.250328        delete from a where id=:id;  
        0.443856        insert into a (info) values ('test');  
        0.234544        delete from a where id=:id;  
        0.420465        insert into a (info) values ('test');  
        0.225787        delete from a where id=:id;  
        0.412413        insert into a (info) values ('test');  
        0.436313        insert into a (info) values ('test');  
        0.437742        insert into a (info) values ('test');  
        0.333693        end;  

综合以上测试, 改进后的函数在单事务中只有单条a表dml操作的场景中没有优势, 在事务中处理需要处理多条a记录的情况下有优势.

对于改进函数的二次改进, 见下一篇blog

http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

参考

以前写的几篇优化group by和count(distinct column)的文章, 有兴趣的朋友也可以参考一下

1. http://blog.163.com/digoal@126/blog/static/16387704020129851138327/

2. http://blog.163.com/digoal@126/blog/static/16387704020128142829610/

关于函数的稳定性 :

1. http://blog.163.com/digoal@126/blog/static/163877040201211241434248/

2. http://blog.163.com/digoal@126/blog/static/163877040201151011105494/

随机查询优化 :

1. http://blog.163.com/digoal@126/blog/static/163877040201111292628555/

advisory locks, 应用程序锁 :

1. http://blog.163.com/digoal@126/blog/static/163877040201172492217830/

为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :

1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/

2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/

3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/

4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/

5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/

6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/

7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/

8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/

Flag Counter

digoal’s 大量PostgreSQL文章入口