多流实时聚合 - 记录级实时快照 - JSON聚合与json全文检索的功能应用

4 minute read

背景

这个需求是这样的,数据在写入时,以上一条记录作为基础,将当前写入的记录与上一条记录合并,然后作为新的记录写进去。

从而每一条记录都携带了之前所有记录的内容。

当然这里指的是每个维度各自的快照,并不是一张表所有记录的快照。

例如,一笔电商订单,可能经过若干个系统(每个系统产生的属性可能都不一样,多个系统合起来就是个大宽表,应用为了设计简单,往往可能选择JSON存储,而不是大宽表),产生若干笔记录,每次写入时期望将之前与之相关的记录内容都合并起来,产生新的值写入。

但是不要忘记,同一笔订单的数据,可能存在并行写入(除非业务上能将订单编号按哈希让一个线程来处理它,而且不能多机)。当存在并发写同一笔订单时,写时合并就违反自然规律。

例子:

tbl已有记录 (0, 1, 'test0', now())  
  
session A:  
  
insert into tbl (pk, caseid, info, crt_time) values (1, 1, 'test1', now());  
  
session B:  
  
insert into tbl (pk, caseid, info, crt_time) values (2, 1, 'test2', now());  
  
如果SESSION A,B同时发起,那么写入的记录可能变成:  
  
(1, 1, 'test0_test1', now());  
  
(2, 1, 'test0_test2', now());  
  
然而实际上要的可能是这两条  
  
(1, 1, 'test0_test1', now());  
  
(2, 1, 'test0_test1_test2', now());  

类似区块链。

所以,我们使用另一种方法来获取快照,写入时,不改变原始的写入方法,即各个业务线产生的订单记录,分别写入到一个单表,使用JSON来表示各个业务线对这个订单的描述。

JSON写入性能

create table tbl_ord (  
  ordid int8,   -- 订单号  
  appid  int,   -- 应用ID  
  info jsonb,   -- 内容  
  crt_time timestamp  -- 写入时间  
);  
  
create index idx_tbl_ord on tbl_ord(ordid, crt_time);  

单条写入压测

vi test.sql  
  
\set ordid random(1,10000000)  
\set appid random(1,10)  
insert into tbl_ord (ordid,appid,info,crt_time) values (:ordid,:appid,jsonb '{"a" : 1, "b" : 2}',now());  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 40 -j 40 -t 2500000  

单条写入压测,23.4万行/s。

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 40  
number of threads: 40  
number of transactions per client: 2500000  
number of transactions actually processed: 100000000/100000000  
latency average = 0.170 ms  
latency stddev = 0.498 ms  
tps = 234047.009786 (including connections establishing)  
tps = 234060.902533 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set ordid random(1,10000000)  
         0.001  \set appid random(1,10)  
         0.168  insert into tbl_ord (ordid,appid,info,crt_time) values (:ordid,:appid,jsonb '{"a" : 1, "b" : 2}',now());  

如果使用批量写入,可以达到100万+行/s。

JSON全字段索引

PostgreSQL 支持JSON类型的全字段索引,支持两种operator class,支持的检索如下。

GIN indexes can be used to efficiently search for keys or key/value pairs occurring within   
a large number of jsonb documents (datums).   
  
Two GIN “operator classes” are provided, offering different performance and flexibility trade-offs.  
  
The default GIN operator class for jsonb supports queries with top-level key-exists operators   
?, ?& and ?| operators and path/value-exists operator @>.   
  
(For details of the semantics that these operators implement,   
  
see Table 9.44.) An example of creating an index with this operator class is:  
  
CREATE INDEX idxgin ON api USING GIN (jdoc);  
  
The non-default GIN operator class jsonb_path_ops supports indexing the @> operator only.   
  
An example of creating an index with this operator class is:  
  
CREATE INDEX idxginp ON api USING GIN (jdoc jsonb_path_ops);  
create index idx_tbl_ord_2 on tbl_ord using gin (info);  

使用举例

-- Find documents in which the key "company" has value "Magnafone"  
SELECT jdoc->'guid', jdoc->'name' FROM api WHERE jdoc @> '{"company": "Magnafone"}';  
  
-- Find documents in which the key "tags" contains key or array element "qui"  
SELECT jdoc->'guid', jdoc->'name' FROM api WHERE jdoc -> 'tags' ? 'qui';  
  
-- Find documents in which the key "tags" contains array element "qui"  
SELECT jdoc->'guid', jdoc->'name' FROM api WHERE jdoc @> '{"tags": ["qui"]}';  

点查,json聚合,得任意时间快照

取某个时间点,某个caseid的快照,使用JSONB聚合,性能贼好。

将所有记录聚合成一条

select caseid, jsonb_agg((pk,info,crt_time) order by crt_time) from tbl where caseid=? and crt_time<=? group by caseid;  

jsonb_agg用法举例

postgres=# create type typ1 as (c1 int, c2 int);  
CREATE TYPE  
  
postgres=# select jsonb_agg((c1,c2)::typ1 order by c1 desc) from (values (1,2),(2,3)) t(c1,c2);  
                jsonb_agg                   
------------------------------------------  
 [{"c1": 2, "c2": 3}, {"c1": 1, "c2": 2}]  
(1 row)  

按订单的聚合查询性能:

0.7毫秒

create type typ2 as (appid int, info jsonb, crt_time timestamp);  
  
postgres=# select ordid, jsonb_agg((appid,info,crt_time)::typ2 order by crt_time) from tbl_ord where ordid=1 and crt_time<=now() group by ordid;  
  
-[ RECORD 1 ]----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
ordid     | 1  
jsonb_agg | [{"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:24:56.659672"}, {"info": {"a": 1, "b": 2}, "appid": 5, "crt_time": "2017-12-09T23:25:13.073163"}, {"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:25:49.94649"}, {"info": {"a": 1, "b": 2}, "appid": 10, "crt_time": "2017-12-09T23:26:23.523946"}, {"info": {"a": 1, "b": 2}, "appid": 2, "crt_time": "2017-12-09T23:26:49.900199"}, {"info": {"a": 1, "b": 2}, "appid": 7, "crt_time": "2017-12-09T23:27:10.643058"}, {"info": {"a": 1, "b": 2}, "appid": 8, "crt_time": "2017-12-09T23:27:20.937021"}, {"info": {"a": 1, "b": 2}, "appid": 8, "crt_time": "2017-12-09T23:27:21.446752"}, {"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:29:19.10536"}, {"info": {"a": 1, "b": 2}, "appid": 7, "crt_time": "2017-12-09T23:29:56.192353"}, {"info": {"a": 1, "b": 2}, "appid": 1, "crt_time": "2017-12-09T23:30:07.879201"}, {"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:30:31.487457"}]  
  
Time: 0.696 ms  

压测

vi test.sql  
  
\set ordid random(1,10000000)  
select ordid, jsonb_agg((appid,info,crt_time)::typ2 order by crt_time) from tbl_ord where ordid=:ordid and crt_time<=now() group by ordid;  

结果

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 28 -j 28 -T 120  
  
transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 28  
number of threads: 28  
duration: 120 s  
number of transactions actually processed: 4677282  
latency average = 0.718 ms  
latency stddev = 0.463 ms  
tps = 38977.016281 (including connections establishing)  
tps = 38982.209839 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set ordid random(1,10000000)  
         0.717  select ordid, jsonb_agg((appid,info,crt_time)::typ2 order by crt_time) from tbl_ord where ordid=:ordid and crt_time<=now() group by ordid;  

1亿记录,任意订单任意时间点快照聚合,性能:

TPS:38982

平均响应时间:0.718 毫秒

批量获取,使用JSON索引 + PostgreSQL 10的并行计算,飞快

1亿记录,全表扫一遍只需要1秒。

postgres=# select count(*) from tbl_ord;  
   count     
-----------  
 100000000  
(1 row)  
  
Time: 1014.201 ms (00:01.014)  

根据索引搜索,极快(毫秒级),支持并行扫描,也支持单进程扫描:

postgres=# explain (analyze,verbose,timing,costs,buffers) SELECT * from tbl_ord WHERE info @> '{"a": 5}';  
                                                             QUERY PLAN                                                                
-------------------------------------------------------------------------------------------------------------------------------------  
 Gather  (cost=32241.40..142872.70 rows=100000 width=61) (actual time=3.878..3.878 rows=0 loops=1)  
   Output: ordid, appid, info, crt_time  
   Workers Planned: 1  
   Workers Launched: 1  
   Single Copy: true  
   ->  Bitmap Heap Scan on public.tbl_ord  (cost=32241.40..142872.70 rows=100000 width=61) (actual time=0.158..0.158 rows=0 loops=1)  
         Output: ordid, appid, info, crt_time  
         Recheck Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)  
         Buffers: shared hit=6  
         Worker 0: actual time=0.158..0.158 rows=0 loops=1  
           Buffers: shared hit=6  
         ->  Bitmap Index Scan on idx_tbl_ord_2  (cost=0.00..32216.40 rows=100000 width=0) (actual time=0.153..0.153 rows=0 loops=1)  
               Index Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)  
               Buffers: shared hit=6  
               Worker 0: actual time=0.153..0.153 rows=0 loops=1  
                 Buffers: shared hit=6  
 Planning time: 0.092 ms  
 Execution time: 4.836 ms  
(18 rows)  
  
Time: 5.416 ms  
postgres=# set max_parallel_workers_per_gather =0;  
SET  
Time: 0.202 ms  
postgres=# explain (analyze,verbose,timing,costs,buffers) SELECT * from tbl_ord WHERE info @> '{"a": 5}';  
                                                          QUERY PLAN                                                             
-------------------------------------------------------------------------------------------------------------------------------  
 Bitmap Heap Scan on public.tbl_ord  (cost=32241.40..142872.70 rows=100000 width=61) (actual time=0.062..0.062 rows=0 loops=1)  
   Output: ordid, appid, info, crt_time  
   Recheck Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)  
   Buffers: shared hit=6  
   ->  Bitmap Index Scan on idx_tbl_ord_2  (cost=0.00..32216.40 rows=100000 width=0) (actual time=0.060..0.060 rows=0 loops=1)  
         Index Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)  
         Buffers: shared hit=6  
 Planning time: 0.091 ms  
 Execution time: 0.098 ms  
(9 rows)  
  
Time: 0.539 ms  

批量数据结果通过写OSS,与其他业务平台打通

RDS PG OSS 外部表文档:

https://help.aliyun.com/knowledge_detail/43352.html

HDB PG OSS 外部表文档:

https://help.aliyun.com/document_detail/35457.html

小结

1、一笔电商订单,可能经过若干个系统(每个系统产生的属性可能都不一样,多个系统合起来就是个大宽表,应用为了设计简单,往往可能选择JSON存储,而不是大宽表),业务上需要任意时间点,一笔订单的所有也许数据合并。

2、使用JSON聚合,可以很好的解决订单按时间顺序,合并所有业务线数据的需求。

3、PostgreSQL JSON类型,支持GIN索引,可以实现高效率的JSON内容检索。

4、通过RDS PG对接OSS对象存储,用户在对订单数据进行筛选后,如果要输送给其他平台,通过OSS外部表,可以轻松的对接其他业务。

5、PostgreSQL 支持全表、索引、排序、聚合等操作的并行计算,使得亿级的表,查询飞快。

参考

https://www.postgresql.org/docs/10/static/functions-json.html

https://www.postgresql.org/docs/10/static/datatype-json.html

Flag Counter

digoal’s 大量PostgreSQL文章入口