PostgreSQL 海量时序数据(任意滑动窗口实时统计分析) - 传感器、人群、物体等对象跟踪
背景
在现实生活中,经常会有聚集分析的需求。
例如:
某个商场,每个时间点,商场的每个商铺位置的人群驻留数量。(有技术手段可以感知人的驻留位置,当走进某个区域时,将写入一条记录,表示你进入了这个区域,离开时记录一条离开的记录,如果长时间不动,则定时写心跳记录)。
某个网游,每个时间点,在线人数。(上线时写一条上线记录,下线时写一条下线记录。)
某个共享单车公司,每个时间点,在线和不在线的车辆数量。(借车时写一条上线记录,还车时写一条下线记录。同时每隔一段时间询问车辆状态。)
某个物联网企业,每个分钟单位内,最小、最大在线传感器的数量。(传感器上线时写一条上线记录,下线时写一条下线记录,同时每隔一段时间询问传感器状态。)
这种属于非常典型的FEED应用,要求输出每个时间点这个世界(系统)的在线数。(如果按时间段输出,则输出每个时间段内的最大,最小在线数,实际上就是取range的边界)。
如何用PostgreSQL实现日增量100亿左右,任意滑动窗口实时统计分析呢?
设计
场景:
某个物联网企业,有一些传感器,传感器上线时写一条上线记录,下线时写一条下线记录,同时每隔小时询问传感器状态,也就是说1小时内没有记录的传感器视为不在线。
企业需要统计每个分钟单位内,最小、最大在线传感器的数量。
1、表结构
create table sensor_stat(
sid int, -- 传感器ID
state boolean, -- 传感器状态,true在线,false离线
crt_time timestamp -- 状态上传时间
);
2、索引
create index idx_sensor_stat_1 on sensor_stat(sid, crt_time desc);
写入1.101亿测试数据(我们假设这是1小时的数据写入量,全天写入26.424亿记录),1001个传感器ID。
insert into sensor_stat select random()*1000, (random()*1)::int::boolean, clock_timestamp() from generate_series(1,110100000);
3、数据TTL,确保表比较瘦,只包含心跳时间范围内的数据。
由于每小时接收心跳,所以1小时内,必有数据,没有数据的传感器不计状态。因此我们保留1小时内的状态即可。
一种保留方法是pipelinedb,用法如下。
另一种保留方法,使用两张表,轮询使用即可。
create table sensor_stat1 (
sid int, -- 传感器ID
state boolean, -- 传感器状态,true在线,false离线
crt_time timestamp -- 状态上传时间
);
create table sensor_stat2 (
sid int, -- 传感器ID
state boolean, -- 传感器状态,true在线,false离线
crt_time timestamp -- 状态上传时间
);
类似的用法如下
《PostgreSQL 数据rotate用法介绍 - 按时间覆盖历史数据》
4、使用递归查询,高效查询传感器的最终状态
with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select (t.sensor_stat).* from t where t.* is not null;
执行计划如下
explain (analyze,verbose,timing,costs,buffers) with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat where state is true order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid and t1.state is true order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select (t.sensor_stat).* from t where t.* is not null;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CTE Scan on t (cost=70.86..72.88 rows=100 width=13) (actual time=0.037..10.975 rows=1001 loops=1)
Output: (t.sensor_stat).sid, (t.sensor_stat).state, (t.sensor_stat).crt_time
Filter: (t.* IS NOT NULL)
Rows Removed by Filter: 1
Buffers: shared hit=5926
CTE t
-> Recursive Union (cost=0.57..70.86 rows=101 width=37) (actual time=0.030..10.293 rows=1002 loops=1)
Buffers: shared hit=5926
-> Subquery Scan on "*SELECT* 1" (cost=0.57..0.63 rows=1 width=37) (actual time=0.029..0.029 rows=1 loops=1)
Output: "*SELECT* 1".sensor_stat
Buffers: shared hit=5
-> Limit (cost=0.57..0.62 rows=1 width=49) (actual time=0.028..0.028 rows=1 loops=1)
Output: sensor_stat.*, sensor_stat.sid, sensor_stat.crt_time
Buffers: shared hit=5
-> Index Scan using idx_sensor_stat_1 on public.sensor_stat (cost=0.57..3180100.70 rows=55369290 width=49) (actual time=0.027..0.027 rows=1 loops=1)
Output: sensor_stat.*, sensor_stat.sid, sensor_stat.crt_time
Filter: (sensor_stat.state IS TRUE)
Buffers: shared hit=5
-> WorkTable Scan on t t_1 (cost=0.00..6.82 rows=10 width=32) (actual time=0.010..0.010 rows=1 loops=1002)
Output: (SubPlan 1)
Filter: ((t_1.sensor_stat).sid IS NOT NULL)
Rows Removed by Filter: 0
Buffers: shared hit=5921
SubPlan 1
-> Limit (cost=0.57..0.66 rows=1 width=49) (actual time=0.009..0.009 rows=1 loops=1001)
Output: t1.*, t1.sid, t1.crt_time
Buffers: shared hit=5921
-> Index Scan using idx_sensor_stat_1 on public.sensor_stat t1 (cost=0.57..1746916.71 rows=18456430 width=49) (actual time=0.009..0.009 rows=1 loops=1001)
Output: t1.*, t1.sid, t1.crt_time
Index Cond: (t1.sid > (t_1.sensor_stat).sid)
Filter: (t1.state IS TRUE)
Rows Removed by Filter: 1
Buffers: shared hit=5921
Planning time: 0.180 ms
Execution time: 11.083 ms
(35 rows)
样例
sid | state | crt_time
------+-------+----------------------------
0 | t | 2017-07-05 10:29:09.470687
1 | f | 2017-07-05 10:29:09.465721
2 | t | 2017-07-05 10:29:09.474216
3 | f | 2017-07-05 10:29:09.473176
4 | t | 2017-07-05 10:29:09.473179
5 | t | 2017-07-05 10:29:09.473842
......
996 | t | 2017-07-05 10:29:09.469787
997 | f | 2017-07-05 10:29:09.470983
998 | t | 2017-07-05 10:29:09.47268
999 | t | 2017-07-05 10:29:09.469192
1000 | t | 2017-07-05 10:29:09.472195
(1001 rows)
Time: 11.067 ms
效率很高,1.101亿数据,11毫秒获取最终在线状态。
在线的设备为state=t的。
with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select count(*) from t where t.* is not null and (t.sensor_stat).state is true;
count
-------
491
(1 row)
Time: 10.182 ms
5、统计任意时间点的传感器在线数量,如果每个设备上线的时间精确到秒(crt_time精确到秒),那么不管有多少条记录,一天最多需要统计86400个时间点的传感器在线数量。
例如统计 2017-07-05 10:29:09
时间点的传感器在线数量,加一个时间限制即可。
with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat where crt_time <= '2017-07-05 10:29:09' order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.crt_time <= '2017-07-05 10:29:09' and t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select count(*) from t where t.* is not null and (t.sensor_stat).state is true;
count
-------
501
(1 row)
Time: 20.743 ms
新增这个时间限制,会带来一定的性能影响,特别是如果这个时间是过去很久以前的时间,过滤会越多,性能下降越严重。
因此,建议实时,每秒发起一次查询请求,就不要加这个时间限制了。
6、一次性生成过去每一秒的在线数。
使用窗口查询的帧查询技术。(帧表示按时间排序,截止到当前记录的区间。)
7、统计每分钟内,最高在线数、最低在线数。
每秒查询一次,将数据写入结果表。
create table result (crt_time timestamp(0) default now(), state boolean, cnt int);
create index idx_result_1 on result using brin (crt_time);
insert into result (state,cnt)
with recursive t as
(
(
select sensor_stat as sensor_stat from sensor_stat order by sid, crt_time desc limit 1
)
union all
(
select (select t1 from sensor_stat AS t1 where t1.sid>(t.sensor_stat).sid order by sid, crt_time desc limit 1) from t where (t.sensor_stat).sid is not null
)
)
select (t.sensor_stat).state, count(*) from t where t.* is not null group by 1;
INSERT 0 2
Time: 12.061 ms
postgres=# select * from result ;
crt_time | state | cnt
---------------------+-------+-----
2017-07-05 11:11:03 | f | 510
2017-07-05 11:11:03 | t | 491
(2 rows)
Time: 0.274 ms
由于每次查询仅需12毫秒,每秒调用一次没有问题。
统计某一分钟内,最高在线数、最低在线数。
select '2017-07-05 11:11:00', min(cnt), max(cnt) from result where crt_time between '2017-07-05 11:11:00' and '2017-07-05 11:12:00';
or
select to_char(crt_time, 'yyyy-mm-dd hh24:mi:00'), min(cnt), max(cnt) from result where crt_time between ? and ? group by 1;
传感器ID很多很多时,如何优化
当传感器ID达到10万级别时,查询性能会下降到250毫秒。
如果传感器ID特别多,例如有百万以上,那么会下降到2.5秒。就不适合每秒查询一次了。
因此传感器数量特别多时,如何优化?
有一个比较好的方法是数据按传感器ID进行哈希分布,例如每张分区表负责1万个传感器ID。在查询在线数时,并发的查询所有的分区表,从而降低RT。
优化方法2 subquery
如果传感器ID有另一张表来维护,则可以使用SUB QUERY来优化本例。
create table a(id int primary key); -- id 为传感器ID
create table b(
aid int, -- 传感器ID
crt_time timestamp, -- 上报时间
val numeric -- 上报的VALUE
);
create index idx_b_1 on b(aid, crt_time desc); -- 索引
写入100001个传感器ID,写入1亿传感器上报的数据。
insert into a select generate_series(0,100000);
insert into b select random()*100000, clock_timestamp(), random() from generate_series(1,100000000);
使用sub query查询每个传感器ID的最后一个VALUE。
select (t.b).aid,(t.b).val,(t.b).crt_time
from
(
select (select b from b where b.aid=a.id order by crt_time desc limit 1) -- sub query, 循环若干次,若干=a的记录数。取出最后一个VALUE。
from a limit 1000000000 -- 这个不加的话有点问题,可能是个BUG,已反馈给社区。
) t
where (t.b).aid is not null; -- 取出b表中已上报的记录.
如果要取某个时间段内的,传感器的最后一条记录。
select (t.b).aid,(t.b).val,(t.b).crt_time
from
(
select
(
select b from b
where b.aid=a.id
and b.crt_time between ? and ? -- 限定时间区间
order by crt_time desc limit 1
) -- sub query, 循环若干次,若干=a的记录数。取出最后一个VALUE。
from a limit 1000000000 -- 这个不加的话有点问题,可能是个BUG,已反馈给社区。
) t
where (t.b).aid is not null; -- 取出b表中已上报的记录.
耗时举例1
explain (analyze,verbose,timing,costs,buffers) select (t.b).aid,(t.b).val,(t.b).crt_time
from
(
select (select b from b where b.aid=a.id order by crt_time desc limit 1) -- sub query, 循环若干次,若干=a的记录数。取出最后一个VALUE。
from a limit 1000000000 -- 这个不加的话有点问题,可能是个BUG,已反馈给社区。
) t
where (t.b).aid is not null; -- 取出b表中已上报的记录.
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=0.00..191854.32 rows=99500 width=44) (actual time=0.033..827.591 rows=100000 loops=1)
Output: (t.b).aid, (t.b).val, (t.b).crt_time
Filter: ((t.b).aid IS NOT NULL)
Buffers: shared hit=500443
-> Limit (cost=0.00..190854.32 rows=100000 width=32) (actual time=0.032..796.185 rows=100000 loops=1)
Output: ((SubPlan 1))
Buffers: shared hit=500443
-> Seq Scan on postgres.a (cost=0.00..190854.32 rows=100000 width=32) (actual time=0.031..787.322 rows=100000 loops=1)
Output: (SubPlan 1)
Buffers: shared hit=500443
SubPlan 1
-> Limit (cost=0.57..1.89 rows=1 width=55) (actual time=0.007..0.007 rows=1 loops=100000)
Output: b.*, b.crt_time
Buffers: shared hit=500000
-> Index Scan using idx_b_1 on postgres.b (cost=0.57..946.44 rows=713 width=55) (actual time=0.007..0.007 rows=1 loops=100000)
Output: b.*, b.crt_time
Index Cond: (b.aid = a.id)
Buffers: shared hit=500000
Planning time: 0.144 ms
Execution time: 832.539 ms
(20 rows)
耗时举例2
explain (analyze,verbose,timing,costs,buffers) select (t.b).aid,(t.b).val,(t.b).crt_time
from
(
select
(
select b from b
where b.aid=a.id
and b.crt_time between '2017-07-17 09:53:00.480416' and '2017-07-17 09:54:00.480416' -- 限定时间区间
order by crt_time desc limit 1
) -- sub query, 循环若干次,若干=a的记录数。取出最后一个VALUE。
from a limit 1000000000 -- 这个不加的话有点问题,可能是个BUG,已反馈给社区。
) t
where (t.b).aid is not null; -- 取出b表中已上报的记录.
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Subquery Scan on t (cost=0.00..192742.68 rows=99500 width=44) (actual time=0.039..671.069 rows=100000 loops=1)
Output: (t.b).aid, (t.b).val, (t.b).crt_time
Filter: ((t.b).aid IS NOT NULL)
Buffers: shared hit=501263
-> Limit (cost=0.00..191742.68 rows=100000 width=32) (actual time=0.036..643.263 rows=100000 loops=1)
Output: ((SubPlan 1))
Buffers: shared hit=501263
-> Seq Scan on postgres.a (cost=0.00..191742.68 rows=100000 width=32) (actual time=0.035..634.038 rows=100000 loops=1)
Output: (SubPlan 1)
Buffers: shared hit=501263
SubPlan 1
-> Limit (cost=0.57..1.90 rows=1 width=55) (actual time=0.006..0.006 rows=1 loops=100000)
Output: b.*, b.crt_time
Buffers: shared hit=500820
-> Index Scan using idx_b_1 on postgres.b (cost=0.57..134.12 rows=100 width=55) (actual time=0.006..0.006 rows=1 loops=100000)
Output: b.*, b.crt_time
Index Cond: ((b.aid = a.id) AND (b.crt_time >= '2017-07-17 09:53:00.480416'::timestamp without time zone) AND (b.crt_time <= '2017-07-17 09:54:00.480416'::timestamp without time zone))
Buffers: shared hit=500820
Planning time: 0.183 ms
Execution time: 676.006 ms
(20 rows)
小结
使用本文提到的方法(递归查询),我们可以实现非常细粒度的,大量被跟踪物的状态实时统计(单机支持日增量100亿左右的实时任意滑动窗口透视)。
用于绘制被跟踪物的实时状态图,例如:
1、实时热力图
2、实时传感器(或用户)在线、离线数,任意滑动窗口的最大最小在线、离线值。