PostgreSQL 事务快照功能 - Parallel Export consistent data or Parallel Query use snapshot transaction feature

5 minute read

背景

PostgreSQL 9.2 的一个新特性支持事务状态的导出和导入, 以前写过一篇文章测试这项特性,

http://blog.163.com/digoal@126/blog/static/1638770402012416105232835/

本文主要是针对这项特性的一个延展, 利用这个特性可以让PostgreSQL 支持并行的一致性数据导出, 或者并行查询.

首先测试两个事务导出和导入的场景, 从测试结果可以看到多个事务导入同一个事务状态后看到了一致的数据.

同时如果导出事务状态的事务提前结束, 状态可以保持, 不会影响导入这个状态的事务的一致性.

一、导出repeatable read 事务状态.

SESSION A(导出repeatable read事务状态) :

postgres=# drop table snapshot ;  
DROP TABLE  
postgres=# create table snapshot (id int, info text);  
CREATE TABLE  
postgres=# insert into snapshot values (1,'abc');  
INSERT 0 1  
postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# SELECT pg_export_snapshot();  
 pg_export_snapshot   
--------------------  
 000007E9-1  
(1 row)  

SESSION B(修改数据,自动提交) :

postgres=# insert into snapshot values (2,'test');  
INSERT 0 1  

SESSION C(导入repeatable read事务状态, 查看数据) :

postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# SET TRANSACTION SNAPSHOT '000007E9-1';  
SET  
postgres=# select * from snapshot ;  
 id | info   
----+------  
  1 | abc  
(1 row)  

SESSION B(修改数据,自动提交) :

postgres=# insert into snapshot values (3,'test');  
INSERT 0 1  

SESSION D(导入repeatable read事务状态, 查看数据) :

postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# SET TRANSACTION SNAPSHOT '000007E9-1';  
SET  
postgres=# select * from snapshot ;  
 id | info   
----+------  
  1 | abc  
(1 row)  

SESSION A(结束事务, 不会影响已导入事务状态的事务的一致性) :

postgres=# end;  
COMMIT  

SESSION B(修改数据,自动提交) :

postgres=# insert into snapshot values (4,'test');  
INSERT 0 1  

SESSION C(查询数据) :

postgres=# select * from snapshot ;  
 id | info   
----+------  
  1 | abc  
(1 row)  

SESSION D(查询数据) :

postgres=# select * from snapshot ;  
 id | info   
----+------  
  1 | abc  
(1 row)  

二、导出read committed事务状态.

SESSION A(导出read committed事务状态) :

postgres=# drop table snapshot ;  
DROP TABLE  
postgres=# create table snapshot (id int, info text);  
CREATE TABLE  
postgres=# insert into snapshot values (1,'abc');  
INSERT 0 1  
postgres=# begin transaction isolation level read committed;  
BEGIN  
postgres=# SELECT pg_export_snapshot();  
 pg_export_snapshot   
--------------------  
 000007F0-1  
(1 row)  

SESSION B(修改数据,自动提交) :

postgres=# insert into snapshot values (2,'test');  
INSERT 0 1  

SESSION C(导入read committed事务状态, 查看数据) :

postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# SET TRANSACTION SNAPSHOT '000007F0-1';  
SET  
postgres=# select * from snapshot ;  
 id | info   
----+------  
  1 | abc  
(1 row)  

SESSION B(修改数据,自动提交) :

postgres=# insert into snapshot values (3,'test');  
INSERT 0 1  

SESSION D(导入read committed事务状态, 查看数据) :

postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# SET TRANSACTION SNAPSHOT '000007F0-1';  
SET  
postgres=# select * from snapshot ;  
 id | info   
----+------  
  1 | abc  
(1 row)  

SESSION A(结束事务, 不会影响已导入事务状态的事务的一致性) :

postgres=# end;  
COMMIT  

SESSION B(修改数据,自动提交) :

postgres=# insert into snapshot values (4,'test');  
INSERT 0 1  

SESSION C(查询数据) :

postgres=# select * from snapshot ;  
 id | info   
----+------  
  1 | abc  
(1 row)  

SESSION D(查询数据) :

postgres=# select * from snapshot ;  
 id | info   
----+------  
  1 | abc  
(1 row)  

从上面的测试可用看出, 导出事务状态的事务不管是read committed还是repeatable read的, 不影响后续事务导入的结果.

但是需要注意, 以下场景则存在差异, 这也是read committed和repeatable read事务的特性差异, 并不会影响后续多节点导入这个事务状态后的数据一致性观测.

注意pg_export_snapshot()会获得一个事务号.

而begin; 并不会马上获得事务号, 需要在第一条变更数据的语句开启前获得. 或者使用txid_current()直接获得.

1. repeatable read :

SESSION A(启动repeatable read事务) :

postgres=# drop table snapshot ;  
DROP TABLE  
postgres=# create table snapshot (id int, info text);  
CREATE TABLE  
postgres=# insert into snapshot values (1,'abc');  
INSERT 0 1  
postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# select txid_current();  
 txid_current   
--------------  
         2045  
(1 row)  

SESSION B(修改数据) :

postgres=# insert into snapshot values (2,'test');  
INSERT 0 1  

SESSION A(导出repeatable read事务状态) :

postgres=# SELECT pg_export_snapshot();  
 pg_export_snapshot   
--------------------  
 000007FD-1  
(1 row)  

SESISON C(导入repeatable read事务状态, 查看数据) :

postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# SET TRANSACTION SNAPSHOT '000007FD-1';  
SET  
postgres=# select * from snapshot ;  
 id | info   
----+------  
  1 | abc  
(1 row)  

2. read committed :

SESSION A(启动read committed事务) :

postgres=# drop table snapshot ;  
DROP TABLE  
postgres=# create table snapshot (id int, info text);  
CREATE TABLE  
postgres=# insert into snapshot values (1,'abc');  
INSERT 0 1  
postgres=# begin transaction isolation level read committed ;  
BEGIN  
postgres=# select txid_current();  
 txid_current   
--------------  
         2050  
(1 row)  

SESSION B(修改数据) :

postgres=# insert into snapshot values (2,'test');  
INSERT 0 1  

SESSION A(导出read committed事务状态) :

postgres=# SELECT pg_export_snapshot();                        
 pg_export_snapshot   
--------------------  
 00000802-1  
(1 row)  

SESSION C(导入read committed事务状态, 查看数据) :

postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# SET TRANSACTION SNAPSHOT '00000802-1';  
SET  
postgres=# select * from snapshot ;  
 id | info   
----+------  
  1 | abc  
  2 | test  
(2 rows)  

三、并行导出一致性数据.

1. 单节点导出事务状态

2. 多节点导入事务状态, 同时copy数据.

测试 :

测试数据 :

postgres=# create table a(id int primary key, info text, crt_time timestamp);  
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "a_pkey" for table "a"  
CREATE TABLE  
postgres=# create table b(id int primary key, info text, crt_time timestamp);  
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "b_pkey" for table "b"  
CREATE TABLE  
postgres=# insert into a select generate_series(1,1000000), md5(clock_timestamp()::text), clock_timestamp();  
INSERT 0 1000000  
postgres=# insert into b select generate_series(1,1000000), md5(clock_timestamp()::text), clock_timestamp();  
INSERT 0 1000000  

导出事务状态 :

SESSION A :

postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# SELECT pg_export_snapshot();  
 pg_export_snapshot   
--------------------  
 0000080E-1  
(1 row)  

并行导出数据 :

SESSION B :

postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# SET TRANSACTION SNAPSHOT '0000080E-1';  
SET  
postgres=# copy a to '/home/ocz/a.csv' with csv;  
COPY 1000000  

SESSION C :

postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# SET TRANSACTION SNAPSHOT '0000080E-1';  
SET  
postgres=# copy b to '/home/ocz/b.csv' with csv;  
COPY 1000000  

使用以上方法的并行导出用到的shell脚本可以参考本文末尾.

PostgreSQL 9.3可能会将并行导出的功能添加到pg_dump工具中. 如下 :

https://commitfest.postgresql.org/action/patch_view?id=785

http://www.postgresql.org/message-id/flat/CACw0+10wAM_L6iC8KKL5eGzUtVjSpum1MEhaggWx5kU4VYn-QA@mail.gmail.com#CACw0+10wAM_L6iC8KKL5eGzUtVjSpum1MEhaggWx5kU4VYn-QA@mail.gmail.com

四、并行查询与并行导出类似, 也可以借鉴snapshot的特性.

1. 单节点导出事务状态

2. 多节点导入事务状态, 同时按照一定的规则并行查询, 合并查询结果.

参考

1. http://www.postgresql.org/docs/9.2/static/sql-set-transaction.html

2. http://www.postgresql.org/docs/9.2/static/functions-admin.html#FUNCTIONS-SNAPSHOT-SYNCHRONIZATION

3. http://blog.163.com/digoal@126/blog/static/1638770402012416105232835/

4. http://blog.163.com/digoal@126/blog/static/163877040201241134721101/

5. src/backend/utils/time/snapmgr.c

6. src/bin/pg_dump/pg_dump.c

        /*  
         * Start transaction-snapshot mode transaction to dump consistent data.  
         */  
        ExecuteSqlStatement(fout, "BEGIN");  
        if (fout->remoteVersion >= 90100)  
        {  
                if (serializable_deferrable)  
                        ExecuteSqlStatement(fout,  
                                                                "SET TRANSACTION ISOLATION LEVEL "  
                                                                "SERIALIZABLE, READ ONLY, DEFERRABLE");  
                else  
                        ExecuteSqlStatement(fout,  
                                                                "SET TRANSACTION ISOLATION LEVEL "  
                                                                "REPEATABLE READ");  
        }  
        else  
                ExecuteSqlStatement(fout,  
                                                        "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");  

7. http://www.postgresql.org/message-id/flat/CACw0+10wAM_L6iC8KKL5eGzUtVjSpum1MEhaggWx5kU4VYn-QA@mail.gmail.com#CACw0+10wAM_L6iC8KKL5eGzUtVjSpum1MEhaggWx5kU4VYn-QA@mail.gmail.com

8. http://www.depesz.com/2013/03/05/parallel-dumping-of-databases/

9. 并行dump脚本可以参考depesz的进行修改 :

以下用于导出digoal库. PGHOST=sockdir

ocz@db-172-16-3-150-> cat dump.sh  
#!/usr/bin/env bash  
   
# configuration  
PATH=$PATH:$HOME/bin  
export PATH  
export LANG=en_US.utf8  
export PGHOME=/home/ocz/pgsql9.2.1  
export LD_LIBRARY_PATH=/opt/uuid-1.6.2/lib:$PGHOME/lib:/lib64:/usr/lib64:/usr/local/lib64:/lib:/usr/lib:/usr/local/lib  
export DATE=`date +"%Y%m%d%H%M"`  
export PATH=$PGHOME/bin:$PATH:.  
  
export PGDATA=/data05/ocz/pg_root  
export PGPORT=9201  
export PGUSER=postgres  
export PGHOST=$PGDATA  
export PGDATABASE=digoal  
  
# Temporary directory for small helper files  
tmp_dir="$( mktemp -d )"  
trap 'rm -rf "$tmp_dir"' EXIT  
   
# Run master psql  
exec 50> >( exec psql -qAtX )  
master_pid="$!"  
   
# Start transaction  
printf 'BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;\n' >&50  
   
# Get transaction snapshot id  
printf '\o %s/snapshot.id\n' "$tmp_dir" >&50  
printf 'SELECT pg_export_snapshot();\n' >&50  
   
# Get list of tables to dump  
printf '\o %s/tables.list\n' "$tmp_dir" >&50  
printf 'SELECT c.oid::regclass FROM pg_class c join pg_namespace n on c.relnamespace = n.oid WHERE c.relkind = $$r$$ and n.nspname !~ $$^(pg_|information_schema)$$ ORDER BY pg_table_size(c.oid) desc;\n' >&50  
   
# Create file that marks that all is done in master  
printf '\o %s/marker.file\n' "$tmp_dir" >&50  
printf 'SELECT 1;\n' >&50  
printf '\o\n' >&50  
   
# Wait for marker file to appear  
while true  
do  
    if [[ -s "$tmp_dir/marker.file" ]]  
    then  
        break  
    fi  
    sleep 0.1  
done  
   
# Get snapshot id to variable  
snapshot_id="$( < "$tmp_dir/snapshot.id" )"  
   
# Dump each table separately  
while read table_name  
do  
    printf "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;\nSET TRANSACTION SNAPSHOT '%s';\nCOPY %s TO STDOUT;\n" "$snapshot_id" "$table_name" | psql -qAtX > "$table_name.dump" &  
done < "$tmp_dir/tables.list"  
   
# wait for all dumps to finish  
wait  
   
echo "All done."       

Flag Counter

digoal’s 大量PostgreSQL文章入口