PostgreSQL 如何潇洒的处理每天上百TB的数据增量

7 minute read

背景

本文主要介绍并测试PostgreSQL 在中高端x86服务器上的数据插入速度(目标表包含一个时间字段的索引),帮助企业用户了解PostgreSQL在这类场景下的性能表现。

这类场景常见于 :

运营商网关数据,金融行业数据,产生量大,并且要求快速插入大数据库中持久化保存。

另外, 用户如果需要流式实时处理,可以参考基于PostgreSQL的流式处理方案,一天处理1万亿的实时流式处理是如何实现的?

《”物联网”流式处理应用 - 用PostgreSQL实时处理(万亿每天)》

TEST CASE

1. 平均每条记录长度360字节, (比较常见的长度)

2. 时间字段创建索引。

3. 每轮测试插入12TB数据,插入完12T后清除数据继续插入。循环。

实际场景中,我们的单机很少有PB级的空间,一天写100TB,一个月就是3PB了。所以我们可以借助阿里云的OSS EXT插件,与海量对象存储来存历史数据。常见的架构如下:

pic

用法参考

https://help.aliyun.com/document_detail/44461.html

https://help.aliyun.com/document_detail/35457.html

4. 测试满24小时停止测试。

5. 统计24小时插入的记录数。

TEST 结果

24小时一共完成12轮测试,平均每轮测试耗时7071秒。

506万行/s,1.78 GB/s,全天插入4372亿,154TB数据。

测试的硬件环境

1\. X86服务器       
    
2\. 3?核。       

3\. 5??G 内存       

4\. 几块SSD,15TB容量       

软件环境

1\. CENTOS 6.x x64       

2\ .xfs       

3\. PostgreSQL 9.5       

系统配置参考

https://github.com/digoal/pgsql_admin_script/blob/master/pgsql_perf_tuning.md

数据库配置

详见:

《PostgreSQL on Linux 最佳部署手册》

./configure --prefix=/home/digoal/pgsql9.5.1 --with-blocksize=32 --with-segsize=128 --with-wal-blocksize=32 --with-wal-segsize=64      
make && make install      

PostgreSQL支持hugepage的方法请参考:

https://yq.aliyun.com/articles/8482

参数

listen_addresses = '0.0.0.0'            # what IP address(es) to listen on;    
fsync=on    
port = 1921                             # (change requires restart)    
max_connections = 600                   # (change requires restart)    
superuser_reserved_connections = 13     # (change requires restart)    
unix_socket_directories = '.'   # comma-separated list of directories    
unix_socket_permissions = 0700          # begin with 0 to use octal notation    
tcp_keepalives_idle = 60                # TCP_KEEPIDLE, in seconds;    
tcp_keepalives_interval = 10            # TCP_KEEPINTVL, in seconds;    
tcp_keepalives_count = 10               # TCP_KEEPCNT;    
shared_buffers = 256GB                   # min 128kB    
huge_pages = on                 # on, off, or try    
work_mem = 512MB                                # min 64kB    
maintenance_work_mem = 1GB              # min 1MB    
autovacuum_work_mem = 1GB               # min 1MB, or -1 to use maintenance_work_mem    
dynamic_shared_memory_type = posix      # the default is the first option    
bgwriter_delay = 10ms                   # 10-10000ms between rounds    
bgwriter_lru_maxpages = 1000            # 0-1000 max buffers written/round    
bgwriter_lru_multiplier = 2.0      
synchronous_commit = off                # synchronization level;    
full_page_writes = on                  # recover from partial page writes    
wal_buffers = 2047MB                    # min 32kB, -1 sets based on shared_buffers    
wal_writer_delay = 10ms         # 1-10000 milliseconds    
checkpoint_timeout = 55min              # range 30s-1h    
max_wal_size = 512GB    
checkpoint_completion_target = 0.9      # checkpoint target duration, 0.0 - 1.0    
effective_cache_size = 40GB       
log_destination = 'csvlog'              # Valid values are combinations of    
logging_collector = on          # Enable capturing of stderr and csvlog    
log_directory = 'pg_log'                # directory where log files are written,    
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log' # log file name pattern,    
log_file_mode = 0600                    # creation mode for log files,    
log_truncate_on_rotation = on           # If on, an existing log file with the    
log_checkpoints = off    
log_connections = off    
log_disconnections = off    
log_error_verbosity = verbose           # terse, default, or verbose messages    
log_timezone = 'PRC'    
log_autovacuum_min_duration = 0 # -1 disables, 0 logs all actions and    
datestyle = 'iso, mdy'    
timezone = 'PRC'    
lc_messages = 'C'                       # locale for system error message    
lc_monetary = 'C'                       # locale for monetary formatting    
lc_numeric = 'C'                        # locale for number formatting    
lc_time = 'C'                           # locale for time formatting    
default_text_search_config = 'pg_catalog.english'    
autovacuum=off    

创建测试表 :

每32K的block存储89条记录, 每条记录360字节。

postgres=# select string_agg(i,'') from (select md5(random()::text) i from generate_series(1,10) t(i)) t(i);    
                               string_agg                                                                           
----------------------------------------------------------------------    
 53d3ec7adbeacc912a45bdd8557b435be848e4b1050dc0f5e46b75703d4745833541b5dabc177db460b6b1493961fc72c478daaaac74bcc89aec4f946a496028d9cff1cc4144f738e01ea36436455c216aa697d87fe1f87ceb49134a687dc69cba34c9951d0c9ce9ca82bba229d56874af40498dca5f    
d8dfb9c877546db76c35a3362d6bdba6472d3919289b6eaeeab58feb4f6e79592fc1dd8253fd4c588a29    
(1 row)    
    
postgres=# create unlogged table test(crt_time timestamp, info text default '53d3ec7adbeacc912a45bdd8557b435be848e4b1050dc0f5e46b75703d4745833541b5dabc177db460b6b1493961fc72c478daaaac74bcc89aec4f946a496028d9cff1cc4144f738e01ea36436455c216aa697d87fe1f87ceb49134a687dc69cba34c9951d0c9ce9ca82bba229d56874af40498dca5f    
d8dfb9c877546db76c35a3362d6bdba6472d3919289b6eaeeab58feb4f6e79592fc1dd8253fd4c588a29');    
    
postgres=# alter table test alter column info set storage plain;    
    
postgres=# insert into test select now() from generate_series(1,1000);    
    
postgres=# select ctid from test limit 1000;    

分别在3个物理块设备上创建3个表空间目录,同时在数据库中创建表空间。

tbs1, tbs2, tbs3.

创建多个分表,用于减少 block extend exclusive lock 冲突。

do language plpgsql $$    
declare    
i int;    
sql text;    
begin    
  for i in 1..42 loop    
    sql := 'create unlogged table test'||i||' (like test including all) tablespace tbs1';    
    execute sql;    
    sql := 'create index idx_test'||i||' on test'||i||' using brin (crt_time) with (pages_per_range=512) tablespace tbs1';    
    execute sql;    
  end loop;    
  for i in 43..84 loop    
    sql := 'create unlogged table test'||i||' (like test including all) tablespace tbs2';    
    execute sql;    
    sql := 'create index idx_test'||i||' on test'||i||' using brin (crt_time) with (pages_per_range=512) tablespace tbs2';    
    execute sql;    
  end loop;    
  for i in 85..128 loop    
    sql := 'create unlogged table test'||i||' (like test including all) tablespace tbs3';    
    execute sql;    
    sql := 'create index idx_test'||i||' on test'||i||' using brin (crt_time) with (pages_per_range=512) tablespace tbs3';    
    execute sql;    
  end loop;    
end;     
$$;    

又见黑科技 BRIN 索引方法

这里使用的是brin范围索引,PostgreSQL 针对物联网流式数据的黑科技。

postgres=# \di    
                 List of relations    
 Schema |    Name     | Type  |  Owner   |  Table      
--------+-------------+-------+----------+---------    
 public | idx_test1   | index | postgres | test1    
 public | idx_test10  | index | postgres | test10    
 public | idx_test100 | index | postgres | test100    
 public | idx_test101 | index | postgres | test101    
 public | idx_test102 | index | postgres | test102    
 public | idx_test103 | index | postgres | test103    
 public | idx_test104 | index | postgres | test104    
 public | idx_test105 | index | postgres | test105    
 public | idx_test106 | index | postgres | test106    
......    
......    
 public | idx_test90  | index | postgres | test90    
 public | idx_test91  | index | postgres | test91    
 public | idx_test92  | index | postgres | test92    
 public | idx_test93  | index | postgres | test93    
 public | idx_test94  | index | postgres | test94    
 public | idx_test95  | index | postgres | test95    
 public | idx_test96  | index | postgres | test96    
 public | idx_test97  | index | postgres | test97    
 public | idx_test98  | index | postgres | test98    
 public | idx_test99  | index | postgres | test99    
(128 rows)    

生成测试脚本, 一个连接一次插入178条记录,占用2个32KB的block :

vi test.sql     
insert into test(crt_time) values (now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now());     
    
for ((i=1;i<=128;i++)) do sed "s/test/test$i/" test.sql > ./test$i.sql; done    

开始测试前清除数据:

do language plpgsql $$      
declare    
i int;    
sql text;    
begin    
  for i in 1..128 loop    
    sql := 'truncate test'||i;    
    execute sql;    
  end loop;    
end;     
$$;    

测试方法:

每轮测试插入12TB数据。通过以下方式控制:

1. 使用128个并行连接,每个连接执行1572864个事务。

2. 一共执行201326592个事务(每个事务插入178条记录)。

3. 一共插入35836133376条记录(358.36 亿记录)(共计12TB 数据,索引空间另算)。

进行下一轮测试前,输出日志,并TRUNCATE所有的数据,然后重复以上测试。

直到测试满24小时,输出统计数据。

测试脚本如下 :

vi test.sh    
#!/bin/bash    
    
if [ $# -ne 5 ]; then    
  echo "please use: $0 ip port dbname user pwd"    
  exit 1    
fi    
    
IP=$1    
PORT=$2    
DBNAME=$3    
USER=$4    
PASSWORD=$5    
    
export PGPASSWORD=$PASSWORD    
    
DEP_CMD="psql"    
which $DEP_CMD     
if [ $? -ne 0 ]; then    
  echo -e "dep commands: $DEP_CMD not exist."    
  exit 1    
fi    
    
truncate() {    
psql -h $IP -p $PORT -U $USER $DBNAME <<EOF    
do language plpgsql \$\$      
declare    
i int;    
sql text;    
begin    
  for i in 1..128 loop    
    sql := 'truncate test'||i;    
    execute sql;    
  end loop;    
end;     
\$\$;    
checkpoint;    
\q    
EOF    
}    
    
# truncate data first    
truncate    
    
START=`date +%s`    
echo "`date +%F%T` $START"    
    
for ((x=1;x>0;x++))     
do     
# ------------------------------------------------------    
echo "Round $x test start: `date +%F%T` `date +%s`"    
    
for ((i=1;i<=128;i++))     
do     
  pgbench -M prepared -n -r -f ./test$i.sql -h $IP -p $PORT -U $USER $DBNAME -c 1 -j 1 -t 1572864 >>./$i.log 2>&1 &     
done     
    
wait    
echo "Round $x test end: `date +%F%T` `date +%s`"    
# ------------------------------------------------------    
    
if [ $((`date +%s`-$START)) -gt 86400 ]; then    
  echo "end `date +%F%T` `date +%s`"    
  echo "duration second: $((`date +%s`-$START))"    
  exit 0    
fi    
    
echo "Round $x test end, start truncate `date +%F%T` `date +%s`"    
truncate    
echo "Round $x test end, end truncate `date +%F%T` `date +%s`"    
    
done    

测试

nohup ./test.sh xxx.xxx.xxx.xxx 1921 postgres postgres postgres >./test.log 2>&1 &    

测试结果

24小时完成12轮测试,平均每轮测试耗时7071秒。

506万行/s(每行360字节),1.78GB/s,全天插入4372亿,154TB数据。

查询性能

postgres=# select min(crt_time),max(crt_time) from test1;    
            min             |            max                 
----------------------------+----------------------------    
 2016-04-08 00:32:26.842728 | 2016-04-08 02:29:41.583367    
(1 row)    
    
postgres=# explain select count(*) from test1 where crt_time between '2016-04-08 00:32:00' and '2016-04-08 00:33:00';    
                                                                            QUERY PLAN                                                                                 
-------------------------------------------------------------------------------------------------------------------------------------------------------------------    
 Aggregate  (cost=1183919.81..1183919.82 rows=1 width=0)    
   ->  Bitmap Heap Scan on test1  (cost=14351.45..1180420.19 rows=1399849 width=0)    
         Recheck Cond: ((crt_time >= '2016-04-08 00:32:00'::timestamp without time zone) AND (crt_time <= '2016-04-08 00:33:00'::timestamp without time zone))    
         ->  Bitmap Index Scan on idx_test1  (cost=0.00..14001.49 rows=1399849 width=0)    
               Index Cond: ((crt_time >= '2016-04-08 00:32:00'::timestamp without time zone) AND (crt_time <= '2016-04-08 00:33:00'::timestamp without time zone))    
(5 rows)    
Time: 0.382 ms    
    
postgres=# select count(*) from test1 where crt_time between '2016-04-08 00:32:00' and '2016-04-08 00:33:00';    
  count      
---------    
 2857968    
(1 row)    
Time: 554.474 ms    

如何潇洒地做到每天写入百TB数据

1、合理的环境部署和数据库部署,请参考

《PostgreSQL on Linux 最佳部署手册》

2、异步提交

set synchronous_commit=off;  

3、关闭被写入表的autovacuum_enable和toast.autovacuum_enable开关。建议不需要进行海量输入的表,不要关闭autovacuum。

postgres=# alter table test_1 set (autovacuum_enabled =off);  
ALTER TABLE  
postgres=# alter table test_1 set (toast.autovacuum_enabled =off);  
ALTER TABLE  

4、使用UNLOGGED TABLE(不建议生产使用)

避免XLOG writer lock瓶颈。

5、使用2-4倍CPU核数的连接

例如32核,建议64-128个连接。少了发挥不了CPU的性能,多了无益(反而增加CPU时间片切换带来的开销)。

6、每个连接写不同的表。

减少表的extend file exclusive lock的锁冲突。如果不好实现的话,可以使用PostgreSQL提供的动态函数功能。在《PostgreSQL on ECS多云盘的部署、快照备份和恢复》有介绍。

7、使用批量提交(COPY或INSERT INTO TABLE VALUES (),(),…();)

批量提交减少交互开销,减少代码路径,提高性能。

8、如果需要使用时序查询,建议使用BRIN索引,不要使用B-TREE索引。请参考

《PostgreSQL 物联网黑科技 - 瘦身几百倍的索引(BRIN index)》

9、如果你的业务需要实时的分析。参考《”物联网”流式处理应用 - 用PostgreSQL实时处理(万亿每天)》

另一组测试数据,以及对应的测试方法参考:

《PostgreSQL on ECS多云盘的部署、快照备份和恢复》

将PostgreSQL部署在ECS虚拟机上,也已经接近每秒千万行、一天接近1万亿行的写入速度。

里面附录了数据结构的定义、批量写入的方法、动态函数等。

小结

1. 这个CASE主要的应用场景是实时的大数据入库,例如 物联网 的应用场景,大量的 传感器 会产生庞大的数据。

又比如传统的 运营商网关 ,也会有非常庞大的流量数据或业务数据需要实时的入库。

索引方面,用到了PostgreSQL黑科技BRIN。

2. 除了实时入库,用户如果需要流式实时处理,可以参考基于PostgreSQL的流式处理方案,

一天处理1万亿的实时流式处理是如何实现的?

《”物联网”流式处理应用 - 用PostgreSQL实时处理(万亿每天)》

3. 瓶颈, 还是在IO上面 , 有几个表现,TOP大量进程处于D(front io)状态 。

       w: S  --  Process Status    
          The status of the task which can be one of:    
             ’D’ = uninterruptible sleep    
             ’R’ = running    
             ’S’ = sleeping    
             ’T’ = traced or stopped    
             ’Z’ = zombie    

所有块设备的使用率均达100% 。

清理数据时 :

Device:         rrqm/s   wrqm/s     r/s     w/s   rsec/s   wsec/s avgrq-sz avgqu-sz   await  svctm  %util    
dfa               0.00     0.00 5807.39 167576.65 1464080.93 1340613.23    16.18   535.69    3.02   0.01 116.77    
dfb               0.00     0.00 5975.10 185132.68 1506714.40 1481061.48    15.63   459.46    2.32   0.01 110.62    
dfc               0.00     0.00 5715.56 182584.05 1440771.98 1460672.37    15.41   568.02    2.93   0.01 112.37    

插入数据时 :

Device:         rrqm/s   wrqm/s     r/s     w/s   rsec/s   wsec/s avgrq-sz avgqu-sz   await  svctm  %util    
dfa               0.00     0.00    0.00 235936.00     0.00 1887488.00     8.00  2676.34   11.17   0.00  99.10    
dfb               0.00     0.00    0.00 237621.00     0.00 1900968.00     8.00    66.02    0.10   0.00  99.10    
dfc               0.00     0.00    0.00 239830.00     0.00 1918632.00     8.00    10.66    0.04   0.00 101.30    

IO层面的性能问题,可以通过优化代码(例如 PostgreSQL bgwriter 在写出数据时,尽量顺序写出),便于OS层进行IO合并,来缓解IO压力,从这个信息来看,单次写IO的大小还可以再大点。

有几个工具你可能用得上,perf, systemtap, goprof.

如果要较全面的分析,建议把PostgreSQL –enable-profiling打开用于诊断。

Flag Counter

digoal’s 大量PostgreSQL文章入口