PostgreSQL 流式处理应用实践 - 二手商品实时归类(异步消息notify/listen、阅后即焚)

4 minute read

背景

因为二手商品没有太多的活动、硬性分类,广告等活动,所以购买或者销售速度没有新商品那么快。为了提高二手商品的销售效率,需要提供一套归类策略。

当商品新增或商品内容发生变化时,需要根据商品属性,以及定义的规则,实时进行商品归类(鱼塘,圈子等)方便用户查询。

结构设计

1、商品ID,属性

create table a (      
  id int8 primary key,   -- 商品ID      
  att jsonb   -- 商品属性      
);      

属性设计为JSON,JSON里面是K-V的属性对,V里面是数组,包含K的值以及这对属性的最后更新时间。

更新时间用于merge insert,当属性发生变化时才更新,没有发生变化时,不更新。

所以json需要遍历,并做合并处理。

合并JSON属性的UDF

create or replace function merge_json(jsonb, jsonb) returns jsonb as $$    
  select jsonb_object_agg(key,value) from (    
  select     
    coalesce(a.key, b.key) as key,     
    case     
    when     
    coalesce(jsonb_array_element(a.value,1)::text::timestamp, '1970-01-01'::timestamp)     
    >     
    coalesce(jsonb_array_element(b.value,1)::text::timestamp, '1970-01-01'::timestamp)     
    then a.value    
    else b.value    
    end    
  from jsonb_each($1) a full outer join jsonb_each($2) b using (key)    
  ) t;      
$$ language sql strict ;    
    
    
postgres=# select merge_json('{"price":[10000, "2018-01-01 10:10:11"], "newatt":[120, "2017-01-01 12:22:00"]}',  '{"price":[8880, "2018-01-04 10:10:12"], "count":[100, "2017-01-01 10:10:00"]}');    
                                                       merge_json                                                            
-------------------------------------------------------------------------------------------------------------------------    
 {"count": [100, "2017-01-01 10:10:00"], "price": [8880, "2018-01-04 10:10:12"], "newatt": [120, "2017-01-01 12:22:00"]}    
(1 row)    

触发器设计

触发器里面定义分类规则,例如这里对价格大于100的商品,吐出消息.

CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then   -- 规则1, 价格大于100,推送异步消息      
     perform pg_notify(      
       'a',    -- 异步消息通道名字      
       format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att)   -- 消息内容      
     );      
  -- elsif ... then  其他规则      
  -- else  其他规则      
  end if;      
return null;      
end;      
$function$ language plpgsql strict;      

创建after insert or update触发器

create trigger tg1 after insert or update on a for each row execute procedure notify1();      

其他触发器(规则设计方法)

本文未使用

CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att)  -- 解析一次JSONB    
  loop    
    -- 规则处理    
    -- if key='price' then ...; end if;    
    -- if key='count' then ...; end if;    
  end loop;    
return null;    
end;    
$function$ language plpgsql strict;      
-- 动态规则表    
    
create table tbl_rule (    
  key text,  -- key值    
  exp text,  -- value 代入的表达式    
  class text,  -- 满足exp时,指向这个归类    
)    
    
CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att)  -- 解析一次JSONB    
  loop    
    -- 使用tbl_rule生成规则处理逻辑,动态    
  end loop;    
return null;    
end;    
$function$ language plpgsql strict;      

规则描述

json属性对中,value的类型可能很多,对应不同的规则语义。

1、文本 LIKE

2、数组 IN

3、等值

4、数值范围

5、时间范围

等等,在trigger的UDF中写规则即可。

数据合并写入测试

insert into a values       
  (1, '{"price":[10000, "2018-01-01 10:10:11"]}')       
  on conflict (id)       
  do update set       
  att = merge_json(a.att, excluded.att)  -- 合并新属性,保留老属性,需要使用一个UDF来合并      
  where       
  a.att <> merge_json(a.att, excluded.att);  -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销   
    
    
postgres=# insert into a values    
  (1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')    
  on conflict (id)    
  do update set    
  att = merge_json(a.att, excluded.att)  -- 合并新属性,保留老属性,需要使用一个UDF来合并    
  where    
  a.att <> merge_json(a.att, excluded.att);   -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销  
INSERT 0 1    
    
    
postgres=# select * from a;    
 id |                                     att                                         
----+-----------------------------------------------------------------------------    
  1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(1 row)    

监听消息

postgres=# listen a;      
LISTEN      
Asynchronous notification "a" with payload "ID:1, ATT:{"price": [10000, "2018-01-01 10:10:19"]}" received from server process with PID 51380.      

https://jdbc.postgresql.org/documentation/head/listennotify.html

其他

删除商品,可以使用DELETE触发器,告诉下游,比如商品已成交,删除。

CREATE OR REPLACE FUNCTION notify2() returns trigger      
AS $function$      
declare      
begin      
     perform pg_notify(      
       'a',                                                     -- 异步消息通道名字      
       format('CLASS:delete, ID:%s, ATT:%s', OLD.id, OLD.att)   -- 消息内容      
     );      
return null;      
end;      
$function$ language plpgsql strict;      
    
create trigger tg2 after delete on a for each row execute procedure notify2();      

方案二 - 流式批量消费

使用异步消息的方式,当连接中断时,重新连接后需要重新监听,并且在中断连接期间的消息会被丢弃掉。所以可靠性不佳。

另外,异步消息无法控制一次消费多少条,也不是特别友好。

所以我们实际上还有其他方法,持久化表,并且使用异步批量消费的方式进行消费。

性能指标:

CASE 数据量 并发 TPS 平均响应时间
流式处理 - 阅后即焚 - 消费 10亿,消费 395.2 万行/s 56 3952 14毫秒

结构沿用前面的例子,

1、新增一张结果表(也可以新增多张表,看业务量,通常一张够用了),

2、同时修改一下触发器内容,把notify改成写表,

3、修改客户端把监听通道改成异步消费SQL

DEMO

1、新增结果表

create table t_result(id serial8 primary key, class text, content text);    

2、触发器里面定义分类规则,例如这里对价格大于100的商品,吐出信息到结果表.

CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then   -- 规则1, 价格大于100,写入结果表      
     insert into t_result(class,content) values (    
       'a',    -- 归类    
       format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att)   -- 消息内容      
     );    
  -- elsif ... then  其他规则      
  -- else  其他规则      
  end if;      
return null;      
end;      
$function$ language plpgsql strict;      

3、创建after insert or update触发器

create trigger tg1 after insert or update on a for each row execute procedure notify1();      

4、数据合并写入测试

insert into a values       
  (1, '{"price":[10000, "2018-01-01 10:10:11"]}')       
  on conflict (id)       
  do update set       
  att = merge_json(a.att, excluded.att)  -- 合并新属性,保留老属性,需要使用一个UDF来合并      
  where       
  a.att <> merge_json(a.att, excluded.att);   -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销  
    
    
postgres=# insert into a values    
  (1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')    
  on conflict (id)    
  do update set    
  att = merge_json(a.att, excluded.att)  -- 合并新属性,保留老属性,需要使用一个UDF来合并    
  where    
  a.att <> merge_json(a.att, excluded.att); -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销   
    
INSERT 0 1    
    
postgres=# select * from a;    
 id |                                     att                                         
----+-----------------------------------------------------------------------------    
  1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(1 row)    

5、异步批量消费结果表的内容(阅后即焚)

with a as (delete from t_result where ctid= any(array(     
  select ctid from t_result order by id limit 10 for update skip locked  -- 可以并发消费,不会相互堵塞,消费顺与写入顺序一致    
)) returning *)    
select * from a;    
 id | class |                                                 content                                                     
----+-------+---------------------------------------------------------------------------------------------------------    
  1 | a     | CLASS:high price, ID:1, ATT:{"price": [10000, "2018-01-01 10:10:11"]}    
  2 | a     | CLASS:high price, ID:1, ATT:{"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(2 rows)    
    
    
原子操作,阅后即焚,再次查询已消费完毕    
    
postgres=# select * from t_result;    
 id | class | content     
----+-------+---------    
(0 rows)    

方案二续 - 使用statement级触发器代替row级触发器

为什么建议使用statement级触发器代替row级触发器,参考:

《PostgreSQL 批量、单步 写入 - row, statement 触发器(中间表)、CTE 几种用法性能对比》

触发器函数修改如下

CREATE OR REPLACE FUNCTION notify1() returns trigger        
AS $function$        
declare        
begin        
  -- 规则1  
  insert into t_result(class,content) select   
    'a',    -- 归类    
    format('CLASS:high price, ID:%s, ATT:%s', id, att)   -- 消息内容     
  from new_table   
  where jsonb_array_element(att->'price', 0)::text::float8 > 100;    -- 规则1, 价格大于100,写入结果表    
    
  -- 其他规则  
  -- insert into t_result(class,content) select   
  -- ......  
  --   from new_table   
  -- where ...  -- 规则n  
    
  return null;        
end;        
$function$ language plpgsql strict;      

触发器修改如下

create trigger tg1 after insert on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();      
create trigger tg2 after update on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();      
postgres=# \d a  
                 Table "public.a"  
 Column |  Type   | Collation | Nullable | Default   
--------+---------+-----------+----------+---------  
 id     | integer |           | not null |   
 att    | jsonb   |           |          |   
Indexes:  
    "pk" PRIMARY KEY, btree (id)  
Triggers:  
    tg1 AFTER INSERT ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()  
    tg2 AFTER UPDATE ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()  

小结

使用异步消息,UDF,规则或触发器,非常轻量化的解决了实时计算的问题。

但是,异步消息是可能丢消息的,例如监听连接中断后,重连时,需要重新发起监听,并且中断连接时未消费的消息,不会再被消费,所以相当于丢消息了。

改进方法:

1、如果要保证不丢消息,可以将notify改成INSERT,把结果写入预先定义好的某个结果表,使用逻辑DECODE的方式,解析这个结果表相关的logical decode信息,从而获取变化量,参考如下。

《PostgreSQL pg_recvlogical 与 test_decoding 自定义,支持source table filter, 对接kafka,es等》

2、使用阅后即焚的方法,类似本方案2.

《阿里云RDS PostgreSQL varbitx实践 - 流式标签 (阅后即焚流式批量计算) - 万亿级,任意标签圈人,毫秒响应》

《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》

《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

[《在PostgreSQL中实现update delete limit - CTID扫描实践 (高效阅后即焚)》](../201608/20160827_01.md)

参考

https://www.postgresql.org/docs/11/static/functions-json.html

https://www.postgresql.org/docs/11/static/datatype-json.html

https://jdbc.postgresql.org/documentation/head/listennotify.html

https://www.postgresql.org/docs/11/static/sql-notify.html

https://www.postgresql.org/docs/11/static/sql-listen.html

https://www.postgresql.org/docs/11/static/sql-unlisten.html

https://www.postgresql.org/docs/11/static/libpq-notify.html

https://www.postgresql.org/docs/11/static/sql-notify.html#id-1.9.3.157.7.5

https://www.postgresql.org/docs/11/static/functions-info.html

https://www.postgresql.org/docs/11/static/plpgsql-trigger.html

https://github.com/impossibl/pgjdbc-ng

https://www.openmakesoftware.com/postgresql-listen-notify-events-example/

Flag Counter

digoal’s 大量PostgreSQL文章入口