块级(ctid)扫描在IoT(物联网)极限写和消费读并存场景的应用
背景
在物联网有一个非常普遍的数据需求,就是数据的写入,另一个普遍的需求则是数据的消费(按时序读取),以及流式计算。
关于流式计算,请参考
《(流式、lambda、触发器)实时处理大比拼 - 物联网(IoT)\金融,时序处理最佳实践》
《流计算风云再起 - PostgreSQL携PipelineDB力挺IoT》
《”物联网”流式处理应用 - 用PostgreSQL实时处理(万亿每天)》
接下来我们谈一谈极限写入和消费。
写入
从数据存储结构来看,PostgreSQL的HEAP存储是非常适合高速写入的,追加式写入。以下文章中已得到高速写入的验证。
《PostgreSQL 如何潇洒的处理每天上百TB的数据增量》
块(时序列)索引
BRIN索引,也被称为块索引,是针对数据块元数据建立的索引(例如某个自增长字段,物理存储和字段的值存在很好的线性相关性,那么每个块的数据区间就具有非常强的独立性),BRIN索引非常小,对写入性能的影响可以忽略。
BRIN适合物理存储和字段的值存在很好的线性相关性的字段,例如时序字段。
或者使用cluster或order 重排后,适合对应字段。
消费
消费是指异步的读取数据,处理数据的过程,例如IoT场景,数据的写入延迟要求非常低,所以要求写入吞吐特别大。
而处理方面,则通过消费机制,进行处理。
那么如何消费呢?
通常可以根据索引进行消费,比如前面提到的BRIN索引,对写入吞吐的影响小,同时支持=,以及范围的检索。如果有时序字段的话,BRIN是非常好的选择。
然而并非所有的数据写入场景都有时序字段(当然用户可以添加一个时间字段来解决这个问题)。当没有时序字段时,如何消费效率最高呢?
块扫描
块扫描是很好的选择,前面提到了数据存储是HEAP,追加形式。
PostgreSQL提供了一种tid scan的扫描方法,告诉数据库你要搜索哪个数据块的哪条记录。
select * from tbl where ctid='(100,99)';
这条SQL指查询100号数据块的第100条记录。
这种扫描效率非常之高,可以配合HEAP存储,在消费(读取记录)时使用。
评估块记录数
PostgreSQL暂时没有提供返回整个数据块的所有记录的接口,只能返回某个数据块的某一条记录,所以如果我们需要读取某个数据块的记录,需要枚举该数据块的所有行。
如何评估一个数据块有多少条记录,或者最多有多少条记录?
PAGE layout
https://www.postgresql.org/docs/10/static/storage-page-layout.html
HeapTupleHeaderData Layout
Field | Type | Length | Description | |
---|---|---|---|---|
t_xmin | TransactionId | 4 bytes | insert XID stamp | |
t_xmax | TransactionId | 4 bytes | delete XID stamp | |
t_cid | CommandId | 4 bytes | insert and/or delete CID stamp (overlays with t_xvac) | |
t_xvac | TransactionId | 4 bytes | XID for VACUUM operation moving a row version | |
t_ctid | ItemPointerData | 6 bytes | current TID of this or newer row version | |
t_infomask2 | uint16 | 2 bytes | number of attributes, plus various flag bits | |
t_infomask | uint16 | 2 bytes | various flag bits | |
t_hoff | uint8 | 1 byte | offset to user data |
Overall Page Layout
Item | Description |
---|---|
PageHeaderData | 24 bytes long. Contains general information about the page, including free space pointers. |
ItemIdData | Array of (offset,length) pairs pointing to the actual items. 4 bytes per item. |
Free space | The unallocated space. New item pointers are allocated from the start of this area, new items from the end. |
Items | The actual items themselves. |
Special space | Index access method specific data. Different methods store different data. Empty in ordinary tables. |
单页最大记录数估算
最大记录数=block_size/(ctid+tuple head)=block_size/(4+27);
postgres=# select current_setting('block_size');
current_setting
-----------------
32768
(1 row)
postgres=# select current_setting('block_size')::int/31;
?column?
----------
1057
(1 row)
如果需要评估更精确的行数,可以加上字段的固定长度,变长字段的头(4BYTE)。
例子
生成指定block TID的函数
create or replace function gen_tids(blkid int) returns tid[] as $$
select array(
SELECT ('('||blkid||',' || s.i || ')')::tid
FROM generate_series(0,current_setting('block_size')::int/31) AS s(i)
) ;
$$ language sql strict immutable;
读取某个数据块的记录
postgres=# create table test(id int);
CREATE TABLE
postgres=# insert into test select generate_series(1,10000);
INSERT 0 10000
postgres=# explain (analyze,verbose,timing,costs,buffers) select * from test where ctid = any
(
array
(
SELECT ('(0,' || s.i || ')')::tid
FROM generate_series(0, current_setting('block_size')::int/31) AS s(i)
)
);
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------
Tid Scan on postgres.test (cost=25.03..40.12 rows=10 width=4) (actual time=0.592..0.795 rows=909 loops=1)
Output: test.id
TID Cond: (test.ctid = ANY ($0))
Buffers: shared hit=1057
InitPlan 1 (returns $0)
-> Function Scan on pg_catalog.generate_series s (cost=0.01..25.01 rows=1000 width=6) (actual time=0.087..0.429 rows=1058 loops=1)
Output: ((('(0,'::text || (s.i)::text) || ')'::text))::tid
Function Call: generate_series(0, ((current_setting('block_size'::text))::integer / 31))
Planning time: 0.106 ms
Execution time: 0.881 ms
(10 rows)
postgres=# explain (analyze,verbose,timing,costs,buffers) select * from test where ctid = any(gen_tids(1));
Tid Scan on postgres.test (cost=1.32..1598.90 rows=1058 width=4) (actual time=0.026..0.235 rows=909 loops=1)
Output: id
TID Cond: (test.ctid = ANY ('{"(1,0)","(1,1)","(1,2)","(1,3)","(1,4)","(1,5)","(1,6)","(1,7)","(1,8)","(1,9)","(1,10)","(1,11)","(1,12)","(1,13)","(1,14)","(1,15)","(1,16)","(1,17)","(1,18)","(1,19)","(1,20)","(1,21)","(1,22)","(1,23)
","(1,24)","(1,25)"
....
Buffers: shared hit=1057
Planning time: 1.084 ms
Execution time: 0.294 ms
(6 rows)
postgres=# select ctid,* from test where ctid = any(gen_tids(11));
ctid | id
--------+-------
(11,1) | 10000
(1 row)
postgres=# select ctid,* from test where ctid = any(gen_tids(9));
ctid | id
---------+------
(9,1) | 8182
(9,2) | 8183
(9,3) | 8184
(9,4) | 8185
(9,5) | 8186
(9,6) | 8187
...
(9,904) | 9085
(9,905) | 9086
(9,906) | 9087
(9,907) | 9088
(9,908) | 9089
(9,909) | 9090
(909 rows)
扩展场景
如果数据没有更新,删除;那么CTID还可以作为索引来使用,例如全文检索(ES),可以在建立索引时使用ctid来指向数据库中的记录,而不需要另外再建一个PK,也能大幅度提升写入性能。
并行场景
创建一个函数,根据规则生成一个ctid数组。
create or replace function gen_tids(
blk1 int, -- 此表总共多少block
m int, -- 并行度,即模数
n int, -- 并行号,即当前余数
rx int -- 平均行长度
) returns tid[] as $$
with blks as (select id from generate_series(0,blk1) t(id) where mod(id,m)=n)
select array(
SELECT ('('||blks.id||',' || s.i || ')')::tid
FROM generate_series(0, (current_setting('block_size')::int/rx)+10 ) AS s(i) , blks -- 加10条余量item,尽量减少漏行的风险
);
$$ language sql strict immutable;
创建测试表,每一行很小,插入10亿行
postgres=# \d tbl
Unlogged table "public.tbl"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
id | integer | | |
postgres=# select count(*) from tbl;
count
------------
1000000000
(1 row)
postgres=# \dt+ tbl
List of relations
Schema | Name | Type | Owner | Size | Description
--------+------+-------+----------+-------+-------------
public | tbl | table | postgres | 34 GB |
(1 row)
查询每一行的平均长度,以及总共占用多少数据块
postgres=# select floor(current_setting('block_size')::int8*relpages/reltuples), relpages from pg_class where relname='tbl';
floor | relpages
-------+----------
36 | 1100111
(1 row)
hash vs ctid 并行扫描
单行较小的大表
1、使用ctid扫描
postgres=# select count(*) from tbl where ctid = any(gen_tids(1100111, 20, 0, 36));
count
----------
50000454
(1 row)
Time: 234985.944 ms (03:54.986)
2、使用hash扫描
postgres=# select count(*) from tbl where mod(id,20)=0;
count
----------
50000000
(1 row)
Time: 79916.058 ms (01:19.916)
3、使用分区扫描
postgres=# create table tbl2 as select * from tbl where mod(id,20)=0;
SELECT 50000000
postgres=# \dt+ tbl2
List of relations
Schema | Name | Type | Owner | Size | Description
--------+------+-------+----------+---------+-------------
public | tbl2 | table | postgres | 1719 MB |
(1 row)
postgres=# \timing
Timing is on.
postgres=# select count(*) from tbl2;
count
----------
50000000
(1 row)
Time: 593.304 ms
很显然,直接扫描分区的效果是最好的。
1、9.6以上的版本,就不需要纠结这么多了,因为已经内置了并行扫描的功能。
2、如果前端能做到写分区,建议还是前端分区,以达到最好的写入性能(即直接写入分区表)。
3、分区表的性能损耗。如果有海量数据写入,前端分区与数据库分区的性能损耗是不一样的。对于基于rule, trigger, 以及PG 10内置分区的分区方法,性能损耗较大。
pg_pathman是目前性能损耗最小的分区方法。推荐使用。
《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》
将来PostgreSQL 内置分区肯定是会优化的,长远来看,还是建议使用内置的分区功能。
《PostgreSQL 10.0 preview 功能增强 - 内置分区表》
4、风险
gen_tids函数里面用的是平均行长度,计算得到的ctid,所以当某些块中的记录小于平均长度时,也许能存下更多的行,而实际上产生的ctids也许没有包含那么多行,所以在查询时,可能会导致漏记录。
5、ctid 并行,适合行很大的场景。
小结
如果内核内置更优雅的写法,ctid扫描性能肯定更好。例如基于BLOCK这个级别的扫描,返回所有属于这些BLOCK的数据。
select * from tbl where blkid = any (array[blocks]);
参考
https://www.citusdata.com/blog/2016/03/30/five-ways-to-paginate/
https://www.postgresql.org/message-id/flat/be64327d326568a3be7fde1891ed34ff.squirrel%40sq.gransy.com#be64327d326568a3be7fde1891ed34ff.squirrel@sq.gransy.com