PostgreSQL pipelinedb 流计算插件 - IoT应用 - 实时轨迹聚合
背景
IoT场景,车联网场景,共享单车场景,人的行为位点等,终端实时上报的是孤立的位点,我们需要将其补齐成轨迹。
例如共享单车,下单,开锁,生成订单,骑行,关闭订单,关锁。这个过程有一个唯一的订单号,每次上报的位点会包含时间,订单号,位置。
根据订单号,将点聚合为轨迹。
使用pipelinedb插件,可以实时的实现聚合。
例子
以ECS (centos 7.x x64), postgresql 10 为例
1、编译zeromq
wget https://github.com/zeromq/libzmq/releases/download/v4.2.5/zeromq-4.2.5.tar.gz
tar -zxvf zeromq-4.2.5.tar.gz
cd zeromq-4.2.5
./configure
make
make install
2、编译pipelinedb
wget https://github.com/pipelinedb/pipelinedb/archive/1.0.0rev4.tar.gz
tar -zxvf 1.0.0rev4.tar.gz
cd pipelinedb-1.0.0rev4/
vi Makefile
SHLIB_LINK += /usr/local/lib/libzmq.so -lstdc++
. /var/lib/pgsql/env.sh 1925
USE_PGXS=1 make
USE_PGXS=1 make install
3、配置postgresql.conf
max_worker_processes = 512
shared_preload_libraries = 'pipelinedb'
pipelinedb.num_combiners=16
pipelinedb.num_workers=8
pipelinedb.num_queues=4
pipelinedb.continuous_queries_enabled=true
重启
pg_ctl restart -m fast
4、安装插件
postgres=# create extension pipelinedb;
5、创建stream,实时写入轨迹点
CREATE FOREIGN TABLE s1 ( order_id int8, ts timestamp, pos geometry )
SERVER pipelinedb;
6、创建Continue view,实时聚合
CREATE VIEW cv1 WITH (action=materialize ) AS
select order_id, min(ts) min_ts, array_agg(ts||','||st_astext(pos)) as seg
from s1
group by order_id;
激活视图(默认已激活)
select pipelinedb.activate('public.cv1');
7、压测
vi test.sql
\set order_id random(1,100000)
\set x random(70,90)
\set y random(120,125)
insert into s1 (order_id, ts, pos) values (:order_id, clock_timestamp(), st_makepoint(:x+10*random(), :y+10*random()));
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 256 -j 256 -T 120
8、压测结果
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 256
number of threads: 256
duration: 120 s
number of transactions actually processed: 17614607
latency average = 1.740 ms
latency stddev = 1.730 ms
tps = 146550.933776 (including connections establishing)
tps = 146906.482277 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.002 \set order_id random(1,10000000)
0.001 \set x random(70,90)
0.000 \set y random(120,125)
1.742 insert into s1 (order_id, ts, pos) values (:order_id, clock_timestamp(), st_makepoint(:x+10*random(), :y+10*random()));
postgres=# \x
Expanded display is on.
-[ RECORD 17 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
order_id | 8672585
min_ts | 2018-11-01 18:44:08.140027
seg | {"2018-11-01 18:44:08.140027,POINT(78.3615547642112 121.881739947945)","2018-11-01 18:44:11.739248,POINT(80.9645632216707 121.450987955555)"}
-[ RECORD 18 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
order_id | 4011211
min_ts | 2018-11-01 18:44:08.166407
seg | {"2018-11-01 18:44:08.166407,POINT(87.126777020283 132.819293198176)","2018-11-01 18:44:11.524995,POINT(80.482944605872 126.906906872056)"}
-[ RECORD 19 ]---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
order_id | 2468486
min_ts | 2018-11-01 18:44:08.135136
seg | {"2018-11-01 18:44:08.135136,POINT(84.7732630362734 132.659516767599)","2018-11-01 18:44:20.603312,POINT(87.6352122295648 132.18647258915)","2018-11-01 18:44:19.447776,POINT(94.9817024609074 131.295661441982)"}
9、历史轨迹的保留
设置cv生命周期,自动清理老化数据
postgres=# select pipelinedb.set_ttl('cv1', interval '1 hour' , 'min_ts');
-[ RECORD 1 ]-----
set_ttl | (3600,2)
创建目标持久化表
create table cv1_persist (like cv1);
创建时间字段索引(CV1)
postgres=# create index idx_1 on cv1 (min_ts);
CREATE INDEX
ETL形式,将数据从cv抽取到目标持久化表
postgres=# insert into cv1_persist select * from cv1 where min_ts <= '2018-01-01';
INSERT 0 0
参考
https://github.com/pipelinedb/pipelinedb
http://zeromq.org/intro:get-the-software
https://www.pipelinedb.com/