PostgreSQL 事务快照功能 - Parallel Export consistent data or Parallel Query use snapshot transaction feature
背景
PostgreSQL 9.2 的一个新特性支持事务状态的导出和导入, 以前写过一篇文章测试这项特性,
http://blog.163.com/digoal@126/blog/static/1638770402012416105232835/
本文主要是针对这项特性的一个延展, 利用这个特性可以让PostgreSQL 支持并行的一致性数据导出, 或者并行查询.
首先测试两个事务导出和导入的场景, 从测试结果可以看到多个事务导入同一个事务状态后看到了一致的数据.
同时如果导出事务状态的事务提前结束, 状态可以保持, 不会影响导入这个状态的事务的一致性.
一、导出repeatable read 事务状态.
SESSION A(导出repeatable read事务状态) :
postgres=# drop table snapshot ;
DROP TABLE
postgres=# create table snapshot (id int, info text);
CREATE TABLE
postgres=# insert into snapshot values (1,'abc');
INSERT 0 1
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# SELECT pg_export_snapshot();
pg_export_snapshot
--------------------
000007E9-1
(1 row)
SESSION B(修改数据,自动提交) :
postgres=# insert into snapshot values (2,'test');
INSERT 0 1
SESSION C(导入repeatable read事务状态, 查看数据) :
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# SET TRANSACTION SNAPSHOT '000007E9-1';
SET
postgres=# select * from snapshot ;
id | info
----+------
1 | abc
(1 row)
SESSION B(修改数据,自动提交) :
postgres=# insert into snapshot values (3,'test');
INSERT 0 1
SESSION D(导入repeatable read事务状态, 查看数据) :
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# SET TRANSACTION SNAPSHOT '000007E9-1';
SET
postgres=# select * from snapshot ;
id | info
----+------
1 | abc
(1 row)
SESSION A(结束事务, 不会影响已导入事务状态的事务的一致性) :
postgres=# end;
COMMIT
SESSION B(修改数据,自动提交) :
postgres=# insert into snapshot values (4,'test');
INSERT 0 1
SESSION C(查询数据) :
postgres=# select * from snapshot ;
id | info
----+------
1 | abc
(1 row)
SESSION D(查询数据) :
postgres=# select * from snapshot ;
id | info
----+------
1 | abc
(1 row)
二、导出read committed事务状态.
SESSION A(导出read committed事务状态) :
postgres=# drop table snapshot ;
DROP TABLE
postgres=# create table snapshot (id int, info text);
CREATE TABLE
postgres=# insert into snapshot values (1,'abc');
INSERT 0 1
postgres=# begin transaction isolation level read committed;
BEGIN
postgres=# SELECT pg_export_snapshot();
pg_export_snapshot
--------------------
000007F0-1
(1 row)
SESSION B(修改数据,自动提交) :
postgres=# insert into snapshot values (2,'test');
INSERT 0 1
SESSION C(导入read committed事务状态, 查看数据) :
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# SET TRANSACTION SNAPSHOT '000007F0-1';
SET
postgres=# select * from snapshot ;
id | info
----+------
1 | abc
(1 row)
SESSION B(修改数据,自动提交) :
postgres=# insert into snapshot values (3,'test');
INSERT 0 1
SESSION D(导入read committed事务状态, 查看数据) :
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# SET TRANSACTION SNAPSHOT '000007F0-1';
SET
postgres=# select * from snapshot ;
id | info
----+------
1 | abc
(1 row)
SESSION A(结束事务, 不会影响已导入事务状态的事务的一致性) :
postgres=# end;
COMMIT
SESSION B(修改数据,自动提交) :
postgres=# insert into snapshot values (4,'test');
INSERT 0 1
SESSION C(查询数据) :
postgres=# select * from snapshot ;
id | info
----+------
1 | abc
(1 row)
SESSION D(查询数据) :
postgres=# select * from snapshot ;
id | info
----+------
1 | abc
(1 row)
从上面的测试可用看出, 导出事务状态的事务不管是read committed还是repeatable read的, 不影响后续事务导入的结果.
但是需要注意, 以下场景则存在差异, 这也是read committed和repeatable read事务的特性差异, 并不会影响后续多节点导入这个事务状态后的数据一致性观测.
注意pg_export_snapshot()会获得一个事务号.
而begin; 并不会马上获得事务号, 需要在第一条变更数据的语句开启前获得. 或者使用txid_current()直接获得.
1. repeatable read :
SESSION A(启动repeatable read事务) :
postgres=# drop table snapshot ;
DROP TABLE
postgres=# create table snapshot (id int, info text);
CREATE TABLE
postgres=# insert into snapshot values (1,'abc');
INSERT 0 1
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# select txid_current();
txid_current
--------------
2045
(1 row)
SESSION B(修改数据) :
postgres=# insert into snapshot values (2,'test');
INSERT 0 1
SESSION A(导出repeatable read事务状态) :
postgres=# SELECT pg_export_snapshot();
pg_export_snapshot
--------------------
000007FD-1
(1 row)
SESISON C(导入repeatable read事务状态, 查看数据) :
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# SET TRANSACTION SNAPSHOT '000007FD-1';
SET
postgres=# select * from snapshot ;
id | info
----+------
1 | abc
(1 row)
2. read committed :
SESSION A(启动read committed事务) :
postgres=# drop table snapshot ;
DROP TABLE
postgres=# create table snapshot (id int, info text);
CREATE TABLE
postgres=# insert into snapshot values (1,'abc');
INSERT 0 1
postgres=# begin transaction isolation level read committed ;
BEGIN
postgres=# select txid_current();
txid_current
--------------
2050
(1 row)
SESSION B(修改数据) :
postgres=# insert into snapshot values (2,'test');
INSERT 0 1
SESSION A(导出read committed事务状态) :
postgres=# SELECT pg_export_snapshot();
pg_export_snapshot
--------------------
00000802-1
(1 row)
SESSION C(导入read committed事务状态, 查看数据) :
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# SET TRANSACTION SNAPSHOT '00000802-1';
SET
postgres=# select * from snapshot ;
id | info
----+------
1 | abc
2 | test
(2 rows)
三、并行导出一致性数据.
1. 单节点导出事务状态
2. 多节点导入事务状态, 同时copy数据.
测试 :
测试数据 :
postgres=# create table a(id int primary key, info text, crt_time timestamp);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "a_pkey" for table "a"
CREATE TABLE
postgres=# create table b(id int primary key, info text, crt_time timestamp);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "b_pkey" for table "b"
CREATE TABLE
postgres=# insert into a select generate_series(1,1000000), md5(clock_timestamp()::text), clock_timestamp();
INSERT 0 1000000
postgres=# insert into b select generate_series(1,1000000), md5(clock_timestamp()::text), clock_timestamp();
INSERT 0 1000000
导出事务状态 :
SESSION A :
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# SELECT pg_export_snapshot();
pg_export_snapshot
--------------------
0000080E-1
(1 row)
并行导出数据 :
SESSION B :
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# SET TRANSACTION SNAPSHOT '0000080E-1';
SET
postgres=# copy a to '/home/ocz/a.csv' with csv;
COPY 1000000
SESSION C :
postgres=# begin transaction isolation level repeatable read;
BEGIN
postgres=# SET TRANSACTION SNAPSHOT '0000080E-1';
SET
postgres=# copy b to '/home/ocz/b.csv' with csv;
COPY 1000000
使用以上方法的并行导出用到的shell脚本可以参考本文末尾.
PostgreSQL 9.3可能会将并行导出的功能添加到pg_dump工具中. 如下 :
https://commitfest.postgresql.org/action/patch_view?id=785
http://www.postgresql.org/message-id/flat/CACw0+10wAM_L6iC8KKL5eGzUtVjSpum1MEhaggWx5kU4VYn-QA@mail.gmail.com#CACw0+10wAM_L6iC8KKL5eGzUtVjSpum1MEhaggWx5kU4VYn-QA@mail.gmail.com
四、并行查询与并行导出类似, 也可以借鉴snapshot的特性.
1. 单节点导出事务状态
2. 多节点导入事务状态, 同时按照一定的规则并行查询, 合并查询结果.
参考
1. http://www.postgresql.org/docs/9.2/static/sql-set-transaction.html
2. http://www.postgresql.org/docs/9.2/static/functions-admin.html#FUNCTIONS-SNAPSHOT-SYNCHRONIZATION
3. http://blog.163.com/digoal@126/blog/static/1638770402012416105232835/
4. http://blog.163.com/digoal@126/blog/static/163877040201241134721101/
5. src/backend/utils/time/snapmgr.c
6. src/bin/pg_dump/pg_dump.c
/*
* Start transaction-snapshot mode transaction to dump consistent data.
*/
ExecuteSqlStatement(fout, "BEGIN");
if (fout->remoteVersion >= 90100)
{
if (serializable_deferrable)
ExecuteSqlStatement(fout,
"SET TRANSACTION ISOLATION LEVEL "
"SERIALIZABLE, READ ONLY, DEFERRABLE");
else
ExecuteSqlStatement(fout,
"SET TRANSACTION ISOLATION LEVEL "
"REPEATABLE READ");
}
else
ExecuteSqlStatement(fout,
"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
7. http://www.postgresql.org/message-id/flat/CACw0+10wAM_L6iC8KKL5eGzUtVjSpum1MEhaggWx5kU4VYn-QA@mail.gmail.com#CACw0+10wAM_L6iC8KKL5eGzUtVjSpum1MEhaggWx5kU4VYn-QA@mail.gmail.com
8. http://www.depesz.com/2013/03/05/parallel-dumping-of-databases/
9. 并行dump脚本可以参考depesz的进行修改 :
以下用于导出digoal库. PGHOST=sockdir
ocz@db-172-16-3-150-> cat dump.sh
#!/usr/bin/env bash
# configuration
PATH=$PATH:$HOME/bin
export PATH
export LANG=en_US.utf8
export PGHOME=/home/ocz/pgsql9.2.1
export LD_LIBRARY_PATH=/opt/uuid-1.6.2/lib:$PGHOME/lib:/lib64:/usr/lib64:/usr/local/lib64:/lib:/usr/lib:/usr/local/lib
export DATE=`date +"%Y%m%d%H%M"`
export PATH=$PGHOME/bin:$PATH:.
export PGDATA=/data05/ocz/pg_root
export PGPORT=9201
export PGUSER=postgres
export PGHOST=$PGDATA
export PGDATABASE=digoal
# Temporary directory for small helper files
tmp_dir="$( mktemp -d )"
trap 'rm -rf "$tmp_dir"' EXIT
# Run master psql
exec 50> >( exec psql -qAtX )
master_pid="$!"
# Start transaction
printf 'BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;\n' >&50
# Get transaction snapshot id
printf '\o %s/snapshot.id\n' "$tmp_dir" >&50
printf 'SELECT pg_export_snapshot();\n' >&50
# Get list of tables to dump
printf '\o %s/tables.list\n' "$tmp_dir" >&50
printf 'SELECT c.oid::regclass FROM pg_class c join pg_namespace n on c.relnamespace = n.oid WHERE c.relkind = $$r$$ and n.nspname !~ $$^(pg_|information_schema)$$ ORDER BY pg_table_size(c.oid) desc;\n' >&50
# Create file that marks that all is done in master
printf '\o %s/marker.file\n' "$tmp_dir" >&50
printf 'SELECT 1;\n' >&50
printf '\o\n' >&50
# Wait for marker file to appear
while true
do
if [[ -s "$tmp_dir/marker.file" ]]
then
break
fi
sleep 0.1
done
# Get snapshot id to variable
snapshot_id="$( < "$tmp_dir/snapshot.id" )"
# Dump each table separately
while read table_name
do
printf "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;\nSET TRANSACTION SNAPSHOT '%s';\nCOPY %s TO STDOUT;\n" "$snapshot_id" "$table_name" | psql -qAtX > "$table_name.dump" &
done < "$tmp_dir/tables.list"
# wait for all dumps to finish
wait
echo "All done."