PostgreSQL 实践 - 实时广告位推荐 1 (任意字段组合、任意维度组合搜索、输出TOP-K)

14 minute read

背景

店铺,广告推荐位,自动计算,高效检索,高效更新。

根据:本店、全网用户行为,库存等进行运算,得到每个商品的分值,推荐排行靠前的商品。

维度可能很多,例如:北京的男性用户在秋天买袜子的可能性是0.431,这里面就是4个维度。实际场景维度可能有几十个,几百个,甚至几千个。

需要支持任意维度,排序,求TOP 100,要求毫秒级延迟,100万QPS。

设计1

1、定义维度

create table tbl_weidu (  
  wid int primary key,  
  info json  
);   

2、定义推荐表,只存储排在前100的商品和分值

create table tbl_score (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  top10 text[] not null, -- top 10的 item_score  
  primary key(wid,uid)  
);   

3、定义一个函数,用于合并两个text数组,在有新的商品分值输入时,合并为一个新值(当商品重复时,新值覆盖旧值,最后排序,保留输出TOP N)

create or replace function merge_top10(  
text[],   -- old value  
text[],   -- new value  
ln int    -- 按score排序,保留 top N  
) returns text[] as $$  
  select array_agg(v2||'_'||v3 order by v3 desc) from   
  (  
    select v2,v3 from   
    (  
      select v2,v3,row_number() over(partition by v2 order by v1 desc) as rn from   -- 同一个商品, 使用new values  
      (  
        select 1 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($1) t(info) -- old values  
        union all  
        select 2 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($2) t(info) -- new values  
      ) t  
    ) t where rn=1 order by v3 desc limit ln   -- 同一个商品, 使用new values  
  ) t;  
$$ language sql strict immutable;  

4、定义日志表,用于记录商品在某个维度上的值的变更,后面消费这个LOG表,合并更新最后的tbl_score表

create unlogged table tbl_score_log (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  item int8 not null, -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   
  
create index idx_tbl_score_log_1 on tbl_score_log (wid,uid,crt_time);  

5、定义消费函数

create or replace function consume_log(  
  i_loop int,    -- 循环处理多少次,(多少组wid,uid)  
  i_limit int,   -- 对于同一组wid,uid,单次处理多少行  
  i_topn int     -- 每个wid,uid 维度,保留TOP N个item (score高的前N个)  
) returns void as $$  
declare  
  v_wid int;  
  v_uid int8;  
  v_top1 text[];  
  i int := 0;  
begin  
  LOOP  
  exit when i >= i_loop;   --  loops  
  
  select wid,uid into v_wid,v_uid from tbl_score_log for update skip locked limit 1;  
  
  with  
  a as (  
    delete from tbl_score_log where ctid= any (array(  
      select ctid from tbl_score_log where wid=v_wid and uid=v_uid order by crt_time limit i_limit  -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc) into v_top1   
    from  
    (select item,score from a order by score desc limit i_topn) t;        -- limit topn  
  
  insert into tbl_score   
  values (v_wid, v_uid, v_top1)  
  on conflict (wid,uid)  
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where  
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn);  
  
  i := i+1;  
  END LOOP;  
end;  
$$ language plpgsql strict;  

6、压测1,生成分值变更日志

(1000个维度,1万家店,1亿个商品)

vi test.sql  
\set wid random(1,1000)  
\set uid random(1,10000)  
\set item random(1,100000000)  
insert into tbl_score_log values (:wid,:uid,:item,random()*100,now());  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  
  
tps = 257737.493753 (including connections establishing)  
tps = 257752.428348 (excluding connections establishing)  

写入超过25万行/s.

8、消费LOG表,合并结果到分值表

postgres=# select consume_log(10, 10000, 100);  
 consume_log   
-------------  
   
(1 row)  
  
postgres=# \timing  
Timing is on.  
  
postgres=# select * from tbl_score limit 10;  
 wid | uid  |                top10                  
-----+------+-------------------------------------  
 115 |   69 | {989915_22.2217}  
 441 | 3914 | {7521898_39.2669}  
 423 | 7048 | {75494665_92.5439}  
 789 | 1335 | {57756208_23.4602}  
 776 | 8065 | {41134454_46.8727}  
 785 | 6248 | {76364646_93.4671,94065193_69.2552}  
 567 | 7539 | {97116865_6.93694}  
 207 | 6926 | {45163995_14.1626}  
 788 | 9025 | {73053901_80.3204}  
 334 | 2805 | {80532634_78.1224}  
(10 rows)  
  
Time: 0.300 ms  

9、跟踪每一次消费消耗的资源

load 'auto_explain';  
set auto_explain.log_analyze =on;  
set auto_explain.log_buffers =on;  
set auto_explain.log_min_duration =0;  
set auto_explain.log_nested_statements =on;  
set auto_explain.log_time=on;  
set auto_explain.log_verbose =on;  
set client_min_messages ='log';  
postgres=# select consume_log(1, 10000, 100);  
LOG:  duration: 0.819 ms  plan:  
Query Text: select wid,uid                  from tbl_score_log for update skip locked limit 1  
Limit  (cost=10000000000.00..10000000000.03 rows=1 width=18) (actual time=0.816..0.816 rows=1 loops=1)  
  Output: wid, uid, ctid  
  Buffers: shared hit=177  
  ->  LockRows  (cost=10000000000.00..10000876856.44 rows=30947272 width=18) (actual time=0.815..0.815 rows=1 loops=1)  
        Output: wid, uid, ctid  
        Buffers: shared hit=177  
        ->  Seq Scan on public.tbl_score_log  (cost=10000000000.00..10000567383.72 rows=30947272 width=18) (actual time=0.808..0.808 rows=1 loops=1)  
              Output: wid, uid, ctid  
              Buffers: shared hit=176  
LOG:  duration: 0.104 ms  plan:  
Query Text: with  
  a as (  
    delete from tbl_score_log where ctid= any (array(  
      select ctid from tbl_score_log where wid=v_wid and uid=v_uid order by crt_time limit i_limit  -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc)                  from  
    (select item,score from a order by score desc limit i_topn) t  
Aggregate  (cost=13.56..13.57 rows=1 width=32) (actual time=0.100..0.100 rows=1 loops=1)  
  Output: array_agg((((a.item)::text || '_'::text) || (a.score)::text) ORDER BY a.score DESC)  
  Buffers: shared hit=20  
  CTE a  
    ->  Delete on public.tbl_score_log tbl_score_log_1  (cost=2.06..13.16 rows=10 width=6) (actual time=0.059..0.063 rows=4 loops=1)  
          Output: tbl_score_log_1.item, tbl_score_log_1.score  
          Buffers: shared hit=20  
          InitPlan 1 (returns $0)  
            ->  Limit  (cost=0.56..2.05 rows=1 width=14) (actual time=0.017..0.043 rows=4 loops=1)  
                  Output: tbl_score_log.ctid, tbl_score_log.crt_time  
                  Buffers: shared hit=8  
                  ->  Index Scan using idx_tbl_score_log_1 on public.tbl_score_log  (cost=0.56..5.02 rows=3 width=14) (actual time=0.017..0.041 rows=4 loops=1)  
                        Output: tbl_score_log.ctid, tbl_score_log.crt_time  
                        Index Cond: ((tbl_score_log.wid = $5) AND (tbl_score_log.uid = $6))  
                        Buffers: shared hit=8  
          ->  Tid Scan on public.tbl_score_log tbl_score_log_1  (cost=0.01..11.11 rows=10 width=6) (actual time=0.053..0.055 rows=4 loops=1)  
                Output: tbl_score_log_1.ctid  
                TID Cond: (tbl_score_log_1.ctid = ANY ($0))  
                Buffers: shared hit=12  
  ->  Limit  (cost=0.37..0.37 rows=1 width=12) (actual time=0.077..0.079 rows=4 loops=1)  
        Output: a.item, a.score  
        Buffers: shared hit=20  
        ->  Sort  (cost=0.37..0.39 rows=10 width=12) (actual time=0.076..0.077 rows=4 loops=1)  
              Output: a.item, a.score  
              Sort Key: a.score DESC  
              Sort Method: quicksort  Memory: 25kB  
              Buffers: shared hit=20  
              ->  CTE Scan on a  (cost=0.00..0.20 rows=10 width=12) (actual time=0.060..0.066 rows=4 loops=1)  
                    Output: a.item, a.score  
                    Buffers: shared hit=20  
LOG:  duration: 0.046 ms  plan:  
Query Text: insert into tbl_score   
  values (v_wid, v_uid, v_top1)  
  on conflict (wid,uid)  
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where  
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)  
Insert on public.tbl_score  (cost=0.00..0.01 rows=1 width=44) (actual time=0.045..0.045 rows=0 loops=1)  
  Conflict Resolution: UPDATE  
  Conflict Arbiter Indexes: tbl_score_pkey  
  Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, $3))  
  Tuples Inserted: 1  
  Conflicting Tuples: 0  
  Buffers: shared hit=7  
  ->  Result  (cost=0.00..0.01 rows=1 width=44) (actual time=0.000..0.001 rows=1 loops=1)  
        Output: $5, $6, $7  
LOG:  duration: 1.951 ms  plan:  
Query Text: select consume_log(1, 10000, 100);  
Result  (cost=0.00..0.26 rows=1 width=4) (actual time=1.944..1.944 rows=1 loops=1)  
  Output: consume_log(1, 10000, 100)  
  Buffers: shared hit=212  
 consume_log   
-------------  
   
(1 row)  
  
Time: 2.390 ms  

消耗1万个指标,约1.5秒。

10、压测2,查询某个维度,某个店铺的广告位推荐

vi test1.sql  
  
\set wid random(1,1000)  
\set uid random(1,10000)  
select * from tbl_score where wid=:wid and uid=:uid;  
  
pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 32 -j 32 -T 120  
  
tps = 470514.018510 (including connections establishing)  
tps = 470542.672975 (excluding connections establishing)  

查询速度可以达到 45万 qps.

设计2

设计1的一个可以优化的点,在写入tbl_score_log时,如果不同维度的数据夹杂在一起输入,在消费时会引入IO放大的问题:

《PostgreSQL 时序最佳实践 - 证券交易系统数据库设计 - 阿里云RDS PostgreSQL最佳实践》

我们可以使用以上同样的方法来对维度数据分区存放,消费时也按分区消费。

1、创建维度描述表

create table tbl_weidu (  
  wid int primary key,  
  info json   
);   

2、创建TOP-K分值表

create table tbl_score (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  top10 text[] not null, -- top 10的item_score  
  primary key(wid,uid)  
);   

3、创建任务表,记录每次消耗LOG时的计数,每个维度一个计数器

create table tbl_score_task (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  cnt int8 default 0, -- 被计算次数  
  primary key(wid,uid)  
);   
  
create index idx_tbl_score_task_cnt on tbl_score_task (cnt);  

4、合并两个TEXT数组的函数

create or replace function merge_top10(  
text[],   -- old value  
text[],   -- new value  
ln int    -- 按score排序,保留 top N  
) returns text[] as $$  
  select array_agg(v2||'_'||v3 order by v3 desc) from   
  (  
    select v2,v3 from   
    (  
      select v2,v3,row_number() over(partition by v2 order by v1 desc) as rn from   
      (  
        select 1 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($1) t(info)   
        union all  
        select 2 as v1,split_part(info,'_',1)::text as v2,split_part(info,'_',2)::float4 as v3 from unnest($2) t(info)   
      ) t  
    ) t where rn=1 order by v3 desc limit ln   
  ) t;  
$$ language sql strict immutable;  

5、日志表

create unlogged table tbl_score_log (  -- 流水数据,不计日志,数据库崩溃会丢失所有记录  
  item int8 not null,     -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   
  
create index idx_tbl_score_log_1 on tbl_score_log (crt_time);   

6、创建写入LOG的函数,解决<设计1>的IO放大问题,

create or replace function ins_score_log(  
  i_wid int,   
  i_uid int8,   
  i_item int8,   
  i_score float4   
) returns void as $$  
declare  
begin  
  execute format('insert into tbl_score_log_%s_%s values (%s,%s,now())', i_wid, i_uid, i_item, i_score);  
  insert into tbl_score_task (wid, uid) values (i_wid, i_uid) on conflict (wid,uid) do nothing;  
  exception when others then  
    execute format('create unlogged table tbl_score_log_%s_%s (like tbl_score_log including all) inherits (tbl_score_log)', i_wid, i_uid, i_item, i_score);  
    execute format('insert into tbl_score_log_%s_%s values (%s,%s,now())', i_wid, i_uid, i_item, i_score);  
    insert into tbl_score_task (wid, uid) values (i_wid, i_uid) on conflict (wid,uid) do nothing;  
end;  
$$ language plpgsql strict;   

但是请注意

《PostgreSQL 单库对象过多,触发Linux系统限制 (ext4_dx_add_entry: Directory index full!) (could not create file “xx/xx/xxxxxx”: No space left on device)》

如果有以上问题,那么建议按UID或WID切库,将数据切到不同的库里面,避免单个目录文件过多。

7、消费LOG

create or replace function consume_log(  
  i_loop int,    -- 循环处理多少次,(多少组wid,uid)  
  i_limit int,   -- 对于同一组wid,uid,单次处理多少行  
  i_topn int     -- 每个wid,uid 维度,保留TOP N个item (score高的前N个)  
) returns void as $$  
declare  
  v_wid int;  
  v_uid int8;  
  v_top1 text[];  
  i int := 0;  
begin  
  LOOP  
  exit when i >= i_loop;   --  loops  
  
  with a as   
  (select wid,uid from tbl_score_task order by cnt for update skip locked limit 1)   
  update tbl_score_task t set cnt=cnt+1 from a where t.wid = a.wid and t.uid = a.uid returning t.wid,t.uid into v_wid, v_uid;  
  
  execute format ($_$  
  with  
  a as (  
    delete from tbl_score_log_%s_%s where ctid= any (array(  
      select ctid from tbl_score_log_%s_%s order by crt_time limit %s      -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc)   
    from  
    (select item,score from a order by score desc limit %s) t    -- limit topn  
  $_$, v_wid, v_uid, v_wid, v_uid, i_limit, i_topn   
  ) into v_top1;    
  
  -- raise notice '%', v_top1;  
    
  if v_top1 is null then  
    continue;  
  end if;  
  
  insert into tbl_score   
  values (v_wid, v_uid, v_top1)   
  on conflict (wid,uid)   
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where   
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn);   
   
  i := i+1;  
  END LOOP;  
end;  
$$ language plpgsql strict;   

8、写入压测

vi test.sql  
\set wid random(1,1000)  
\set uid random(1,10000)  
\set item random(1,100000000)  
select ins_score_log (:wid,:uid::int8,:item::int8,(random()*100)::float4);  
  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  
  
tps = 146606.220095 (including connections establishing)  
tps = 146614.705007 (excluding connections establishing)  

所有分区都建好之后,由于使用了动态SQL,写入只有15万行/s左右。

9、消耗LOG,合并到SCORE表

postgres=# select consume_log(10, 10000, 100);  
 consume_log   
-------------  
   
(1 row)  
  
postgres=# \timing  
Timing is on.  
postgres=# select * from tbl_score limit 10;  
 wid | uid  |                top10                  
-----+------+-------------------------------------  
 115 |   69 | {989915_22.2217}  
 441 | 3914 | {7521898_39.2669}  
 423 | 7048 | {75494665_92.5439}  
 789 | 1335 | {57756208_23.4602}  
 776 | 8065 | {41134454_46.8727}  
 785 | 6248 | {76364646_93.4671,94065193_69.2552}  
 567 | 7539 | {97116865_6.93694}  
 207 | 6926 | {45163995_14.1626}  
 788 | 9025 | {73053901_80.3204}  
 334 | 2805 | {80532634_78.1224}  
(10 rows)  
  
Time: 0.300 ms  
postgres=# select consume_log(10, 10000, 100);  
 consume_log   
-------------  
   
(1 row)  
  
Time: 3677.130 ms (00:03.677)  
postgres=# select consume_log(1, 10000, 100);  
LOG:  duration: 0.105 ms  plan:  
Query Text: with a as   
  (select wid,uid from tbl_score_task order by cnt for update skip locked limit 1)   
  update tbl_score_task t set cnt=cnt+1 from a where t.wid = a.wid and t.uid = a.uid returning t.wid,t.uid  
Update on public.tbl_score_task t  (cost=0.60..2.85 rows=1 width=62) (actual time=0.099..0.100 rows=1 loops=1)  
  Output: t.wid, t.uid  
  Buffers: shared hit=13  
  CTE a  
    ->  Limit  (cost=0.28..0.32 rows=1 width=26) (actual time=0.036..0.036 rows=1 loops=1)  
          Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid  
          Buffers: shared hit=4  
          ->  LockRows  (cost=0.28..271.41 rows=7057 width=26) (actual time=0.035..0.035 rows=1 loops=1)  
                Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid  
                Buffers: shared hit=4  
                ->  Index Scan using idx_tbl_score_task_cnt on public.tbl_score_task  (cost=0.28..200.84 rows=7057 width=26) (actual time=0.018..0.018 rows=1 loops=1)  
                      Output: tbl_score_task.wid, tbl_score_task.uid, tbl_score_task.cnt, tbl_score_task.ctid  
                      Buffers: shared hit=3  
  ->  Nested Loop  (cost=0.28..2.53 rows=1 width=62) (actual time=0.059..0.060 rows=1 loops=1)  
        Output: t.wid, t.uid, (t.cnt + 1), t.ctid, a.*  
        Inner Unique: true  
        Buffers: shared hit=7  
        ->  CTE Scan on a  (cost=0.00..0.02 rows=1 width=48) (actual time=0.046..0.047 rows=1 loops=1)  
              Output: a.*, a.wid, a.uid  
              Buffers: shared hit=4  
        ->  Index Scan using tbl_score_task_pkey on public.tbl_score_task t  (cost=0.28..2.50 rows=1 width=26) (actual time=0.009..0.009 rows=1 loops=1)  
              Output: t.wid, t.uid, t.cnt, t.ctid  
              Index Cond: ((t.wid = a.wid) AND (t.uid = a.uid))  
              Buffers: shared hit=3  
LOG:  duration: 24.624 ms  plan:  
Query Text:   
  with  
  a as (  
    delete from tbl_score_log_3_5 where ctid= any (array(  
      select ctid from tbl_score_log_3_5 order by crt_time limit 10000      -- limit batch  
    )) returning item,score  
  )  
  select   
    array_agg((item||'_'||score)::text order by score desc)   
    from  
    (select item,score from a order by score desc limit 100) t    -- limit topn  
    
Aggregate  (cost=279.53..279.54 rows=1 width=32) (actual time=24.619..24.619 rows=1 loops=1)  
  Output: array_agg((((a.item)::text || '_'::text) || (a.score)::text) ORDER BY a.score DESC)  
  Buffers: shared hit=39297  
  CTE a  
    ->  Delete on public.tbl_score_log_3_5 tbl_score_log_3_5_1  (cost=267.76..278.86 rows=10 width=6) (actual time=10.193..19.993 rows=10000 loops=1)  
          Output: tbl_score_log_3_5_1.item, tbl_score_log_3_5_1.score  
          Buffers: shared hit=39297  
          InitPlan 1 (returns $0)  
            ->  Limit  (cost=0.42..267.75 rows=10000 width=14) (actual time=0.017..7.185 rows=10000 loops=1)  
                  Output: tbl_score_log_3_5.ctid, tbl_score_log_3_5.crt_time  
                  Buffers: shared hit=9297  
                  ->  Index Scan using tbl_score_log_3_5_crt_time_idx on public.tbl_score_log_3_5  (cost=0.42..3907.05 rows=146135 width=14) (actual time=0.016..5.319 rows=10000 loops=1)  
                        Output: tbl_score_log_3_5.ctid, tbl_score_log_3_5.crt_time  
                        Buffers: shared hit=9297  
          ->  Tid Scan on public.tbl_score_log_3_5 tbl_score_log_3_5_1  (cost=0.01..11.11 rows=10 width=6) (actual time=10.188..13.238 rows=10000 loops=1)  
                Output: tbl_score_log_3_5_1.ctid  
                TID Cond: (tbl_score_log_3_5_1.ctid = ANY ($0))  
                Buffers: shared hit=19297  
  ->  Limit  (cost=0.37..0.39 rows=10 width=12) (actual time=24.433..24.461 rows=100 loops=1)  
        Output: a.item, a.score  
        Buffers: shared hit=39297  
        ->  Sort  (cost=0.37..0.39 rows=10 width=12) (actual time=24.432..24.443 rows=100 loops=1)  
              Output: a.item, a.score  
              Sort Key: a.score DESC  
              Sort Method: top-N heapsort  Memory: 32kB  
              Buffers: shared hit=39297  
              ->  CTE Scan on a  (cost=0.00..0.20 rows=10 width=12) (actual time=10.195..22.790 rows=10000 loops=1)  
                    Output: a.item, a.score  
                    Buffers: shared hit=39297  
LOG:  duration: 0.084 ms  plan:  
Query Text: insert into tbl_score   
  values (v_wid, v_uid, v_top1)   
  on conflict (wid,uid)   
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)  
  where   
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)  
Insert on public.tbl_score  (cost=0.00..0.01 rows=1 width=44) (actual time=0.083..0.083 rows=0 loops=1)  
  Conflict Resolution: UPDATE  
  Conflict Arbiter Indexes: tbl_score_pkey  
  Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, $3))  
  Tuples Inserted: 1  
  Conflicting Tuples: 0  
  Buffers: shared hit=4  
  ->  Result  (cost=0.00..0.01 rows=1 width=44) (actual time=0.001..0.001 rows=1 loops=1)  
        Output: $5, $6, $7  
LOG:  duration: 26.335 ms  plan:  
Query Text: select consume_log(1, 10000, 100);  
Result  (cost=0.00..0.26 rows=1 width=4) (actual time=26.329..26.329 rows=1 loops=1)  
  Output: consume_log(1, 10000, 100)  
  Buffers: shared hit=39388  
 consume_log   
-------------  
   
(1 row)  
  
Time: 26.937 ms  

设计3

与设计1类似,只是在前面再加一个离散写入表,定期对离散表排序后写入tbl_score_log表,再从tbl_score_log消费(与设计1保持一致),解决IO放大问题。

使用AB表切换:

create unlogged table tbl_score_log_a (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  item int8 not null, -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   
  
create unlogged table tbl_score_log_b (  
  wid int not null,   -- 维度ID  
  uid int8 not null,  -- ToB 店铺ID  
  item int8 not null, -- 商品ID  
  score float4 not null,  -- 打分  
  crt_time timestamp not null   
);   

例如堆积了2000万记录后,排序写入tbl_score_log

begin;  
lock table tbl_score_log_a in ACCESS EXCLUSIVE mode;   
insert into tbl_score_log select * from tbl_score_log_a order by wid,uid,crt_time;  
truncate tbl_score_log_a;  
end;  

设计4

与设计1类似,只是每次计算的是多个维度而不是一个维度。

单次计算多个维度的TOP-K,参考这种方法:

《PostgreSQL 递归妙用案例 - 分组数据去重与打散》

设计1采用每个维度计算一次的方法,如果使用设计1,那么会导致IO放大,而如果使用单次计算多个维度的方法,IO放大的问题就没了。(但是建议这种方法单次计算更大量的数据(比如一次计算1000万条),否则可能造成tbl_score更新频次过多的问题(单个维度多次消耗,多次更新))

与设计1不同的设计之处如下:

create unlogged table tbl_score_log (    
  wid int not null,   -- 维度ID    
  uid int8 not null,  -- ToB 店铺ID    
  item int8 not null, -- 商品ID    
  score float4 not null,  -- 打分    
  crt_time timestamp not null     
);     
    
create index idx_tbl_score_log_1 on tbl_score_log (crt_time);    
create or replace function consume_log(    
  i_limit int,   -- 单次处理多少行    
  i_topn int     -- 每个wid,uid 维度,保留TOP N个item (score高的前N个)    
) returns void as $$    
declare    
begin    
      
  with    
  a as (    
    delete from tbl_score_log where ctid= any (array(    
      select ctid from tbl_score_log order by crt_time limit i_limit     -- limit batch    
    )) returning wid,uid,item,score    
  )    
  insert into tbl_score     
  select wid,uid,topn  
  from  
  (  
  select     
    wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn     
    from    
    (  
    select wid,uid,item,score from  
      (select wid,uid,item,score,row_number() over (partition by wid,uid order by score desc) as rn from a) t   
      where rn <= i_topn  -- limit topn    
    ) t  
    group by wid,uid   
  ) t  
  on conflict (wid,uid)    
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, i_topn)     -- limit topn 
  where    
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, i_topn)  -- limit topn 
  ;  
end;    
$$ language plpgsql strict;    
select consume_log(10000000,100);  

或者可以直接使用如下SQL来进行消费例如

  with    
  a as (    
    delete from tbl_score_log where ctid= any (array(    
      select ctid from tbl_score_log order by crt_time limit 10000000     -- limit batch    
    )) returning wid,uid,item,score    
  )    
  insert into tbl_score     
  select wid,uid,topn  
  from  
  (  
  select     
    wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn     
    from    
    (  
    select wid,uid,item,score from  
      (select wid,uid,item,score,row_number() over (partition by wid,uid order by score desc) as rn from a) t   
      where rn <= 100  -- limit topn    
    ) t  
    group by wid,uid   
  ) t  
  on conflict (wid,uid)    
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, 100)     -- limit topn 
  where    
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, 100)  -- limit topn 
  ;  
                                                                                     QUERY PLAN                                                                                      
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Insert on public.tbl_score  (cost=36744.69..36745.17 rows=3 width=44) (actual time=69966.565..69966.565 rows=0 loops=1)
   Conflict Resolution: UPDATE
   Conflict Arbiter Indexes: tbl_score_pkey
   Conflict Filter: (tbl_score.top10 IS DISTINCT FROM merge_top10(tbl_score.top10, excluded.top10, 100))
   Tuples Inserted: 317084
   Conflicting Tuples: 634683
   Buffers: shared hit=13811948 read=7001 dirtied=7001
   CTE a
     ->  Delete on public.tbl_score_log tbl_score_log_1  (cost=36733.22..36744.32 rows=10 width=6) (actual time=968.724..1891.686 rows=1000000 loops=1)
           Output: tbl_score_log_1.wid, tbl_score_log_1.uid, tbl_score_log_1.item, tbl_score_log_1.score
           Buffers: shared hit=4007463
           InitPlan 1 (returns $0)
             ->  Limit  (cost=0.43..36733.21 rows=1000000 width=14) (actual time=0.011..660.528 rows=1000000 loops=1)
                   Output: tbl_score_log.ctid, tbl_score_log.crt_time
                   Buffers: shared hit=999099
                   ->  Index Scan using idx_tbl_score_log_1 on public.tbl_score_log  (cost=0.43..427926.61 rows=11649711 width=14) (actual time=0.010..494.951 rows=1000000 loops=1)
                         Output: tbl_score_log.ctid, tbl_score_log.crt_time
                         Buffers: shared hit=999099
           ->  Tid Scan on public.tbl_score_log tbl_score_log_1  (cost=0.01..11.11 rows=10 width=6) (actual time=968.673..1265.722 rows=1000000 loops=1)
                 Output: tbl_score_log_1.ctid
                 TID Cond: (tbl_score_log_1.ctid = ANY ($0))
                 Buffers: shared hit=1999099
   ->  GroupAggregate  (cost=0.37..0.82 rows=3 width=44) (actual time=2907.640..8707.867 rows=951767 loops=1)
         Output: t.wid, t.uid, array_agg((((t.item)::text || '_'::text) || (t.score)::text) ORDER BY t.score DESC)
         Group Key: t.wid, t.uid
         Buffers: shared hit=4007463
         ->  Subquery Scan on t  (cost=0.37..0.72 rows=3 width=24) (actual time=2907.590..4711.497 rows=1000000 loops=1)
               Output: t.wid, t.uid, t.item, t.score, t.rn
               Filter: (t.rn <= 100)
               Buffers: shared hit=4007463
               ->  WindowAgg  (cost=0.37..0.59 rows=10 width=32) (actual time=2907.588..4395.127 rows=1000000 loops=1)
                     Output: a.wid, a.uid, a.item, a.score, row_number() OVER (?)
                     Buffers: shared hit=4007463
                     ->  Sort  (cost=0.37..0.39 rows=10 width=24) (actual time=2907.575..3283.649 rows=1000000 loops=1)
                           Output: a.wid, a.uid, a.score, a.item
                           Sort Key: a.wid, a.uid, a.score DESC
                           Sort Method: quicksort  Memory: 102702kB
                           Buffers: shared hit=4007463
                           ->  CTE Scan on a  (cost=0.00..0.20 rows=10 width=24) (actual time=968.728..2201.439 rows=1000000 loops=1)
                                 Output: a.wid, a.uid, a.score, a.item
                                 Buffers: shared hit=4007463
 Planning time: 0.623 ms
 Execution time: 69990.738 ms
(43 rows)

设计5

与设计4类似,只是我们不使用delete tbl_score_log的方式来消耗,而是将tbl_score_log使用分区表或类似AB表的方式,一次消耗一整张表。那么就不需要delete了,而是算完直接truncate.

begin;
  insert into tbl_score     
  select wid,uid,topn  
  from  
  (  
  select     
    wid,uid,array_agg((item||'_'||score)::text order by score desc) as topn     
    from    
    (  
    select wid,uid,item,score from  
      (select wid,uid,item,score,row_number() over (partition by wid,uid order by wid,uid,score desc) as rn from tbl_score_log_a) t   -- AB表切换的方式
      where rn <= 100                      -- limit topn    
    ) t  
    group by wid,uid   
  ) t  
  on conflict (wid,uid)    
  do update set top10 = merge_top10(tbl_score.top10, excluded.top10, 100)       -- limit topn 
  where    
  tbl_score.top10 is distinct from merge_top10(tbl_score.top10, excluded.top10, 100)     -- limit topn 
  ;  

  truncate tbl_score_log_a;
end;  

小结

1、使用预排的方法,使得查询响应得到保障,单个RDS PG实例可以做到45万的tps。

2、初始数据生成,可以从OSS导入(在HDB PG或ODPS中计算好,生成初始数据,写入OSS)。使用并行导入,可以加快导入速度,参考如下:

《阿里云RDS PostgreSQL OSS 外部表 - (dblink异步调用封装)并行写提速案例》

3、增量数据,通过记日志的形式写入RDS PG,在RDS PG中调度消费日志,合并到最终的tbl_score表。

增量(新增、删除、更新):

删除,设置SCORE=0

更新,UDF已包含(覆盖)。

其他思考

1、考虑引入概率计算?

《PostgreSQL count-min sketch top-n 概率计算插件 cms_topn (结合窗口实现同比、环比、滑窗分析等) - 流计算核心功能之一》

2、单次计算多个维度的TOP-K,参考这种方法:

《PostgreSQL 递归妙用案例 - 分组数据去重与打散》

目前采用每个维度计算一次的方法,如果使用设计1,那么会导致IO放大,而如果使用单次计算多个维度的方法,IO放大的问题就没了。(但是建议这种方法单次计算更大量的数据(比如一次计算1000万条),否则可能造成tbl_score更新频次过多的问题(单个维度多次消耗,多次更新))

参考

《PostgreSQL 单库对象过多,触发Linux系统限制 (ext4_dx_add_entry: Directory index full!) (could not create file “xx/xx/xxxxxx”: No space left on device)》

《PostgreSQL 时序最佳实践 - 证券交易系统数据库设计 - 阿里云RDS PostgreSQL最佳实践》

《阿里云RDS PostgreSQL OSS 外部表 - (dblink异步调用封装)并行写提速案例》

《PostgreSQL count-min sketch top-n 概率计算插件 cms_topn (结合窗口实现同比、环比、滑窗分析等) - 流计算核心功能之一》

Flag Counter

digoal’s 大量PostgreSQL文章入口