多流实时聚合 - 记录级实时快照 - JSON聚合与json全文检索的功能应用
背景
这个需求是这样的,数据在写入时,以上一条记录作为基础,将当前写入的记录与上一条记录合并,然后作为新的记录写进去。
从而每一条记录都携带了之前所有记录的内容。
当然这里指的是每个维度各自的快照,并不是一张表所有记录的快照。
例如,一笔电商订单,可能经过若干个系统(每个系统产生的属性可能都不一样,多个系统合起来就是个大宽表,应用为了设计简单,往往可能选择JSON存储,而不是大宽表),产生若干笔记录,每次写入时期望将之前与之相关的记录内容都合并起来,产生新的值写入。
但是不要忘记,同一笔订单的数据,可能存在并行写入(除非业务上能将订单编号按哈希让一个线程来处理它,而且不能多机)。当存在并发写同一笔订单时,写时合并就违反自然规律。
例子:
tbl已有记录 (0, 1, 'test0', now())
session A:
insert into tbl (pk, caseid, info, crt_time) values (1, 1, 'test1', now());
session B:
insert into tbl (pk, caseid, info, crt_time) values (2, 1, 'test2', now());
如果SESSION A,B同时发起,那么写入的记录可能变成:
(1, 1, 'test0_test1', now());
(2, 1, 'test0_test2', now());
然而实际上要的可能是这两条
(1, 1, 'test0_test1', now());
(2, 1, 'test0_test1_test2', now());
类似区块链。
所以,我们使用另一种方法来获取快照,写入时,不改变原始的写入方法,即各个业务线产生的订单记录,分别写入到一个单表,使用JSON来表示各个业务线对这个订单的描述。
JSON写入性能
create table tbl_ord (
ordid int8, -- 订单号
appid int, -- 应用ID
info jsonb, -- 内容
crt_time timestamp -- 写入时间
);
create index idx_tbl_ord on tbl_ord(ordid, crt_time);
单条写入压测
vi test.sql
\set ordid random(1,10000000)
\set appid random(1,10)
insert into tbl_ord (ordid,appid,info,crt_time) values (:ordid,:appid,jsonb '{"a" : 1, "b" : 2}',now());
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 40 -j 40 -t 2500000
单条写入压测,23.4万行/s。
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 40
number of threads: 40
number of transactions per client: 2500000
number of transactions actually processed: 100000000/100000000
latency average = 0.170 ms
latency stddev = 0.498 ms
tps = 234047.009786 (including connections establishing)
tps = 234060.902533 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set ordid random(1,10000000)
0.001 \set appid random(1,10)
0.168 insert into tbl_ord (ordid,appid,info,crt_time) values (:ordid,:appid,jsonb '{"a" : 1, "b" : 2}',now());
如果使用批量写入,可以达到100万+行/s。
JSON全字段索引
PostgreSQL 支持JSON类型的全字段索引,支持两种operator class,支持的检索如下。
GIN indexes can be used to efficiently search for keys or key/value pairs occurring within
a large number of jsonb documents (datums).
Two GIN “operator classes” are provided, offering different performance and flexibility trade-offs.
The default GIN operator class for jsonb supports queries with top-level key-exists operators
?, ?& and ?| operators and path/value-exists operator @>.
(For details of the semantics that these operators implement,
see Table 9.44.) An example of creating an index with this operator class is:
CREATE INDEX idxgin ON api USING GIN (jdoc);
The non-default GIN operator class jsonb_path_ops supports indexing the @> operator only.
An example of creating an index with this operator class is:
CREATE INDEX idxginp ON api USING GIN (jdoc jsonb_path_ops);
create index idx_tbl_ord_2 on tbl_ord using gin (info);
使用举例
-- Find documents in which the key "company" has value "Magnafone"
SELECT jdoc->'guid', jdoc->'name' FROM api WHERE jdoc @> '{"company": "Magnafone"}';
-- Find documents in which the key "tags" contains key or array element "qui"
SELECT jdoc->'guid', jdoc->'name' FROM api WHERE jdoc -> 'tags' ? 'qui';
-- Find documents in which the key "tags" contains array element "qui"
SELECT jdoc->'guid', jdoc->'name' FROM api WHERE jdoc @> '{"tags": ["qui"]}';
点查,json聚合,得任意时间快照
取某个时间点,某个caseid的快照,使用JSONB聚合,性能贼好。
将所有记录聚合成一条
select caseid, jsonb_agg((pk,info,crt_time) order by crt_time) from tbl where caseid=? and crt_time<=? group by caseid;
jsonb_agg用法举例
postgres=# create type typ1 as (c1 int, c2 int);
CREATE TYPE
postgres=# select jsonb_agg((c1,c2)::typ1 order by c1 desc) from (values (1,2),(2,3)) t(c1,c2);
jsonb_agg
------------------------------------------
[{"c1": 2, "c2": 3}, {"c1": 1, "c2": 2}]
(1 row)
按订单的聚合查询性能:
0.7毫秒
create type typ2 as (appid int, info jsonb, crt_time timestamp);
postgres=# select ordid, jsonb_agg((appid,info,crt_time)::typ2 order by crt_time) from tbl_ord where ordid=1 and crt_time<=now() group by ordid;
-[ RECORD 1 ]----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ordid | 1
jsonb_agg | [{"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:24:56.659672"}, {"info": {"a": 1, "b": 2}, "appid": 5, "crt_time": "2017-12-09T23:25:13.073163"}, {"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:25:49.94649"}, {"info": {"a": 1, "b": 2}, "appid": 10, "crt_time": "2017-12-09T23:26:23.523946"}, {"info": {"a": 1, "b": 2}, "appid": 2, "crt_time": "2017-12-09T23:26:49.900199"}, {"info": {"a": 1, "b": 2}, "appid": 7, "crt_time": "2017-12-09T23:27:10.643058"}, {"info": {"a": 1, "b": 2}, "appid": 8, "crt_time": "2017-12-09T23:27:20.937021"}, {"info": {"a": 1, "b": 2}, "appid": 8, "crt_time": "2017-12-09T23:27:21.446752"}, {"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:29:19.10536"}, {"info": {"a": 1, "b": 2}, "appid": 7, "crt_time": "2017-12-09T23:29:56.192353"}, {"info": {"a": 1, "b": 2}, "appid": 1, "crt_time": "2017-12-09T23:30:07.879201"}, {"info": {"a": 1, "b": 2}, "appid": 6, "crt_time": "2017-12-09T23:30:31.487457"}]
Time: 0.696 ms
压测
vi test.sql
\set ordid random(1,10000000)
select ordid, jsonb_agg((appid,info,crt_time)::typ2 order by crt_time) from tbl_ord where ordid=:ordid and crt_time<=now() group by ordid;
结果
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 28 -j 28 -T 120
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 28
number of threads: 28
duration: 120 s
number of transactions actually processed: 4677282
latency average = 0.718 ms
latency stddev = 0.463 ms
tps = 38977.016281 (including connections establishing)
tps = 38982.209839 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set ordid random(1,10000000)
0.717 select ordid, jsonb_agg((appid,info,crt_time)::typ2 order by crt_time) from tbl_ord where ordid=:ordid and crt_time<=now() group by ordid;
1亿记录,任意订单任意时间点快照聚合,性能:
TPS:38982
平均响应时间:0.718 毫秒
批量获取,使用JSON索引 + PostgreSQL 10的并行计算,飞快
1亿记录,全表扫一遍只需要1秒。
postgres=# select count(*) from tbl_ord;
count
-----------
100000000
(1 row)
Time: 1014.201 ms (00:01.014)
根据索引搜索,极快(毫秒级),支持并行扫描,也支持单进程扫描:
postgres=# explain (analyze,verbose,timing,costs,buffers) SELECT * from tbl_ord WHERE info @> '{"a": 5}';
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=32241.40..142872.70 rows=100000 width=61) (actual time=3.878..3.878 rows=0 loops=1)
Output: ordid, appid, info, crt_time
Workers Planned: 1
Workers Launched: 1
Single Copy: true
-> Bitmap Heap Scan on public.tbl_ord (cost=32241.40..142872.70 rows=100000 width=61) (actual time=0.158..0.158 rows=0 loops=1)
Output: ordid, appid, info, crt_time
Recheck Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)
Buffers: shared hit=6
Worker 0: actual time=0.158..0.158 rows=0 loops=1
Buffers: shared hit=6
-> Bitmap Index Scan on idx_tbl_ord_2 (cost=0.00..32216.40 rows=100000 width=0) (actual time=0.153..0.153 rows=0 loops=1)
Index Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)
Buffers: shared hit=6
Worker 0: actual time=0.153..0.153 rows=0 loops=1
Buffers: shared hit=6
Planning time: 0.092 ms
Execution time: 4.836 ms
(18 rows)
Time: 5.416 ms
postgres=# set max_parallel_workers_per_gather =0;
SET
Time: 0.202 ms
postgres=# explain (analyze,verbose,timing,costs,buffers) SELECT * from tbl_ord WHERE info @> '{"a": 5}';
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on public.tbl_ord (cost=32241.40..142872.70 rows=100000 width=61) (actual time=0.062..0.062 rows=0 loops=1)
Output: ordid, appid, info, crt_time
Recheck Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)
Buffers: shared hit=6
-> Bitmap Index Scan on idx_tbl_ord_2 (cost=0.00..32216.40 rows=100000 width=0) (actual time=0.060..0.060 rows=0 loops=1)
Index Cond: (tbl_ord.info @> '{"a": 5}'::jsonb)
Buffers: shared hit=6
Planning time: 0.091 ms
Execution time: 0.098 ms
(9 rows)
Time: 0.539 ms
批量数据结果通过写OSS,与其他业务平台打通
RDS PG OSS 外部表文档:
https://help.aliyun.com/knowledge_detail/43352.html
HDB PG OSS 外部表文档:
https://help.aliyun.com/document_detail/35457.html
小结
1、一笔电商订单,可能经过若干个系统(每个系统产生的属性可能都不一样,多个系统合起来就是个大宽表,应用为了设计简单,往往可能选择JSON存储,而不是大宽表),业务上需要任意时间点,一笔订单的所有也许数据合并。
2、使用JSON聚合,可以很好的解决订单按时间顺序,合并所有业务线数据的需求。
3、PostgreSQL JSON类型,支持GIN索引,可以实现高效率的JSON内容检索。
4、通过RDS PG对接OSS对象存储,用户在对订单数据进行筛选后,如果要输送给其他平台,通过OSS外部表,可以轻松的对接其他业务。
5、PostgreSQL 支持全表、索引、排序、聚合等操作的并行计算,使得亿级的表,查询飞快。
参考
https://www.postgresql.org/docs/10/static/functions-json.html
https://www.postgresql.org/docs/10/static/datatype-json.html