PostgreSQL 流式数据处理(聚合、过滤、转换…)系列 - 9
背景
2013年帮朋友做的方案。写了一些列文档来解决当时某个大数据BI平台的异步流式数据处理的功能。
逐步优化,化繁为简。
在业务层面,统计,数据的过滤,数据的清洗,数据的事件触发等。是比较常见的需求。
比如以COUNT就是一个很典型的例子。
在9.2以前全表的count只能通过扫描全表来得到, 即使有pk也必须扫描全表.
9.2版本增加了index only scan的功能, count(*)可以通过仅仅扫描pk就可以得到.
但是如果是一个比较大的表, pk也是很大的, 扫描pk也是个不小的开销.
到了9.6,开始支持并行查询,通过并行,一张1亿的表,COUNT可能只需要几百毫秒。这是一个质的飞跃。(但是还有很多时候用并行并不是最好的)
另外社区也除了一个流式处理的数据库,pipelineDB,但是它的社区版本限制了一个DATABASE只能使用1024个流视图,在编码的地方使用了1BYTE存储CV。
那么回到postgresql数据库本身,有没有办法来优化count全表的操作呢, 如果你的场景真的有必要频繁的count全表, 那么可以尝试一下使用以下方法来优化你的场景.
正文
对于insert-only的大数据实时COUNT统计, 我以前写过8篇相关的文章来实现并行插入的环境下, 如何实现实时的COUNT统计.
按照XID做切片, 每次统计一些XID的数据, 一个事务中插入的数据必须一次处理完.
但是当插入的数据是批量插入时, 例如一个事务中插入了几百万记录, 那么使用原来的方法, 对于一个事务的取数是在一次性完成的,
这种情况, 我们更希望一个事务包含的几百万记录被拆分成多个来统计.
思路是增加一个序列字段, 这样操作的话对于一个事务插入的一批数据: XID一致, 但是序列字段的值不一致.
这样可以用来对单个事务包含的几百万记录拆分成多个数据分片来统计.
当然取数据的逻辑也会变得更加复杂, 个人不建议这么来做.
还是推荐使用如下方法
为什么要用XID做切片呢, 原因很简单, 因为如果只用序列或者时间类型的自增字段来对数据切片的话, 对于并发插入的场景, 可能导致某些数据取不到.
例如 :
digoal=# create table log(id serial8 primary key, info text);
CREATE TABLE
SESSION A :
digoal=# begin;
BEGIN
digoal=# insert into log (info) values ('test1');
INSERT 0 1
SESSION B :
digoal=# begin;
BEGIN
digoal=# insert into log (info) values ('test2');
INSERT 0 1
digoal=# end;
COMMIT
SESSION C :
按ID增长取数据,
digoal=# select * from log where id<= (select max(id) from log);
id | info
----+-------
2 | test2
(1 row)
SESSION A :
digoal=# end;
COMMIT
SESSION C :
下次取数据, ID=1的数据就取不到了.
digoal=# select * from log where id>2 and id<=(select max(id) from log);
id | info
----+------
(0 rows)
对于使用时序也一样存在这个问题 :
digoal=# create table log(id serial8 primary key, info text, crt_time timestamp default clock_timestamp());
CREATE TABLE
SESSION A :
digoal=# begin;
BEGIN
digoal=# insert into log (info) values ('test1');
INSERT 0 1
SESSION B :
digoal=# begin;
BEGIN
digoal=# insert into log (info) values ('test2');
INSERT 0 1
digoal=# end;
COMMIT
SESSION C :
digoal=# select * from log where crt_time<=(select max(crt_time) from log);
id | info | crt_time
----+-------+---------------------------
2 | test2 | 2013-12-30 17:14:03.37241
(1 row)
SESSION A :
digoal=# end;
COMMIT
SESSION C :
digoal=# select * from log where crt_time>'2013-12-30 17:14:03.37241' and crt_time<=(select max(crt_time) from log);
id | info | crt_time
----+------+----------
(0 rows)
digoal=# select * from log;
id | info | crt_time
----+-------+----------------------------
1 | test1 | 2013-12-30 17:13:56.258402
2 | test2 | 2013-12-30 17:14:03.37241
(2 rows)
我的前9篇文章都是通过对表增加事务号xid字段来规避以上并行插入带来的第三方进程实时取数遗失问题.
其实我们反过来想, 如果单个表的插入, 同一时刻没有并行的插入, 或者说只有串行插入的话, 用以上例子中的方法就可以了, 而不需要XID这种繁杂的取数据的逻辑. (具体有多繁杂大家可以看看我前面写的9篇BLOG, 在本文参考部分).
由于单个表单线程插入的话, 性能可能被质疑, 我们先看看单个表, 单个线程插入的性能到底能达到多少?
分单步和批量两种方式测试
单步提交, 每秒约插入1.08万条记录每秒.
digoal=# create table log
(id serial8 primary key,
c1 int default 1,
c2 int default 2,
c3 int default 3,
c4 int default 4,
c5 text default '75230039beb12ce952f24927f2bfa2f2',
c6 text default 'ff8c5e1e275dafc3aa14b8977b2c4388',
c7 text default '835ff6bcc56013f69a0c22b53434eac3',
c8 text default 'c9408302cc70937dea98796f936c42f0',
c9 text default '45ecadc25510c732bec90bc8c3a986f1',
c10 text default '86d3639386427f45c2bf4ba40df4fa8e',
c11 timestamp default clock_timestamp()
);
CREATE TABLE
pg94@db-172-16-3-150-> vi test.sql
insert into log(c1) values(1);
pg94@db-172-16-3-150-> pgbench -M prepared -n -r -f ./test.sql -c 1 -j 1 -T 30
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 1
number of threads: 1
duration: 30 s
number of transactions actually processed: 323891
latency average: 0.093 ms
tps = 10796.333558 (including connections establishing)
tps = 10797.279037 (excluding connections establishing)
statement latencies in milliseconds:
0.091609 insert into log(c1) values(1);
批量提交, 一次600条. 换算后最终结果是插入76200条记录每秒.
pg94@db-172-16-3-150-> pgbench -M prepared -n -r -f ./test.sql -c 1 -j 1 -T 30
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 1
number of threads: 1
duration: 30 s
number of transactions actually processed: 3816
latency average: 7.862 ms
tps = 127.175476 (including connections establishing)
tps = 127.186531 (excluding connections establishing)
statement latencies in milliseconds:
7.859686 insert into log(c1) values(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1);
把序列的缓存调到1000后, 单步插入性能没有变化, 批量插入性能提升到88613万条每秒.
digoal=# alter sequence log_id_seq cache 1000;
ALTER SEQUENCE
如果不能满足以上性能的话, 也不必着急, 有一种方法可以规避, 创建多个表, 每个表一个线程插入这样总可以吧.
如果觉得多个表一对一的插入这样程序设计起来比较麻烦, 那么用分区表来解决也是可以的, 程序还是插1个主表, 不过由数据库的触发器函数来决定每个程序插入1个表, 这样做的话数据库需要承担一定的逻辑判断, 表越多, 对性能越不利.
不过我个人还是建议使用程序来区分每个线程插入不同的表, 这样能够达到最高的性能.
最后, 如果使用锁的方法来实现多线程并行插入模拟成串行插入的话, 还需要注意序列的缓存, 因为会话不断开的话, 取到的序列的缓存是一直存在的, PostgreSQL 9.4开始支持DISCARD SEQUENCE了, 这样可以清楚缓存中的序列, 那么使用加锁的方式来实现并行模拟串行的插入就可以了, 获得锁后, 立刻执行DISCARD SEQUENCE然后再插入数据.
例如 :
alter sequence log_id_seq cache 1000;
pg94@db-172-16-3-150-> vi test.sql
select pg_advisory_lock(1);
DISCARD SEQUENCES;
insert into log(c1) values(1);
select pg_advisory_unlock_all();
性能比单线程插入更差, 因为虽然是并行的, 但是最终要更多的锁开销来模拟成串行.
pg94@db-172-16-3-150-> pgbench -M prepared -n -r -f ./test.sql -c 16 -j 4 -T 10
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 16
number of threads: 4
duration: 10 s
number of transactions actually processed: 38647
latency average: 4.140 ms
tps = 3862.764755 (including connections establishing)
tps = 3867.133490 (excluding connections establishing)
statement latencies in milliseconds:
3.876671 select pg_advisory_lock(1);
0.046670 DISCARD SEQUENCES;
0.116832 insert into log(c1) values(1);
0.090477 select pg_advisory_unlock_all();
批量插入的结果则更能接受, 因为锁的开销和并行插入的时间相比不算大 :
pg94@db-172-16-3-150-> pgbench -M prepared -n -r -f ./test.sql -c 16 -j 4 -T 10
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 16
number of threads: 4
duration: 10 s
number of transactions actually processed: 1346
latency average: 118.871 ms
tps = 133.019620 (including connections establishing)
tps = 133.165837 (excluding connections establishing)
statement latencies in milliseconds:
112.006556 select pg_advisory_lock(1);
0.115342 DISCARD SEQUENCES;
7.137090 insert into log(c1) values(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1);
0.168648 select pg_advisory_unlock_all();
检验使用锁的方法模拟的串行插入事务ID和序列是否一致.
digoal=# select * from
(select row_number() over (order by id) as rn, id, xmin::text::int8 from log) t1,
(select row_number() over (order by (xmin::text::int8),id) as rn, id, xmin::text::int8 from log) t2
where t1.rn=t2.rn
and (t1.id<>t2.id or t1.xmin<>t2.xmin);
rn | id | xmin | rn | id | xmin
----+----+------+----+----+------
(0 rows)
结果一致.
为方便大家查询, 汇总PostgreSQL实时和非实时数据统计的案例分析文章系列 - 如下 :
1. http://blog.163.com/digoal@126/blog/static/163877040201331252945440/
2. http://blog.163.com/digoal@126/blog/static/16387704020133151402415/
3. http://blog.163.com/digoal@126/blog/static/16387704020133155179877/
4. http://blog.163.com/digoal@126/blog/static/16387704020133156636579/
5. http://blog.163.com/digoal@126/blog/static/16387704020133218305242/
6. http://blog.163.com/digoal@126/blog/static/16387704020133224161563/
7. http://blog.163.com/digoal@126/blog/static/16387704020133271134563/
8. http://blog.163.com/digoal@126/blog/static/16387704020134311144755/