表级复制(base on trigger) – PostgreSQL general sync and async multi-master replication trigger function

本文在9.2下面测试, 较老版本可能系统表的字段名不一样, 需要修改触发器函数相关部分, 如pg_stat_activity的pid, 老版本是procpid.

数据同步是一个比较老的话题了, 在PostgreSQL中有比较多的同步手段, 例如流复制, slony, londiste, pgpool等.

不过这些都需要诸多的配置, 流复制的话更是需要将所有的物理数据都复制过去. 同时也不能实现双向同步(multi-master).

如果只想同步少量的数据(改动量不太大), 例如单表. 或者单表中的某些列, 甚至单表中某些符合条件的行同步到其他一个或多个节点.

甚至是需要双向可写的复制(oracle 物化视图无法实现). 这些简单的使用以上工具是无法实现的.

下面使用触发器和dblink来举个例子, 实现以上的功能, 同时是实时的同步(但是必须注意到使用触发器带来的开销, 所以不是所有场景都适用).

(PS: 使用postgres_fdw + 触发器 可以达到同样的目的,支持同步模式)





但是对于有许多表的环境, 写触发器函数也是很费神的事情.

本文以PostgreSQL 9.2为例, 介绍如何创建通用的用于多主复制的触发器函数.

所谓通用, 就是1个触发器函数搞定所有的表复制, 而不需要为每个表创建触发器函数. 方便DBA管理数据库复制.

测试环境 :

创建两个用户 :

postgres=# create role local login encrypted password 'LOCAL321';  
postgres=# create role remote login encrypted password 'REMOTE321';  

创建两个数据库 :

postgres=# create database local owner local;  
postgres=# create database remote owner remote;  

创建dblink模块 :

postgres=# \c local postgres  
You are now connected to database "local" as user "postgres".  
local=# create extension dblink;  
local=# \c remote postgres  
You are now connected to database "remote" as user "postgres".  
remote=# create extension dblink;  

创建schema :

local=> \c local local  
local=> create schema local;  
local=> \c remote remote  
You are now connected to database "remote" as user "remote".  
remote=> create schema remote;  

在local库创建测试表 :

本例将使用联合索引 :

remote=> \c local local  
You are now connected to database "local" as user "local".  
local=> create table loc_test (pk1 int, pk2 text, info text, crt_time timestamp(0), mod_time timestamp(0), primary key(pk1,pk2));  
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "loc_test_pkey" for table "loc_test"  
local=> select string_agg(position::text,' ')::int2vector,count(*) from dblink_get_pkey('local.loc_test');  
 string_agg | count   
 1 2        |     2  
(1 row)  

在remote库创建测试表 :

postgres=# \c remote remote  
You are now connected to database "remote" as user "remote".  
remote=> create table rmt_test (pk1 int, pk2 text, info text, crt_time timestamp(0), mod_time timestamp(0), primary key(pk1,pk2));  
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "rmt_test_pkey" for table "rmt_test"  
remote=> select string_agg(position::text,' ')::int2vector,count(*) from dblink_get_pkey('remote.rmt_test');  
 string_agg | count   
 1 2        |     2  
(1 row)  

在local库回收pg_user_mappings的public权限 :

local=> \c local postgres  
You are now connected to database "local" as user "postgres".  
local=# revoke all on pg_user_mappings from public;  

在remote库回收pg_user_mappings的public权限 :

local=# \c remote postgres  
You are now connected to database "remote" as user "postgres".  
remote=# revoke all on pg_user_mappings from public;  

在local库创建server :

postgres=# \c local postgres  
You are now connected to database "local" as user "postgres".  
local=# CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;  
local=# CREATE SERVER dst FOREIGN DATA WRAPPER postgresql OPTIONS (hostaddr '', port '9999', dbname 'remote', options '-c tcp_keepalives_idle=60s -c tcp_keepalives_interval=10 -c tcp_keepalives_count=6 -c application_name=aaa_bbb_digoal');  
local=# CREATE USER MAPPING FOR local SERVER dst OPTIONS (user 'remote', password 'REMOTE321');  

在remote库创建server :

local=# \c remote postgres  
You are now connected to database "remote" as user "postgres".  
remote=# CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;  
remote=# CREATE SERVER dst FOREIGN DATA WRAPPER postgresql OPTIONS (hostaddr '', port '9999', dbname 'local', options '-c tcp_keepalives_idle=60s -c tcp_keepalives_interval=10 -c tcp_keepalives_count=6 -c application_name=aaa_bbb_digoal');  
remote=# GRANT USAGE ON FOREIGN SERVER dst TO remote;  
remote=# CREATE USER MAPPING FOR remote SERVER dst OPTIONS (user 'local', password 'LOCAL321');  

在local和remote库创建异步复制错误SQL记录表 :

create table sync_err_rec(id serial8 primary key, nsp_name name, table_name name, dst_server text, dst_query text, create_time timestamp without time zone);  
NOTICE:  CREATE TABLE will create implicit sequence "sync_err_rec_id_seq" for serial column "sync_err_rec.id"  
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "sync_err_rec_pkey" for table "sync_err_rec"  

在local和remote库创建异步复制错误SQL处理函数 :

create or replace function deal_sync_err_rec (i_limit int) returns boolean as $$  
  v_conn_name text;  -- 连接名  
  v_conn_status text;  -- 存储dblink_connect(v_conn_name, v_dst_server)的返回值  
  v_exec_status text;  -- 存储dblink_exec(v_conn_name, v_dst_query, true|false)的返回值.  
  v_dst_server text;  -- foreign server, 一次取一个. 根据这个dst_server再抽取错误的同步记录, 进行处理.  
  v_dst_query text;  -- sync_err_rec中记录的SQL语句  
  v_id int8[];  -- sync_err_rec的主键, 用于记录一批记录, BATCH删除.  
  -- 如果是standby数据库则直接退出  
  if pg_is_in_recovery() then  
    raise notice 'this is standby';  
    return false;  
  end if;  
  -- 取出最早的记录的dst_server, 加上RowExclusiveLock锁, 接下来将处理这个dst_server发生的错误.  
  select dst_server into v_dst_server from sync_err_rec order by create_time limit 1 for update;  
  -- 空表示没有记录直接返回  
  if (v_dst_server is NULL) then  
    return true;  
  end if;  
  -- 将v_dst_server的值赋予给连接名  
  v_conn_name := v_dst_server;  
  -- 取出一批记录  
  select array_agg(id), string_agg(dst_query, ';') into v_id, v_dst_query from   
    (select id,dst_query from sync_err_rec where dst_server=v_dst_server order by create_time limit i_limit) t;  
  -- 删除sync_err_rec中对应的记录.  
  delete from sync_err_rec where id in (select unnest(v_id));  
  -- 建立dblink连接  
  if ( dblink_get_connections() @> ('{'||v_conn_name||'}')::text[] ) then   
    select * into v_conn_status from dblink_connect(v_conn_name, v_dst_server);  
  end if;  
  -- 这里dblink_exec使用的是true参数, 远程执行异常则回滚.  
  select * into v_exec_status from dblink_exec(v_conn_name, v_dst_query, true);  
  -- debug  
  -- raise notice 'v_conn_status:%, v_exec_status:%.', v_conn_status, v_exec_status;  
  return true;  
$$ language plpgsql;  

在local和remote库创建触发器函数 :

RETURNS trigger  
LANGUAGE plpgsql  
  v_dst_server text;  -- foreign server  
  v_conn_name text;  -- 连接名, 本例配置与dst_server一致.  
  v_conn_status text;  -- 存储dblink_connect(v_conn_name, v_dst_server)的返回值  
  v_nsp_name name := TG_TABLE_SCHEMA;  -- 触发器变量, 触发这个触发器的表所在的schema  
  v_table_name name := TG_TABLE_NAME;  -- 触发器变量, 触发这个触发器的表名  
  v_dst_nsp_name name;  -- 目标schema, 当需要复制到其他schema下时, 这里改成其schema名即可  
  v_dst_table_name name;  -- 目标表名  
  v_query text;  -- 使用dblink_build_sql_insert, dblink_build_sql_update, dblink_build_sql_delete得到的SQL, 用于调用dblink_exec远程执行.  
  v_query_upd1 text;  -- update需要用到delete和insert  
  v_query_upd2 text;  -- update需要用到delete和insert  
  v_dst_query text;  -- v_query修改后的语句, 主要就是修改目标schema和目标表名  
  v_dst_query_upd1 text;  -- update需要用到delete和insert  
  v_dst_query_upd2 text;  -- update需要用到delete和insert  
  v_pk_vector int2vector;  -- dblink_build_sql_insert, dblink_build_sql_update, dblink_build_sql_delete要用到的被复制的表的PK列s的逻辑位置  
  v_pk_cnt int;  -- dblink_build_sql_insert, dblink_build_sql_update, dblink_build_sql_delete要用到的被复制的表的PK列个数  
  v_pk_attname_array name[];  -- pk名称数组  
  v_pk_att_vals_array text[];  -- pk值  
  tmp_v_pk_att_vals_array text[]; -- 临时pk值  
  v_exec_status text;  -- 存储dblink_exec(v_conn_name, v_dst_query, true|false)的返回值.  
  -- multi master replication 需要以下参数分辨更新来源, 用于防止死循环触发.  
  v_application_name_cli text;  
  v_application_name_check text;  
  v_pg_backend_pid int;  
  v_replica_mode text;  -- 复制模式, 同步复制还是异步复制.  
  i record;  
  y int;  
  -- 触发器说明详见 http://blog.163.com/digoal@126/blog/static/163877040201321125220134/  
  -- 触发器参数传入格式(v_dst_server, v_dst_nsp_name, v_dst_table_name, v_application_name_check)  
  -- 配置foreign server, 远程schema, 远程表名, application_name  
  v_dst_server := TG_ARGV[0];  -- foreign server  
  v_conn_name := v_dst_server;  -- 连接名, 本例配置与dst_server一致.  
  v_dst_nsp_name := TG_ARGV[1];  -- 目标schema, 当需要复制到其他schema下时, 这里改成其schema名即可  
  v_dst_table_name := TG_ARGV[2];  -- 目标表名  
  v_application_name_check := TG_ARGV[3];  -- application_name, 用于防止死循环触发.  
  v_replica_mode := TG_ARGV[4];  -- 同步模式, 同步复制还是异步复制.  
  -- 获取会话的postgres pid  
  select pg_backend_pid() into v_pg_backend_pid;  
  -- 根据这个PID获取application_name, 这个就可以用来区分是不是复制程序连上来的会话.9.1(含9.1)以前的版本pg_stat_activity.procpid.  
  select application_name into v_application_name_cli from pg_stat_activity where pid=v_pg_backend_pid;  
  -- 如果是复制程序连上来的会话, 直接返回null, 否则会出现死循环触发.  
  if (v_application_name_cli = v_application_name_check ) then  
    return null;  
  end if;  
  -- 主键信息(注意位置vector数据9.0及以上版本是输出的逻辑顺序,删除列后将改变逻辑顺序; 9.0以前的版本是pg_attribute.attnum;)  
  -- v_pk_vector和v_pk_cnt, v_pk_attname_array通过以下SQL得到.  
  -- v_pk_cnt信息获取.  
  select array_length(conkey, 1) into v_pk_cnt from pg_constraint where conrelid=TG_RELID and contype='p';  
  -- v_pk_vector, v_pk_attname_array 信息获取.  
  if substring(version(),12,1) = '9' then  
    select string_agg(rn::text, ' ')::int2vector, array_agg(attname) into v_pk_vector, v_pk_attname_array from  
      (select row_number() over() AS rn, attnum, attname from pg_attribute where attnum>0 and not attisdropped and attrelid=TG_RELID) AS t        
      where attnum in                                                                                 
      (select unnest(conkey) from pg_constraint where conrelid=TG_RELID and contype='p');  
  elsif substring(version(),12,1) = '8' then  
    select string_agg(attnum::text, ' ')::int2vector, array_agg(attname) into v_pk_vector, v_pk_attname_array from pg_attribute   
      where attrelid=TG_RELID  
      and attnum in  
      (select unnest(conkey) from pg_constraint where conrelid=TG_RELID and contype='p');  
    -- 其他版本返回异常  
    raise exception 'PostgreSQL Version not 8 or 9.';  
  end if;  
  y := 0;  
  case TG_OP  
  when 'INSERT' then  
    -- 生成远程要执行的insert语句, id是这个表的主键. 如果是多列值的则需要得到按v_pk_vector顺序的text[], 考虑到兼容性不使用array的foreach循环  
    for i in select * from unnest(v_pk_attname_array) AS t(attname) loop  
      if y = 0 then  
        execute 'select array[('||quote_literal(NEW)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||']::text[]' into v_pk_att_vals_array;  
        tmp_v_pk_att_vals_array := v_pk_att_vals_array;  
	execute 'select array_append($1, ('||quote_literal(NEW)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||'::text)' into v_pk_att_vals_array using tmp_v_pk_att_vals_array;  
      end if;  
      y := y+1;  
    end loop;  
    select * into v_query from dblink_build_sql_insert(v_nsp_name||'.'||v_table_name, v_pk_vector, v_pk_cnt, v_pk_att_vals_array, v_pk_att_vals_array);  
  when 'DELETE' then  
    -- 生成远程要执行的delete语句, id是这个表的主键. 如果是多列值的则需要得到按v_pk_vector顺序的text[]  
    for i in select * from unnest(v_pk_attname_array) AS t(attname) loop  
      if y = 0 then  
        execute 'select array[('||quote_literal(OLD)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||']::text[]' into v_pk_att_vals_array;  
        tmp_v_pk_att_vals_array := v_pk_att_vals_array;  
	execute 'select array_append($1, ('||quote_literal(OLD)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||'::text)' into v_pk_att_vals_array using tmp_v_pk_att_vals_array;  
      end if;  
      y := y+1;  
    end loop;  
    select * into v_query from dblink_build_sql_delete(v_nsp_name||'.'||v_table_name, v_pk_vector, v_pk_cnt, v_pk_att_vals_array);  
  when 'UPDATE' then  
    -- 生成远程要执行的update语句, id是这个表的主键. 如果是多列值的则需要得到按v_pk_vector顺序的text[]  
    -- 这里没有使用dblink_build_sql_update来生成update语句, 因为主键也可能被更新. 所以只能拆成两部分.  
    for i in select * from unnest(v_pk_attname_array) AS t(attname) loop  
      if y = 0 then  
        execute 'select array[('||quote_literal(OLD)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||']::text[]' into v_pk_att_vals_array;  
        tmp_v_pk_att_vals_array := v_pk_att_vals_array;  
	execute 'select array_append($1, ('||quote_literal(OLD)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||'::text)' into v_pk_att_vals_array using tmp_v_pk_att_vals_array;  
      end if;  
      y := y+1;  
    end loop;  
    -- debug  
    -- raise notice '%', v_pk_att_vals_array;  
    select * into v_query_upd1 from dblink_build_sql_delete(v_nsp_name||'.'||v_table_name, v_pk_vector, v_pk_cnt, v_pk_att_vals_array);  
    -- 初始化  
    v_pk_att_vals_array := null;  
    for i in select * from unnest(v_pk_attname_array) AS t(attname) loop  
      if y = 0 then  
        execute 'select array[('||quote_literal(NEW)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||']::text[]' into v_pk_att_vals_array;  
        tmp_v_pk_att_vals_array := v_pk_att_vals_array;  
	execute 'select array_append($1, ('||quote_literal(NEW)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||'::text)' into v_pk_att_vals_array using tmp_v_pk_att_vals_array;  
      end if;  
      y := y+1;  
    end loop;  
    -- debug  
    -- raise notice '%', v_pk_att_vals_array;  
    select * into v_query_upd2 from dblink_build_sql_insert(v_nsp_name||'.'||v_table_name, v_pk_vector, v_pk_cnt, v_pk_att_vals_array, v_pk_att_vals_array);  
  when 'TRUNCATE' then  
    -- 生成远程要执行的truncate语句. 注意这里是truncate table only. 如果是子表, 可以在子表上继续减这样的触发器.  
    v_query := 'truncate table only '||v_table_name;  
  end case;  
  -- 将目标schema和目标表名替换现有表名.  
  case TG_OP  
  when 'UPDATE' then  
    v_dst_query_upd1 := regexp_replace(v_query_upd1, v_table_name, v_dst_nsp_name||'.'||v_dst_table_name, '');  
    v_dst_query_upd2 := regexp_replace(v_query_upd2, v_table_name, v_dst_nsp_name||'.'||v_dst_table_name, '');  
    v_dst_query := v_dst_query_upd1||';'||v_dst_query_upd2;  
    v_dst_query := regexp_replace(v_query, v_table_name, v_dst_nsp_name||'.'||v_dst_table_name, '');  
  end case;  
  -- debug  
  -- raise notice 'v_dst_query:%', v_dst_query;  
  -- 异步复制逻辑. 异步的情况下如果要做到绝对的远程执行顺序一致并且远程执行的SQL带有事务属性, 请使用londiste3.  
  if v_replica_mode = 'async' then  
    -- 防止远程SQL执行顺序错误的逻辑如下, 原因详见 http://blog.163.com/digoal@126/blog/static/1638770402012731944439/  
    -- 判断sync_err_rec是否被其他会话加RowExclusiveLock锁. 如果发现, 往sync_err_rec写入触发SQL,   
    -- 返回NULL. 不再继续调用dblink_exec(v_conn_name, v_dst_query, false)  
    perform 1 from pg_locks where   
      relation=(select oid from pg_class where relname='sync_err_rec')   
      and pid != pg_backend_pid()  
      and mode='RowExclusiveLock'  
      limit 1;  
    if found then  
      raise notice 'sync_err_rec in syncing, this sql will insert into sync_err_rec but not replica to remote now.';  
      insert into sync_err_rec (nsp_name, table_name, dst_server, dst_query, create_time)   
        values (v_nsp_name, v_table_name, v_dst_server, v_dst_query, clock_timestamp());  
      return null;  
    end if;  
    -- 判断sync_err_rec中是否有dst_server=v_dst_server的记录. 如果发现, 往sync_err_rec写入触发SQL,   
    -- 返回NULL. 不再继续调用dblink_exec(v_conn_name, v_dst_query, false)  
    perform 1 from sync_err_rec where dst_server=v_dst_server limit 1;  
    if found then  
      raise notice 'sync_err_rec has record with %, this sql will insert into sync_err_rec but not replica to remote now.', v_dst_server;  
      insert into sync_err_rec (nsp_name, table_name, dst_server, dst_query, create_time)   
        values (v_nsp_name, v_table_name, v_dst_server, v_dst_query, clock_timestamp());  
      return null;  
    -- 严格意义上来说这里应该加一个else raise; 报错; 原因是如果是并行的插入的话, 可能会出现前面的事务遇到问题插入了sync_err_rec并且还未提交,   
    -- 但是后面的事务没有发现sync_err_rec中的记录, 并且后面的事务又可以正常操作远程库的话, 那么就出现后面的事务在远程先执行, 前面的事务却写入了  
    -- sync_err_rec表的情况, 也就是本地和远端事务的执行顺序不一致.  
    end if;  
  end if;  
  -- 判断连接是否存在, 不存在则创建.  
  if ( dblink_get_connections() @> ('{'||v_conn_name||'}')::text[] ) then   
    select * into v_conn_status from dblink_connect(v_conn_name, v_dst_server);  
  end if;  
  -- 判断是同步复制还是异步复制  
  if v_replica_mode = 'sync' then  
    -- 同步复制, 远程执行错误将回滚本地事务.  
    select * into v_exec_status from dblink_exec(v_conn_name, v_dst_query, true);  
  elsif v_replica_mode = 'async' then  
    -- 异步复制, 远程执行错误不回滚事务, 只是返回ERROR字符串  
    select * into v_exec_status from dblink_exec(v_conn_name, v_dst_query, false);  
    if (v_exec_status = 'ERROR') then  
      insert into sync_err_rec (nsp_name, table_name, dst_server, dst_query, create_time)   
        values (v_nsp_name, v_table_name, v_dst_server, v_dst_query, clock_timestamp());  
    end if;  
    raise exception 'replica mode must sync or async.';  
  end if;  
  -- 复制完成, 返回.  
  return null;  
-- 连接异常捕获  
  WHEN SQLSTATE '08000'   
    or SQLSTATE '08003'   
    or SQLSTATE '08006'   
    or SQLSTATE '08001'   
    or SQLSTATE '08004'   
    or SQLSTATE '08007'   
    or SQLSTATE '08P01' THEN  
    if v_replica_mode = 'async' then  
      insert into sync_err_rec (nsp_name, table_name, dst_server, dst_query, create_time)   
        values (v_nsp_name, v_table_name, v_dst_server, v_dst_query, clock_timestamp());  
      raise notice 'CONNECTION EXCEPTION, remote SQL write to sync_err_rec';  
      return null;  
    elsif v_replica_mode = 'sync' then  
      raise exception 'replica mode must sync or async.';  
    end if;  
-- 其他异常不记录到sync_err_rec, 仅用于区分同步复制和异步复制.  
$BODY$ volatile;  


在local库创建触发器 :

\c local local  
CREATE TRIGGER tg1 AFTER DELETE or UPDATE or INSERT ON loc_test FOR EACH ROW EXECUTE PROCEDURE f_sync_test('dst', 'remote', 'rmt_test', 'aaa_bbb_digoal', 'sync');  
CREATE TRIGGER tg2 AFTER TRUNCATE ON loc_test FOR EACH STATEMENT EXECUTE PROCEDURE f_sync_test('dst', 'remote', 'rmt_test', 'aaa_bbb_digoal', 'sync');  

在remote库创建触发器 :

\c remote remote  
CREATE TRIGGER tg1 AFTER DELETE or UPDATE or INSERT ON rmt_test FOR EACH ROW EXECUTE PROCEDURE f_sync_test('dst', 'local', 'loc_test', 'aaa_bbb_digoal', 'sync');  
CREATE TRIGGER tg2 AFTER TRUNCATE ON rmt_test FOR EACH STATEMENT EXECUTE PROCEDURE f_sync_test('dst', 'local', 'loc_test', 'aaa_bbb_digoal', 'sync');  

插入 :

local=> insert into loc_test select generate_series(1,10),'abc',now(),now();  
INSERT 0 10  
local=> select * from loc_test;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   1 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   2 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   3 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   4 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   5 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   6 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   7 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   8 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   9 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
  10 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
(10 rows)  
local=> \c remote remote  
You are now connected to database "remote" as user "remote".  
remote=> select * from rmt_test ;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   1 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   2 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   3 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   4 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   5 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   6 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   7 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   8 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   9 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
  10 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
(10 rows)  
remote=> insert into rmt_test select generate_series(11,12),'abc',now(),now();  
INSERT 0 2  
remote=> select * from rmt_test;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   1 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   2 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   3 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   4 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   5 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   6 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   7 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   8 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   9 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
  10 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
  11 | abc | 2013-03-11 15:03:01.569531+08 | 2013-03-11 15:03:02 |   
  12 | abc | 2013-03-11 15:03:01.569531+08 | 2013-03-11 15:03:02 |   
(12 rows)  
remote=> \c local local  
You are now connected to database "local" as user "local".  
local=> select * from loc_test;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   1 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   2 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   3 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   4 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   5 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   6 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   7 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   8 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
   9 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
  10 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |   
  11 | abc | 2013-03-11 15:03:01.569531+08 | 2013-03-11 15:03:02 |   
  12 | abc | 2013-03-11 15:03:01.569531+08 | 2013-03-11 15:03:02 |   
(12 rows)  

更新 :

local=> update loc_test set pk2='new' where pk1=1;  
local=> select * from loc_test where pk1=1;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   1 | new | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
(1 row)  
local=> \c remote remote  
You are now connected to database "remote" as user "remote".  
remote=> select * from rmt_test where pk1=1;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   1 | new | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
(1 row)  
remote=> select * from rmt_test where pk1=2;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   2 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
(1 row)  
remote=> update rmt_test set pk2='rmt' where pk1=2;  
remote=> select * from rmt_test where pk1=2;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   2 | rmt | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
(1 row)  
remote=> \c local local  
You are now connected to database "local" as user "local".  
local=> select * from loc_test where pk1=2;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   2 | rmt | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
(1 row)  

删除 :

local=> delete from loc_test where pk1<9;  
local=> select * from loc_test;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   9 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
  10 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
  11 | abc | 2013-03-11 15:20:24.136187+08 | 2013-03-11 15:20:24 |   
  12 | abc | 2013-03-11 15:20:24.136187+08 | 2013-03-11 15:20:24 |   
(4 rows)  
local=> \c remote remote  
You are now connected to database "remote" as user "remote".  
remote=> select * from rmt_test;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   9 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
  10 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
  11 | abc | 2013-03-11 15:20:24.136187+08 | 2013-03-11 15:20:24 |   
  12 | abc | 2013-03-11 15:20:24.136187+08 | 2013-03-11 15:20:24 |   
(4 rows)  
remote=> delete from rmt_test where pk1>=11;  
remote=> select * from rmt_test;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   9 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
  10 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
(2 rows)  
remote=> \c local local  
You are now connected to database "local" as user "local".  
local=> select * from loc_test;  
 pk1 | pk2 |             info              |      crt_time       | mod_time   
   9 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
  10 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |   
(2 rows)  


local=> truncate table loc_test ;  
local=> \c remote remote  
You are now connected to database "remote" as user "remote".  
remote=> select * from rmt_test ;  
 pk1 | pk2 | info | crt_time | mod_time   
(0 rows)  


local库 :

创建异步复制触发器 :

local=> CREATE TRIGGER atg1 AFTER DELETE or UPDATE or INSERT ON loc_test FOR EACH ROW EXECUTE PROCEDURE f_sync_test('dst', 'remote', 'rmt_test', 'aaa_bbb_digoal', 'async');  
local=> CREATE TRIGGER atg2 AFTER TRUNCATE ON loc_test FOR EACH STATEMENT EXECUTE PROCEDURE f_sync_test('dst', 'remote', 'rmt_test', 'aaa_bbb_digoal', 'async');  

禁用同步复制触发器 :

local=> alter table loc_test DISABLE TRIGGER tg1;  
local=> alter table loc_test DISABLE TRIGGER tg2;  

remote库 :

创建异步复制触发器 :

remote=> CREATE TRIGGER atg1 AFTER DELETE or UPDATE or INSERT ON rmt_test FOR EACH ROW EXECUTE PROCEDURE f_sync_test('dst', 'local', 'loc_test', 'aaa_bbb_digoal', 'async');  
remote=> CREATE TRIGGER atg2 AFTER TRUNCATE ON rmt_test FOR EACH STATEMENT EXECUTE PROCEDURE f_sync_test('dst', 'local', 'loc_test', 'aaa_bbb_digoal', 'async');  

禁用同步复制触发器 :

remote=> alter table rmt_test DISABLE TRIGGER tg1;  
remote=> alter table rmt_test DISABLE TRIGGER tg2;  

插入 :


local=> insert into loc_test values (9,'abc','digoal',now(),now());  
INSERT 0 1  
local=> select * from loc_test where pk1=9;  
 pk1 | pk2 |  info  |      crt_time       |      mod_time         
   9 | abc | digoal | 2013-03-13 09:24:29 | 2013-03-13 09:24:29  
(1 row)  
local=> \c remote remote  
You are now connected to database "remote" as user "remote".  
remote=> select * from rmt_test where pk1=9;  
 pk1 | pk2 |  info  |      crt_time       |      mod_time         
   9 | abc | digoal | 2013-03-13 09:24:29 | 2013-03-13 09:24:29  
(1 row)  


remote=> insert into rmt_test values (10,'abc','digoal',now(),now());  
INSERT 0 1  
remote=> select * from rmt_test where pk1=10;  
 pk1 | pk2 |  info  |      crt_time       |      mod_time         
  10 | abc | digoal | 2013-03-13 09:26:04 | 2013-03-13 09:26:04  
(1 row)  


remote=> insert into rmt_test values (9,'abc','digoal',now(),now());  
ERROR:  duplicate key value violates unique constraint "rmt_test_pkey"  
DETAIL:  Key (pk1, pk2)=(9, abc) already exists.  
remote=> select * from sync_err_rec;  
 id | nsp_name | table_name | dst_server | dst_query | create_time   
(0 rows)  

更新 :


remote=> update rmt_test set pk2='new' where pk1=10;  
remote=> select * from rmt_test where pk1=10;  
 pk1 | pk2 |  info  |      crt_time       |      mod_time         
  10 | new | digoal | 2013-03-13 09:26:04 | 2013-03-13 09:26:04  
(1 row)  
remote=> \c local local  
You are now connected to database "local" as user "local".  
local=> select * from loc_test where pk1=10;  
 pk1 | pk2 |  info  |      crt_time       |      mod_time         
  10 | new | digoal | 2013-03-13 09:26:04 | 2013-03-13 09:26:04  
(1 row)  


local=> update loc_test set pk2='new_loc' where pk1=10;  
local=> select * from loc_test where pk1=10;  
 pk1 |   pk2   |  info  |      crt_time       |      mod_time         
  10 | new_loc | digoal | 2013-03-13 09:26:04 | 2013-03-13 09:26:04  
(1 row)  
local=> \c remote remote  
You are now connected to database "remote" as user "remote".  
remote=> select * from rmt_test where pk1=10;  
 pk1 |   pk2   |  info  |      crt_time       |      mod_time         
  10 | new_loc | digoal | 2013-03-13 09:26:04 | 2013-03-13 09:26:04  
(1 row)  

修改pg_hba.conf, 禁止远程连接 :

vi $PGDATA/pg_hba.conf  
#host all all md5  
pg_ctl reload  

以插入为例进行测试 :

建立连接的错误捕获, 本地执行, 远程SQL写入sync_err_rec表, 以便后期处理.

local=> insert into loc_test values (11,'abc','digoal',now(),now());  
NOTICE:  CONNECTION EXCEPTION, remote SQL write to sync_err_rec  
INSERT 0 1  
local=> select * from loc_test where pk1=11;  
 pk1 | pk2 |  info  |      crt_time       |      mod_time         
  11 | abc | digoal | 2013-03-13 09:31:20 | 2013-03-13 09:31:20  
(1 row)  
local=> select * from sync_err_rec;  
 id | nsp_name | table_name | dst_server |                                                              dst_query                     
                                           |        create_time           
  7 | local    | loc_test   | dst        | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('11','abc','digoal','2  
013-03-13 09:31:20','2013-03-13 09:31:20') | 2013-03-13 09:31:20.394119  
(1 row)  
remote=> select * from rmt_test where pk1=11;  
 pk1 | pk2 | info | crt_time | mod_time   
(0 rows)  

当sync_err_rec有目标foreign server的数据时为了保证远程SQL执行顺序, 不立即复制.

remote=> \c local local  
You are now connected to database "local" as user "local".  
local=> insert into loc_test values (12,'abc','digoal',now(),now());  
NOTICE:  sync_err_rec has record with dst, this sql will insert into sync_err_rec but not replica to remote now.  
INSERT 0 1  
local=> select * from loc_test where pk1=12;  
 pk1 | pk2 |  info  |      crt_time       |      mod_time         
  12 | abc | digoal | 2013-03-13 09:32:18 | 2013-03-13 09:32:18  
(1 row)  
local=> select * from sync_err_rec;  
 id | nsp_name | table_name | dst_server |                                                              dst_query                     
                                           |        create_time           
  7 | local    | loc_test   | dst        | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('11','abc','digoal','2  
013-03-13 09:31:20','2013-03-13 09:31:20') | 2013-03-13 09:31:20.394119  
  8 | local    | loc_test   | dst        | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('12','abc','digoal','2  
013-03-13 09:32:18','2013-03-13 09:32:18') | 2013-03-13 09:32:18.486967  
(2 rows)  

正在执行deal_sync_err_rec时为了保证远程SQL执行顺序, 不立即复制.

修改pg_hba.conf, 允许连接, 并执行deal_sync_err_rec修复数据.


local=> begin;  
local=> select * from deal_sync_err_rec(10);  
(1 row)  


local=> insert into loc_test values (13,'abc','digoal',now(),now());  
NOTICE:  00000: sync_err_rec in syncing, this sql will insert into sync_err_rec but not replica to remote now.  
LOCATION:  exec_stmt_raise, pl_exec.c:2840  
INSERT 0 1  
Time: 12.985 ms  
local=> select * from sync_err_rec;  
 id | nsp_name | table_name | dst_server |                                                              dst_query                     
                                           |        create_time           
  7 | local    | loc_test   | dst        | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('11','abc','digoal','2  
013-03-13 09:31:20','2013-03-13 09:31:20') | 2013-03-13 09:31:20.394119  
  8 | local    | loc_test   | dst        | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('12','abc','digoal','2  
013-03-13 09:32:18','2013-03-13 09:32:18') | 2013-03-13 09:32:18.486967  
  9 | local    | loc_test   | dst        | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('13','abc','digoal','2  
013-03-13 09:34:32','2013-03-13 09:34:32') | 2013-03-13 09:34:31.6283  
(3 rows)  
Time: 0.439 ms  



local=> end;  
local=> select * from deal_sync_err_rec(10);  
(1 row)  
local=> select * from sync_err_rec;  
 id | nsp_name | table_name | dst_server | dst_query | create_time   
(0 rows)  


后, 远程SQL正常复制

local=> insert into loc_test values (14,'abc','digoal',now(),now());  
INSERT 0 1  
Time: 14.380 ms  

对比远程和本地的HASH值一致 :

local=> select sum(hashtext(t.*::text)) from loc_test t;  
(1 row)  
Time: 0.865 ms  
local=> \c remote remote  
You are now connected to database "remote" as user "remote".  
remote=> select sum(hashtext(t.*::text)) from rmt_test t;  
(1 row)  
Time: 1.991 ms  


1. application_name被用于防止无限循环的触发, 所以foreign server的option中设置的application_name必须与远程表上创建的触发器的传入值一致.

那么在local库上创建的foreign server的option中application_name必须也是aaa_bbb_digoal;  

2. 注意dblink_build_sql_insert, dblink_build_sql_delete, dblink_build_sql_update这几个函数的参数int2vector primary_key_attnums;

PostgreSQL 9.0以及以上版本, 指的是PK在逻辑上的输出顺序, 如select * from loc_test的逻辑输出顺序.  
PostgreSQL 9.0以下的版本, 指的是PK列在pg_attribute.attnum的值.   
所以当PK的前面删除了列以后, PK的逻辑顺序会发生变化, 对于9.0以上版本的话需要注意这个变化.  

原文如下 :

As of PostgreSQL 9.0, the attribute numbers in primary_key_attnums are interpreted as logical column numbers, corresponding to the column's position in SELECT * FROM relname.   
Previous versions interpreted the numbers as physical column positions. There is a difference if any column(s) to the left of the indicated column have been dropped during the lifetime of the table.  

3. 异步复制的sync_err_rec可以使用crontab定期调用deal_sync_err_rec函数来处理.

4. 值注意用quote_literal来解析, 否则可能造成字符逃逸问题.

例如上一个版本的SQL封装中使用的$Q$, 如果出现在PK中就会有逃逸问题, 如下 :

when 'INSERT' then  
    -- 生成远程要执行的insert语句, id是这个表的主键. 如果是多列值的则需要得到按v_pk_vector顺序的text[], 考虑到兼容性不使用array的for  
    for i in select * from unnest(v_pk_attname_array) AS t(attname) loop  
      if y = 0 then  
        execute 'select array[($Q$'||NEW||'$Q$::'||v_nsp_name||'.'||v_table_name||').'||i.attname||']::text[]' into v_pk_att_vals_ar  
        tmp_v_pk_att_vals_array := v_pk_att_vals_array;  
execute 'select array_append($1, ($Q$'||NEW||'$Q$::'||v_nsp_name||'.'||v_table_name||').'||i.attname||'::text)' into v_pk_att_vals_a  
rray using tmp_v_pk_att_vals_array;  
      end if;  
      y := y+1;  
    end loop;  
    select * into v_query from dblink_build_sql_insert(v_nsp_name||'.'||v_table_name, v_pk_vector, v_pk_cnt, v_pk_att_vals_array, v_  

异常数据 :

local=> insert into loc_test values(1,'$Q$','$Q$',now(),now());  
ERROR:  syntax error at or near "."  
LINE 1: ...:01:09","2013-03-14 08:01:09")$Q$::local.loc_test).pk1]::tex...  
QUERY:  select array[($Q$(1,$Q$,$Q$,"2013-03-14 08:01:09","2013-03-14 08:01:09")$Q$::local.loc_test).pk1]::text[]  
CONTEXT:  PL/pgSQL function f_sync_test() line 77 at EXECUTE statement  

修改为quote_literal后正常 :

remote=> insert into rmt_test values(100,'$Q$','$Q$',now(),now());  
INSERT 0 1  
Time: 4.086 ms  
remote=> select * from rmt_test ;  
 pk1 |  pk2  |  info  |      crt_time       |      mod_time         
   1 | $Q1$  | $Q1$   | 2013-03-14 08:02:50 | 2013-03-14 08:02:50  
   1 | $Q$   | $Q$    | 2013-03-14 08:08:49 | 2013-03-14 08:08:49  
   1 | $Q'$  | $'Q$   | 2013-03-14 08:09:16 | 2013-03-14 08:09:16  
   1 | $Q$'$ | $Q$'Q$ | 2013-03-14 08:09:43 | 2013-03-14 08:09:43  
   2 | $Q$'  | $Q$'Q$ | 2013-03-14 08:13:45 | 2013-03-14 08:13:45  
(5 rows)  


1. http://blog.163.com/digoal@126/blog/static/1638770402013283547959/

2. http://blog.163.com/digoal@126/blog/static/1638770402013211102130526/

3. http://blog.163.com/digoal@126/blog/static/1638770402012731944439/

4. http://blog.163.com/digoal@126/blog/static/1638770402012731203716/

