表级复制(base on trigger) – PostgreSQL general sync and async multi-master replication trigger function
背景
本文在9.2下面测试, 较老版本可能系统表的字段名不一样, 需要修改触发器函数相关部分, 如pg_stat_activity的pid, 老版本是procpid.
数据同步是一个比较老的话题了, 在PostgreSQL中有比较多的同步手段, 例如流复制, slony, londiste, pgpool等.
不过这些都需要诸多的配置, 流复制的话更是需要将所有的物理数据都复制过去. 同时也不能实现双向同步(multi-master).
如果只想同步少量的数据(改动量不太大), 例如单表. 或者单表中的某些列, 甚至单表中某些符合条件的行同步到其他一个或多个节点.
甚至是需要双向可写的复制(oracle 物化视图无法实现). 这些简单的使用以上工具是无法实现的.
下面使用触发器和dblink来举个例子, 实现以上的功能, 同时是实时的同步(但是必须注意到使用触发器带来的开销, 所以不是所有场景都适用).
(PS: 使用postgres_fdw + 触发器 可以达到同样的目的,支持同步模式)
正文
以前写过一些使用触发器复制的BLOG,
http://blog.163.com/digoal@126/blog/static/1638770402012731944439/
http://blog.163.com/digoal@126/blog/static/1638770402012731203716/
但是对于有许多表的环境, 写触发器函数也是很费神的事情.
本文以PostgreSQL 9.2为例, 介绍如何创建通用的用于多主复制的触发器函数.
所谓通用, 就是1个触发器函数搞定所有的表复制, 而不需要为每个表创建触发器函数. 方便DBA管理数据库复制.
测试环境 :
创建两个用户 :
postgres=# create role local login encrypted password 'LOCAL321';
CREATE ROLE
postgres=# create role remote login encrypted password 'REMOTE321';
CREATE ROLE
创建两个数据库 :
postgres=# create database local owner local;
CREATE DATABASE
postgres=# create database remote owner remote;
CREATE DATABASE
创建dblink模块 :
postgres=# \c local postgres
You are now connected to database "local" as user "postgres".
local=# create extension dblink;
CREATE EXTENSION
local=# \c remote postgres
You are now connected to database "remote" as user "postgres".
remote=# create extension dblink;
CREATE EXTENSION
创建schema :
local=> \c local local
local=> create schema local;
CREATE SCHEMA
local=> \c remote remote
You are now connected to database "remote" as user "remote".
remote=> create schema remote;
CREATE SCHEMA
在local库创建测试表 :
本例将使用联合索引 :
remote=> \c local local
You are now connected to database "local" as user "local".
local=> create table loc_test (pk1 int, pk2 text, info text, crt_time timestamp(0), mod_time timestamp(0), primary key(pk1,pk2));
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "loc_test_pkey" for table "loc_test"
CREATE TABLE
local=> select string_agg(position::text,' ')::int2vector,count(*) from dblink_get_pkey('local.loc_test');
string_agg | count
------------+-------
1 2 | 2
(1 row)
在remote库创建测试表 :
postgres=# \c remote remote
You are now connected to database "remote" as user "remote".
remote=> create table rmt_test (pk1 int, pk2 text, info text, crt_time timestamp(0), mod_time timestamp(0), primary key(pk1,pk2));
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "rmt_test_pkey" for table "rmt_test"
CREATE TABLE
remote=> select string_agg(position::text,' ')::int2vector,count(*) from dblink_get_pkey('remote.rmt_test');
string_agg | count
------------+-------
1 2 | 2
(1 row)
在local库回收pg_user_mappings的public权限 :
local=> \c local postgres
You are now connected to database "local" as user "postgres".
local=# revoke all on pg_user_mappings from public;
REVOKE
在remote库回收pg_user_mappings的public权限 :
local=# \c remote postgres
You are now connected to database "remote" as user "postgres".
remote=# revoke all on pg_user_mappings from public;
REVOKE
在local库创建server :
postgres=# \c local postgres
You are now connected to database "local" as user "postgres".
local=# CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
CREATE FOREIGN DATA WRAPPER
local=# CREATE SERVER dst FOREIGN DATA WRAPPER postgresql OPTIONS (hostaddr '172.16.3.150', port '9999', dbname 'remote', options '-c tcp_keepalives_idle=60s -c tcp_keepalives_interval=10 -c tcp_keepalives_count=6 -c application_name=aaa_bbb_digoal');
CREATE SERVER
local=# GRANT USAGE ON FOREIGN SERVER dst TO local;
GRANT
local=# CREATE USER MAPPING FOR local SERVER dst OPTIONS (user 'remote', password 'REMOTE321');
CREATE USER MAPPING
在remote库创建server :
local=# \c remote postgres
You are now connected to database "remote" as user "postgres".
remote=# CREATE FOREIGN DATA WRAPPER postgresql VALIDATOR postgresql_fdw_validator;
CREATE FOREIGN DATA WRAPPER
remote=# CREATE SERVER dst FOREIGN DATA WRAPPER postgresql OPTIONS (hostaddr '172.16.3.150', port '9999', dbname 'local', options '-c tcp_keepalives_idle=60s -c tcp_keepalives_interval=10 -c tcp_keepalives_count=6 -c application_name=aaa_bbb_digoal');
CREATE SERVER
remote=# GRANT USAGE ON FOREIGN SERVER dst TO remote;
GRANT
remote=# CREATE USER MAPPING FOR remote SERVER dst OPTIONS (user 'local', password 'LOCAL321');
CREATE USER MAPPING
在local和remote库创建异步复制错误SQL记录表 :
create table sync_err_rec(id serial8 primary key, nsp_name name, table_name name, dst_server text, dst_query text, create_time timestamp without time zone);
NOTICE: CREATE TABLE will create implicit sequence "sync_err_rec_id_seq" for serial column "sync_err_rec.id"
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "sync_err_rec_pkey" for table "sync_err_rec"
CREATE TABLE
在local和remote库创建异步复制错误SQL处理函数 :
create or replace function deal_sync_err_rec (i_limit int) returns boolean as $$
declare
v_conn_name text; -- 连接名
v_conn_status text; -- 存储dblink_connect(v_conn_name, v_dst_server)的返回值
v_exec_status text; -- 存储dblink_exec(v_conn_name, v_dst_query, true|false)的返回值.
v_dst_server text; -- foreign server, 一次取一个. 根据这个dst_server再抽取错误的同步记录, 进行处理.
v_dst_query text; -- sync_err_rec中记录的SQL语句
v_id int8[]; -- sync_err_rec的主键, 用于记录一批记录, BATCH删除.
begin
-- 如果是standby数据库则直接退出
if pg_is_in_recovery() then
raise notice 'this is standby';
return false;
end if;
-- 取出最早的记录的dst_server, 加上RowExclusiveLock锁, 接下来将处理这个dst_server发生的错误.
select dst_server into v_dst_server from sync_err_rec order by create_time limit 1 for update;
-- 空表示没有记录直接返回
if (v_dst_server is NULL) then
return true;
end if;
-- 将v_dst_server的值赋予给连接名
v_conn_name := v_dst_server;
-- 取出一批记录
select array_agg(id), string_agg(dst_query, ';') into v_id, v_dst_query from
(select id,dst_query from sync_err_rec where dst_server=v_dst_server order by create_time limit i_limit) t;
-- 删除sync_err_rec中对应的记录.
delete from sync_err_rec where id in (select unnest(v_id));
-- 建立dblink连接
if ( dblink_get_connections() @> ('{'||v_conn_name||'}')::text[] ) then
else
select * into v_conn_status from dblink_connect(v_conn_name, v_dst_server);
end if;
-- 这里dblink_exec使用的是true参数, 远程执行异常则回滚.
select * into v_exec_status from dblink_exec(v_conn_name, v_dst_query, true);
-- debug
-- raise notice 'v_conn_status:%, v_exec_status:%.', v_conn_status, v_exec_status;
return true;
END;
$$ language plpgsql;
在local和remote库创建触发器函数 :
CREATE OR REPLACE FUNCTION f_sync_test()
RETURNS trigger
LANGUAGE plpgsql
AS $BODY$
DECLARE
v_dst_server text; -- foreign server
v_conn_name text; -- 连接名, 本例配置与dst_server一致.
v_conn_status text; -- 存储dblink_connect(v_conn_name, v_dst_server)的返回值
v_nsp_name name := TG_TABLE_SCHEMA; -- 触发器变量, 触发这个触发器的表所在的schema
v_table_name name := TG_TABLE_NAME; -- 触发器变量, 触发这个触发器的表名
v_dst_nsp_name name; -- 目标schema, 当需要复制到其他schema下时, 这里改成其schema名即可
v_dst_table_name name; -- 目标表名
v_query text; -- 使用dblink_build_sql_insert, dblink_build_sql_update, dblink_build_sql_delete得到的SQL, 用于调用dblink_exec远程执行.
v_query_upd1 text; -- update需要用到delete和insert
v_query_upd2 text; -- update需要用到delete和insert
v_dst_query text; -- v_query修改后的语句, 主要就是修改目标schema和目标表名
v_dst_query_upd1 text; -- update需要用到delete和insert
v_dst_query_upd2 text; -- update需要用到delete和insert
v_pk_vector int2vector; -- dblink_build_sql_insert, dblink_build_sql_update, dblink_build_sql_delete要用到的被复制的表的PK列s的逻辑位置
v_pk_cnt int; -- dblink_build_sql_insert, dblink_build_sql_update, dblink_build_sql_delete要用到的被复制的表的PK列个数
v_pk_attname_array name[]; -- pk名称数组
v_pk_att_vals_array text[]; -- pk值
tmp_v_pk_att_vals_array text[]; -- 临时pk值
v_exec_status text; -- 存储dblink_exec(v_conn_name, v_dst_query, true|false)的返回值.
-- multi master replication 需要以下参数分辨更新来源, 用于防止死循环触发.
v_application_name_cli text;
v_application_name_check text;
v_pg_backend_pid int;
v_replica_mode text; -- 复制模式, 同步复制还是异步复制.
i record;
y int;
BEGIN
-- 触发器说明详见 http://blog.163.com/digoal@126/blog/static/163877040201321125220134/
-- 触发器参数传入格式(v_dst_server, v_dst_nsp_name, v_dst_table_name, v_application_name_check)
-- 配置foreign server, 远程schema, 远程表名, application_name
v_dst_server := TG_ARGV[0]; -- foreign server
v_conn_name := v_dst_server; -- 连接名, 本例配置与dst_server一致.
v_dst_nsp_name := TG_ARGV[1]; -- 目标schema, 当需要复制到其他schema下时, 这里改成其schema名即可
v_dst_table_name := TG_ARGV[2]; -- 目标表名
v_application_name_check := TG_ARGV[3]; -- application_name, 用于防止死循环触发.
v_replica_mode := TG_ARGV[4]; -- 同步模式, 同步复制还是异步复制.
-- 获取会话的postgres pid
select pg_backend_pid() into v_pg_backend_pid;
-- 根据这个PID获取application_name, 这个就可以用来区分是不是复制程序连上来的会话.9.1(含9.1)以前的版本pg_stat_activity.procpid.
select application_name into v_application_name_cli from pg_stat_activity where pid=v_pg_backend_pid;
-- 如果是复制程序连上来的会话, 直接返回null, 否则会出现死循环触发.
if (v_application_name_cli = v_application_name_check ) then
return null;
end if;
-- 主键信息(注意位置vector数据9.0及以上版本是输出的逻辑顺序,删除列后将改变逻辑顺序; 9.0以前的版本是pg_attribute.attnum;)
-- v_pk_vector和v_pk_cnt, v_pk_attname_array通过以下SQL得到.
-- v_pk_cnt信息获取.
select array_length(conkey, 1) into v_pk_cnt from pg_constraint where conrelid=TG_RELID and contype='p';
-- v_pk_vector, v_pk_attname_array 信息获取.
if substring(version(),12,1) = '9' then
select string_agg(rn::text, ' ')::int2vector, array_agg(attname) into v_pk_vector, v_pk_attname_array from
(select row_number() over() AS rn, attnum, attname from pg_attribute where attnum>0 and not attisdropped and attrelid=TG_RELID) AS t
where attnum in
(select unnest(conkey) from pg_constraint where conrelid=TG_RELID and contype='p');
elsif substring(version(),12,1) = '8' then
select string_agg(attnum::text, ' ')::int2vector, array_agg(attname) into v_pk_vector, v_pk_attname_array from pg_attribute
where attrelid=TG_RELID
and attnum in
(select unnest(conkey) from pg_constraint where conrelid=TG_RELID and contype='p');
else
-- 其他版本返回异常
raise exception 'PostgreSQL Version not 8 or 9.';
end if;
y := 0;
case TG_OP
when 'INSERT' then
-- 生成远程要执行的insert语句, id是这个表的主键. 如果是多列值的则需要得到按v_pk_vector顺序的text[], 考虑到兼容性不使用array的foreach循环
for i in select * from unnest(v_pk_attname_array) AS t(attname) loop
if y = 0 then
execute 'select array[('||quote_literal(NEW)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||']::text[]' into v_pk_att_vals_array;
else
tmp_v_pk_att_vals_array := v_pk_att_vals_array;
execute 'select array_append($1, ('||quote_literal(NEW)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||'::text)' into v_pk_att_vals_array using tmp_v_pk_att_vals_array;
end if;
y := y+1;
end loop;
select * into v_query from dblink_build_sql_insert(v_nsp_name||'.'||v_table_name, v_pk_vector, v_pk_cnt, v_pk_att_vals_array, v_pk_att_vals_array);
when 'DELETE' then
-- 生成远程要执行的delete语句, id是这个表的主键. 如果是多列值的则需要得到按v_pk_vector顺序的text[]
for i in select * from unnest(v_pk_attname_array) AS t(attname) loop
if y = 0 then
execute 'select array[('||quote_literal(OLD)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||']::text[]' into v_pk_att_vals_array;
else
tmp_v_pk_att_vals_array := v_pk_att_vals_array;
execute 'select array_append($1, ('||quote_literal(OLD)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||'::text)' into v_pk_att_vals_array using tmp_v_pk_att_vals_array;
end if;
y := y+1;
end loop;
select * into v_query from dblink_build_sql_delete(v_nsp_name||'.'||v_table_name, v_pk_vector, v_pk_cnt, v_pk_att_vals_array);
when 'UPDATE' then
-- 生成远程要执行的update语句, id是这个表的主键. 如果是多列值的则需要得到按v_pk_vector顺序的text[]
-- 这里没有使用dblink_build_sql_update来生成update语句, 因为主键也可能被更新. 所以只能拆成两部分.
for i in select * from unnest(v_pk_attname_array) AS t(attname) loop
if y = 0 then
execute 'select array[('||quote_literal(OLD)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||']::text[]' into v_pk_att_vals_array;
else
tmp_v_pk_att_vals_array := v_pk_att_vals_array;
execute 'select array_append($1, ('||quote_literal(OLD)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||'::text)' into v_pk_att_vals_array using tmp_v_pk_att_vals_array;
end if;
y := y+1;
end loop;
-- debug
-- raise notice '%', v_pk_att_vals_array;
select * into v_query_upd1 from dblink_build_sql_delete(v_nsp_name||'.'||v_table_name, v_pk_vector, v_pk_cnt, v_pk_att_vals_array);
-- 初始化
v_pk_att_vals_array := null;
for i in select * from unnest(v_pk_attname_array) AS t(attname) loop
if y = 0 then
execute 'select array[('||quote_literal(NEW)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||']::text[]' into v_pk_att_vals_array;
else
tmp_v_pk_att_vals_array := v_pk_att_vals_array;
execute 'select array_append($1, ('||quote_literal(NEW)||'::'||v_nsp_name||'.'||v_table_name||').'||i.attname||'::text)' into v_pk_att_vals_array using tmp_v_pk_att_vals_array;
end if;
y := y+1;
end loop;
-- debug
-- raise notice '%', v_pk_att_vals_array;
select * into v_query_upd2 from dblink_build_sql_insert(v_nsp_name||'.'||v_table_name, v_pk_vector, v_pk_cnt, v_pk_att_vals_array, v_pk_att_vals_array);
when 'TRUNCATE' then
-- 生成远程要执行的truncate语句. 注意这里是truncate table only. 如果是子表, 可以在子表上继续减这样的触发器.
v_query := 'truncate table only '||v_table_name;
end case;
-- 将目标schema和目标表名替换现有表名.
case TG_OP
when 'UPDATE' then
v_dst_query_upd1 := regexp_replace(v_query_upd1, v_table_name, v_dst_nsp_name||'.'||v_dst_table_name, '');
v_dst_query_upd2 := regexp_replace(v_query_upd2, v_table_name, v_dst_nsp_name||'.'||v_dst_table_name, '');
v_dst_query := v_dst_query_upd1||';'||v_dst_query_upd2;
else
v_dst_query := regexp_replace(v_query, v_table_name, v_dst_nsp_name||'.'||v_dst_table_name, '');
end case;
-- debug
-- raise notice 'v_dst_query:%', v_dst_query;
-- 异步复制逻辑. 异步的情况下如果要做到绝对的远程执行顺序一致并且远程执行的SQL带有事务属性, 请使用londiste3.
if v_replica_mode = 'async' then
-- 防止远程SQL执行顺序错误的逻辑如下, 原因详见 http://blog.163.com/digoal@126/blog/static/1638770402012731944439/
-- 判断sync_err_rec是否被其他会话加RowExclusiveLock锁. 如果发现, 往sync_err_rec写入触发SQL,
-- 返回NULL. 不再继续调用dblink_exec(v_conn_name, v_dst_query, false)
perform 1 from pg_locks where
relation=(select oid from pg_class where relname='sync_err_rec')
and pid != pg_backend_pid()
and mode='RowExclusiveLock'
limit 1;
if found then
raise notice 'sync_err_rec in syncing, this sql will insert into sync_err_rec but not replica to remote now.';
insert into sync_err_rec (nsp_name, table_name, dst_server, dst_query, create_time)
values (v_nsp_name, v_table_name, v_dst_server, v_dst_query, clock_timestamp());
return null;
end if;
-- 判断sync_err_rec中是否有dst_server=v_dst_server的记录. 如果发现, 往sync_err_rec写入触发SQL,
-- 返回NULL. 不再继续调用dblink_exec(v_conn_name, v_dst_query, false)
perform 1 from sync_err_rec where dst_server=v_dst_server limit 1;
if found then
raise notice 'sync_err_rec has record with %, this sql will insert into sync_err_rec but not replica to remote now.', v_dst_server;
insert into sync_err_rec (nsp_name, table_name, dst_server, dst_query, create_time)
values (v_nsp_name, v_table_name, v_dst_server, v_dst_query, clock_timestamp());
return null;
-- 严格意义上来说这里应该加一个else raise; 报错; 原因是如果是并行的插入的话, 可能会出现前面的事务遇到问题插入了sync_err_rec并且还未提交,
-- 但是后面的事务没有发现sync_err_rec中的记录, 并且后面的事务又可以正常操作远程库的话, 那么就出现后面的事务在远程先执行, 前面的事务却写入了
-- sync_err_rec表的情况, 也就是本地和远端事务的执行顺序不一致.
end if;
end if;
-- 判断连接是否存在, 不存在则创建.
if ( dblink_get_connections() @> ('{'||v_conn_name||'}')::text[] ) then
else
select * into v_conn_status from dblink_connect(v_conn_name, v_dst_server);
end if;
-- 判断是同步复制还是异步复制
if v_replica_mode = 'sync' then
-- 同步复制, 远程执行错误将回滚本地事务.
select * into v_exec_status from dblink_exec(v_conn_name, v_dst_query, true);
elsif v_replica_mode = 'async' then
-- 异步复制, 远程执行错误不回滚事务, 只是返回ERROR字符串
select * into v_exec_status from dblink_exec(v_conn_name, v_dst_query, false);
if (v_exec_status = 'ERROR') then
insert into sync_err_rec (nsp_name, table_name, dst_server, dst_query, create_time)
values (v_nsp_name, v_table_name, v_dst_server, v_dst_query, clock_timestamp());
end if;
else
raise exception 'replica mode must sync or async.';
end if;
-- 复制完成, 返回.
return null;
-- 连接异常捕获
exception
WHEN SQLSTATE '08000'
or SQLSTATE '08003'
or SQLSTATE '08006'
or SQLSTATE '08001'
or SQLSTATE '08004'
or SQLSTATE '08007'
or SQLSTATE '08P01' THEN
if v_replica_mode = 'async' then
insert into sync_err_rec (nsp_name, table_name, dst_server, dst_query, create_time)
values (v_nsp_name, v_table_name, v_dst_server, v_dst_query, clock_timestamp());
raise notice 'CONNECTION EXCEPTION, remote SQL write to sync_err_rec';
return null;
elsif v_replica_mode = 'sync' then
raise;
else
raise exception 'replica mode must sync or async.';
end if;
-- 其他异常不记录到sync_err_rec, 仅用于区分同步复制和异步复制.
END;
$BODY$ volatile;
同步复制测试
在local库创建触发器 :
\c local local
CREATE TRIGGER tg1 AFTER DELETE or UPDATE or INSERT ON loc_test FOR EACH ROW EXECUTE PROCEDURE f_sync_test('dst', 'remote', 'rmt_test', 'aaa_bbb_digoal', 'sync');
CREATE TRIGGER tg2 AFTER TRUNCATE ON loc_test FOR EACH STATEMENT EXECUTE PROCEDURE f_sync_test('dst', 'remote', 'rmt_test', 'aaa_bbb_digoal', 'sync');
在remote库创建触发器 :
\c remote remote
CREATE TRIGGER tg1 AFTER DELETE or UPDATE or INSERT ON rmt_test FOR EACH ROW EXECUTE PROCEDURE f_sync_test('dst', 'local', 'loc_test', 'aaa_bbb_digoal', 'sync');
CREATE TRIGGER tg2 AFTER TRUNCATE ON rmt_test FOR EACH STATEMENT EXECUTE PROCEDURE f_sync_test('dst', 'local', 'loc_test', 'aaa_bbb_digoal', 'sync');
插入 :
local=> insert into loc_test select generate_series(1,10),'abc',now(),now();
INSERT 0 10
local=> select * from loc_test;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
1 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
2 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
3 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
4 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
5 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
6 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
7 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
8 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
9 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
10 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
(10 rows)
local=> \c remote remote
You are now connected to database "remote" as user "remote".
remote=> select * from rmt_test ;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
1 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
2 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
3 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
4 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
5 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
6 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
7 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
8 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
9 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
10 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
(10 rows)
remote=> insert into rmt_test select generate_series(11,12),'abc',now(),now();
INSERT 0 2
remote=> select * from rmt_test;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
1 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
2 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
3 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
4 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
5 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
6 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
7 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
8 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
9 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
10 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
11 | abc | 2013-03-11 15:03:01.569531+08 | 2013-03-11 15:03:02 |
12 | abc | 2013-03-11 15:03:01.569531+08 | 2013-03-11 15:03:02 |
(12 rows)
remote=> \c local local
You are now connected to database "local" as user "local".
local=> select * from loc_test;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
1 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
2 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
3 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
4 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
5 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
6 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
7 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
8 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
9 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
10 | abc | 2013-03-11 15:02:15.769929+08 | 2013-03-11 15:02:16 |
11 | abc | 2013-03-11 15:03:01.569531+08 | 2013-03-11 15:03:02 |
12 | abc | 2013-03-11 15:03:01.569531+08 | 2013-03-11 15:03:02 |
(12 rows)
更新 :
local=> update loc_test set pk2='new' where pk1=1;
UPDATE 1
local=> select * from loc_test where pk1=1;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
1 | new | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
(1 row)
local=> \c remote remote
You are now connected to database "remote" as user "remote".
remote=> select * from rmt_test where pk1=1;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
1 | new | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
(1 row)
remote=> select * from rmt_test where pk1=2;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
2 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
(1 row)
remote=> update rmt_test set pk2='rmt' where pk1=2;
UPDATE 1
remote=> select * from rmt_test where pk1=2;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
2 | rmt | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
(1 row)
remote=> \c local local
You are now connected to database "local" as user "local".
local=> select * from loc_test where pk1=2;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
2 | rmt | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
(1 row)
删除 :
local=> delete from loc_test where pk1<9;
DELETE 8
local=> select * from loc_test;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
9 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
10 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
11 | abc | 2013-03-11 15:20:24.136187+08 | 2013-03-11 15:20:24 |
12 | abc | 2013-03-11 15:20:24.136187+08 | 2013-03-11 15:20:24 |
(4 rows)
local=> \c remote remote
You are now connected to database "remote" as user "remote".
remote=> select * from rmt_test;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
9 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
10 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
11 | abc | 2013-03-11 15:20:24.136187+08 | 2013-03-11 15:20:24 |
12 | abc | 2013-03-11 15:20:24.136187+08 | 2013-03-11 15:20:24 |
(4 rows)
remote=> delete from rmt_test where pk1>=11;
DELETE 2
remote=> select * from rmt_test;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
9 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
10 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
(2 rows)
remote=> \c local local
You are now connected to database "local" as user "local".
local=> select * from loc_test;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+-------------------------------+---------------------+----------
9 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
10 | abc | 2013-03-11 15:20:12.791216+08 | 2013-03-11 15:20:13 |
(2 rows)
TRUNCATE测试 :
local=> truncate table loc_test ;
TRUNCATE TABLE
local=> \c remote remote
You are now connected to database "remote" as user "remote".
remote=> select * from rmt_test ;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+------+----------+----------
(0 rows)
异步复制测试
local库 :
创建异步复制触发器 :
local=> CREATE TRIGGER atg1 AFTER DELETE or UPDATE or INSERT ON loc_test FOR EACH ROW EXECUTE PROCEDURE f_sync_test('dst', 'remote', 'rmt_test', 'aaa_bbb_digoal', 'async');
CREATE TRIGGER
local=> CREATE TRIGGER atg2 AFTER TRUNCATE ON loc_test FOR EACH STATEMENT EXECUTE PROCEDURE f_sync_test('dst', 'remote', 'rmt_test', 'aaa_bbb_digoal', 'async');
CREATE TRIGGER
禁用同步复制触发器 :
local=> alter table loc_test DISABLE TRIGGER tg1;
ALTER TABLE
local=> alter table loc_test DISABLE TRIGGER tg2;
ALTER TABLE
remote库 :
创建异步复制触发器 :
remote=> CREATE TRIGGER atg1 AFTER DELETE or UPDATE or INSERT ON rmt_test FOR EACH ROW EXECUTE PROCEDURE f_sync_test('dst', 'local', 'loc_test', 'aaa_bbb_digoal', 'async');
remote=> CREATE TRIGGER atg2 AFTER TRUNCATE ON rmt_test FOR EACH STATEMENT EXECUTE PROCEDURE f_sync_test('dst', 'local', 'loc_test', 'aaa_bbb_digoal', 'async');
禁用同步复制触发器 :
remote=> alter table rmt_test DISABLE TRIGGER tg1;
ALTER TABLE
remote=> alter table rmt_test DISABLE TRIGGER tg2;
ALTER TABLE
插入 :
正向同步
local=> insert into loc_test values (9,'abc','digoal',now(),now());
INSERT 0 1
local=> select * from loc_test where pk1=9;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+--------+---------------------+---------------------
9 | abc | digoal | 2013-03-13 09:24:29 | 2013-03-13 09:24:29
(1 row)
local=> \c remote remote
You are now connected to database "remote" as user "remote".
remote=> select * from rmt_test where pk1=9;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+--------+---------------------+---------------------
9 | abc | digoal | 2013-03-13 09:24:29 | 2013-03-13 09:24:29
(1 row)
反向同步
remote=> insert into rmt_test values (10,'abc','digoal',now(),now());
INSERT 0 1
remote=> select * from rmt_test where pk1=10;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+--------+---------------------+---------------------
10 | abc | digoal | 2013-03-13 09:26:04 | 2013-03-13 09:26:04
(1 row)
本地错误不写sync_err_rec
remote=> insert into rmt_test values (9,'abc','digoal',now(),now());
ERROR: duplicate key value violates unique constraint "rmt_test_pkey"
DETAIL: Key (pk1, pk2)=(9, abc) already exists.
remote=> select * from sync_err_rec;
id | nsp_name | table_name | dst_server | dst_query | create_time
----+----------+------------+------------+-----------+-------------
(0 rows)
更新 :
反向更新
remote=> update rmt_test set pk2='new' where pk1=10;
UPDATE 1
remote=> select * from rmt_test where pk1=10;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+--------+---------------------+---------------------
10 | new | digoal | 2013-03-13 09:26:04 | 2013-03-13 09:26:04
(1 row)
remote=> \c local local
You are now connected to database "local" as user "local".
local=> select * from loc_test where pk1=10;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+--------+---------------------+---------------------
10 | new | digoal | 2013-03-13 09:26:04 | 2013-03-13 09:26:04
(1 row)
正向更新
local=> update loc_test set pk2='new_loc' where pk1=10;
UPDATE 1
local=> select * from loc_test where pk1=10;
pk1 | pk2 | info | crt_time | mod_time
-----+---------+--------+---------------------+---------------------
10 | new_loc | digoal | 2013-03-13 09:26:04 | 2013-03-13 09:26:04
(1 row)
local=> \c remote remote
You are now connected to database "remote" as user "remote".
remote=> select * from rmt_test where pk1=10;
pk1 | pk2 | info | crt_time | mod_time
-----+---------+--------+---------------------+---------------------
10 | new_loc | digoal | 2013-03-13 09:26:04 | 2013-03-13 09:26:04
(1 row)
##错误测试
修改pg_hba.conf, 禁止远程连接 :
vi $PGDATA/pg_hba.conf
#host all all 0.0.0.0/0 md5
pg_ctl reload
以插入为例进行测试 :
建立连接的错误捕获, 本地执行, 远程SQL写入sync_err_rec表, 以便后期处理.
local=> insert into loc_test values (11,'abc','digoal',now(),now());
NOTICE: CONNECTION EXCEPTION, remote SQL write to sync_err_rec
INSERT 0 1
local=> select * from loc_test where pk1=11;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+--------+---------------------+---------------------
11 | abc | digoal | 2013-03-13 09:31:20 | 2013-03-13 09:31:20
(1 row)
local=> select * from sync_err_rec;
id | nsp_name | table_name | dst_server | dst_query
| create_time
----+----------+------------+------------+------------------------------------------------------------------------------------------
-------------------------------------------+----------------------------
7 | local | loc_test | dst | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('11','abc','digoal','2
013-03-13 09:31:20','2013-03-13 09:31:20') | 2013-03-13 09:31:20.394119
(1 row)
remote=> select * from rmt_test where pk1=11;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+------+----------+----------
(0 rows)
当sync_err_rec有目标foreign server的数据时为了保证远程SQL执行顺序, 不立即复制.
remote=> \c local local
You are now connected to database "local" as user "local".
local=> insert into loc_test values (12,'abc','digoal',now(),now());
NOTICE: sync_err_rec has record with dst, this sql will insert into sync_err_rec but not replica to remote now.
INSERT 0 1
local=> select * from loc_test where pk1=12;
pk1 | pk2 | info | crt_time | mod_time
-----+-----+--------+---------------------+---------------------
12 | abc | digoal | 2013-03-13 09:32:18 | 2013-03-13 09:32:18
(1 row)
local=> select * from sync_err_rec;
id | nsp_name | table_name | dst_server | dst_query
| create_time
----+----------+------------+------------+------------------------------------------------------------------------------------------
-------------------------------------------+----------------------------
7 | local | loc_test | dst | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('11','abc','digoal','2
013-03-13 09:31:20','2013-03-13 09:31:20') | 2013-03-13 09:31:20.394119
8 | local | loc_test | dst | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('12','abc','digoal','2
013-03-13 09:32:18','2013-03-13 09:32:18') | 2013-03-13 09:32:18.486967
(2 rows)
正在执行deal_sync_err_rec时为了保证远程SQL执行顺序, 不立即复制.
修改pg_hba.conf, 允许连接, 并执行deal_sync_err_rec修复数据.
SESSION A :
local=> begin;
BEGIN
local=> select * from deal_sync_err_rec(10);
deal_sync_err_rec
-------------------
t
(1 row)
--不结束事务,暂不释放锁
SESSION B :
local=> insert into loc_test values (13,'abc','digoal',now(),now());
NOTICE: 00000: sync_err_rec in syncing, this sql will insert into sync_err_rec but not replica to remote now.
LOCATION: exec_stmt_raise, pl_exec.c:2840
INSERT 0 1
Time: 12.985 ms
local=> select * from sync_err_rec;
id | nsp_name | table_name | dst_server | dst_query
| create_time
----+----------+------------+------------+------------------------------------------------------------------------------------------
-------------------------------------------+----------------------------
7 | local | loc_test | dst | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('11','abc','digoal','2
013-03-13 09:31:20','2013-03-13 09:31:20') | 2013-03-13 09:31:20.394119
8 | local | loc_test | dst | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('12','abc','digoal','2
013-03-13 09:32:18','2013-03-13 09:32:18') | 2013-03-13 09:32:18.486967
9 | local | loc_test | dst | INSERT INTO remote.rmt_test(pk1,pk2,info,crt_time,mod_time) VALUES('13','abc','digoal','2
013-03-13 09:34:32','2013-03-13 09:34:32') | 2013-03-13 09:34:31.6283
(3 rows)
Time: 0.439 ms
SESSION A :
所有异常SQL处理完
local=> end;
COMMIT
local=> select * from deal_sync_err_rec(10);
deal_sync_err_rec
-------------------
t
(1 row)
local=> select * from sync_err_rec;
id | nsp_name | table_name | dst_server | dst_query | create_time
----+----------+------------+------------+-----------+-------------
(0 rows)
SESSION B :
后, 远程SQL正常复制
local=> insert into loc_test values (14,'abc','digoal',now(),now());
INSERT 0 1
Time: 14.380 ms
对比远程和本地的HASH值一致 :
local=> select sum(hashtext(t.*::text)) from loc_test t;
sum
-----------
306096063
(1 row)
Time: 0.865 ms
local=> \c remote remote
You are now connected to database "remote" as user "remote".
remote=> select sum(hashtext(t.*::text)) from rmt_test t;
sum
-----------
306096063
(1 row)
Time: 1.991 ms
注意事项
1. application_name被用于防止无限循环的触发, 所以foreign server的option中设置的application_name必须与远程表上创建的触发器的传入值一致.
例如rmt_test表的触发器条件中设置了aaa_bbb_digoal.
那么在local库上创建的foreign server的option中application_name必须也是aaa_bbb_digoal;
2. 注意dblink_build_sql_insert, dblink_build_sql_delete, dblink_build_sql_update这几个函数的参数int2vector primary_key_attnums;
PostgreSQL 9.0以及以上版本, 指的是PK在逻辑上的输出顺序, 如select * from loc_test的逻辑输出顺序.
PostgreSQL 9.0以下的版本, 指的是PK列在pg_attribute.attnum的值.
所以当PK的前面删除了列以后, PK的逻辑顺序会发生变化, 对于9.0以上版本的话需要注意这个变化.
因此在触发器函数中需要体现这个变化.
原文如下 :
As of PostgreSQL 9.0, the attribute numbers in primary_key_attnums are interpreted as logical column numbers, corresponding to the column's position in SELECT * FROM relname.
Previous versions interpreted the numbers as physical column positions. There is a difference if any column(s) to the left of the indicated column have been dropped during the lifetime of the table.
3. 异步复制的sync_err_rec可以使用crontab定期调用deal_sync_err_rec函数来处理.
4. 值注意用quote_literal来解析, 否则可能造成字符逃逸问题.
例如上一个版本的SQL封装中使用的$Q$, 如果出现在PK中就会有逃逸问题, 如下 :
when 'INSERT' then
-- 生成远程要执行的insert语句, id是这个表的主键. 如果是多列值的则需要得到按v_pk_vector顺序的text[], 考虑到兼容性不使用array的for
each循环
for i in select * from unnest(v_pk_attname_array) AS t(attname) loop
if y = 0 then
execute 'select array[($Q$'||NEW||'$Q$::'||v_nsp_name||'.'||v_table_name||').'||i.attname||']::text[]' into v_pk_att_vals_ar
ray;
else
tmp_v_pk_att_vals_array := v_pk_att_vals_array;
execute 'select array_append($1, ($Q$'||NEW||'$Q$::'||v_nsp_name||'.'||v_table_name||').'||i.attname||'::text)' into v_pk_att_vals_a
rray using tmp_v_pk_att_vals_array;
end if;
y := y+1;
end loop;
select * into v_query from dblink_build_sql_insert(v_nsp_name||'.'||v_table_name, v_pk_vector, v_pk_cnt, v_pk_att_vals_array, v_
pk_att_vals_array);
异常数据 :
local=> insert into loc_test values(1,'$Q$','$Q$',now(),now());
ERROR: syntax error at or near "."
LINE 1: ...:01:09","2013-03-14 08:01:09")$Q$::local.loc_test).pk1]::tex...
^
QUERY: select array[($Q$(1,$Q$,$Q$,"2013-03-14 08:01:09","2013-03-14 08:01:09")$Q$::local.loc_test).pk1]::text[]
CONTEXT: PL/pgSQL function f_sync_test() line 77 at EXECUTE statement
修改为quote_literal后正常 :
remote=> insert into rmt_test values(100,'$Q$','$Q$',now(),now());
INSERT 0 1
Time: 4.086 ms
remote=> select * from rmt_test ;
pk1 | pk2 | info | crt_time | mod_time
-----+-------+--------+---------------------+---------------------
1 | $Q1$ | $Q1$ | 2013-03-14 08:02:50 | 2013-03-14 08:02:50
1 | $Q$ | $Q$ | 2013-03-14 08:08:49 | 2013-03-14 08:08:49
1 | $Q'$ | $'Q$ | 2013-03-14 08:09:16 | 2013-03-14 08:09:16
1 | $Q$'$ | $Q$'Q$ | 2013-03-14 08:09:43 | 2013-03-14 08:09:43
2 | $Q$' | $Q$'Q$ | 2013-03-14 08:13:45 | 2013-03-14 08:13:45
(5 rows)
参考
1. http://blog.163.com/digoal@126/blog/static/1638770402013283547959/
2. http://blog.163.com/digoal@126/blog/static/1638770402013211102130526/
3. http://blog.163.com/digoal@126/blog/static/1638770402012731944439/
4. http://blog.163.com/digoal@126/blog/static/1638770402012731203716/