块级(ctid)扫描在IoT(物联网)极限写和消费读并存场景的应用

3 minute read

背景

在物联网有一个非常普遍的数据需求,就是数据的写入,另一个普遍的需求则是数据的消费(按时序读取),以及流式计算。

关于流式计算,请参考

《(流式、lambda、触发器)实时处理大比拼 - 物联网(IoT)\金融,时序处理最佳实践》

《流计算风云再起 - PostgreSQL携PipelineDB力挺IoT》

《”物联网”流式处理应用 - 用PostgreSQL实时处理(万亿每天)》

接下来我们谈一谈极限写入和消费。

写入

从数据存储结构来看,PostgreSQL的HEAP存储是非常适合高速写入的,追加式写入。以下文章中已得到高速写入的验证。

《PostgreSQL 如何潇洒的处理每天上百TB的数据增量》

块(时序列)索引

BRIN索引,也被称为块索引,是针对数据块元数据建立的索引(例如某个自增长字段,物理存储和字段的值存在很好的线性相关性,那么每个块的数据区间就具有非常强的独立性),BRIN索引非常小,对写入性能的影响可以忽略。

BRIN适合物理存储和字段的值存在很好的线性相关性的字段,例如时序字段。

或者使用cluster或order 重排后,适合对应字段。

消费

消费是指异步的读取数据,处理数据的过程,例如IoT场景,数据的写入延迟要求非常低,所以要求写入吞吐特别大。

而处理方面,则通过消费机制,进行处理。

那么如何消费呢?

通常可以根据索引进行消费,比如前面提到的BRIN索引,对写入吞吐的影响小,同时支持=,以及范围的检索。如果有时序字段的话,BRIN是非常好的选择。

然而并非所有的数据写入场景都有时序字段(当然用户可以添加一个时间字段来解决这个问题)。当没有时序字段时,如何消费效率最高呢?

块扫描

块扫描是很好的选择,前面提到了数据存储是HEAP,追加形式。

PostgreSQL提供了一种tid scan的扫描方法,告诉数据库你要搜索哪个数据块的哪条记录。

select * from tbl where ctid='(100,99)';  

这条SQL指查询100号数据块的第100条记录。

这种扫描效率非常之高,可以配合HEAP存储,在消费(读取记录)时使用。

评估块记录数

PostgreSQL暂时没有提供返回整个数据块的所有记录的接口,只能返回某个数据块的某一条记录,所以如果我们需要读取某个数据块的记录,需要枚举该数据块的所有行。

如何评估一个数据块有多少条记录,或者最多有多少条记录?

PAGE layout

https://www.postgresql.org/docs/10/static/storage-page-layout.html

HeapTupleHeaderData Layout

Field Type Length Description  
t_xmin TransactionId   4 bytes insert XID stamp
t_xmax TransactionId 4 bytes delete XID stamp  
t_cid CommandId 4 bytes insert and/or delete CID stamp (overlays with t_xvac)  
t_xvac TransactionId 4 bytes XID for VACUUM operation moving a row version  
t_ctid ItemPointerData 6 bytes current TID of this or newer row version  
t_infomask2 uint16 2 bytes number of attributes, plus various flag bits  
t_infomask uint16 2 bytes various flag bits  
t_hoff uint8 1 byte offset to user data  

Overall Page Layout

Item Description
PageHeaderData 24 bytes long. Contains general information about the page, including free space pointers.
ItemIdData Array of (offset,length) pairs pointing to the actual items. 4 bytes per item.
Free space The unallocated space. New item pointers are allocated from the start of this area, new items from the end.
Items The actual items themselves.
Special space Index access method specific data. Different methods store different data. Empty in ordinary tables.

单页最大记录数估算

最大记录数=block_size/(ctid+tuple head)=block_size/(4+27);

postgres=# select current_setting('block_size');  
 current_setting   
-----------------  
 32768  
(1 row)  
  
postgres=# select current_setting('block_size')::int/31;  
 ?column?   
----------  
     1057  
(1 row)  

如果需要评估更精确的行数,可以加上字段的固定长度,变长字段的头(4BYTE)。

例子

生成指定block TID的函数

create or replace function gen_tids(blkid int) returns tid[] as $$  
select array(  
  SELECT ('('||blkid||',' || s.i || ')')::tid  
    FROM generate_series(0,current_setting('block_size')::int/31) AS s(i)  
)  ;  
$$ language sql strict immutable;  

读取某个数据块的记录

postgres=# create table test(id int);  
CREATE TABLE  
postgres=# insert into test select generate_series(1,10000);  
INSERT 0 10000  
  
postgres=# explain (analyze,verbose,timing,costs,buffers) select * from test where ctid = any  
(  
  array  
  (  
    SELECT ('(0,' || s.i || ')')::tid  
      FROM generate_series(0, current_setting('block_size')::int/31) AS s(i)  
  )  
);  
                                                                QUERY PLAN                                                                  
------------------------------------------------------------------------------------------------------------------------------------------  
 Tid Scan on postgres.test  (cost=25.03..40.12 rows=10 width=4) (actual time=0.592..0.795 rows=909 loops=1)  
   Output: test.id  
   TID Cond: (test.ctid = ANY ($0))  
   Buffers: shared hit=1057  
   InitPlan 1 (returns $0)  
     ->  Function Scan on pg_catalog.generate_series s  (cost=0.01..25.01 rows=1000 width=6) (actual time=0.087..0.429 rows=1058 loops=1)  
           Output: ((('(0,'::text || (s.i)::text) || ')'::text))::tid  
           Function Call: generate_series(0, ((current_setting('block_size'::text))::integer / 31))  
 Planning time: 0.106 ms  
 Execution time: 0.881 ms  
(10 rows)  
postgres=# explain (analyze,verbose,timing,costs,buffers) select * from test where ctid = any(gen_tids(1));  
  
 Tid Scan on postgres.test  (cost=1.32..1598.90 rows=1058 width=4) (actual time=0.026..0.235 rows=909 loops=1)  
   Output: id  
   TID Cond: (test.ctid = ANY ('{"(1,0)","(1,1)","(1,2)","(1,3)","(1,4)","(1,5)","(1,6)","(1,7)","(1,8)","(1,9)","(1,10)","(1,11)","(1,12)","(1,13)","(1,14)","(1,15)","(1,16)","(1,17)","(1,18)","(1,19)","(1,20)","(1,21)","(1,22)","(1,23)  
","(1,24)","(1,25)"  
....  
   Buffers: shared hit=1057  
 Planning time: 1.084 ms  
 Execution time: 0.294 ms  
(6 rows)  
postgres=# select ctid,* from test where ctid = any(gen_tids(11));
  ctid  |  id   
--------+-------
 (11,1) | 10000
(1 row)

postgres=# select ctid,* from test where ctid = any(gen_tids(9));
  ctid   |  id  
---------+------
 (9,1)   | 8182
 (9,2)   | 8183
 (9,3)   | 8184
 (9,4)   | 8185
 (9,5)   | 8186
 (9,6)   | 8187
 ...
 (9,904) | 9085
 (9,905) | 9086
 (9,906) | 9087
 (9,907) | 9088
 (9,908) | 9089
 (9,909) | 9090
(909 rows)

扩展场景

如果数据没有更新,删除;那么CTID还可以作为索引来使用,例如全文检索(ES),可以在建立索引时使用ctid来指向数据库中的记录,而不需要另外再建一个PK,也能大幅度提升写入性能。

并行场景

创建一个函数,根据规则生成一个ctid数组。

create or replace function gen_tids(
  blk1 int,  -- 此表总共多少block
  m int,     -- 并行度,即模数
  n int,     -- 并行号,即当前余数
  rx int     -- 平均行长度
) returns tid[] as $$  
with blks as (select id from generate_series(0,blk1) t(id) where mod(id,m)=n)
select array(  
  SELECT ('('||blks.id||',' || s.i || ')')::tid  
    FROM generate_series(0, (current_setting('block_size')::int/rx)+10 ) AS s(i) , blks   -- 加10条余量item,尽量减少漏行的风险
);  
$$ language sql strict immutable;  

创建测试表,每一行很小,插入10亿行

postgres=# \d tbl
            Unlogged table "public.tbl"
 Column |  Type   | Collation | Nullable | Default 
--------+---------+-----------+----------+---------
 id     | integer |           |          | 

postgres=# select count(*) from tbl;
   count    
------------
 1000000000
(1 row)

postgres=# \dt+ tbl
                   List of relations
 Schema | Name | Type  |  Owner   | Size  | Description 
--------+------+-------+----------+-------+-------------
 public | tbl  | table | postgres | 34 GB | 
(1 row)

查询每一行的平均长度,以及总共占用多少数据块

postgres=# select floor(current_setting('block_size')::int8*relpages/reltuples), relpages from pg_class where relname='tbl';
 floor | relpages 
-------+----------
    36 |  1100111
(1 row)

hash vs ctid 并行扫描

单行较小的大表

1、使用ctid扫描

postgres=# select count(*) from tbl where ctid = any(gen_tids(1100111, 20, 0, 36)); 
  count   
----------
 50000454
(1 row)

Time: 234985.944 ms (03:54.986)

2、使用hash扫描

postgres=# select count(*) from tbl where mod(id,20)=0;
  count   
----------
 50000000
(1 row)

Time: 79916.058 ms (01:19.916)

3、使用分区扫描

postgres=# create table tbl2 as  select * from tbl where mod(id,20)=0;
SELECT 50000000

postgres=# \dt+ tbl2
                    List of relations
 Schema | Name | Type  |  Owner   |  Size   | Description 
--------+------+-------+----------+---------+-------------
 public | tbl2 | table | postgres | 1719 MB | 
(1 row)

postgres=# \timing
Timing is on.

postgres=# select count(*) from tbl2;
  count   
----------
 50000000
(1 row)

Time: 593.304 ms

很显然,直接扫描分区的效果是最好的。

1、9.6以上的版本,就不需要纠结这么多了,因为已经内置了并行扫描的功能。

2、如果前端能做到写分区,建议还是前端分区,以达到最好的写入性能(即直接写入分区表)。

3、分区表的性能损耗。如果有海量数据写入,前端分区与数据库分区的性能损耗是不一样的。对于基于rule, trigger, 以及PG 10内置分区的分区方法,性能损耗较大。

pg_pathman是目前性能损耗最小的分区方法。推荐使用。

《PostgreSQL 9.5+ 高效分区表实现 - pg_pathman》

将来PostgreSQL 内置分区肯定是会优化的,长远来看,还是建议使用内置的分区功能。

《PostgreSQL 10.0 preview 功能增强 - 内置分区表》

4、风险

gen_tids函数里面用的是平均行长度,计算得到的ctid,所以当某些块中的记录小于平均长度时,也许能存下更多的行,而实际上产生的ctids也许没有包含那么多行,所以在查询时,可能会导致漏记录。

5、ctid 并行,适合行很大的场景。

小结

如果内核内置更优雅的写法,ctid扫描性能肯定更好。例如基于BLOCK这个级别的扫描,返回所有属于这些BLOCK的数据。

select * from tbl where blkid = any (array[blocks]);

参考

https://www.citusdata.com/blog/2016/03/30/five-ways-to-paginate/

https://www.postgresql.org/message-id/flat/be64327d326568a3be7fde1891ed34ff.squirrel%40sq.gransy.com#be64327d326568a3be7fde1891ed34ff.squirrel@sq.gransy.com

Flag Counter

digoal’s 大量PostgreSQL文章入口