PostgreSQL 独立事件相关性分析 二 - 人车拟合

2 minute read

背景

独立事件相关性分析是一件很有意思的事情,例如

探探软件的擦肩而过功能点,在不同时空与你擦肩而过的人。

舆情分析。

商品最佳销售组合。

安全系统中的人车拟合,对时空数据进行处理,用于司机、乘客、车辆的拟合。

人车拟合

1、建立表结构

create table u_pos (  
  id int8,  
  uid int8,  
  crt_time timestamp,  
  pos geometry  
);  

create table c_pos (  
  id int8,  
  car_id int8,  
  crt_time timestamp,  
  pos geometry  
);  

2、生成测试数据。

以杭州市为例,经纬度范围如下:

东经118°21′-120°30′,北纬29°11′-30°33′ 计算得东经118.35°-120.5°,北纬29.183°-30.55°。  

活跃量假设:

1000万人口,1000万车辆。  
  
人的轨迹数,一天10亿。  
  
车的轨迹数,一天1亿。  

2.1、写入人的活动位置数据,按天分区,保留一年。

for ((i=1;i<=32;i++))  
do  
nohup psql -c "insert into u_pos select id, random()*10000000, '2017-10-01'::date + ((id*2.7648)||' ms')::interval, st_setsrid(st_makepoint(118.35+random()*2.15, 29.183+random()*1.367), 4326) from generate_series(1,31250000) t(id);" >/dev/null 2>&1 &  
done  

采用时序数据中最常用的brin索引。

create index idx_u_pos_1 on u_pos using brin(crt_time);  

建立人+时间的索引。

create index idx_u_pos_2 on u_pos using btree(uid, crt_time);  

2.2、写入车辆的活动位置数据,按天分区,保留一年。

for ((i=1;i<=32;i++))  
do  
nohup psql -c "insert into c_pos select id, random()*10000000, '2017-10-01'::date + ((id*27.648)||' ms')::interval, st_setsrid(st_makepoint(118.35+random()*2.15, 29.183+random()*1.367), 4326) from generate_series(1,3125000) t(id);" >/dev/null 2>&1 &  
done  

采用时序数据中最常用的brin索引。

create index idx_c_pos_1 on c_pos using brin(crt_time);  

建立车+时间的索引。

create index idx_c_pos_2 on c_pos using btree(car_id, crt_time);  

3、求某个时间区间的人车拟合

3.1、车辆,行驶过程中抓到的N个点,返回时间,位置。

select pos, crt_time from c_pos where car_id=? and crt_time between ? and ?;  

返回对应时间区间的N个点附近的人交集

create or replace function merge_car_u(  
  v_car_id int8,       -- 汽车ID  
  s_time timestamp,    -- 搜索范围,开始时间  
  e_time timestamp,    -- 搜索范围,结束时间  
  ts_range interval,   -- 每个汽车轨迹点对应的:目标人出现的时间与汽车出现时间的时间差(前后各放大多少)  
  pos_range float8     -- 每个汽车轨迹点对应的:目标人与汽车的距离  
) returns int8[] as $$  
declare  
  res int8[];  
  tmp int8[];  
  v_pos geometry;  
  v_crt_time timestamp;  
  i int := 0;  
begin  
  for v_pos, v_crt_time in select pos, crt_time from c_pos where car_id=v_car_id and crt_time between s_time and e_time  -- 求轨迹点  
  loop  
    select array_agg(uid) into tmp from u_pos where crt_time between v_crt_time-ts_range and v_crt_time+ts_range and (v_pos <-> pos) < pos_range;  -- 求对应目标的ID  
    if (i <> 0) then  
      select array_agg(unnest) into res from (select unnest(res) intersect select unnest(tmp)) t;  -- 求交集  
    else  
      res := tmp;  
    end if;  
    i := i+1;  
  end loop;  
  return res;  
end;  
$$ language plpgsql strict;  

例子:

postgres=# select * from merge_car_u(1, '2017-10-01 01:00:00', '2017-10-01 04:00:00', '10 s', 0.004);  
            merge_car_u              
-----------------------------------  
 {5481974,5958009,3682524,1313466}  
(1 row)  
  
Time: 232.960 ms  

3.2、人,运动过程中抓到的N个点,返回时间,位置。

返回对应时间区间的N个点附近的车辆的交集

create or replace function merge_u_car(  
  v_uid int8,              -- 人ID  
  s_time timestamp,        -- 搜索范围,开始时间  
  e_time timestamp,        -- 搜索范围,结束时间  
  ts_range interval,       -- 每个人轨迹点对应的:目标车辆出现的时间与人出现时间的时间差(前后各放大多少)  
  pos_range float8         -- 每个人轨迹点对应的:目标车辆与人的距离  
) returns int8[] as $$  
declare  
  res int8[];  
  tmp int8[];  
  v_pos geometry;  
  v_crt_time timestamp;  
  i int := 0;  
begin  
  for v_pos, v_crt_time in select pos, crt_time from u_pos where uid=v_uid and crt_time between s_time and e_time  -- 求轨迹点  
  loop  
    select array_agg(car_id) into tmp from c_pos where crt_time between v_crt_time-ts_range and v_crt_time+ts_range and (v_pos <-> pos) < pos_range;  -- 求对应目标的ID  
    if (i <> 0) then  
      select array_agg(unnest) into res from (select unnest(res) intersect select unnest(tmp)) t;  -- 求交集  
    else  
      res := tmp;  
    end if;  
    i := i+1;  
  end loop;  
  return res;  
end;  
$$ language plpgsql strict;  

例子:

postgres=# select * from merge_u_car(100, '2017-10-01 01:00:00', '2017-10-01 02:00:00', '100 s', 0.2);  
                                                merge_u_car                                                  
-----------------------------------------------------------------------------------------------------------  
 {6214562,6180159,4534165,7824219,6826437,3020910,1463798,2939986,5786345,7233751,2856178,1719127,7763683}  
(1 row)  
  
Time: 96.986 ms  

小结

1、存储、索引优化思路。

时间截断 + 空间排序 存储

例如

(YYYY-MM-DD HH24:MI), (geohash)  

存储修整后,建立以上结构的btree或BRIN索引。

当搜索某个时间点,出现在某个点附近的记录时,可以并行,并且搜索的数据块是比较少的,因为密集存储。

2、其他需求:缺失位置的补齐。某些情况下,可能导致车辆、人的位置信息未采集的情况,例如经过拥堵路段、采集设备死角等。

在位置获取出现空缺的情况下,使用pgrouting,以及路网信息,生成若干条路径,补齐为出现的点。同时估算时间,得到点和经过的时间。

3、其他需求:异常位置纠正。

4、拟合性能,以天为分区。1000万人口,1000万车辆。人的轨迹数,一天10亿。车的轨迹数,一天1亿。

可以做到毫秒级别的拟合响应。

参考

《潘金莲改变了历史之 - PostgreSQL舆情事件分析应用》

《为什么啤酒和纸尿裤最搭 - 用HybridDB/PostgreSQL查询商品营销最佳组合》

Flag Counter

digoal’s 大量PostgreSQL文章入口