PostgreSQL 9.5 new feature - pg_dump use –snapshot taken by another session (exp. slot, pg_export_snapshot())

3 minute read

背景

PostgreSQL 9.5 的逻辑流复制的功能越来越完善了,针对逻辑流复制,对其他工具也提出了一定的要求,例如我们在使用流复制协议接口(非SQL接口)创建一个逻辑流复制slot的同时,会自动导出创建SLOT时的snapshot,有了这个SNAPSHOT ID,我们才能够将基础数据弄出来,加上从WAL decode出来的信息,从而实现逻辑复制。

pg_dump是一个数据备份工具,目前加入了对snapshot的支持,目的非常明显,主要是配合逻辑复制使用的。当然也可以配合其他复制工具使用,需要导出snapshot。

Allow pg_dump to share a snapshot taken by another session using –snapshot (Simon Riggs, Michael Paquier)

The remote snapshot must have been exported by pg_export_snapshot() or been defined when
creating a logical replication slot.

This can be used by parallel pg_dump to use a consistent snapshot across pg_dump processes.

测试:

postgres=# begin transaction isolation level repeatable read;  
BEGIN  
postgres=# select pg_export_snapshot();  
 pg_export_snapshot   
--------------------  
 0000072C-1  
(1 row)  

先不要断开这个事务。等备份启动后再关闭即可(不需要等待备份结束)。

pg95@db-172-16-3-150-> pg_dump --snapshot=0000072C-1  

使用这个SNAPSHOT导出。

对于逻辑复制,我们需要使用逻辑流复制协议创建slot,然后开启备份。

例子:

vi pg_hba.conf  
# replication privilege.  
local   replication     postgres                                trust  
host    replication     postgres        127.0.0.1/32            trust  
host    replication     postgres        ::1/128                 trust  
pg_ctl reload  

在数据库端使用test_encoding记录逻辑变更到WAL中。

使用流复制协议连接数据库。

pg95@db-172-16-3-150-> psql 'hostaddr=127.0.0.1 port=1922 user=postgres dbname=postgres replication=database'   
psql (9.5devel)  
Type "help" for help.  
postgres=# CREATE_REPLICATION_SLOT ab12 LOGICAL "/opt/pgsql9.5/lib/test_decoding.so";  
 slot_name | consistent_point | snapshot_name |           output_plugin              
-----------+------------------+---------------+------------------------------------  
 ab12      | 7/77B59A00       | 00000736-1    | /opt/pgsql9.5/lib/test_decoding.so  
(1 row)  

使用这个SNAPSHOT导出。

pg95@db-172-16-3-150-> pg_dump --snapshot=00000736-1  

另外我们可以使用pg_recvlogical从这个slot开始接收逻辑变更:

pg_recvlogical -S ab12 -d postgres -v  -f - --start  
BEGIN 1849  
table public.t1: INSERT: id[integer]:1 c1[text]:null c2[integer]:null c3[timestamp without time zone]:null pk[bigint]:12  
COMMIT 1849  

有了dump(基础备份)+逻辑变更(SQL)+exec sql模块,就可以完成基于SQL的逻辑复制。

使用SQL函数也可以消费slot的变更。

-[ RECORD 9 ]+-----------------------------------  
slot_name    | ab12  
plugin       | /opt/pgsql9.5/lib/test_decoding.so  
slot_type    | logical  
datoid       | 13181  
database     | postgres  
active       | f  
active_pid   |   
xmin         |   
catalog_xmin | 1850  
restart_lsn  | 7/77B5A3B0  
postgres=# select * from pg_replication_slots ;  
postgres=# insert into t1 values (1);  
INSERT 0 1  
postgres=# SELECT * FROM pg_logical_slot_get_changes('ab12', NULL, NULL, 'include-timestamp', '1', 'include-xids', '1');  
  location  | xid  |                                                           data                                                   
            
------------+------+--------------------------------------------------------------------------------  
 7/77B5ABB8 | 1852 | BEGIN 1852  
 7/77B5ABB8 | 1852 | table public.t1: INSERT: id[integer]:1 c1[text]:null c2[integer]:null c3[timestamp without time zone]:null pk[bigint]:15  
 7/77B5AD98 | 1852 | COMMIT 1852 (at 2000-01-01 08:00:00+08)  
(3 rows)  

但是请注意,slot中的信息只会消费一次,所以一个slot对应一个消费者,如果有多个消费者,请使用多个slot。除非你的应用适合多个消费者使用一个SLOT.

其他

1. PostgreSQL 9.5 新增了一个参数log_replication_commands = on,打开的话在日志中会记录流复制协议的命令。

例如:

2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,1,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"re  
ceived replication command: IDENTIFY_SYSTEM",,,,,,,,,"pg_recvlogical"  
2015-06-16 16:36:34.761 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,2,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"received replication command: CREATE_REPLICATION_SLOT ""ab1"" LOGICAL ""test_decoding""",,,,,,,,,"pg_recvlogical"  
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,3,"idle",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"  
2015-06-16 16:36:34.793 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,4,"idle",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"exported logical decoding snapshot: ""00000732-1"" with 0 transaction IDs",,,,,,,,,"pg_recvlogical"  
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,5,"idle in transaction",2015-06-16 16:36:34 CST,2/5,1842,LOG,00000,"received replication command: START_REPLICATION SLOT ""ab1"" LOGICAL 7/77B58F50",,,,,,,,,"pg_recvlogical"  
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,6,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"starting logical decoding for slot ""ab1""","streaming transactions committing after 7/77B58F50, reading WAL from 7/77B58F18",,,,,,,,"pg_recvlogical"  
2015-06-16 16:36:34.804 CST,"postgres","postgres",18389,"[local]",557fe012.47d5,7,"idle in transaction",2015-06-16 16:36:34 CST,2/0,0,LOG,00000,"logical decoding found consistent point at 7/77B58F18","There are no running transactions.",,,,,,,,"pg_recvlogical"  

2. 流复制协议详见

http://www.postgresql.org/docs/devel/static/protocol-replication.html

参考

1. 《PostgreSQL 事务快照功能 - Parallel Export consistent data or Parallel Query use snapshot transaction feature》

2. http://www.postgresql.org/docs/devel/static/protocol-replication.html

3. http://www.postgresql.org/docs/devel/static/test-decoding.html

4. http://www.postgresql.org/docs/devel/static/protocol-replication.html

5. test_decoding支持的options, contrib/test_decoding/test_decoding.c

include-xids  
include-timestamp  
force-binary  
skip-empty-xacts  
only-local  

如下:

	foreach(option, ctx->output_plugin_options)  
        {  
                DefElem    *elem = lfirst(option);  
  
                Assert(elem->arg == NULL || IsA(elem->arg, String));  
  
                if (strcmp(elem->defname, "include-xids") == 0)  
                {  
                        /* if option does not provide a value, it means its value is true */  
                        if (elem->arg == NULL)  
                                data->include_xids = true;  
                        else if (!parse_bool(strVal(elem->arg), &data->include_xids))  
                                ereport(ERROR,  
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",  
                                                 strVal(elem->arg), elem->defname)));  
                }  
                else if (strcmp(elem->defname, "include-timestamp") == 0)  
                {  
                        if (elem->arg == NULL)  
                                data->include_timestamp = true;  
                        else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))  
                                ereport(ERROR,  
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",  
                                                 strVal(elem->arg), elem->defname)));  
                }  
                else if (strcmp(elem->defname, "force-binary") == 0)  
                {  
                        bool            force_binary;  
  
                        if (elem->arg == NULL)  
                                continue;  
                        else if (!parse_bool(strVal(elem->arg), &force_binary))  
                                ereport(ERROR,  
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",  
                                                 strVal(elem->arg), elem->defname)));  
                        if (force_binary)  
                                opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;  
                }  
                else if (strcmp(elem->defname, "skip-empty-xacts") == 0)  
                {  
  
                        if (elem->arg == NULL)  
                                data->skip_empty_xacts = true;  
                        else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))  
                                ereport(ERROR,  
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",  
                                                 strVal(elem->arg), elem->defname)));  
                }  
                else if (strcmp(elem->defname, "only-local") == 0)  
                {  
  
                        if (elem->arg == NULL)  
                                data->only_local = true;  
                        else if (!parse_bool(strVal(elem->arg), &data->only_local))  
                                ereport(ERROR,  
                                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
                                  errmsg("could not parse value \"%s\" for parameter \"%s\"",  
                                                 strVal(elem->arg), elem->defname)));  
                }  
                else  
                {  
                        ereport(ERROR,  
                                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),  
                                         errmsg("option \"%s\" = \"%s\" is unknown",  
                                                        elem->defname,  
                                                        elem->arg ? strVal(elem->arg) : "(null)")));  
                }  

6. src/backend/replication/walsender.c

创建逻辑slot时,自动创建snapshot。

/*  
 * Create a new replication slot.  
 */  
static void  
CreateReplicationSlot(CreateReplicationSlotCmd *cmd)  
{  
......  
        if (cmd->kind == REPLICATION_KIND_LOGICAL)  
        {  
......  
                /*  
                 * Export a plain (not of the snapbuild.c type) snapshot to the user  
                 * that can be imported into another session.  
                 */  
                snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);  
  
......  

注意在执行replication命令时,会先释放snapshot,因此务必在这之前将这个snapshot 给pg_dump导入。

/*  
 * Execute an incoming replication command.  
 */  
void  
exec_replication_command(const char *cmd_string)  
{  
......  
        /*  
         * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next  
         * command arrives. Clean up the old stuff if there's anything.  
         */  
        SnapBuildClearExportedSnapshot();  
......  

Flag Counter

digoal’s 大量PostgreSQL文章入口