PostgreSQL 实践 - 实时广告位推荐 1 (任意字段组合、任意维度组合搜索、输出TOP-K)
背景
店铺,广告推荐位,自动计算,高效检索,高效更新。
根据:本店、全网用户行为,库存等进行运算,得到每个商品的分值,推荐排行靠前的商品。
维度可能很多,例如:北京的男性用户在秋天买袜子的可能性是0.431,这里面就是4个维度。实际场景维度可能有几十个,几百个,甚至几千个。
需要支持任意维度,排序,求TOP 100,要求毫秒级延迟,100万QPS。
设计1
1、定义维度
create table tbl_weidu (
wid int primary key,
info json
);
2、定义推荐表,只存储排在前100的商品和分值
create table tbl_score (
wid int not null, -- 维度ID
uid int8 not null, -- ToB 店铺ID
top10 text[] not null, -- top 10的 item_score
primary key(wid,uid)
);
3、定义一个函数,用于合并两个text数组,在有新的商品分值输入时,合并为一个新值(当商品重复时,新值覆盖旧值,最后排序,保留输出TOP N)
create or replace function merge_top10(
text[], -- old value
text[], -- new value
ln int -- 按score排序,保留 top N
) returns text[] as $$
select array_agg(v2||'_'||v3 order by v3 desc) from
(
select v2,v3 from
(
select v2,v3,row_number() over(partition by v2 order by v1 desc) as rn from -- 同一个商品, 使用new values
(
select 1 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($1) t(info) -- old values
union all
select 2 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($2) t(info) -- new values
) t
) t where rn=1 order by v3 desc limit ln -- 同一个商品, 使用new values
) t;
$$ language sql strict immutable;
4、定义日志表,用于记录商品在某个维度上的值的变更,后面消费这个LOG表,合并更新最后的tbl_score表
create unlogged table tbl_score_log (
wid int not null, -- 维度ID
uid int8 not null, -- ToB 店铺ID
item int8 not null, -- 商品ID
score float4 not null, -- 打分
crt_time timestamp not null
);
create index idx_tbl_score_log_1 on tbl_score_log (wid,uid,crt_time);
5、定义消费函数
create or replace function consume_log(
i_loop int, -- 循环处理多少次,(多少组wid,uid)
i_limit int, -- 对于同一组wid,uid,单次处理多少行
i_topn int -- 每个wid,uid 维度,保留TOP N个item (score高的前N个)
) returns void as $$
declare
v_wid int;
v_uid int8;
v_top1 text[];
i int := 0;
begin
LOOP
exit when i >= i_loop; -- loops
select wid,uid into v_wid,v_uid from tbl_score_log for update skip locked limit 1;
with
a as (
delete from tbl_score_log where ctid= any (array(
select ctid from tbl_score_log where wid=v_wid and uid=v_uid order by crt_time limit i_limit -- limit batch
)) returning item,score
)
select
array_agg((item||'_'||score)::text order by score desc) into v_top1
from
(select item,score from a order by score desc limit i_topn) t; -- limit topn
insert into tbl_score
values (v_wid, v_uid, v_top1)
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn);
i := i+1;
END LOOP;
end;
$$ language plpgsql strict;
6、压测1,生成分值变更日志
(1000个维度,1万家店,1亿个商品)
vi test.sql
\set wid random(1,1000)
\set uid random(1,10000)
\set item random(1,100000000)
insert into tbl_score_log values (:wid,:uid,:item,random()*100,now());
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
tps = 257737.493753 (including connections establishing)
tps = 257752.428348 (excluding connections establishing)
写入超过25万行/s.
8、消费LOG表,合并结果到分值表
postgres=# select consume_log(10, 10000, 100);
consume_log
-------------
(1 row)
postgres=# \timing
Timing is on.
postgres=# select * from tbl_score limit 10;
wid | uid | top10
-----+------+-------------------------------------
115 | 69 | {989915_22.2217}
441 | 3914 | {7521898_39.2669}
423 | 7048 | {75494665_92.5439}
789 | 1335 | {57756208_23.4602}
776 | 8065 | {41134454_46.8727}
785 | 6248 | {76364646_93.4671,94065193_69.2552}
567 | 7539 | {97116865_6.93694}
207 | 6926 | {45163995_14.1626}
788 | 9025 | {73053901_80.3204}
334 | 2805 | {80532634_78.1224}
(10 rows)
Time: 0.300 ms
9、跟踪每一次消费消耗的资源
load 'auto_explain';
set auto_explain.log_analyze =on;
set auto_explain.log_buffers =on;
set auto_explain.log_min_duration =0;
set auto_explain.log_nested_statements =on;
set auto_explain.log_time=on;
set auto_explain.log_verbose =on;
set client_min_messages ='log';
postgres=# select consume_log(1, 10000, 100);
LOG: duration: 0.819 ms plan:
Query Text: select wid,uid from tbl_score_log for update skip locked limit 1
Limit (cost=10000000000.00..10000000000.03 rows=1 width=18) (actual time=0.816..0.816 rows=1 loops=1)
Output: wid, uid, ctid
Buffers: shared hit=177
-> LockRows (cost=10000000000.00..10000876856.44 rows=30947272 width=18) (actual time=0.815..0.815 rows=1 loops=1)
Output: wid, uid, ctid
Buffers: shared hit=177
-> Seq Scan on public.tbl_score_log (cost=10000000000.00..10000567383.72 rows=30947272 width=18) (actual time=0.808..0.808 rows=1 loops=1)
Output: wid, uid, ctid
Buffers: shared hit=176
LOG: duration: 0.104 ms plan:
Query Text: with
a as (
delete from tbl_score_log where ctid= any (array(
select ctid from tbl_score_log where wid=v_wid and uid=v_uid order by crt_time limit i_limit -- limit batch
)) returning item,score
)
select
array_agg((item||'_'||score)::text order by score desc) from
(select item,score from a order by score desc limit i_topn) t
Aggregate (cost=13.56..13.57 rows=1 width=32) (actual time=0.100..0.100 rows=1 loops=1)
Output: array_agg((((a.item)::text || '_'::text) || (a.score)::text) ORDER BY a.score DESC)
Buffers: shared hit=20
CTE a
-> Delete on public.tbl_score_log tbl_score_log_1 (cost=2.06..13.16 rows=10 width=6) (actual time=0.059..0.063 rows=4 loops=1)
Output: tbl_score_log_1.item, tbl_score_log_1.score
Buffers: shared hit=20
InitPlan 1 (returns $0)
-> Limit (cost=0.56..2.05 rows=1 width=14) (actual time=0.017..0.043 rows=4 loops=1)
Output: tbl_score_log.ctid, tbl_score_log.crt_time
Buffers: shared hit=8
-> Index Scan using idx_tbl_score_log_1 on public.tbl_score_log (cost=0.56..5.02 rows=3 width=14) (actual time=0.017..0.041 rows=4 loops=1)
Output: tbl_score_log.ctid, tbl_score_log.crt_time
Index Cond: ((tbl_score_log.wid = $5) AND (tbl_score_log.uid = $6))
Buffers: shared hit=8
-> Tid Scan on public.tbl_score_log tbl_score_log_1 (cost=0.01..11.11 rows=10 width=6) (actual time=0.053..0.055 rows=4 loops=1)
Output: tbl_score_log_1.ctid
TID Cond: (tbl_score_log_1.ctid = ANY ($0))
Buffers: shared hit=12
-> Limit (cost=0.37..0.37 rows=1 width=12) (actual time=0.077..0.079 rows=4 loops=1)
Output: a.item, a.score
Buffers: shared hit=20
-> Sort (cost=0.37..0.39 rows=10 width=12) (actual time=0.076..0.077 rows=4 loops=1)
Output: a.item, a.score
Sort Key: a.score DESC
Sort Method: quicksort Memory: 25kB
Buffers: shared hit=20
-> CTE Scan on a (cost=0.00..0.20 rows=10 width=12) (actual time=0.060..0.066 rows=4 loops=1)
Output: a.item, a.score
Buffers: shared hit=20
LOG: duration: 0.046 ms plan:
Query Text: insert into tbl_score
values (v_wid, v_uid, v_top1)
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)
Insert on public.tbl_score (cost=0.00..0.01 rows=1 width=44) (actual time=0.045..0.045 rows=0 loops=1)
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: tbl_score_pkey
Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, $3))
Tuples Inserted: 1
Conflicting Tuples: 0
Buffers: shared hit=7
-> Result (cost=0.00..0.01 rows=1 width=44) (actual time=0.000..0.001 rows=1 loops=1)
Output: $5, $6, $7
LOG: duration: 1.951 ms plan:
Query Text: select consume_log(1, 10000, 100);
Result (cost=0.00..0.26 rows=1 width=4) (actual time=1.944..1.944 rows=1 loops=1)
Output: consume_log(1, 10000, 100)
Buffers: shared hit=212
consume_log
-------------
(1 row)
Time: 2.390 ms
消耗1万个指标,约1.5秒。
10、压测2,查询某个维度,某个店铺的广告位推荐
vi test1.sql
\set wid random(1,1000)
\set uid random(1,10000)
select * from tbl_score where wid=:wid and uid=:uid;
pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 32 -j 32 -T 120
tps = 470514.018510 (including connections establishing)
tps = 470542.672975 (excluding connections establishing)
查询速度可以达到 45万 qps.
设计2
设计1的一个可以优化的点,在写入tbl_score_log时,如果不同维度的数据夹杂在一起输入,在消费时会引入IO放大的问题:
《PostgreSQL 时序最佳实践 - 证券交易系统数据库设计 - 阿里云RDS PostgreSQL最佳实践》
我们可以使用以上同样的方法来对维度数据分区存放,消费时也按分区消费。
1、创建维度描述表
create table tbl_weidu (
wid int primary key,
info json
);
2、创建TOP-K分值表
create table tbl_score (
wid int not null, -- 维度ID
uid int8 not null, -- ToB 店铺ID
top10 text[] not null, -- top 10的item_score
primary key(wid,uid)
);
3、创建任务表,记录每次消耗LOG时的计数,每个维度一个计数器
create table tbl_score_task (
wid int not null, -- 维度ID
uid int8 not null, -- ToB 店铺ID
cnt int8 default 0, -- 被计算次数
primary key(wid,uid)
);
create index idx_tbl_score_task_cnt on tbl_score_task (cnt);
4、合并两个TEXT数组的函数
create or replace function merge_top10(
text[], -- old value
text[], -- new value
ln int -- 按score排序,保留 top N
) returns text[] as $$
select array_agg(v2||'_'||v3 order by v3 desc) from
(
select v2,v3 from
(
select v2,v3,row_number() over(partition by v2 order by v1 desc) as rn from
(
select 1 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($1) t(info)
union all
select 2 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($2) t(info)
) t
) t where rn=1 order by v3 desc limit ln
) t;
$$ language sql strict immutable;
5、日志表
create unlogged table tbl_score_log ( -- 流水数据,不计日志,数据库崩溃会丢失所有记录
item int8 not null, -- 商品ID
score float4 not null, -- 打分
crt_time timestamp not null
);
create index idx_tbl_score_log_1 on tbl_score_log (crt_time);
6、创建写入LOG的函数,解决<设计1>的IO放大问题,设计1>
create or replace function ins_score_log(
i_wid int,
i_uid int8,
i_item int8,
i_score float4
) returns void as $$
declare
begin
execute format('insert into tbl_score_log_%s_%s values (%s,%s,now())', i_wid, i_uid, i_item, i_score);
insert into tbl_score_task (wid, uid) values (i_wid, i_uid) on conflict (wid,uid) do nothing;
exception when others then
execute format('create unlogged table tbl_score_log_%s_%s (like tbl_score_log including all) inherits (tbl_score_log)', i_wid, i_uid, i_item, i_score);
execute format('insert into tbl_score_log_%s_%s values (%s,%s,now())', i_wid, i_uid, i_item, i_score);
insert into tbl_score_task (wid, uid) values (i_wid, i_uid) on conflict (wid,uid) do nothing;
end;
$$ language plpgsql strict;
但是请注意
如果有以上问题,那么建议按UID或WID切库,将数据切到不同的库里面,避免单个目录文件过多。
7、消费LOG
create or replace function consume_log(
i_loop int, -- 循环处理多少次,(多少组wid,uid)
i_limit int, -- 对于同一组wid,uid,单次处理多少行
i_topn int -- 每个wid,uid 维度,保留TOP N个item (score高的前N个)
) returns void as $$
declare
v_wid int;
v_uid int8;
v_top1 text[];
i int := 0;
begin
LOOP
exit when i >= i_loop; -- loops
with a as
(select wid,uid from tbl_score_task order by cnt for update skip locked limit 1)
update tbl_score_task t set cnt=cnt+1 from a where t.wid = a.wid and t.uid = a.uid returning t.wid,t.uid into v_wid, v_uid;
execute format ($_$
with
a as (
delete from tbl_score_log_%s_%s where ctid= any (array(
select ctid from tbl_score_log_%s_%s order by crt_time limit %s -- limit batch
)) returning item,score
)
select
array_agg((item||'_'||score)::text order by score desc)
from
(select item,score from a order by score desc limit %s) t -- limit topn
$_$, v_wid, v_uid, v_wid, v_uid, i_limit, i_topn
) into v_top1;
-- raise notice '%', v_top1;
if v_top1 is null then
continue;
end if;
insert into tbl_score
values (v_wid, v_uid, v_top1)
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn);
i := i+1;
END LOOP;
end;
$$ language plpgsql strict;
8、写入压测
vi test.sql
\set wid random(1,1000)
\set uid random(1,10000)
\set item random(1,100000000)
select ins_score_log (:wid,:uid::int8,:item::int8,(random()*100)::float4);
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120
tps = 146606.220095 (including connections establishing)
tps = 146614.705007 (excluding connections establishing)
所有分区都建好之后,由于使用了动态SQL,写入只有15万行/s左右。
9、消耗LOG,合并到SCORE表
postgres=# select consume_log(10, 10000, 100);
consume_log
-------------
(1 row)
postgres=# \timing
Timing is on.
postgres=# select * from tbl_score limit 10;
wid | uid | top10
-----+------+-------------------------------------
115 | 69 | {989915_22.2217}
441 | 3914 | {7521898_39.2669}
423 | 7048 | {75494665_92.5439}
789 | 1335 | {57756208_23.4602}
776 | 8065 | {41134454_46.8727}
785 | 6248 | {76364646_93.4671,94065193_69.2552}
567 | 7539 | {97116865_6.93694}
207 | 6926 | {45163995_14.1626}
788 | 9025 | {73053901_80.3204}
334 | 2805 | {80532634_78.1224}
(10 rows)
Time: 0.300 ms
postgres=# select consume_log(10, 10000, 100);
consume_log
-------------
(1 row)
Time: 3677.130 ms (00:03.677)
postgres=# select consume_log(1, 10000, 100);
LOG: duration: 0.105 ms plan:
Query Text: with a as
(select wid,uid from tbl_score_task order by cnt for update skip locked limit 1)
update tbl_score_task t set cnt=cnt+1 from a where t.wid = a.wid and t.uid = a.uid returning t.wid,t.uid
Update on public.tbl_score_task t (cost=0.60..2.85 rows=1 width=62) (actual time=0.099..0.100 rows=1 loops=1)
Output: t.wid, t.uid
Buffers: shared hit=13
CTE a
-> Limit (cost=0.28..0.32 rows=1 width=26) (actual time=0.036..0.036 rows=1 loops=1)
Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid
Buffers: shared hit=4
-> LockRows (cost=0.28..271.41 rows=7057 width=26) (actual time=0.035..0.035 rows=1 loops=1)
Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid
Buffers: shared hit=4
-> Index Scan using idx_tbl_score_task_cnt on public.tbl_score_task (cost=0.28..200.84 rows=7057 width=26) (actual time=0.018..0.018 rows=1 loops=1)
Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid
Buffers: shared hit=3
-> Nested Loop (cost=0.28..2.53 rows=1 width=62) (actual time=0.059..0.060 rows=1 loops=1)
Output: t.wid, t.uid, (t.cnt + 1), t.ctid, a.*
Inner Unique: true
Buffers: shared hit=7
-> CTE Scan on a (cost=0.00..0.02 rows=1 width=48) (actual time=0.046..0.047 rows=1 loops=1)
Output: a.*, a.wid, a.uid
Buffers: shared hit=4
-> Index Scan using tbl_score_task_pkey on public.tbl_score_task t (cost=0.28..2.50 rows=1 width=26) (actual time=0.009..0.009 rows=1 loops=1)
Output: t.wid, t.uid, t.cnt, t.ctid
Index Cond: ((t.wid = a.wid) AND (t.uid = a.uid))
Buffers: shared hit=3
LOG: duration: 24.624 ms plan:
Query Text:
with
a as (
delete from tbl_score_log_3_5 where ctid= any (array(
select ctid from tbl_score_log_3_5 order by crt_time limit 10000 -- limit batch
)) returning item,score
)
select
array_agg((item||'_'||score)::text order by score desc)
from
(select item,score from a order by score desc limit 100) t -- limit topn
Aggregate (cost=279.53..279.54 rows=1 width=32) (actual time=24.619..24.619 rows=1 loops=1)
Output: array_agg((((a.item)::text || '_'::text) || (a.score)::text) ORDER BY a.score DESC)
Buffers: shared hit=39297
CTE a
-> Delete on public.tbl_score_log_3_5 tbl_score_log_3_5_1 (cost=267.76..278.86 rows=10 width=6) (actual time=10.193..19.993 rows=10000 loops=1)
Output: tbl_score_log_3_5_1.item, tbl_score_log_3_5_1.score
Buffers: shared hit=39297
InitPlan 1 (returns $0)
-> Limit (cost=0.42..267.75 rows=10000 width=14) (actual time=0.017..7.185 rows=10000 loops=1)
Output: tbl_score_log_3_5.ctid, tbl_score_log_3_5.crt_time
Buffers: shared hit=9297
-> Index Scan using tbl_score_log_3_5_crt_time_idx on public.tbl_score_log_3_5 (cost=0.42..3907.05 rows=146135 width=14) (actual time=0.016..5.319 rows=10000 loops=1)
Output: tbl_score_log_3_5.ctid, tbl_score_log_3_5.crt_time
Buffers: shared hit=9297
-> Tid Scan on public.tbl_score_log_3_5 tbl_score_log_3_5_1 (cost=0.01..11.11 rows=10 width=6) (actual time=10.188..13.238 rows=10000 loops=1)
Output: tbl_score_log_3_5_1.ctid
TID Cond: (tbl_score_log_3_5_1.ctid = ANY ($0))
Buffers: shared hit=19297
-> Limit (cost=0.37..0.39 rows=10 width=12) (actual time=24.433..24.461 rows=100 loops=1)
Output: a.item, a.score
Buffers: shared hit=39297
-> Sort (cost=0.37..0.39 rows=10 width=12) (actual time=24.432..24.443 rows=100 loops=1)
Output: a.item, a.score
Sort Key: a.score DESC
Sort Method: top-N heapsort Memory: 32kB
Buffers: shared hit=39297
-> CTE Scan on a (cost=0.00..0.20 rows=10 width=12) (actual time=10.195..22.790 rows=10000 loops=1)
Output: a.item, a.score
Buffers: shared hit=39297
LOG: duration: 0.084 ms plan:
Query Text: insert into tbl_score
values (v_wid, v_uid, v_top1)
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)
Insert on public.tbl_score (cost=0.00..0.01 rows=1 width=44) (actual time=0.083..0.083 rows=0 loops=1)
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: tbl_score_pkey
Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, $3))
Tuples Inserted: 1
Conflicting Tuples: 0
Buffers: shared hit=4
-> Result (cost=0.00..0.01 rows=1 width=44) (actual time=0.001..0.001 rows=1 loops=1)
Output: $5, $6, $7
LOG: duration: 26.335 ms plan:
Query Text: select consume_log(1, 10000, 100);
Result (cost=0.00..0.26 rows=1 width=4) (actual time=26.329..26.329 rows=1 loops=1)
Output: consume_log(1, 10000, 100)
Buffers: shared hit=39388
consume_log
-------------
(1 row)
Time: 26.937 ms
设计3
与设计1类似,只是在前面再加一个离散写入表,定期对离散表排序后写入tbl_score_log表,再从tbl_score_log消费(与设计1保持一致),解决IO放大问题。
使用AB表切换:
create unlogged table tbl_score_log_a (
wid int not null, -- 维度ID
uid int8 not null, -- ToB 店铺ID
item int8 not null, -- 商品ID
score float4 not null, -- 打分
crt_time timestamp not null
);
create unlogged table tbl_score_log_b (
wid int not null, -- 维度ID
uid int8 not null, -- ToB 店铺ID
item int8 not null, -- 商品ID
score float4 not null, -- 打分
crt_time timestamp not null
);
例如堆积了2000万记录后,排序写入tbl_score_log
begin;
lock table tbl_score_log_a in ACCESS EXCLUSIVE mode;
insert into tbl_score_log select * from tbl_score_log_a order by wid,uid,crt_time;
truncate tbl_score_log_a;
end;
设计4
与设计1类似,只是每次计算的是多个维度而不是一个维度。
单次计算多个维度的TOP-K,参考这种方法:
《PostgreSQL 递归妙用案例 - 分组数据去重与打散》
设计1采用每个维度计算一次的方法,如果使用设计1,那么会导致IO放大,而如果使用单次计算多个维度的方法,IO放大的问题就没了。(但是建议这种方法单次计算更大量的数据(比如一次计算1000万条),否则可能造成tbl_score更新频次过多的问题(单个维度多次消耗,多次更新))
与设计1不同的设计之处如下:
create unlogged table tbl_score_log (
wid int not null, -- 维度ID
uid int8 not null, -- ToB 店铺ID
item int8 not null, -- 商品ID
score float4 not null, -- 打分
crt_time timestamp not null
);
create index idx_tbl_score_log_1 on tbl_score_log (crt_time);
create or replace function consume_log(
i_limit int, -- 单次处理多少行
i_topn int -- 每个wid,uid 维度,保留TOP N个item (score高的前N个)
) returns void as $$
declare
begin
with
a as (
delete from tbl_score_log where ctid= any (array(
select ctid from tbl_score_log order by crt_time limit i_limit -- limit batch
)) returning wid,uid,item,score
)
insert into tbl_score
select wid,uid,topn
from
(
select
wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn
from
(
select wid,uid,item,score from
(select wid,uid,item,score,row_number() over (partition by wid,uid order by score desc) as rn from a) t
where rn <= i_topn -- limit topn
) t
group by wid,uid
) t
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn) -- limit topn
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn) -- limit topn
;
end;
$$ language plpgsql strict;
select consume_log(10000000,100);
或者可以直接使用如下SQL来进行消费例如
with
a as (
delete from tbl_score_log where ctid= any (array(
select ctid from tbl_score_log order by crt_time limit 10000000 -- limit batch
)) returning wid,uid,item,score
)
insert into tbl_score
select wid,uid,topn
from
(
select
wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn
from
(
select wid,uid,item,score from
(select wid,uid,item,score,row_number() over (partition by wid,uid order by score desc) as rn from a) t
where rn <= 100 -- limit topn
) t
group by wid,uid
) t
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, 100) -- limit topn
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, 100) -- limit topn
;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Insert on public.tbl_score (cost=36744.69..36745.17 rows=3 width=44) (actual time=69966.565..69966.565 rows=0 loops=1)
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: tbl_score_pkey
Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, 100))
Tuples Inserted: 317084
Conflicting Tuples: 634683
Buffers: shared hit=13811948 read=7001 dirtied=7001
CTE a
-> Delete on public.tbl_score_log tbl_score_log_1 (cost=36733.22..36744.32 rows=10 width=6) (actual time=968.724..1891.686 rows=1000000 loops=1)
Output: tbl_score_log_1.wid, tbl_score_log_1.uid, tbl_score_log_1.item, tbl_score_log_1.score
Buffers: shared hit=4007463
InitPlan 1 (returns $0)
-> Limit (cost=0.43..36733.21 rows=1000000 width=14) (actual time=0.011..660.528 rows=1000000 loops=1)
Output: tbl_score_log.ctid, tbl_score_log.crt_time
Buffers: shared hit=999099
-> Index Scan using idx_tbl_score_log_1 on public.tbl_score_log (cost=0.43..427926.61 rows=11649711 width=14) (actual time=0.010..494.951 rows=1000000 loops=1)
Output: tbl_score_log.ctid, tbl_score_log.crt_time
Buffers: shared hit=999099
-> Tid Scan on public.tbl_score_log tbl_score_log_1 (cost=0.01..11.11 rows=10 width=6) (actual time=968.673..1265.722 rows=1000000 loops=1)
Output: tbl_score_log_1.ctid
TID Cond: (tbl_score_log_1.ctid = ANY ($0))
Buffers: shared hit=1999099
-> GroupAggregate (cost=0.37..0.82 rows=3 width=44) (actual time=2907.640..8707.867 rows=951767 loops=1)
Output: t.wid, t.uid, array_agg((((t.item)::text || '_'::text) || (t.score)::text) ORDER BY t.score DESC)
Group Key: t.wid, t.uid
Buffers: shared hit=4007463
-> Subquery Scan on t (cost=0.37..0.72 rows=3 width=24) (actual time=2907.590..4711.497 rows=1000000 loops=1)
Output: t.wid, t.uid, t.item, t.score, t.rn
Filter: (t.rn <= 100)
Buffers: shared hit=4007463
-> WindowAgg (cost=0.37..0.59 rows=10 width=32) (actual time=2907.588..4395.127 rows=1000000 loops=1)
Output: a.wid, a.uid, a.item, a.score, row_number() OVER (?)
Buffers: shared hit=4007463
-> Sort (cost=0.37..0.39 rows=10 width=24) (actual time=2907.575..3283.649 rows=1000000 loops=1)
Output: a.wid, a.uid, a.score, a.item
Sort Key: a.wid, a.uid, a.score DESC
Sort Method: quicksort Memory: 102702kB
Buffers: shared hit=4007463
-> CTE Scan on a (cost=0.00..0.20 rows=10 width=24) (actual time=968.728..2201.439 rows=1000000 loops=1)
Output: a.wid, a.uid, a.score, a.item
Buffers: shared hit=4007463
Planning time: 0.623 ms
Execution time: 69990.738 ms
(43 rows)
设计5
与设计4类似,只是我们不使用delete tbl_score_log的方式来消耗,而是将tbl_score_log使用分区表或类似AB表的方式,一次消耗一整张表。那么就不需要delete了,而是算完直接truncate.
begin;
insert into tbl_score
select wid,uid,topn
from
(
select
wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn
from
(
select wid,uid,item,score from
(select wid,uid,item,score,row_number() over (partition by wid,uid order by wid,uid,score desc) as rn from tbl_score_log_a) t -- AB表切换的方式
where rn <= 100 -- limit topn
) t
group by wid,uid
) t
on conflict (wid,uid)
do update set top10 = merge_top10(tbl_score.top10, excluded.top10, 100) -- limit topn
where
tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, 100) -- limit topn
;
truncate tbl_score_log_a;
end;
小结
1、使用预排的方法,使得查询响应得到保障,单个RDS PG实例可以做到45万的tps。
2、初始数据生成,可以从OSS导入(在HDB PG或ODPS中计算好,生成初始数据,写入OSS)。使用并行导入,可以加快导入速度,参考如下:
《阿里云RDS PostgreSQL OSS 外部表 - (dblink异步调用封装)并行写提速案例》
3、增量数据,通过记日志的形式写入RDS PG,在RDS PG中调度消费日志,合并到最终的tbl_score表。
增量(新增、删除、更新):
删除,设置SCORE=0
更新,UDF已包含(覆盖)。
其他思考
1、考虑引入概率计算?
《PostgreSQL count-min sketch top-n 概率计算插件 cms_topn (结合窗口实现同比、环比、滑窗分析等) - 流计算核心功能之一》
2、单次计算多个维度的TOP-K,参考这种方法:
《PostgreSQL 递归妙用案例 - 分组数据去重与打散》
目前采用每个维度计算一次的方法,如果使用设计1,那么会导致IO放大,而如果使用单次计算多个维度的方法,IO放大的问题就没了。(但是建议这种方法单次计算更大量的数据(比如一次计算1000万条),否则可能造成tbl_score更新频次过多的问题(单个维度多次消耗,多次更新))
参考
《PostgreSQL 时序最佳实践 - 证券交易系统数据库设计 - 阿里云RDS PostgreSQL最佳实践》
《阿里云RDS PostgreSQL OSS 外部表 - (dblink异步调用封装)并行写提速案例》
《PostgreSQL count-min sketch top-n 概率计算插件 cms_topn (结合窗口实现同比、环比、滑窗分析等) - 流计算核心功能之一》