PostgreSQL 无会话、有会话模式 - 客服平均响应速度(RT)实时计算实践(窗口查询流计算)

19 minute read

背景

通常客服系统可能存在一对多,多对多的情况。

例如,

我们在使用淘宝时,与店家交流时,你根本不知道后面的小二是一个人还是多个人共用一个账号,还有可能是多个人使用了多个账号但是对消费者只看到一个。

例如:

小二(n)账号 -> 统一对外账号 -> 消费者

还有的情况是一个小二为多个消费者服务:

小二账号 -> 统一对外账号 -> 消费者(n)

小二重要的KPI之一是响应速度,因为这直接反应到消费者的感受上。如果消费者一个问题,很久没人回复,可能就直接关闭页面,更换其他商家了。

那么如何统计响应速度呢?

通常来说,需要从消费者维度看待响应速度,因为一个问题可能被多个小二回复,也可能被1个小二回复,这种情况下,应该统计第一反馈时间作为响应时间。

另一方面,如果系统没有会话机制的话,统计起来会比较麻烦。(并且,一个真实的会话里面的若干次交互,可能统计时会被抽象成若干的“虚拟会话”)

我们来看个例子。

1 无会话模式的响应速度统计

假设数据以TS字段顺序到达为前提(通常这种场景,按TS到达的可能性较大,或者你可以使用clock_timestamp()来作为这个时间,可能性就更大了。),后面会讲如果不这样有什么问题,以及解决方案。

无会话模式,适合于客户发起消息后,后台任意分配一个客服给他(或者分配一个客服池子给他),第一时间响应他的可以是任意客服。

1、客服、客户交谈表(只展示重要字段)

create table tbl (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    

2、客服的平均响应时间

一个客户的最早发言时间,下一时刻任意客服最早回复这位客户的回复时间。(中间部分略过)

例如

1, 2, 0001, false   -- 客户2给客服1发信息时间,作为一次虚拟会话的开始时间    
100, 2, 0003, false   -- 客户2给客服100发信息时间,如果比下一条先到达,这次虚拟会话 ,按这种方法将计算不到。    
22, 2, 0002, true   -- 客服22给客户2发信息时间,作为一次虚拟会话的最早响应时间    
1, 2, 0005, true   -- 客服1给客户2发信息时间    

3、实时计算解决这个问题

结果表结构

create table tbl_result (    
  b int not null,  -- 客户ID    
  b_ts timestamp,  -- 客户发起一次虚拟会话的最早时间    
  a int default -1,  -- 最先响应这次虚拟会话的客服ID, -1表示还没人响应    
  a_ts timestamp  -- 最先响应这次虚拟会话的时间    
);    
    
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。      
-- 保证同一时刻,同一客户,只有一个未完结的虚拟会话。    
alter table tbl_result add constraint uk exclude (b with =) where (a=-1);    

4、实时处理逻辑

when insert into tbl

if    
b -> a 逻辑(客户发给客服)    
    
select 1 from tbl_result where b=? and a = -1;    
if not found then     
    insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;    
    -- update set b_ts=excluded.b_ts     
    -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
-- else    
  -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
end if;    
    
if    
a -> b 逻辑(客服发给客户)    
    
select 1 from tbl_result where b=? and a = -1;    
if found then    
update tbl_result set a=? , a_ts=? where b=? and a = -1 and NEW.ts >= b_ts;    
-- else    
  -- 说明已有人回复,不需要更新    
end if;    

5、tbl的insert trigger函数

create or replace function tb() returns trigger as $$    
declare    
begin    
  if not NEW.direct then  -- b -> a 逻辑(客户发给客服)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if not found then     
      insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;    
      -- update set b_ts=excluded.b_ts     
      -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
    -- else    
      -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
    end if;    
  else  -- a -> b 逻辑(客服发给客户)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if found then    
      update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts;    
    -- else    
      -- 说明已有人回复,不需要更新    
    end if;    
  end if;    
  return NULL;    
end;    
$$ language plpgsql strict;    

创建触发器

create trigger tg0 after insert on tbl for each row execute procedure tb();    

6、写入压测

假设有100个客服    
100万个客户    
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。    
    
vi test.sql    
    
\set a random(1,100)    
\set b random(1,1000000)    
\set bo random(0,1)    
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    
postgres=# select count(*) from tbl;    
  count       
----------    
 19805266    
(1 row)    
    
postgres=# select count(*) from tbl_result;    
  count      
---------    
 5202622    
(1 row)    

7、算法校验,正确

postgres=# select * from tbl where b=1 order by ts limit 10;    
 a  | b |             ts             | direct     
----+---+----------------------------+--------    
 25 | 1 | 2018-08-15 09:43:22.862526 | f    
 17 | 1 | 2018-08-15 09:43:25.180255 | f    
 63 | 1 | 2018-08-15 09:43:29.901536 | t    
  3 | 1 | 2018-08-15 09:43:31.906753 | t    
 38 | 1 | 2018-08-15 09:43:52.035444 | f    
 24 | 1 | 2018-08-15 09:43:52.679127 | f    
 69 | 1 | 2018-08-15 09:43:54.855426 | t    
 44 | 1 | 2018-08-15 09:44:05.735922 | t    
 75 | 1 | 2018-08-15 09:44:10.555001 | t    
 17 | 1 | 2018-08-15 09:44:10.565798 | f    
(10 rows)    
    
postgres=# select * from tbl_result where b=1 order by b_ts limit 10;    
 b |            b_ts            | a  |            a_ts                
---+----------------------------+----+----------------------------    
 1 | 2018-08-15 09:43:22.862526 | 63 | 2018-08-15 09:43:29.901536    
 1 | 2018-08-15 09:43:52.035444 | 69 | 2018-08-15 09:43:54.855426    
 1 | 2018-08-15 09:44:10.565798 | 86 | 2018-08-15 09:44:33.090099    
 1 | 2018-08-15 09:44:33.815634 | 63 | 2018-08-15 09:44:45.737907    
 1 | 2018-08-15 09:44:52.277396 | 45 | 2018-08-15 09:44:59.006899    
 1 | 2018-08-15 09:45:19.288931 | -1 |     
(6 rows)    

性能,写入吞吐达到16.5万行/s。

transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 32    
number of threads: 32    
duration: 120 s    
number of transactions actually processed: 19805266    
latency average = 0.194 ms    
latency stddev = 0.221 ms    
tps = 165043.068862 (including connections establishing)    
tps = 165056.827167 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set a random(1,100)    
         0.000  \set b random(1,1000000)    
         0.000  \set bo random(0,1)    
         0.191  insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    

2 有会话模式的响应速度统计

假设数据以TS字段顺序到达为前提(通常这种场景,按TS到达的可能性较大,或者你可以使用clock_timestamp()来作为这个时间,可能性就更大了。),后面会讲如果不这样有什么问题,以及解决方案。

相比前面的不同之处,a,b一一对应,即有会话模式。

客户1发给客服2    
    
那么就只看客服2第一次响应客户1的时间。    

有会话模式,适合于客户发起消息后,后台分配一个客服给他,第一时间响应他的必须是这个分配的客服。

稍微修改前面的代码即可。

1、客服、客户交谈表(只展示重要字段)

create table tbl (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    

2、客服的平均响应时间

一个客户的最早发言时间,下一时刻对应客服最早回复这位客户的回复时间。(中间部分略过)

例如

1, 2, 0001, false   -- 客户2给客服1发信息时间,作为一次虚拟会话的开始时间    
1, 2, 0003, false   -- 客户2给客服1发信息时间。    
1, 2, 0002, true   -- 客服1给客户2发信息时间,作为一次虚拟会话的最早响应时间    
1, 2, 0005, true   -- 客服1给客户2发信息时间    

3、实时计算解决这个问题

结果表结构

create table tbl_result (    
  b int not null,  -- 客户ID    
  b_ts timestamp,  -- 客户发起一次虚拟会话的最早时间    
  a int,           -- 客户给谁发起了这次会话    
  rsp_a int default -1,  -- 响应这次虚拟会话的客服ID, -1表示没人响应    
  a_ts timestamp  -- 最先响应这次虚拟会话的时间    
);    
    
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。      
-- 保证同一时刻,同一客户,与同一客服,只有一个未完结的虚拟会话。    
alter table tbl_result add constraint uk exclude (b with =, a with =) where (rsp_a=-1);    

4、实时处理逻辑

when insert into tbl

if    
b -> a 逻辑(客户发给客服)    
    
select 1 from tbl_result where b=? and a=? and rsp_a = -1;    
if not found then     
    insert into tbl_result (b,b_ts,a) values (NEW.b,NEW.ts,NEW.a) on conflict ON CONSTRAINT uk do nothing;    
    -- update set b_ts=excluded.b_ts     
    -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
-- else    
  -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
end if;    
    
if    
a -> b 逻辑(客服发给客户)    
    
select 1 from tbl_result where b=? and a=? and rsp_a = -1;    
if found then    
update tbl_result set rsp_a=? , a_ts=? where b=? and a=? and rsp_a = -1 and NEW.ts >= b_ts;    
-- else    
  -- 说明已有人回复,不需要更新    
end if;    

5、tbl的insert trigger函数

create or replace function tb() returns trigger as $$    
declare    
begin    
  if not NEW.direct then  -- b -> a 逻辑(客户发给客服)    
    perform 1 from tbl_result where b=NEW.b and a=NEW.a and rsp_a = -1;    
    if not found then     
      insert into tbl_result (b,b_ts,a) values (NEW.b,NEW.ts,NEW.a) on conflict ON CONSTRAINT uk do nothing;    
      -- update set b_ts=excluded.b_ts     
      -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
    -- else    
      -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
    end if;    
  else  -- a -> b 逻辑(客服发给客户)    
    perform 1 from tbl_result where b=NEW.b and a=NEW.a and rsp_a = -1;    
    if found then    
      update tbl_result set rsp_a=NEW.a , a_ts=NEW.ts where b=NEW.b and a=NEW.a and rsp_a = -1 and NEW.ts >= b_ts;    
    -- else    
      -- 说明已有人回复,不需要更新    
    end if;    
  end if;    
  return NULL;    
end;    
$$ language plpgsql strict;    

创建触发器

create trigger tg0 after insert on tbl for each row execute procedure tb();    

6、写入压测

假设有10个客服    
1万个客户    
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。    
    
vi test.sql    
    
\set a random(1,10)    
\set b random(1,10000)    
\set bo random(0,1)    
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    
postgres=# select count(*) from tbl;    
  count       
----------    
 19771381    
(1 row)    
    
postgres=# select count(*) from tbl_result;    
  count      
---------    
 4967253    
(1 row)    

7、算法校验,正确

postgres=# select * from tbl where b=1 and a=9 order by ts limit 30;    
 a | b |             ts             | direct     
---+---+----------------------------+--------    
 9 | 1 | 2018-08-15 10:08:20.82439  | f    
 9 | 1 | 2018-08-15 10:08:21.341471 | f    
 9 | 1 | 2018-08-15 10:08:23.084166 | f    
 9 | 1 | 2018-08-15 10:08:23.160162 | f    
 9 | 1 | 2018-08-15 10:08:23.596106 | f    
 9 | 1 | 2018-08-15 10:08:23.735911 | f    
 9 | 1 | 2018-08-15 10:08:23.869232 | f    
 9 | 1 | 2018-08-15 10:08:25.379688 | t    
 9 | 1 | 2018-08-15 10:08:26.471402 | t    
 9 | 1 | 2018-08-15 10:08:26.622047 | t    
 9 | 1 | 2018-08-15 10:08:26.640313 | t    
 9 | 1 | 2018-08-15 10:08:27.28104  | f    
 9 | 1 | 2018-08-15 10:08:27.285187 | f    
 9 | 1 | 2018-08-15 10:08:27.992076 | t    
 9 | 1 | 2018-08-15 10:08:28.233072 | t    
 9 | 1 | 2018-08-15 10:08:28.590125 | t    
 9 | 1 | 2018-08-15 10:08:29.6004   | t    
 9 | 1 | 2018-08-15 10:08:30.058747 | f    
 9 | 1 | 2018-08-15 10:08:30.114936 | t    
 9 | 1 | 2018-08-15 10:08:30.237846 | f    
 9 | 1 | 2018-08-15 10:08:30.468956 | t    
 9 | 1 | 2018-08-15 10:08:31.904644 | t    
 9 | 1 | 2018-08-15 10:08:32.092077 | t    
 9 | 1 | 2018-08-15 10:08:32.407465 | t    
 9 | 1 | 2018-08-15 10:08:32.530952 | f    
 9 | 1 | 2018-08-15 10:08:32.991299 | f    
 9 | 1 | 2018-08-15 10:08:33.567598 | f    
 9 | 1 | 2018-08-15 10:08:33.726376 | f    
 9 | 1 | 2018-08-15 10:08:33.734359 | f    
 9 | 1 | 2018-08-15 10:08:34.288767 | f    
(30 rows)    
    
postgres=# select * from tbl_result where b=1 and a=9 order by b_ts limit 10;    
 b |            b_ts            | a | rsp_a |            a_ts                
---+----------------------------+---+-------+----------------------------    
 1 | 2018-08-15 10:08:20.82439  | 9 |     9 | 2018-08-15 10:08:25.379688    
 1 | 2018-08-15 10:08:27.28104  | 9 |     9 | 2018-08-15 10:08:27.992076    
 1 | 2018-08-15 10:08:30.058747 | 9 |     9 | 2018-08-15 10:08:30.114936    
 1 | 2018-08-15 10:08:30.237846 | 9 |     9 | 2018-08-15 10:08:30.468956    
 1 | 2018-08-15 10:08:32.530952 | 9 |     9 | 2018-08-15 10:08:34.749098    
 1 | 2018-08-15 10:08:35.615081 | 9 |     9 | 2018-08-15 10:08:35.681585    
 1 | 2018-08-15 10:08:35.689469 | 9 |     9 | 2018-08-15 10:08:37.099554    
 1 | 2018-08-15 10:08:40.70679  | 9 |     9 | 2018-08-15 10:08:40.80081    
 1 | 2018-08-15 10:08:40.892459 | 9 |     9 | 2018-08-15 10:08:44.732971    
 1 | 2018-08-15 10:08:45.685787 | 9 |     9 | 2018-08-15 10:08:46.301875    
(10 rows)    

性能,写入吞吐达到16.5万行/s。

transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 32    
number of threads: 32    
duration: 120 s    
number of transactions actually processed: 19771381    
latency average = 0.194 ms    
latency stddev = 0.222 ms    
tps = 164760.717898 (including connections establishing)    
tps = 164774.989399 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set a random(1,10)    
         0.000  \set b random(1,10000)    
         0.000  \set bo random(0,1)    
         0.192  insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    

看似问题解决了吗?

3 统计算法问题与解决办法

前面都是假设数据按TS到达的情况(使用clock_timestamp生成ts还是比较靠谱的),如果数据完全不按TS到达,会出现什么问题么?

1、如果不按顺序到达,会话的发起时间、第一响应时间可能无法得到正确结果

因为一旦触发生成tbl_result后,后面进来的数据无法修正前面的错误。

2、允许一定时间的延迟,同时容忍一定的错误率的情况下。比如每小时消费前一小时的数据,中间预留1小时的缓冲时间,降低错误率:

2.1、按时间区间,延迟消费适当解决以上问题。

单线程消费,统计。

with tmp as (    
delete from tbl where ctid = any(array(    
select ctid from tbl where     
  ts < now()-interval '1 hour'     
  order by ts limit 10000    
))    
returning *    
) select * from tmp     
order by ts;   

然后,按顺序消费。

2.2、按时间区间,延迟并行消费,解决大数据量的问题。例如按客户ID,HASH,并行消费。

多线程(每个HASH一个线程),消费,统计。

create index idx_tbl_mod_32 on tbl (abs(mod(hashint4(b), 32)), ts);    
    
with tmp as (    
delete from tbl where ctid = any(array(    
select ctid from tbl where     
  ts < now()-interval '1 hour'     
  and    
  abs(mod(hashint4(b), 32))=0  -- hash 并行    
  order by ts limit 10000    
))    
returning *    
) select * from tmp     
order by ts;    

然后,按顺序消费。

例子1

以第一种场景(无会话状态)为例。延迟批量消费的方法生成最终数据。

1、会话表

create table tbl (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    
    
create index idx_tbl_ts on tbl(ts);    

2、统计结果表

create table tbl_result (    
  b int not null,  -- 客户ID    
  b_ts timestamp,  -- 客户发起一次虚拟会话的最早时间    
  a int default -1,  -- 最先响应这次虚拟会话的客服ID, -1表示还没人响应    
  a_ts timestamp  -- 最先响应这次虚拟会话的时间    
);    
    
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。      
-- 保证同一时刻,同一客户,只有一个未完结的虚拟会话。    
alter table tbl_result add constraint uk exclude (b with =) where (a=-1);    

3、中间会话表(可以不落地,只顺序计算)。

create table tbl_mid (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    

4、中间会话表触发器

(before 触发器 return null(不落地,只顺序计算))

(after 触发器 return null(落地))

create or replace function tb() returns trigger as $$    
declare    
begin    
  if not NEW.direct then  -- b -> a 逻辑(客户发给客服)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if not found then     
      insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;    
      -- update set b_ts=excluded.b_ts     
      -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
    -- else    
      -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
    end if;    
  else  -- a -> b 逻辑(客服发给客户)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if found then    
      update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts;    
    -- else    
      -- 说明已有人回复,不需要更新    
    end if;    
  end if;    
  return NULL;    
end;    
$$ language plpgsql strict;    
create trigger tg0 after insert on tbl_mid for each row execute procedure tb();    

5、写入大批量数据,由于触发器转移到了中间表,所以写入吞吐达到了接近29万行/s。

假设有100个客服    
100万个客户    
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。    
    
vi test.sql    
    
\set a random(1,100)    
\set b random(1,1000000)    
\set bo random(0,1)    
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    
    
transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 32    
number of threads: 32    
duration: 120 s    
number of transactions actually processed: 34403943    
latency average = 0.112 ms    
latency stddev = 0.229 ms    
tps = 286698.048259 (including connections establishing)    
tps = 286718.916176 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set a random(1,100)    
         0.000  \set b random(1,1000000)    
         0.000  \set bo random(0,1)    
         0.109  insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
postgres=# select count(*) from tbl;    
  count       
----------    
 19805266    
(1 row)    
    
postgres=# select count(*) from tbl_result;    
  count      
---------    
 5202622    
(1 row)    

6、单线程消费,一次消费100万行,速度约每秒6万。

with tmp as (    
delete from tbl where ctid = any(array(    
select ctid from tbl where     
  ts < now()-interval '1 min'  -- 测试时改成了消费1分钟前的数据    
  order by ts limit 1000000    
))    
returning *    
)     
insert into tbl_mid     
select * from tmp     
order by ts;    
    
Time: 16532.939 ms (00:16.533)    

7、算法校验,正确

postgres=# select * from tbl_mid where b=2 order by ts limit 10;    
 a  | b |             ts             | direct     
----+---+----------------------------+--------    
 10 | 2 | 2018-08-15 10:24:58.538558 | t    
 25 | 2 | 2018-08-15 10:25:00.585426 | f    
 62 | 2 | 2018-08-15 10:25:04.2633   | f    
 45 | 2 | 2018-08-15 10:25:04.406764 | t    
(4 rows)    
    
postgres=# select * from tbl_result where b=2 order by b_ts limit 10;    
 b |            b_ts            | a  |            a_ts                
---+----------------------------+----+----------------------------    
 2 | 2018-08-15 10:25:00.585426 | 45 | 2018-08-15 10:25:04.406764    
(1 row)    

消费性能,单线程吞吐达到6万行/s。

with tmp as (    
delete from tbl where ctid = any(array(    
select ctid from tbl where     
  ts < now()-interval '1 min'  -- 测试时改成了消费1分钟前的数据    
  order by ts limit 1000000    
))    
returning *    
)     
insert into tbl_mid     
select * from tmp     
order by ts;    
    
Time: 16532.939 ms (00:16.533)    

消费节奏:

1、消费    
2、VACUUM tbl;    
3、消费    
loop;    

例子2

以第一种场景(无会话状态)为例。延迟批量统计的方法生成最终数据。(不消费(delete)已有数据)

1、会话表

create table tbl (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    
    
create index idx_tbl_ts on tbl(ts);    
-- 也可以使用brin索引  
-- create index idx_tbl_ts on tbl using brin(ts);    

2、统计结果表

create table tbl_result (    
  b int not null,  -- 客户ID    
  b_ts timestamp,  -- 客户发起一次虚拟会话的最早时间    
  a int default -1,  -- 最先响应这次虚拟会话的客服ID, -1表示还没人响应    
  a_ts timestamp  -- 最先响应这次虚拟会话的时间    
);    
    
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。      
-- 保证同一时刻,同一客户,只有一个未完结的虚拟会话。    
alter table tbl_result add constraint uk exclude (b with =) where (a=-1);    

3、中间会话表(可以不落地,只顺序计算)。

create table tbl_mid (    
  a int not null,   -- 客服ID    
  b int not null,   -- 客户ID    
  ts timestamp not null,   -- 消息时间    
  direct boolean not null  -- 消息方向 true: a->b, false: b->a    
);    

4、中间会话表触发器

(before 触发器 return null(不落地,只顺序计算))

create or replace function tb() returns trigger as $$    
declare    
begin    
  if not NEW.direct then  -- b -> a 逻辑(客户发给客服)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if not found then     
      insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;    
      -- update set b_ts=excluded.b_ts     
      -- where tbl_result.b_ts > excluded.b_ts;  -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。    
    -- else    
      -- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录    
    end if;    
  else  -- a -> b 逻辑(客服发给客户)    
    perform 1 from tbl_result where b=NEW.b and a = -1;    
    if found then    
      update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts;    
    -- else    
      -- 说明已有人回复,不需要更新    
    end if;    
  end if;    
  return NULL;    
end;    
$$ language plpgsql strict;    
create trigger tg0 before insert on tbl_mid for each row execute procedure tb();    

5、写入大批量数据,由于触发器转移到了中间表,所以写入吞吐达到了接近29万行/s。

假设有100个客服    
100万个客户    
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。    
    
vi test.sql    
    
\set a random(1,100)    
\set b random(1,1000000)    
\set bo random(0,1)    
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    
    
transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 32    
number of threads: 32    
duration: 120 s    
number of transactions actually processed: 34403943    
latency average = 0.112 ms    
latency stddev = 0.229 ms    
tps = 286698.048259 (including connections establishing)    
tps = 286718.916176 (excluding connections establishing)    
statement latencies in milliseconds:    
         0.001  \set a random(1,100)    
         0.000  \set b random(1,1000000)    
         0.000  \set bo random(0,1)    
         0.109  insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);    
postgres=# select count(*) from tbl;    
  count       
----------    
 19805266    
(1 row)    
    
postgres=# select count(*) from tbl_result;    
  count      
---------    
 5202622    
(1 row)    

6、单线程读取,统计,例如每次读取一个小时的数据(定义清楚边界,连续消费,同时避免并发、或重复消费,或者在写统计结果时做到幂等,不用担心重复消费)。

创建一张消费记录表,统计已消费的时间间隔。

create table tbl_record (ts1 timestamp, ts2 timestamp);  

下次消费时,参考上次已消费的时间。

with tmp as (  
  insert into tbl_record (ts1, ts2) values ('2018-01-01 12:00:00', '2018-01-01 13:00:00')  -- 记录当前消费窗口  
)  
insert into tbl_mid     
select * from tbl  
where ts >= '2018-01-01 12:00:00' and ts < '2018-01-01 13:00:00'  -- 上一个小时为窗口 (当前时间 大于等于 '2018-01-01 14:00:00')   
order by ts;  -- 无会话模式    
    
Time: 16532.939 ms (00:16.533)    

7、算法校验,正确

postgres=# select * from tbl_mid where b=2 order by ts limit 10;    
 a  | b |             ts             | direct     
----+---+----------------------------+--------    
 10 | 2 | 2018-08-15 10:24:58.538558 | t    
 25 | 2 | 2018-08-15 10:25:00.585426 | f    
 62 | 2 | 2018-08-15 10:25:04.2633   | f    
 45 | 2 | 2018-08-15 10:25:04.406764 | t    
(4 rows)    
    
postgres=# select * from tbl_result where b=2 order by b_ts limit 10;    
 b |            b_ts            | a  |            a_ts                
---+----------------------------+----+----------------------------    
 2 | 2018-08-15 10:25:00.585426 | 45 | 2018-08-15 10:25:04.406764    
(1 row)    

消费性能,单线程吞吐达到6万行/s。

with tmp as (    
delete from tbl where ctid = any(array(    
select ctid from tbl where     
  ts < now()-interval '1 min'  -- 测试时改成了消费1分钟前的数据    
  order by ts limit 1000000    
))    
returning *    
)     
insert into tbl_mid     
select * from tmp     
order by b, ts;  -- 无会话模式    
    
Time: 16532.939 ms (00:16.533)    

消费节奏:

1、消费    
2、VACUUM tbl;    
3、消费    
loop;    

例子3,使用窗口查询解决同一问题

1、新增索引,用于窗口查询加速

create index idx_tbl_1 on tbl (b,ts);  

2、无会话模式,使用窗口查询,得到每个虚拟会话的开始时间、第一响应时间

select  
  a,  -- 虚拟会话的第一条消息,客户发给了哪位客服ID
  b,  -- 客户ID
  ts,  -- 虚拟会话开始时间  
  lead_a,  -- 最先响应的是谁(哪位客服)
  lead_session_end_ts,  -- 虚拟会话第一次响应时间  
  lead_session_end_ts - ts as dur,  -- 响应间隔  
  direct,lag_direct,lag_ts  
from  
(  
select *,   
  lead(session_end_ts) over w2 as lead_session_end_ts,  -- 当前窗口,当前行的下一条ts值 , 即会话第一次响应时间   
  lead(a) over w2 as lead_a -- 当前窗口,当前行的下一条的b(客服ID) , 即响应的是哪位客服
from  
(  
select * from   
(  
select a,b,ts,direct,lag_direct,lag_ts,  
case when ((direct = false and lag_direct is null)  -- 判断虚拟会话开始时间的逻辑  
or  
(direct = false and lag_direct = true))  
then ts  
end as session_begin_ts,  -- 虚拟会话开始时间  
case when (direct = true and lag_direct = false)  -- 判断虚拟会话第一次响应时间的逻辑  
then ts  
end as session_end_ts  -- 虚拟会话第一次响应时间  
from  
(  
select   
  a,  -- 客服ID  
  b,  -- 客户ID  
  ts, -- 消息时间  
  direct,  -- 消息方向 true: a->b, false: b->a   
  lag(direct) over w1 as lag_direct,  -- 当前窗口,当前行的上一条direct值  
  lag(ts) over w1 as lag_ts           -- 当前窗口,当前行的上一条ts值  
from tbl   
  window w1 as (partition by b order by ts)   
  -- where ts between xx and xx  , 一次只查部分数据时可用  
) t  
) t  
where session_begin_ts is not null  -- 虚拟会话开始时间字段不为空,表示这条记录是会话开始的记录  
or  
session_end_ts is not null   -- 虚拟会话结束时间字段不为空,表示这条记录是会话第一次响应的记录  
) t   
window w2 as (partition by b order by ts)    
) t  
where   
direct = false -- 客户在虚拟会话中发起第一条消息的记录   
and  
lead_session_end_ts - ts is not null  
limit 100;  

3、结果、算法正确性验证

  a  | b  |             ts             | lead_a |    lead_session_end_ts     |       dur       | direct | lag_direct |           lag_ts           
-----+----+----------------------------+--------+----------------------------+-----------------+--------+------------+----------------------------
  26 |  1 | 2018-08-15 10:25:13.056316 |     75 | 2018-08-15 10:25:16.546126 | 00:00:03.48981  | f      |            | 
  43 |  1 | 2018-08-15 10:25:21.483542 |     99 | 2018-08-15 10:25:25.552488 | 00:00:04.068946 | f      | t          | 2018-08-15 10:25:16.546126
  28 |  1 | 2018-08-15 10:25:28.287823 |     70 | 2018-08-15 10:25:37.375585 | 00:00:09.087762 | f      | t          | 2018-08-15 10:25:26.518359
  12 |  1 | 2018-08-15 10:25:47.203597 |     20 | 2018-08-15 10:26:03.423969 | 00:00:16.220372 | f      | t          | 2018-08-15 10:25:47.036459
  91 |  1 | 2018-08-15 10:26:05.332921 |     57 | 2018-08-15 10:26:08.070122 | 00:00:02.737201 | f      | t          | 2018-08-15 10:26:03.423969
  24 |  1 | 2018-08-15 10:26:16.798485 |     85 | 2018-08-15 10:26:22.222025 | 00:00:05.42354  | f      | t          | 2018-08-15 10:26:15.319287
  90 |  1 | 2018-08-15 10:26:22.58553  |     28 | 2018-08-15 10:26:25.987987 | 00:00:03.402457 | f      | t          | 2018-08-15 10:26:22.222025
  30 |  1 | 2018-08-15 10:26:31.458875 |     42 | 2018-08-15 10:26:36.259917 | 00:00:04.801042 | f      | t          | 2018-08-15 10:26:25.987987
  11 |  1 | 2018-08-15 10:26:37.828413 |     70 | 2018-08-15 10:26:49.212275 | 00:00:11.383862 | f      | t          | 2018-08-15 10:26:36.259917
  21 |  2 | 2018-08-15 10:25:15.532378 |     66 | 2018-08-15 10:25:19.742437 | 00:00:04.210059 | f      |            | 
  50 |  2 | 2018-08-15 10:25:30.988507 |     20 | 2018-08-15 10:25:36.645969 | 00:00:05.657462 | f      | t          | 2018-08-15 10:25:30.750224
  98 |  2 | 2018-08-15 10:25:47.075616 |     72 | 2018-08-15 10:25:52.34913  | 00:00:05.273514 | f      | t          | 2018-08-15 10:25:40.858465
  72 |  2 | 2018-08-15 10:25:56.595608 |     99 | 2018-08-15 10:26:11.46232  | 00:00:14.866712 | f      | t          | 2018-08-15 10:25:55.324131
  98 |  2 | 2018-08-15 10:26:12.303834 |     97 | 2018-08-15 10:26:15.341379 | 00:00:03.037545 | f      | t          | 2018-08-15 10:26:11.46232
  63 |  2 | 2018-08-15 10:26:19.116171 |     22 | 2018-08-15 10:26:23.743978 | 00:00:04.627807 | f      | t          | 2018-08-15 10:26:15.341379
  66 |  2 | 2018-08-15 10:26:30.024534 |     49 | 2018-08-15 10:26:41.196351 | 00:00:11.171817 | f      | t          | 2018-08-15 10:26:23.743978
  83 |  2 | 2018-08-15 10:26:41.962942 |     51 | 2018-08-15 10:26:43.172856 | 00:00:01.209914 | f      | t          | 2018-08-15 10:26:41.196351
  64 |  2 | 2018-08-15 10:26:43.575144 |     88 | 2018-08-15 10:26:44.17728  | 00:00:00.602136 | f      | t          | 2018-08-15 10:26:43.172856

4、对比使用中间表得到的结果

insert into tbl_mid select * from tbl order by ts ;  
select * from tbl_result where b=1 or b=2 order by b_ts;  
  

 b |            b_ts            | a  |            a_ts            
---+----------------------------+----+----------------------------
 1 | 2018-08-15 10:25:13.056316 | 75 | 2018-08-15 10:25:16.546126
 1 | 2018-08-15 10:25:21.483542 | 99 | 2018-08-15 10:25:25.552488
 1 | 2018-08-15 10:25:28.287823 | 70 | 2018-08-15 10:25:37.375585
 1 | 2018-08-15 10:25:47.203597 | 20 | 2018-08-15 10:26:03.423969
 1 | 2018-08-15 10:26:05.332921 | 57 | 2018-08-15 10:26:08.070122
 1 | 2018-08-15 10:26:16.798485 | 85 | 2018-08-15 10:26:22.222025
 1 | 2018-08-15 10:26:22.58553  | 28 | 2018-08-15 10:26:25.987987
 1 | 2018-08-15 10:26:31.458875 | 42 | 2018-08-15 10:26:36.259917
 1 | 2018-08-15 10:26:37.828413 | 70 | 2018-08-15 10:26:49.212275
 1 | 2018-08-15 10:26:50.622352 | -1 | 
 2 | 2018-08-15 10:25:15.532378 | 66 | 2018-08-15 10:25:19.742437
 2 | 2018-08-15 10:25:30.988507 | 20 | 2018-08-15 10:25:36.645969
 2 | 2018-08-15 10:25:47.075616 | 72 | 2018-08-15 10:25:52.34913
 2 | 2018-08-15 10:25:56.595608 | 99 | 2018-08-15 10:26:11.46232
 2 | 2018-08-15 10:26:12.303834 | 97 | 2018-08-15 10:26:15.341379
 2 | 2018-08-15 10:26:19.116171 | 22 | 2018-08-15 10:26:23.743978
 2 | 2018-08-15 10:26:30.024534 | 49 | 2018-08-15 10:26:41.196351
 2 | 2018-08-15 10:26:41.962942 | 51 | 2018-08-15 10:26:43.172856
 2 | 2018-08-15 10:26:43.575144 | 88 | 2018-08-15 10:26:44.17728
 2 | 2018-08-15 10:26:45.595639 | -1 | 
(20 rows)

5、会话模式,SQL改动两处即可。

create index idx_tbl_2 on tbl (b,a,ts);  -- 窗口加速
select  
  a,  -- 虚拟会话的第一条消息,客户发给了哪位客服ID
  b,  -- 客户ID
  ts,  -- 虚拟会话开始时间  
  lead_a,  -- 最先响应的是谁(哪位客服)
  lead_session_end_ts,  -- 虚拟会话第一次响应时间  
  lead_session_end_ts - ts as dur,  -- 响应间隔  
  direct,lag_direct,lag_ts  
from  
(  
select *,   
  lead(session_end_ts) over w2 as lead_session_end_ts,  -- 当前窗口,当前行的下一条ts值 , 即会话第一次响应时间   
  lead(a) over w2 as lead_a -- 当前窗口,当前行的下一条的b(客服ID) , 即响应的是哪位客服
from  
(  
select * from   
(  
select a,b,ts,direct,lag_direct,lag_ts,  
case when ((direct = false and lag_direct is null)  -- 判断虚拟会话开始时间的逻辑  
or  
(direct = false and lag_direct = true))  
then ts  
end as session_begin_ts,  -- 虚拟会话开始时间  
case when (direct = true and lag_direct = false)  -- 判断虚拟会话第一次响应时间的逻辑  
then ts  
end as session_end_ts  -- 虚拟会话第一次响应时间  
from  
(  
select   
  a,  -- 客服ID  
  b,  -- 客户ID  
  ts, -- 消息时间  
  direct,  -- 消息方向 true: a->b, false: b->a   
  lag(direct) over w1 as lag_direct,  -- 当前窗口,当前行的上一条direct值  
  lag(ts) over w1 as lag_ts           -- 当前窗口,当前行的上一条ts值  
from tbl   
  window w1 as (partition by b,a order by ts)   -- 有会话模式,改这个partition
  -- where ts between xx and xx  , 一次只查部分数据时可用  
) t  
) t  
where session_begin_ts is not null  -- 虚拟会话开始时间字段不为空,表示这条记录是会话开始的记录  
or  
session_end_ts is not null   -- 虚拟会话结束时间字段不为空,表示这条记录是会话第一次响应的记录  
) t   
window w2 as (partition by b,a order by ts)    -- 有会话模式,改这个partition
) t  
where   
direct = false -- 客户在虚拟会话中发起第一条消息的记录   
and  
lead_session_end_ts - ts is not null  
limit 100;  

性能,3000万记录,1毫秒响应。

小结

本文涉及的场景为无会话、或者会话无明显标识的情况下,使用PostgreSQL高效率的统计客服的响应速度的问题。

使用到的方法与性能指标

1、实时计算,触发器(当到达时间有序, 或者说大部分有序时。使用clock_timestamp可以让数据基本有序)

写入吞吐16.5万行每秒。

2、阅后即焚(延迟消费,解决数据写入无需的问题)。

写入吞吐29万行每秒。

单线程消费6万行每秒。

3、阅后即焚,使用HASH,并行消费,提升消费吞吐。

4、使用窗口查询,同样能够很好的解决此场景的需求,而且性能杠杠的。

参考

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

Flag Counter

digoal’s 大量PostgreSQL文章入口