PostgreSQL 流式处理应用实践 - 二手商品实时归类(异步消息notify/listen、阅后即焚)
背景
因为二手商品没有太多的活动、硬性分类,广告等活动,所以购买或者销售速度没有新商品那么快。为了提高二手商品的销售效率,需要提供一套归类策略。
当商品新增或商品内容发生变化时,需要根据商品属性,以及定义的规则,实时进行商品归类(鱼塘,圈子等)方便用户查询。
结构设计
1、商品ID,属性
create table a (
id int8 primary key, -- 商品ID
att jsonb -- 商品属性
);
属性设计为JSON,JSON里面是K-V的属性对,V里面是数组,包含K的值以及这对属性的最后更新时间。
更新时间用于merge insert,当属性发生变化时才更新,没有发生变化时,不更新。
所以json需要遍历,并做合并处理。
合并JSON属性的UDF
create or replace function merge_json(jsonb, jsonb) returns jsonb as $$
select jsonb_object_agg(key,value) from (
select
coalesce(a.key, b.key) as key,
case
when
coalesce(jsonb_array_element(a.value,1)::text::timestamp, '1970-01-01'::timestamp)
>
coalesce(jsonb_array_element(b.value,1)::text::timestamp, '1970-01-01'::timestamp)
then a.value
else b.value
end
from jsonb_each($1) a full outer join jsonb_each($2) b using (key)
) t;
$$ language sql strict ;
postgres=# select merge_json('{"price":[10000, "2018-01-01 10:10:11"], "newatt":[120, "2017-01-01 12:22:00"]}', '{"price":[8880, "2018-01-04 10:10:12"], "count":[100, "2017-01-01 10:10:00"]}');
merge_json
-------------------------------------------------------------------------------------------------------------------------
{"count": [100, "2017-01-01 10:10:00"], "price": [8880, "2018-01-04 10:10:12"], "newatt": [120, "2017-01-01 12:22:00"]}
(1 row)
触发器设计
触发器里面定义分类规则,例如这里对价格大于100的商品,吐出消息.
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then -- 规则1, 价格大于100,推送异步消息
perform pg_notify(
'a', -- 异步消息通道名字
format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att) -- 消息内容
);
-- elsif ... then 其他规则
-- else 其他规则
end if;
return null;
end;
$function$ language plpgsql strict;
创建after insert or update触发器
create trigger tg1 after insert or update on a for each row execute procedure notify1();
其他触发器(规则设计方法)
本文未使用
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att) -- 解析一次JSONB
loop
-- 规则处理
-- if key='price' then ...; end if;
-- if key='count' then ...; end if;
end loop;
return null;
end;
$function$ language plpgsql strict;
-- 动态规则表
create table tbl_rule (
key text, -- key值
exp text, -- value 代入的表达式
class text, -- 满足exp时,指向这个归类
)
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att) -- 解析一次JSONB
loop
-- 使用tbl_rule生成规则处理逻辑,动态
end loop;
return null;
end;
$function$ language plpgsql strict;
规则描述
json属性对中,value的类型可能很多,对应不同的规则语义。
1、文本 LIKE
2、数组 IN
3、等值
4、数值范围
5、时间范围
等等,在trigger的UDF中写规则即可。
数据合并写入测试
insert into a values
(1, '{"price":[10000, "2018-01-01 10:10:11"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- 合并新属性,保留老属性,需要使用一个UDF来合并
where
a.att <> merge_json(a.att, excluded.att); -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销
postgres=# insert into a values
(1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- 合并新属性,保留老属性,需要使用一个UDF来合并
where
a.att <> merge_json(a.att, excluded.att); -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销
INSERT 0 1
postgres=# select * from a;
id | att
----+-----------------------------------------------------------------------------
1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}
(1 row)
监听消息
postgres=# listen a;
LISTEN
Asynchronous notification "a" with payload "ID:1, ATT:{"price": [10000, "2018-01-01 10:10:19"]}" received from server process with PID 51380.
https://jdbc.postgresql.org/documentation/head/listennotify.html
其他
删除商品,可以使用DELETE触发器,告诉下游,比如商品已成交,删除。
CREATE OR REPLACE FUNCTION notify2() returns trigger
AS $function$
declare
begin
perform pg_notify(
'a', -- 异步消息通道名字
format('CLASS:delete, ID:%s, ATT:%s', OLD.id, OLD.att) -- 消息内容
);
return null;
end;
$function$ language plpgsql strict;
create trigger tg2 after delete on a for each row execute procedure notify2();
方案二 - 流式批量消费
使用异步消息的方式,当连接中断时,重新连接后需要重新监听,并且在中断连接期间的消息会被丢弃掉。所以可靠性不佳。
另外,异步消息无法控制一次消费多少条,也不是特别友好。
所以我们实际上还有其他方法,持久化表,并且使用异步批量消费的方式进行消费。
性能指标:
CASE | 数据量 | 并发 | TPS | 平均响应时间 |
---|---|---|---|---|
流式处理 - 阅后即焚 - 消费 | 10亿,消费 395.2 万行/s | 56 | 3952 | 14毫秒 |
结构沿用前面的例子,
1、新增一张结果表(也可以新增多张表,看业务量,通常一张够用了),
2、同时修改一下触发器内容,把notify改成写表,
3、修改客户端把监听通道改成异步消费SQL
DEMO
1、新增结果表
create table t_result(id serial8 primary key, class text, content text);
2、触发器里面定义分类规则,例如这里对价格大于100的商品,吐出信息到结果表.
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then -- 规则1, 价格大于100,写入结果表
insert into t_result(class,content) values (
'a', -- 归类
format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att) -- 消息内容
);
-- elsif ... then 其他规则
-- else 其他规则
end if;
return null;
end;
$function$ language plpgsql strict;
3、创建after insert or update触发器
create trigger tg1 after insert or update on a for each row execute procedure notify1();
4、数据合并写入测试
insert into a values
(1, '{"price":[10000, "2018-01-01 10:10:11"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- 合并新属性,保留老属性,需要使用一个UDF来合并
where
a.att <> merge_json(a.att, excluded.att); -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销
postgres=# insert into a values
(1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- 合并新属性,保留老属性,需要使用一个UDF来合并
where
a.att <> merge_json(a.att, excluded.att); -- 如果相等的概率很低,则可以去掉这个判断, 降低CPU开销
INSERT 0 1
postgres=# select * from a;
id | att
----+-----------------------------------------------------------------------------
1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}
(1 row)
5、异步批量消费结果表的内容(阅后即焚)
with a as (delete from t_result where ctid= any(array(
select ctid from t_result order by id limit 10 for update skip locked -- 可以并发消费,不会相互堵塞,消费顺与写入顺序一致
)) returning *)
select * from a;
id | class | content
----+-------+---------------------------------------------------------------------------------------------------------
1 | a | CLASS:high price, ID:1, ATT:{"price": [10000, "2018-01-01 10:10:11"]}
2 | a | CLASS:high price, ID:1, ATT:{"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}
(2 rows)
原子操作,阅后即焚,再次查询已消费完毕
postgres=# select * from t_result;
id | class | content
----+-------+---------
(0 rows)
方案二续 - 使用statement级触发器代替row级触发器
为什么建议使用statement级触发器代替row级触发器,参考:
《PostgreSQL 批量、单步 写入 - row, statement 触发器(中间表)、CTE 几种用法性能对比》
触发器函数修改如下
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
-- 规则1
insert into t_result(class,content) select
'a', -- 归类
format('CLASS:high price, ID:%s, ATT:%s', id, att) -- 消息内容
from new_table
where jsonb_array_element(att->'price', 0)::text::float8 > 100; -- 规则1, 价格大于100,写入结果表
-- 其他规则
-- insert into t_result(class,content) select
-- ......
-- from new_table
-- where ... -- 规则n
return null;
end;
$function$ language plpgsql strict;
触发器修改如下
create trigger tg1 after insert on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();
create trigger tg2 after update on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();
postgres=# \d a
Table "public.a"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
id | integer | | not null |
att | jsonb | | |
Indexes:
"pk" PRIMARY KEY, btree (id)
Triggers:
tg1 AFTER INSERT ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()
tg2 AFTER UPDATE ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()
小结
使用异步消息,UDF,规则或触发器,非常轻量化的解决了实时计算的问题。
但是,异步消息是可能丢消息的,例如监听连接中断后,重连时,需要重新发起监听,并且中断连接时未消费的消息,不会再被消费,所以相当于丢消息了。
改进方法:
1、如果要保证不丢消息,可以将notify改成INSERT,把结果写入预先定义好的某个结果表,使用逻辑DECODE的方式,解析这个结果表相关的logical decode信息,从而获取变化量,参考如下。
《PostgreSQL pg_recvlogical 与 test_decoding 自定义,支持source table filter, 对接kafka,es等》
2、使用阅后即焚的方法,类似本方案2.
《阿里云RDS PostgreSQL varbitx实践 - 流式标签 (阅后即焚流式批量计算) - 万亿级,任意标签圈人,毫秒响应》
《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》
《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》
《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》
[《在PostgreSQL中实现update | delete limit - CTID扫描实践 (高效阅后即焚)》](../201608/20160827_01.md) |
参考
https://www.postgresql.org/docs/11/static/functions-json.html
https://www.postgresql.org/docs/11/static/datatype-json.html
https://jdbc.postgresql.org/documentation/head/listennotify.html
https://www.postgresql.org/docs/11/static/sql-notify.html
https://www.postgresql.org/docs/11/static/sql-listen.html
https://www.postgresql.org/docs/11/static/sql-unlisten.html
https://www.postgresql.org/docs/11/static/libpq-notify.html
https://www.postgresql.org/docs/11/static/sql-notify.html#id-1.9.3.157.7.5
https://www.postgresql.org/docs/11/static/functions-info.html
https://www.postgresql.org/docs/11/static/plpgsql-trigger.html
https://github.com/impossibl/pgjdbc-ng
https://www.openmakesoftware.com/postgresql-listen-notify-events-example/