PostgreSQL 无会话、有会话模式 - 客服平均响应速度(RT)实时计算实践(窗口查询流计算)
背景
通常客服系统可能存在一对多,多对多的情况。
例如,
我们在使用淘宝时,与店家交流时,你根本不知道后面的小二是一个人还是多个人共用一个账号,还有可能是多个人使用了多个账号但是对消费者只看到一个。
例如:
小二(n)账号 -> 统一对外账号 -> 消费者
还有的情况是一个小二为多个消费者服务:
小二账号 -> 统一对外账号 -> 消费者(n)
小二重要的KPI之一是响应速度,因为这直接反应到消费者的感受上。如果消费者一个问题,很久没人回复,可能就直接关闭页面,更换其他商家了。
那么如何统计响应速度呢?
通常来说,需要从消费者维度看待响应速度,因为一个问题可能被多个小二回复,也可能被1个小二回复,这种情况下,应该统计第一反馈时间作为响应时间。
另一方面,如果系统没有会话机制的话,统计起来会比较麻烦。(并且,一个真实的会话里面的若干次交互,可能统计时会被抽象成若干的“虚拟会话”)
我们来看个例子。
1 无会话模式的响应速度统计
假设数据以TS字段顺序到达为前提(通常这种场景,按TS到达的可能性较大,或者你可以使用clock_timestamp()来作为这个时间,可能性就更大了。),后面会讲如果不这样有什么问题,以及解决方案。
无会话模式,适合于客户发起消息后,后台任意分配一个客服给他(或者分配一个客服池子给他),第一时间响应他的可以是任意客服。
1、客服、客户交谈表(只展示重要字段)
create table tbl (
a int not null, -- 客服ID
b int not null, -- 客户ID
ts timestamp not null, -- 消息时间
direct boolean not null -- 消息方向 true: a->b, false: b->a
);
2、客服的平均响应时间
一个客户的最早发言时间,下一时刻任意客服最早回复这位客户的回复时间。(中间部分略过)
例如
1, 2, 0001, false -- 客户2给客服1发信息时间,作为一次虚拟会话的开始时间
100, 2, 0003, false -- 客户2给客服100发信息时间,如果比下一条先到达,这次虚拟会话 ,按这种方法将计算不到。
22, 2, 0002, true -- 客服22给客户2发信息时间,作为一次虚拟会话的最早响应时间
1, 2, 0005, true -- 客服1给客户2发信息时间
3、实时计算解决这个问题
结果表结构
create table tbl_result (
b int not null, -- 客户ID
b_ts timestamp, -- 客户发起一次虚拟会话的最早时间
a int default -1, -- 最先响应这次虚拟会话的客服ID, -1表示还没人响应
a_ts timestamp -- 最先响应这次虚拟会话的时间
);
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。
-- 保证同一时刻,同一客户,只有一个未完结的虚拟会话。
alter table tbl_result add constraint uk exclude (b with =) where (a=-1);
4、实时处理逻辑
when insert into tbl
if
b -> a 逻辑(客户发给客服)
select 1 from tbl_result where b=? and a = -1;
if not found then
insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;
-- update set b_ts=excluded.b_ts
-- where tbl_result.b_ts > excluded.b_ts; -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。
-- else
-- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录
end if;
if
a -> b 逻辑(客服发给客户)
select 1 from tbl_result where b=? and a = -1;
if found then
update tbl_result set a=? , a_ts=? where b=? and a = -1 and NEW.ts >= b_ts;
-- else
-- 说明已有人回复,不需要更新
end if;
5、tbl的insert trigger函数
create or replace function tb() returns trigger as $$
declare
begin
if not NEW.direct then -- b -> a 逻辑(客户发给客服)
perform 1 from tbl_result where b=NEW.b and a = -1;
if not found then
insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;
-- update set b_ts=excluded.b_ts
-- where tbl_result.b_ts > excluded.b_ts; -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。
-- else
-- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录
end if;
else -- a -> b 逻辑(客服发给客户)
perform 1 from tbl_result where b=NEW.b and a = -1;
if found then
update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts;
-- else
-- 说明已有人回复,不需要更新
end if;
end if;
return NULL;
end;
$$ language plpgsql strict;
创建触发器
create trigger tg0 after insert on tbl for each row execute procedure tb();
6、写入压测
假设有100个客服
100万个客户
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。
vi test.sql
\set a random(1,100)
\set b random(1,1000000)
\set bo random(0,1)
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
postgres=# select count(*) from tbl;
count
----------
19805266
(1 row)
postgres=# select count(*) from tbl_result;
count
---------
5202622
(1 row)
7、算法校验,正确
postgres=# select * from tbl where b=1 order by ts limit 10;
a | b | ts | direct
----+---+----------------------------+--------
25 | 1 | 2018-08-15 09:43:22.862526 | f
17 | 1 | 2018-08-15 09:43:25.180255 | f
63 | 1 | 2018-08-15 09:43:29.901536 | t
3 | 1 | 2018-08-15 09:43:31.906753 | t
38 | 1 | 2018-08-15 09:43:52.035444 | f
24 | 1 | 2018-08-15 09:43:52.679127 | f
69 | 1 | 2018-08-15 09:43:54.855426 | t
44 | 1 | 2018-08-15 09:44:05.735922 | t
75 | 1 | 2018-08-15 09:44:10.555001 | t
17 | 1 | 2018-08-15 09:44:10.565798 | f
(10 rows)
postgres=# select * from tbl_result where b=1 order by b_ts limit 10;
b | b_ts | a | a_ts
---+----------------------------+----+----------------------------
1 | 2018-08-15 09:43:22.862526 | 63 | 2018-08-15 09:43:29.901536
1 | 2018-08-15 09:43:52.035444 | 69 | 2018-08-15 09:43:54.855426
1 | 2018-08-15 09:44:10.565798 | 86 | 2018-08-15 09:44:33.090099
1 | 2018-08-15 09:44:33.815634 | 63 | 2018-08-15 09:44:45.737907
1 | 2018-08-15 09:44:52.277396 | 45 | 2018-08-15 09:44:59.006899
1 | 2018-08-15 09:45:19.288931 | -1 |
(6 rows)
性能,写入吞吐达到16.5万行/s。
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 32
number of threads: 32
duration: 120 s
number of transactions actually processed: 19805266
latency average = 0.194 ms
latency stddev = 0.221 ms
tps = 165043.068862 (including connections establishing)
tps = 165056.827167 (excluding connections establishing)
statement latencies in milliseconds:
0.001 \set a random(1,100)
0.000 \set b random(1,1000000)
0.000 \set bo random(0,1)
0.191 insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);
2 有会话模式的响应速度统计
假设数据以TS字段顺序到达为前提(通常这种场景,按TS到达的可能性较大,或者你可以使用clock_timestamp()来作为这个时间,可能性就更大了。),后面会讲如果不这样有什么问题,以及解决方案。
相比前面的不同之处,a,b一一对应,即有会话模式。
客户1发给客服2
那么就只看客服2第一次响应客户1的时间。
有会话模式,适合于客户发起消息后,后台分配一个客服给他,第一时间响应他的必须是这个分配的客服。
稍微修改前面的代码即可。
1、客服、客户交谈表(只展示重要字段)
create table tbl (
a int not null, -- 客服ID
b int not null, -- 客户ID
ts timestamp not null, -- 消息时间
direct boolean not null -- 消息方向 true: a->b, false: b->a
);
2、客服的平均响应时间
一个客户的最早发言时间,下一时刻对应客服最早回复这位客户的回复时间。(中间部分略过)
例如
1, 2, 0001, false -- 客户2给客服1发信息时间,作为一次虚拟会话的开始时间
1, 2, 0003, false -- 客户2给客服1发信息时间。
1, 2, 0002, true -- 客服1给客户2发信息时间,作为一次虚拟会话的最早响应时间
1, 2, 0005, true -- 客服1给客户2发信息时间
3、实时计算解决这个问题
结果表结构
create table tbl_result (
b int not null, -- 客户ID
b_ts timestamp, -- 客户发起一次虚拟会话的最早时间
a int, -- 客户给谁发起了这次会话
rsp_a int default -1, -- 响应这次虚拟会话的客服ID, -1表示没人响应
a_ts timestamp -- 最先响应这次虚拟会话的时间
);
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。
-- 保证同一时刻,同一客户,与同一客服,只有一个未完结的虚拟会话。
alter table tbl_result add constraint uk exclude (b with =, a with =) where (rsp_a=-1);
4、实时处理逻辑
when insert into tbl
if
b -> a 逻辑(客户发给客服)
select 1 from tbl_result where b=? and a=? and rsp_a = -1;
if not found then
insert into tbl_result (b,b_ts,a) values (NEW.b,NEW.ts,NEW.a) on conflict ON CONSTRAINT uk do nothing;
-- update set b_ts=excluded.b_ts
-- where tbl_result.b_ts > excluded.b_ts; -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。
-- else
-- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录
end if;
if
a -> b 逻辑(客服发给客户)
select 1 from tbl_result where b=? and a=? and rsp_a = -1;
if found then
update tbl_result set rsp_a=? , a_ts=? where b=? and a=? and rsp_a = -1 and NEW.ts >= b_ts;
-- else
-- 说明已有人回复,不需要更新
end if;
5、tbl的insert trigger函数
create or replace function tb() returns trigger as $$
declare
begin
if not NEW.direct then -- b -> a 逻辑(客户发给客服)
perform 1 from tbl_result where b=NEW.b and a=NEW.a and rsp_a = -1;
if not found then
insert into tbl_result (b,b_ts,a) values (NEW.b,NEW.ts,NEW.a) on conflict ON CONSTRAINT uk do nothing;
-- update set b_ts=excluded.b_ts
-- where tbl_result.b_ts > excluded.b_ts; -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。
-- else
-- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录
end if;
else -- a -> b 逻辑(客服发给客户)
perform 1 from tbl_result where b=NEW.b and a=NEW.a and rsp_a = -1;
if found then
update tbl_result set rsp_a=NEW.a , a_ts=NEW.ts where b=NEW.b and a=NEW.a and rsp_a = -1 and NEW.ts >= b_ts;
-- else
-- 说明已有人回复,不需要更新
end if;
end if;
return NULL;
end;
$$ language plpgsql strict;
创建触发器
create trigger tg0 after insert on tbl for each row execute procedure tb();
6、写入压测
假设有10个客服
1万个客户
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。
vi test.sql
\set a random(1,10)
\set b random(1,10000)
\set bo random(0,1)
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
postgres=# select count(*) from tbl;
count
----------
19771381
(1 row)
postgres=# select count(*) from tbl_result;
count
---------
4967253
(1 row)
7、算法校验,正确
postgres=# select * from tbl where b=1 and a=9 order by ts limit 30;
a | b | ts | direct
---+---+----------------------------+--------
9 | 1 | 2018-08-15 10:08:20.82439 | f
9 | 1 | 2018-08-15 10:08:21.341471 | f
9 | 1 | 2018-08-15 10:08:23.084166 | f
9 | 1 | 2018-08-15 10:08:23.160162 | f
9 | 1 | 2018-08-15 10:08:23.596106 | f
9 | 1 | 2018-08-15 10:08:23.735911 | f
9 | 1 | 2018-08-15 10:08:23.869232 | f
9 | 1 | 2018-08-15 10:08:25.379688 | t
9 | 1 | 2018-08-15 10:08:26.471402 | t
9 | 1 | 2018-08-15 10:08:26.622047 | t
9 | 1 | 2018-08-15 10:08:26.640313 | t
9 | 1 | 2018-08-15 10:08:27.28104 | f
9 | 1 | 2018-08-15 10:08:27.285187 | f
9 | 1 | 2018-08-15 10:08:27.992076 | t
9 | 1 | 2018-08-15 10:08:28.233072 | t
9 | 1 | 2018-08-15 10:08:28.590125 | t
9 | 1 | 2018-08-15 10:08:29.6004 | t
9 | 1 | 2018-08-15 10:08:30.058747 | f
9 | 1 | 2018-08-15 10:08:30.114936 | t
9 | 1 | 2018-08-15 10:08:30.237846 | f
9 | 1 | 2018-08-15 10:08:30.468956 | t
9 | 1 | 2018-08-15 10:08:31.904644 | t
9 | 1 | 2018-08-15 10:08:32.092077 | t
9 | 1 | 2018-08-15 10:08:32.407465 | t
9 | 1 | 2018-08-15 10:08:32.530952 | f
9 | 1 | 2018-08-15 10:08:32.991299 | f
9 | 1 | 2018-08-15 10:08:33.567598 | f
9 | 1 | 2018-08-15 10:08:33.726376 | f
9 | 1 | 2018-08-15 10:08:33.734359 | f
9 | 1 | 2018-08-15 10:08:34.288767 | f
(30 rows)
postgres=# select * from tbl_result where b=1 and a=9 order by b_ts limit 10;
b | b_ts | a | rsp_a | a_ts
---+----------------------------+---+-------+----------------------------
1 | 2018-08-15 10:08:20.82439 | 9 | 9 | 2018-08-15 10:08:25.379688
1 | 2018-08-15 10:08:27.28104 | 9 | 9 | 2018-08-15 10:08:27.992076
1 | 2018-08-15 10:08:30.058747 | 9 | 9 | 2018-08-15 10:08:30.114936
1 | 2018-08-15 10:08:30.237846 | 9 | 9 | 2018-08-15 10:08:30.468956
1 | 2018-08-15 10:08:32.530952 | 9 | 9 | 2018-08-15 10:08:34.749098
1 | 2018-08-15 10:08:35.615081 | 9 | 9 | 2018-08-15 10:08:35.681585
1 | 2018-08-15 10:08:35.689469 | 9 | 9 | 2018-08-15 10:08:37.099554
1 | 2018-08-15 10:08:40.70679 | 9 | 9 | 2018-08-15 10:08:40.80081
1 | 2018-08-15 10:08:40.892459 | 9 | 9 | 2018-08-15 10:08:44.732971
1 | 2018-08-15 10:08:45.685787 | 9 | 9 | 2018-08-15 10:08:46.301875
(10 rows)
性能,写入吞吐达到16.5万行/s。
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 32
number of threads: 32
duration: 120 s
number of transactions actually processed: 19771381
latency average = 0.194 ms
latency stddev = 0.222 ms
tps = 164760.717898 (including connections establishing)
tps = 164774.989399 (excluding connections establishing)
statement latencies in milliseconds:
0.001 \set a random(1,10)
0.000 \set b random(1,10000)
0.000 \set bo random(0,1)
0.192 insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);
看似问题解决了吗?
3 统计算法问题与解决办法
前面都是假设数据按TS到达的情况(使用clock_timestamp生成ts还是比较靠谱的),如果数据完全不按TS到达,会出现什么问题么?
1、如果不按顺序到达,会话的发起时间、第一响应时间可能无法得到正确结果
因为一旦触发生成tbl_result后,后面进来的数据无法修正前面的错误。
2、允许一定时间的延迟,同时容忍一定的错误率的情况下。比如每小时消费前一小时的数据,中间预留1小时的缓冲时间,降低错误率:
2.1、按时间区间,延迟消费适当解决以上问题。
单线程消费,统计。
with tmp as (
delete from tbl where ctid = any(array(
select ctid from tbl where
ts < now()-interval '1 hour'
order by ts limit 10000
))
returning *
) select * from tmp
order by ts;
然后,按顺序消费。
2.2、按时间区间,延迟并行消费,解决大数据量的问题。例如按客户ID,HASH,并行消费。
多线程(每个HASH一个线程),消费,统计。
create index idx_tbl_mod_32 on tbl (abs(mod(hashint4(b), 32)), ts);
with tmp as (
delete from tbl where ctid = any(array(
select ctid from tbl where
ts < now()-interval '1 hour'
and
abs(mod(hashint4(b), 32))=0 -- hash 并行
order by ts limit 10000
))
returning *
) select * from tmp
order by ts;
然后,按顺序消费。
例子1
以第一种场景(无会话状态)为例。延迟批量消费的方法生成最终数据。
1、会话表
create table tbl (
a int not null, -- 客服ID
b int not null, -- 客户ID
ts timestamp not null, -- 消息时间
direct boolean not null -- 消息方向 true: a->b, false: b->a
);
create index idx_tbl_ts on tbl(ts);
2、统计结果表
create table tbl_result (
b int not null, -- 客户ID
b_ts timestamp, -- 客户发起一次虚拟会话的最早时间
a int default -1, -- 最先响应这次虚拟会话的客服ID, -1表示还没人响应
a_ts timestamp -- 最先响应这次虚拟会话的时间
);
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。
-- 保证同一时刻,同一客户,只有一个未完结的虚拟会话。
alter table tbl_result add constraint uk exclude (b with =) where (a=-1);
3、中间会话表(可以不落地,只顺序计算)。
create table tbl_mid (
a int not null, -- 客服ID
b int not null, -- 客户ID
ts timestamp not null, -- 消息时间
direct boolean not null -- 消息方向 true: a->b, false: b->a
);
4、中间会话表触发器
(before 触发器 return null(不落地,只顺序计算))
(after 触发器 return null(落地))
create or replace function tb() returns trigger as $$
declare
begin
if not NEW.direct then -- b -> a 逻辑(客户发给客服)
perform 1 from tbl_result where b=NEW.b and a = -1;
if not found then
insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;
-- update set b_ts=excluded.b_ts
-- where tbl_result.b_ts > excluded.b_ts; -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。
-- else
-- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录
end if;
else -- a -> b 逻辑(客服发给客户)
perform 1 from tbl_result where b=NEW.b and a = -1;
if found then
update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts;
-- else
-- 说明已有人回复,不需要更新
end if;
end if;
return NULL;
end;
$$ language plpgsql strict;
create trigger tg0 after insert on tbl_mid for each row execute procedure tb();
5、写入大批量数据,由于触发器转移到了中间表,所以写入吞吐达到了接近29万行/s。
假设有100个客服
100万个客户
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。
vi test.sql
\set a random(1,100)
\set b random(1,1000000)
\set bo random(0,1)
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 32
number of threads: 32
duration: 120 s
number of transactions actually processed: 34403943
latency average = 0.112 ms
latency stddev = 0.229 ms
tps = 286698.048259 (including connections establishing)
tps = 286718.916176 (excluding connections establishing)
statement latencies in milliseconds:
0.001 \set a random(1,100)
0.000 \set b random(1,1000000)
0.000 \set bo random(0,1)
0.109 insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);
postgres=# select count(*) from tbl;
count
----------
19805266
(1 row)
postgres=# select count(*) from tbl_result;
count
---------
5202622
(1 row)
6、单线程消费,一次消费100万行,速度约每秒6万。
with tmp as (
delete from tbl where ctid = any(array(
select ctid from tbl where
ts < now()-interval '1 min' -- 测试时改成了消费1分钟前的数据
order by ts limit 1000000
))
returning *
)
insert into tbl_mid
select * from tmp
order by ts;
Time: 16532.939 ms (00:16.533)
7、算法校验,正确
postgres=# select * from tbl_mid where b=2 order by ts limit 10;
a | b | ts | direct
----+---+----------------------------+--------
10 | 2 | 2018-08-15 10:24:58.538558 | t
25 | 2 | 2018-08-15 10:25:00.585426 | f
62 | 2 | 2018-08-15 10:25:04.2633 | f
45 | 2 | 2018-08-15 10:25:04.406764 | t
(4 rows)
postgres=# select * from tbl_result where b=2 order by b_ts limit 10;
b | b_ts | a | a_ts
---+----------------------------+----+----------------------------
2 | 2018-08-15 10:25:00.585426 | 45 | 2018-08-15 10:25:04.406764
(1 row)
消费性能,单线程吞吐达到6万行/s。
with tmp as (
delete from tbl where ctid = any(array(
select ctid from tbl where
ts < now()-interval '1 min' -- 测试时改成了消费1分钟前的数据
order by ts limit 1000000
))
returning *
)
insert into tbl_mid
select * from tmp
order by ts;
Time: 16532.939 ms (00:16.533)
消费节奏:
1、消费
2、VACUUM tbl;
3、消费
loop;
例子2
以第一种场景(无会话状态)为例。延迟批量统计的方法生成最终数据。(不消费(delete)已有数据)
1、会话表
create table tbl (
a int not null, -- 客服ID
b int not null, -- 客户ID
ts timestamp not null, -- 消息时间
direct boolean not null -- 消息方向 true: a->b, false: b->a
);
create index idx_tbl_ts on tbl(ts);
-- 也可以使用brin索引
-- create index idx_tbl_ts on tbl using brin(ts);
2、统计结果表
create table tbl_result (
b int not null, -- 客户ID
b_ts timestamp, -- 客户发起一次虚拟会话的最早时间
a int default -1, -- 最先响应这次虚拟会话的客服ID, -1表示还没人响应
a_ts timestamp -- 最先响应这次虚拟会话的时间
);
-- 添加约束,当客户的虚拟会话没有完结时,不计新虚拟会话。
-- 保证同一时刻,同一客户,只有一个未完结的虚拟会话。
alter table tbl_result add constraint uk exclude (b with =) where (a=-1);
3、中间会话表(可以不落地,只顺序计算)。
create table tbl_mid (
a int not null, -- 客服ID
b int not null, -- 客户ID
ts timestamp not null, -- 消息时间
direct boolean not null -- 消息方向 true: a->b, false: b->a
);
4、中间会话表触发器
(before 触发器 return null(不落地,只顺序计算))
create or replace function tb() returns trigger as $$
declare
begin
if not NEW.direct then -- b -> a 逻辑(客户发给客服)
perform 1 from tbl_result where b=NEW.b and a = -1;
if not found then
insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing;
-- update set b_ts=excluded.b_ts
-- where tbl_result.b_ts > excluded.b_ts; -- 仅当新写入时间小于原记录时更新, 也可以不做,假设TS是顺序的。
-- else
-- 说明还没有人回复它,跳过,等第一次客服响应来更新这条记录
end if;
else -- a -> b 逻辑(客服发给客户)
perform 1 from tbl_result where b=NEW.b and a = -1;
if found then
update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts;
-- else
-- 说明已有人回复,不需要更新
end if;
end if;
return NULL;
end;
$$ language plpgsql strict;
create trigger tg0 before insert on tbl_mid for each row execute procedure tb();
5、写入大批量数据,由于触发器转移到了中间表,所以写入吞吐达到了接近29万行/s。
假设有100个客服
100万个客户
使用clock_timestamp生成TS,确保数据按一定时序顺序写入。
vi test.sql
\set a random(1,100)
\set b random(1,1000000)
\set bo random(0,1)
insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 32
number of threads: 32
duration: 120 s
number of transactions actually processed: 34403943
latency average = 0.112 ms
latency stddev = 0.229 ms
tps = 286698.048259 (including connections establishing)
tps = 286718.916176 (excluding connections establishing)
statement latencies in milliseconds:
0.001 \set a random(1,100)
0.000 \set b random(1,1000000)
0.000 \set bo random(0,1)
0.109 insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);
postgres=# select count(*) from tbl;
count
----------
19805266
(1 row)
postgres=# select count(*) from tbl_result;
count
---------
5202622
(1 row)
6、单线程读取,统计,例如每次读取一个小时的数据(定义清楚边界,连续消费,同时避免并发、或重复消费,或者在写统计结果时做到幂等,不用担心重复消费)。
创建一张消费记录表,统计已消费的时间间隔。
create table tbl_record (ts1 timestamp, ts2 timestamp);
下次消费时,参考上次已消费的时间。
with tmp as (
insert into tbl_record (ts1, ts2) values ('2018-01-01 12:00:00', '2018-01-01 13:00:00') -- 记录当前消费窗口
)
insert into tbl_mid
select * from tbl
where ts >= '2018-01-01 12:00:00' and ts < '2018-01-01 13:00:00' -- 上一个小时为窗口 (当前时间 大于等于 '2018-01-01 14:00:00')
order by ts; -- 无会话模式
Time: 16532.939 ms (00:16.533)
7、算法校验,正确
postgres=# select * from tbl_mid where b=2 order by ts limit 10;
a | b | ts | direct
----+---+----------------------------+--------
10 | 2 | 2018-08-15 10:24:58.538558 | t
25 | 2 | 2018-08-15 10:25:00.585426 | f
62 | 2 | 2018-08-15 10:25:04.2633 | f
45 | 2 | 2018-08-15 10:25:04.406764 | t
(4 rows)
postgres=# select * from tbl_result where b=2 order by b_ts limit 10;
b | b_ts | a | a_ts
---+----------------------------+----+----------------------------
2 | 2018-08-15 10:25:00.585426 | 45 | 2018-08-15 10:25:04.406764
(1 row)
消费性能,单线程吞吐达到6万行/s。
with tmp as (
delete from tbl where ctid = any(array(
select ctid from tbl where
ts < now()-interval '1 min' -- 测试时改成了消费1分钟前的数据
order by ts limit 1000000
))
returning *
)
insert into tbl_mid
select * from tmp
order by b, ts; -- 无会话模式
Time: 16532.939 ms (00:16.533)
消费节奏:
1、消费
2、VACUUM tbl;
3、消费
loop;
例子3,使用窗口查询解决同一问题
1、新增索引,用于窗口查询加速
create index idx_tbl_1 on tbl (b,ts);
2、无会话模式,使用窗口查询,得到每个虚拟会话的开始时间、第一响应时间
select
a, -- 虚拟会话的第一条消息,客户发给了哪位客服ID
b, -- 客户ID
ts, -- 虚拟会话开始时间
lead_a, -- 最先响应的是谁(哪位客服)
lead_session_end_ts, -- 虚拟会话第一次响应时间
lead_session_end_ts - ts as dur, -- 响应间隔
direct,lag_direct,lag_ts
from
(
select *,
lead(session_end_ts) over w2 as lead_session_end_ts, -- 当前窗口,当前行的下一条ts值 , 即会话第一次响应时间
lead(a) over w2 as lead_a -- 当前窗口,当前行的下一条的b(客服ID) , 即响应的是哪位客服
from
(
select * from
(
select a,b,ts,direct,lag_direct,lag_ts,
case when ((direct = false and lag_direct is null) -- 判断虚拟会话开始时间的逻辑
or
(direct = false and lag_direct = true))
then ts
end as session_begin_ts, -- 虚拟会话开始时间
case when (direct = true and lag_direct = false) -- 判断虚拟会话第一次响应时间的逻辑
then ts
end as session_end_ts -- 虚拟会话第一次响应时间
from
(
select
a, -- 客服ID
b, -- 客户ID
ts, -- 消息时间
direct, -- 消息方向 true: a->b, false: b->a
lag(direct) over w1 as lag_direct, -- 当前窗口,当前行的上一条direct值
lag(ts) over w1 as lag_ts -- 当前窗口,当前行的上一条ts值
from tbl
window w1 as (partition by b order by ts)
-- where ts between xx and xx , 一次只查部分数据时可用
) t
) t
where session_begin_ts is not null -- 虚拟会话开始时间字段不为空,表示这条记录是会话开始的记录
or
session_end_ts is not null -- 虚拟会话结束时间字段不为空,表示这条记录是会话第一次响应的记录
) t
window w2 as (partition by b order by ts)
) t
where
direct = false -- 客户在虚拟会话中发起第一条消息的记录
and
lead_session_end_ts - ts is not null
limit 100;
3、结果、算法正确性验证
a | b | ts | lead_a | lead_session_end_ts | dur | direct | lag_direct | lag_ts
-----+----+----------------------------+--------+----------------------------+-----------------+--------+------------+----------------------------
26 | 1 | 2018-08-15 10:25:13.056316 | 75 | 2018-08-15 10:25:16.546126 | 00:00:03.48981 | f | |
43 | 1 | 2018-08-15 10:25:21.483542 | 99 | 2018-08-15 10:25:25.552488 | 00:00:04.068946 | f | t | 2018-08-15 10:25:16.546126
28 | 1 | 2018-08-15 10:25:28.287823 | 70 | 2018-08-15 10:25:37.375585 | 00:00:09.087762 | f | t | 2018-08-15 10:25:26.518359
12 | 1 | 2018-08-15 10:25:47.203597 | 20 | 2018-08-15 10:26:03.423969 | 00:00:16.220372 | f | t | 2018-08-15 10:25:47.036459
91 | 1 | 2018-08-15 10:26:05.332921 | 57 | 2018-08-15 10:26:08.070122 | 00:00:02.737201 | f | t | 2018-08-15 10:26:03.423969
24 | 1 | 2018-08-15 10:26:16.798485 | 85 | 2018-08-15 10:26:22.222025 | 00:00:05.42354 | f | t | 2018-08-15 10:26:15.319287
90 | 1 | 2018-08-15 10:26:22.58553 | 28 | 2018-08-15 10:26:25.987987 | 00:00:03.402457 | f | t | 2018-08-15 10:26:22.222025
30 | 1 | 2018-08-15 10:26:31.458875 | 42 | 2018-08-15 10:26:36.259917 | 00:00:04.801042 | f | t | 2018-08-15 10:26:25.987987
11 | 1 | 2018-08-15 10:26:37.828413 | 70 | 2018-08-15 10:26:49.212275 | 00:00:11.383862 | f | t | 2018-08-15 10:26:36.259917
21 | 2 | 2018-08-15 10:25:15.532378 | 66 | 2018-08-15 10:25:19.742437 | 00:00:04.210059 | f | |
50 | 2 | 2018-08-15 10:25:30.988507 | 20 | 2018-08-15 10:25:36.645969 | 00:00:05.657462 | f | t | 2018-08-15 10:25:30.750224
98 | 2 | 2018-08-15 10:25:47.075616 | 72 | 2018-08-15 10:25:52.34913 | 00:00:05.273514 | f | t | 2018-08-15 10:25:40.858465
72 | 2 | 2018-08-15 10:25:56.595608 | 99 | 2018-08-15 10:26:11.46232 | 00:00:14.866712 | f | t | 2018-08-15 10:25:55.324131
98 | 2 | 2018-08-15 10:26:12.303834 | 97 | 2018-08-15 10:26:15.341379 | 00:00:03.037545 | f | t | 2018-08-15 10:26:11.46232
63 | 2 | 2018-08-15 10:26:19.116171 | 22 | 2018-08-15 10:26:23.743978 | 00:00:04.627807 | f | t | 2018-08-15 10:26:15.341379
66 | 2 | 2018-08-15 10:26:30.024534 | 49 | 2018-08-15 10:26:41.196351 | 00:00:11.171817 | f | t | 2018-08-15 10:26:23.743978
83 | 2 | 2018-08-15 10:26:41.962942 | 51 | 2018-08-15 10:26:43.172856 | 00:00:01.209914 | f | t | 2018-08-15 10:26:41.196351
64 | 2 | 2018-08-15 10:26:43.575144 | 88 | 2018-08-15 10:26:44.17728 | 00:00:00.602136 | f | t | 2018-08-15 10:26:43.172856
4、对比使用中间表得到的结果
insert into tbl_mid select * from tbl order by ts ;
select * from tbl_result where b=1 or b=2 order by b_ts;
b | b_ts | a | a_ts
---+----------------------------+----+----------------------------
1 | 2018-08-15 10:25:13.056316 | 75 | 2018-08-15 10:25:16.546126
1 | 2018-08-15 10:25:21.483542 | 99 | 2018-08-15 10:25:25.552488
1 | 2018-08-15 10:25:28.287823 | 70 | 2018-08-15 10:25:37.375585
1 | 2018-08-15 10:25:47.203597 | 20 | 2018-08-15 10:26:03.423969
1 | 2018-08-15 10:26:05.332921 | 57 | 2018-08-15 10:26:08.070122
1 | 2018-08-15 10:26:16.798485 | 85 | 2018-08-15 10:26:22.222025
1 | 2018-08-15 10:26:22.58553 | 28 | 2018-08-15 10:26:25.987987
1 | 2018-08-15 10:26:31.458875 | 42 | 2018-08-15 10:26:36.259917
1 | 2018-08-15 10:26:37.828413 | 70 | 2018-08-15 10:26:49.212275
1 | 2018-08-15 10:26:50.622352 | -1 |
2 | 2018-08-15 10:25:15.532378 | 66 | 2018-08-15 10:25:19.742437
2 | 2018-08-15 10:25:30.988507 | 20 | 2018-08-15 10:25:36.645969
2 | 2018-08-15 10:25:47.075616 | 72 | 2018-08-15 10:25:52.34913
2 | 2018-08-15 10:25:56.595608 | 99 | 2018-08-15 10:26:11.46232
2 | 2018-08-15 10:26:12.303834 | 97 | 2018-08-15 10:26:15.341379
2 | 2018-08-15 10:26:19.116171 | 22 | 2018-08-15 10:26:23.743978
2 | 2018-08-15 10:26:30.024534 | 49 | 2018-08-15 10:26:41.196351
2 | 2018-08-15 10:26:41.962942 | 51 | 2018-08-15 10:26:43.172856
2 | 2018-08-15 10:26:43.575144 | 88 | 2018-08-15 10:26:44.17728
2 | 2018-08-15 10:26:45.595639 | -1 |
(20 rows)
5、会话模式,SQL改动两处即可。
create index idx_tbl_2 on tbl (b,a,ts); -- 窗口加速
select
a, -- 虚拟会话的第一条消息,客户发给了哪位客服ID
b, -- 客户ID
ts, -- 虚拟会话开始时间
lead_a, -- 最先响应的是谁(哪位客服)
lead_session_end_ts, -- 虚拟会话第一次响应时间
lead_session_end_ts - ts as dur, -- 响应间隔
direct,lag_direct,lag_ts
from
(
select *,
lead(session_end_ts) over w2 as lead_session_end_ts, -- 当前窗口,当前行的下一条ts值 , 即会话第一次响应时间
lead(a) over w2 as lead_a -- 当前窗口,当前行的下一条的b(客服ID) , 即响应的是哪位客服
from
(
select * from
(
select a,b,ts,direct,lag_direct,lag_ts,
case when ((direct = false and lag_direct is null) -- 判断虚拟会话开始时间的逻辑
or
(direct = false and lag_direct = true))
then ts
end as session_begin_ts, -- 虚拟会话开始时间
case when (direct = true and lag_direct = false) -- 判断虚拟会话第一次响应时间的逻辑
then ts
end as session_end_ts -- 虚拟会话第一次响应时间
from
(
select
a, -- 客服ID
b, -- 客户ID
ts, -- 消息时间
direct, -- 消息方向 true: a->b, false: b->a
lag(direct) over w1 as lag_direct, -- 当前窗口,当前行的上一条direct值
lag(ts) over w1 as lag_ts -- 当前窗口,当前行的上一条ts值
from tbl
window w1 as (partition by b,a order by ts) -- 有会话模式,改这个partition
-- where ts between xx and xx , 一次只查部分数据时可用
) t
) t
where session_begin_ts is not null -- 虚拟会话开始时间字段不为空,表示这条记录是会话开始的记录
or
session_end_ts is not null -- 虚拟会话结束时间字段不为空,表示这条记录是会话第一次响应的记录
) t
window w2 as (partition by b,a order by ts) -- 有会话模式,改这个partition
) t
where
direct = false -- 客户在虚拟会话中发起第一条消息的记录
and
lead_session_end_ts - ts is not null
limit 100;
性能,3000万记录,1毫秒响应。
小结
本文涉及的场景为无会话、或者会话无明显标识的情况下,使用PostgreSQL高效率的统计客服的响应速度的问题。
使用到的方法与性能指标
1、实时计算,触发器(当到达时间有序, 或者说大部分有序时。使用clock_timestamp可以让数据基本有序)
写入吞吐16.5万行每秒。
2、阅后即焚(延迟消费,解决数据写入无需的问题)。
写入吞吐29万行每秒。
单线程消费6万行每秒。
3、阅后即焚,使用HASH,并行消费,提升消费吞吐。
4、使用窗口查询,同样能够很好的解决此场景的需求,而且性能杠杠的。
参考
《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》