Londiste 3 replicate case - 1 上节
背景
前面讲解过Londiste3的安装, 有不了解的朋友可以参考如下BLOG:
今天将介绍一下使用londiste3, 从PostgreSQL 9.1.3 复制表以及序列到PostgreSQL 9.2beta1. (这种场景的复制使用PostgreSQL的流复制是办不到的.目前使用slony也是无法做到的,因为slony和PostgreSQL的版本相关, 目前还没有支持PostgreSQL9.2的版本. 正因为slony对版本要求较高, 相对没有londiste灵活, 所以我选择了londiste作为跨数据库版本的复制工具)
londiste3支持合并复制,级联复制等. 合并复制是只可以把多个数据库的表复制到同一个目标表中, 常用在plproxy分区的数据库集群环境中, 同时londiste还支持部分复制, 即复制的源可以带where条件. 支持源和目标的库名不一样, 表名不一样, schema_name不一样, 支持自定义handler等. 功能非常强大. 这一切都基于PostgreSQL queue技术.
以后再讲解londiste3的合并复制和级联复制等.
一、测试环境 :
源库 :
PostgreSQL 9.1.3
host=172.16.3.176 port=1921 user=digoal_01 dbname=digoal_01
4个表,2个序列
目标库1 :
PostgreSQL 9.2 beta1
host=172.16.3.33 port=1919 user=digoal_01 dbname=digoal_01
复制4个表,2个序列
目标库2 :
PostgreSQL 9.2 beta1
host=172.16.3.33 port=1919 user=digoal_02 dbname=digoal_02
复制2个表,2个序列, 但是只复制userid<1000的记录
二、源库测试模型建立
源库的测试模型基于我前面写的《PostgreSQL性能优化综合案例讲解》, 如下 :
创建测试用户和测试库
postgres@db5-> psql
psql (9.1.3)
Type "help" for help.
postgres=# create role digoal_01 nosuperuser nocreatedb nocreaterole noinherit login encrypted password 'digoal_01';
CREATE ROLE
postgres=# create database digoal_01 encoding 'UTF8' template template0 owner digoal_01;
CREATE DATABASE
postgres=# \c digoal_01 digoal_01
You are now connected to database "digoal_01" as user "digoal_01".
digoal_01=> create schema digoal_01 authorization digoal_01;
CREATE SCHEMA
创建测试表
create table user_info
(userid int,
engname text,
cnname text,
occupation text,
birthday date,
signname text,
email text,
qq numeric,
crt_time timestamp without time zone,
mod_time timestamp without time zone
);
create table user_session
(userid int,
logintime timestamp(0) without time zone,
login_count bigint default 0,
logouttime timestamp(0) without time zone,
online_interval interval default interval '0'
);
create table user_login_rec
(userid int,
login_time timestamp without time zone,
ip inet
);
create table user_logout_rec
(userid int,
logout_time timestamp without time zone,
ip inet
);
初始化数据
insert into user_info (userid,engname,cnname,occupation,birthday,signname,email,qq,crt_time,mod_time)
select generate_series(1,200000),
'digoal.zhou',
'德哥',
'DBA',
'1970-01-01'
,E'公益是一辈子的事, I\'m Digoal.Zhou, Just do it!',
'digoal@126.com',
276732431,
clock_timestamp(),
NULL;
insert into user_session (userid) select generate_series(1,200000);
set work_mem='2048MB';
set maintenance_work_mem='2048MB';
alter table user_info add constraint pk_user_info primary key (userid);
alter table user_session add constraint pk_user_session primary key (userid);
业务函数
模拟用户登录的函数
create or replace function f_user_login
(i_userid int,
OUT o_userid int,
OUT o_engname text,
OUT o_cnname text,
OUT o_occupation text,
OUT o_birthday date,
OUT o_signname text,
OUT o_email text,
OUT o_qq numeric
)
as $BODY$
declare
begin
select userid,engname,cnname,occupation,birthday,signname,email,qq
into o_userid,o_engname,o_cnname,o_occupation,o_birthday,o_signname,o_email,o_qq
from user_info where userid=i_userid;
insert into user_login_rec (userid,login_time,ip) values (i_userid,now(),inet_client_addr());
update user_session set logintime=now(),login_count=login_count+1 where userid=i_userid;
return;
end;
$BODY$
language plpgsql;
模拟用户退出的函数
create or replace function f_user_logout
(i_userid int,
OUT o_result int
)
as $BODY$
declare
begin
insert into user_logout_rec (userid,logout_time,ip) values (i_userid,now(),inet_client_addr());
update user_session set logouttime=now(),online_interval=online_interval+(now()-logintime) where userid=i_userid;
o_result := 0;
return;
exception
when others then
o_result := 1;
return;
end;
$BODY$
language plpgsql;
三、准备目标库1, (注意londiste的复制需要表上有主键.所以两个日志表一会在添加的时候会报错, 因为没有主键)
创建测试用户和测试库
PostgreSQL 9.2beta1
host=172.16.3.33 port=1919 user=digoal_01 dbname=digoal_01
pg92@db-172-16-3-33-> psql postgres postgres
psql (9.2beta1)
Type "help" for help.
postgres=# create role digoal_01 nosuperuser nocreatedb nocreaterole noinherit login encrypted password 'digoal_01';
CREATE ROLE
postgres=# create database digoal_01 encoding 'UTF8' template template0 owner digoal_01;
CREATE DATABASE
postgres=# \c digoal_01 digoal_01
You are now connected to database "digoal_01" as user "digoal_01".
digoal_01=> create schema digoal_01 authorization digoal_01;
CREATE SCHEMA
创建测试表
create table user_info
(userid int,
engname text,
cnname text,
occupation text,
birthday date,
signname text,
email text,
qq numeric,
crt_time timestamp without time zone,
mod_time timestamp without time zone
);
create table user_session
(userid int,
logintime timestamp(0) without time zone,
login_count bigint default 0,
logouttime timestamp(0) without time zone,
online_interval interval default interval '0'
);
create table user_login_rec
(userid int,
login_time timestamp without time zone,
ip inet
);
create table user_logout_rec
(userid int,
logout_time timestamp without time zone,
ip inet
);
alter table user_info add constraint pk_user_info primary key (userid);
alter table user_session add constraint pk_user_session primary key (userid);
四、准备目标库2, (目标库2里面只复制了两个用户相关的表, 都有主键.)
创建测试用户和测试库
PostgreSQL 9.2beta1
host=172.16.3.33 port=1919 user=digoal_02 dbname=digoal_02
pg92@db-172-16-3-33-> psql postgres postgres
psql (9.2beta1)
Type "help" for help.
postgres=# create role digoal_02 nosuperuser nocreatedb nocreaterole noinherit login encrypted password 'digoal_02';
CREATE ROLE
postgres=# create database digoal_02 encoding 'UTF8' template template0 owner digoal_02;
CREATE DATABASE
postgres=# \c digoal_02 digoal_02
You are now connected to database "digoal_02" as user "digoal_02".
digoal_01=> create schema digoal_02 authorization digoal_02;
CREATE SCHEMA
创建测试表
create table user_info1
(userid int,
engname text,
cnname text,
occupation text,
birthday date,
signname text,
email text,
qq numeric,
crt_time timestamp without time zone,
mod_time timestamp without time zone
);
create table user_session1
(userid int,
logintime timestamp(0) without time zone,
login_count bigint default 0,
logouttime timestamp(0) without time zone,
online_interval interval default interval '0'
);
alter table user_info1 add constraint pk_user_info primary key (userid);
alter table user_session1 add constraint pk_user_session primary key (userid);
五、从主库复制到目标库1
本次测试中我们把pgq, provider, consumer进程都放在主库, 当然这些都可以放在其他服务器上, 甚至不在主库和目标库的服务器上都行. 只要允许连接到数据库.
主库服务器上配置如下
配置放londiste配置文件的目录以及日志目录和pid文件目录.
mkdir -p /home/postgres/londiste3/log
mkdir -p /home/postgres/londiste3/pid
创建provider进程的配置文件, 配置文件模板可参考 londiste3 –ini的输出
注意db里面我配置了password, 如果你使用的是trust认证或者使用了.pgpass文件, 这里就不需要配密码了.
vi /home/postgres/londiste3/src_digoal_01.ini
[londiste3]
job_name = src_digoal_01
db = host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres
queue_name = replika
logfile = /home/postgres/londiste3/log/src_digoal_01.log
pidfile = /home/postgres/londiste3/pid/src_digoal_01.pid
配置好provider 后, 就可以创建根节点了.
postgres@db5-> londiste3 -v /home/postgres/londiste3/src_digoal_01.ini create-root src_digoal_01 "host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres"
2012-05-30 21:32:36,847 9378 DEBUG Connect 'new_node' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 [...]'
2012-05-30 21:32:36,853 9378 INFO plpgsql is installed
2012-05-30 21:32:36,853 9378 INFO Installing pgq
2012-05-30 21:32:36,853 9378 INFO Reading from /opt/skytools3.0.2/share/skytools3/pgq.sql
2012-05-30 21:32:37,339 9378 INFO pgq.get_batch_cursor is installed
2012-05-30 21:32:37,339 9378 INFO Installing pgq_ext
2012-05-30 21:32:37,339 9378 INFO Reading from /opt/skytools3.0.2/share/skytools3/pgq_ext.sql
2012-05-30 21:32:37,568 9378 INFO Installing pgq_node
2012-05-30 21:32:37,569 9378 INFO Reading from /opt/skytools3.0.2/share/skytools3/pgq_node.sql
2012-05-30 21:32:37,839 9378 INFO Installing londiste
2012-05-30 21:32:37,839 9378 INFO Reading from /opt/skytools3.0.2/share/skytools3/londiste.sql
2012-05-30 21:32:38,117 9378 INFO londiste.global_add_table is installed
2012-05-30 21:32:38,118 9378 DEBUG exec_query: select * from pgq_node.get_node_info('replika')
2012-05-30 21:32:38,121 9378 INFO Initializing node
2012-05-30 21:32:38,121 9378 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'src_digoal_01', 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres', false)
2012-05-30 21:32:38,122 9378 INFO Location registered
2012-05-30 21:32:38,122 9378 DEBUG exec_cmd: select * from pgq_node.create_node('replika', 'root', 'src_digoal_01', 'src_digoal_01', null, null, null)
2012-05-30 21:32:38,377 9378 INFO Node "src_digoal_01" initialized for queue "replika" with type "root"
2012-05-30 21:32:38,378 9378 INFO Done
根节点创建完, 连接到主库的digoal_01库下面, 会看到多了几个schema: londiste, pgq, pgq_ext, pgq_node 这些都是londiste3套件新增的schema, 用于完成复制所必须的.
postgres=# \c digoal_01 digoal_01
You are now connected to database "digoal_01" as user "digoal_01".
digoal_01=> \dn
List of schemas
Name | Owner
-----------+-----------
digoal_01 | digoal_01
londiste | postgres
pgq | postgres
pgq_ext | postgres
pgq_node | postgres
public | postgres
(6 rows)
以下则是新增的这些schema下面的对象,
digoal_01=> select nspname,relkind,relname from pg_class,pg_namespace where relnamespace=pg_namespace.oid and relnamespace in (select oid from pg_namespace where nspname in ('londiste','pgq','pgq_ext','pgq_node','public','digoal_01')) order by relnamespace,relkind,relname;
nspname | relkind | relname
-----------+---------+-------------------------
digoal_01 | i | pk_user_info
digoal_01 | i | pk_user_session
digoal_01 | r | user_info
digoal_01 | r | user_login_rec
digoal_01 | r | user_logout_rec
digoal_01 | r | user_session
pgq | S | batch_id_seq
pgq | S | consumer_co_id_seq
pgq | S | event_1_id_seq
pgq | S | event_1_tick_seq
pgq | S | queue_queue_id_seq
pgq | S | subscription_sub_id_seq
pgq | i | consumer_name_uq
pgq | i | consumer_pkey
pgq | i | event_1_0_txid_idx
pgq | i | event_1_1_txid_idx
pgq | i | event_1_2_txid_idx
pgq | i | queue_name_uq
pgq | i | queue_pkey
pgq | i | rq_pkey
pgq | i | rq_retry_idx
pgq | i | subscription_batch_idx
pgq | i | subscription_pkey
pgq | i | tick_pkey
pgq | r | consumer
pgq | r | event_1
pgq | r | event_1_0
pgq | r | event_1_1
pgq | r | event_1_2
pgq | r | event_template
pgq | r | queue
pgq | r | retry_queue
pgq | r | subscription
pgq | r | tick
pgq_ext | i | completed_batch_pkey
pgq_ext | i | completed_event_pkey
pgq_ext | i | completed_tick_pkey
pgq_ext | i | partial_batch_pkey
pgq_ext | r | completed_batch
pgq_ext | r | completed_event
pgq_ext | r | completed_tick
pgq_ext | r | partial_batch
pgq_node | i | local_state_pkey
pgq_node | i | node_info_pkey
pgq_node | i | node_location_pkey
pgq_node | i | subscriber_info_pkey
pgq_node | r | local_state
pgq_node | r | node_info
pgq_node | r | node_location
pgq_node | r | subscriber_info
londiste | S | seq_info_nr_seq
londiste | S | table_info_nr_seq
londiste | i | applied_execute_pkey
londiste | i | pending_fkeys_pkey
londiste | i | seq_info_pkey
londiste | i | table_info_pkey
londiste | r | applied_execute
londiste | r | pending_fkeys
londiste | r | seq_info
londiste | r | table_info
以下是新增的函数, 复制, batch, 触发器handle, 注册, 注销, 映射关系等.
digoal_01=> select nspname,lanname,proname from pg_proc,pg_namespace,pg_language where prolang=pg_language.oid and pronamespace=pg_namespace.oid and pronamespace in (select oid from pg_namespace where nspname in ('londiste','pgq','pgq_ext','pgq_node','public','digoal_01')) order by pronamespace,proname;
nspname | lanname | proname
-----------+---------+---------------------------
digoal_01 | plpgsql | f_user_login
digoal_01 | plpgsql | f_user_logout
pgq | plpgsql | _grant_perms_from
pgq | plpgsql | batch_event_sql
pgq | plpgsql | batch_event_tables
pgq | plpgsql | batch_retry
pgq | plpgsql | create_queue
pgq | plpgsql | current_event_table
pgq | plpgsql | drop_queue
pgq | plpgsql | drop_queue
pgq | plpgsql | event_retry
pgq | plpgsql | event_retry
pgq | plpgsql | event_retry_raw
pgq | plpgsql | find_tick_helper
pgq | plpgsql | finish_batch
pgq | plpgsql | force_tick
pgq | plpgsql | get_batch_cursor
pgq | plpgsql | get_batch_cursor
pgq | plpgsql | get_batch_events
pgq | plpgsql | get_batch_info
pgq | plpgsql | get_consumer_info
pgq | plpgsql | get_consumer_info
pgq | plpgsql | get_consumer_info
pgq | plpgsql | get_queue_info
pgq | plpgsql | get_queue_info
pgq | plpgsql | grant_perms
pgq | plpgsql | insert_event
pgq | plpgsql | insert_event
pgq | c | insert_event_raw
pgq | c | logutriga
pgq | plpgsql | maint_operations
pgq | plpgsql | maint_retry_events
pgq | plpgsql | maint_rotate_tables_step1
pgq | plpgsql | maint_rotate_tables_step2
pgq | plpgsql | maint_tables_to_vacuum
pgq | plpgsql | next_batch
pgq | plpgsql | next_batch_custom
pgq | plpgsql | next_batch_info
pgq | plpgsql | register_consumer
pgq | plpgsql | register_consumer_at
pgq | plpgsql | seq_getval
pgq | plpgsql | seq_setval
pgq | plpgsql | set_queue_config
pgq | c | sqltriga
pgq | plpgsql | ticker
pgq | plpgsql | ticker
pgq | plpgsql | ticker
pgq | plpgsql | tune_storage
pgq | plpgsql | unregister_consumer
pgq | plpgsql | upgrade_schema
pgq | plpgsql | version
pgq_ext | plpgsql | get_last_tick
pgq_ext | plpgsql | get_last_tick
pgq_ext | plpgsql | is_batch_done
pgq_ext | plpgsql | is_batch_done
pgq_ext | plpgsql | is_event_done
pgq_ext | plpgsql | is_event_done
pgq_ext | plpgsql | set_batch_done
pgq_ext | plpgsql | set_batch_done
pgq_ext | plpgsql | set_event_done
pgq_ext | plpgsql | set_event_done
pgq_ext | plpgsql | set_last_tick
pgq_ext | plpgsql | set_last_tick
pgq_ext | plpgsql | upgrade_schema
pgq_ext | plpgsql | version
pgq_node | plpgsql | change_consumer_provider
pgq_node | plpgsql | create_node
pgq_node | plpgsql | demote_root
pgq_node | plpgsql | drop_node
pgq_node | plpgsql | get_consumer_info
pgq_node | plpgsql | get_consumer_state
pgq_node | plpgsql | get_node_info
pgq_node | plpgsql | get_queue_locations
pgq_node | plpgsql | get_subscriber_info
pgq_node | plpgsql | get_worker_state
pgq_node | plpgsql | is_leaf_node
pgq_node | plpgsql | is_root_node
pgq_node | plpgsql | maint_watermark
pgq_node | plpgsql | promote_branch
pgq_node | plpgsql | register_consumer
pgq_node | plpgsql | register_location
pgq_node | plpgsql | register_subscriber
pgq_node | plpgsql | set_consumer_completed
pgq_node | plpgsql | set_consumer_error
pgq_node | plpgsql | set_consumer_paused
pgq_node | plpgsql | set_consumer_uptodate
pgq_node | plpgsql | set_global_watermark
pgq_node | plpgsql | set_node_attrs
pgq_node | plpgsql | set_partition_watermark
pgq_node | plpgsql | set_subscriber_watermark
pgq_node | plpgsql | unregister_consumer
pgq_node | plpgsql | unregister_location
pgq_node | plpgsql | unregister_subscriber
pgq_node | plpgsql | upgrade_schema
pgq_node | plpgsql | version
londiste | plpgsql | _coordinate_copy
londiste | plpgsql | drop_table_fkey
londiste | plpgsql | drop_table_triggers
londiste | plpgsql | execute_finish
londiste | plpgsql | execute_start
londiste | plpgsql | find_column_types
londiste | plpgsql | find_rel_oid
londiste | plpgsql | find_seq_oid
londiste | plpgsql | find_table_fkeys
londiste | plpgsql | find_table_oid
londiste | plpgsql | get_seq_list
londiste | plpgsql | get_table_list
londiste | plpgsql | get_table_pending_fkeys
londiste | plpgsql | get_valid_pending_fkeys
londiste | plpgsql | global_add_table
londiste | plpgsql | global_remove_seq
londiste | plpgsql | global_remove_table
londiste | plpgsql | global_update_seq
londiste | sql | is_replica_func
londiste | plpgsql | local_add_seq
londiste | plpgsql | local_add_table
londiste | plpgsql | local_add_table
londiste | plpgsql | local_add_table
londiste | plpgsql | local_add_table
londiste | plpgsql | local_remove_seq
londiste | plpgsql | local_remove_table
londiste | plpgsql | local_set_table_attrs
londiste | plpgsql | local_set_table_state
londiste | plpgsql | local_set_table_struct
londiste | plpgsql | local_show_missing
londiste | plpgsql | make_fqname
londiste | plpgsql | quote_fqname
londiste | plpgsql | restore_table_fkey
londiste | plpgsql | root_check_seqs
londiste | plpgsql | root_check_seqs
londiste | plpgsql | root_notify_change
londiste | plpgsql | split_fqname
londiste | plpgsql | table_info_trigger
londiste | plpgsql | upgrade_schema
londiste | plpgsql | version
(135 rows)
创建完根节点的这些对象, 函数等之后. 如果没有异常, 就可以启动provider的worker (Replay events to subscriber: it is needed to make the replication active as it will start to replay the events.) 进程了.
postgres@db5-> londiste3 -d /home/postgres/londiste3/src_digoal_01.ini worker
其实是一个python进程, 如下
postgres@db5-> ps -ewf|grep python
postgres 27316 1 0 15:53 ? 00:00:00 /opt/python2.7.3/bin/python /opt/skytools3.0.2/bin/londiste3 -d /home/postgres/londiste3/src_digoal_01.ini worker
根配置完了, 就可以辐射出去了, 接下来配置目标1的配置文件 :
同样, 注意db里面我配置了password, 如果你使用的是trust认证或者使用了.pgpass文件, 这里就不需要配密码了.
queue的名字和provider的queue的名字必须一致. job_name和前面的不能一致.
vi /home/postgres/londiste3/dst1_digoal_01.ini
[londiste3]
job_name = dst1_digoal_01
db = host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres
queue_name = replika
logfile = /home/postgres/londiste3/log/dst1_digoal_01.log
pidfile = /home/postgres/londiste3/pid/dst1_digoal_01.pid
配置好后, 创建叶节点, 注意我这里创建的是页节点, 而不是树枝节点, 因为本次不讲级联复制. 如果是级联的话创建的是树枝节点.
postgres@db5-> londiste3 -v /home/postgres/londiste3/dst1_digoal_01.ini create-leaf dst1_digoal_01 "host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres" --provider="host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres"
2012-05-30 21:33:47,316 9412 DEBUG Connect 'new_node' to 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 [...]'
2012-05-30 21:33:47,324 9412 INFO plpgsql is installed
2012-05-30 21:33:47,325 9412 INFO Installing pgq
2012-05-30 21:33:47,325 9412 INFO Reading from /opt/skytools3.0.2/share/skytools3/pgq.sql
2012-05-30 21:33:47,721 9412 INFO pgq.get_batch_cursor is installed
2012-05-30 21:33:47,721 9412 INFO Installing pgq_ext
2012-05-30 21:33:47,722 9412 INFO Reading from /opt/skytools3.0.2/share/skytools3/pgq_ext.sql
2012-05-30 21:33:47,918 9412 INFO Installing pgq_node
2012-05-30 21:33:47,919 9412 INFO Reading from /opt/skytools3.0.2/share/skytools3/pgq_node.sql
2012-05-30 21:33:48,217 9412 INFO Installing londiste
2012-05-30 21:33:48,217 9412 INFO Reading from /opt/skytools3.0.2/share/skytools3/londiste.sql
2012-05-30 21:33:48,430 9412 INFO londiste.global_add_table is installed
2012-05-30 21:33:48,431 9412 DEBUG exec_query: select * from pgq_node.get_node_info('replika')
2012-05-30 21:33:48,434 9412 INFO Initializing node
2012-05-30 21:33:48,434 9412 DEBUG Connect 'root_db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 [...]'
2012-05-30 21:33:48,437 9412 DEBUG exec_query: select * from pgq_node.get_node_info('replika')
2012-05-30 21:33:48,446 9412 DEBUG db='host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres' -- type='root' provider='host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres'
2012-05-30 21:33:48,446 9412 DEBUG exec_query: select * from pgq_node.get_node_info('replika')
2012-05-30 21:33:48,447 9412 DEBUG exec_query: select * from pgq_node.get_queue_locations('replika')
2012-05-30 21:33:48,448 9412 DEBUG Connect 'provider_db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 [...]'
2012-05-30 21:33:48,451 9412 DEBUG exec_query: select node_type, node_name from pgq_node.get_node_info('replika')
2012-05-30 21:33:48,460 9412 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'dst1_digoal_01', 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres', false)
2012-05-30 21:33:48,463 9412 INFO Location registered
2012-05-30 21:33:48,463 9412 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'dst1_digoal_01', 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres', false)
2012-05-30 21:33:48,466 9412 INFO Location registered
2012-05-30 21:33:48,466 9412 DEBUG exec_cmd: select * from pgq_node.register_subscriber('replika', 'dst1_digoal_01', 'dst1_digoal_01', null)
2012-05-30 21:33:48,470 9412 INFO Subscriber registered: dst1_digoal_01
2012-05-30 21:33:48,471 9412 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'dst1_digoal_01', 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres', false)
2012-05-30 21:33:48,472 9412 INFO Location registered
2012-05-30 21:33:48,472 9412 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'src_digoal_01', 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres', 'False')
2012-05-30 21:33:48,473 9412 INFO Location registered
2012-05-30 21:33:48,473 9412 DEBUG exec_cmd: select * from pgq_node.create_node('replika', 'leaf', 'dst1_digoal_01', 'dst1_digoal_01', 'src_digoal_01', '1', null)
2012-05-30 21:33:48,475 9412 INFO Node "dst1_digoal_01" initialized for queue "replika" with type "leaf"
2012-05-30 21:33:48,477 9412 INFO Done
创建完叶节点, 启动subscriber的worker进程 :
postgres@db5-> londiste3 -d /home/postgres/londiste3/dst1_digoal_01.ini worker
可以看到现在系统中有两个python进程, 分别是provider和subscriber的worker进程.
postgres@db5-> ps -ewf|grep python
postgres 27316 1 0 15:53 ? 00:00:00 /opt/python2.7.3/bin/python /opt/skytools3.0.2/bin/londiste3 -d /home/postgres/londiste3/src_digoal_01.ini worker
postgres 27439 1 0 15:57 ? 00:00:00 /opt/python2.7.3/bin/python /opt/skytools3.0.2/bin/londiste3 -d /home/postgres/londiste3/dst1_digoal_01.ini worker
接下来配置pgq的ticker进程需要的配置文件, ticker是用来做batch的.
(Londiste needs a ticker which has to target the P(rovider) database, and can be run from another machine. The common usage is to run the ticker directly on the Provider database host.
Any ticker can host as many queues as you want. Each queue has a unique name and can be used by as many subscribers as needed.
If you have several copies of the same database, you can subscribe to the same queue from several subscribers. If you want to have different subsets of the same source database on several subscribers, you either can have those use the same queue but only a part of the tables in it, or have a queue per set of tables.)
vi /home/postgres/londiste3/pgqd.ini
[pgqd]
base_connstr = host=172.16.3.176 port=1921 user=postgres password=postgres
initial_database = template1
logfile = /home/postgres/londiste3/log/pgqd.log
pidfile = /home/postgres/londiste3/pid/pgqd.pid
启动pgqd进程.
postgres@db5-> pgqd -d /home/postgres/londiste3/pgqd.ini
2012-05-30 21:35:07.497 9446 LOG Starting pgqd 3.0.2
pgqd是一个c程序.
postgres@db5-> ps -ewf|grep pgqd
postgres 27637 1 0 16:01 ? 00:00:00 pgqd -d /home/postgres/londiste3/pgqd.ini
异常注意
使用普通用户连接的话看到src和dst的数据库日志都有报错, set session_replication_role = ‘replica’这个必须超级用户执行,
我以前写过一篇关于这个参数的BLOG, 有兴趣的朋友参考如下 :
《Can session_replication_role used like MySQL’s BlackHole Engine?》
http://blog.163.com/digoal@126/blog/static/163877040201119111234570/
postgres@db5-> less pgqd.log
2012-05-30 15:59:17.110 CST,"digoal_01","digoal_01",18490,"172.16.3.176:33835",4fc5d355.483a,1,"SET",2012-05-30 15:59:17 CST,3/295,0,ERROR,42501,"permission denied to set parameter ""session_replication_role""",,,,,,"set session_replication_role = 'replica'",,,""
londiste的日志也有报错
postgres@db5-> less src_digoal_01.log
2012-05-30 16:03:29,221 27316 WARNING Failure to call pgq_node.set_consumer_error()
2012-05-30 16:03:29,221 27316 ERROR Job src_digoal_01 got error on connection 'db': permission denied to set parameter "session_repl
ication_role". Query: set session_replication_role = 'replica'
Traceback (most recent call last):
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/consumer.py", line 285, in exception_hook
dst_db = self.get_database(self.target_db)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 733, in get_database
return dbc.get_connection(params['isolation_level'], clist)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 954, in get_connection
self.setup_func(self.name, self.conn)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/londiste/playback.py", line 322, in connection_hook
curs.execute("set session_replication_role = 'replica'")
File "/opt/python2.7.3/lib/python2.7/site-packages/psycopg2/extras.py", line 123, in execute
return _cursor.execute(self, query, vars)
ProgrammingError: permission denied to set parameter "session_replication_role"
postgres@db5-> less dst1_digoal_01.log
2012-05-30 16:03:34,347 27439 WARNING Failure to call pgq_node.set_consumer_error()
2012-05-30 16:03:34,347 27439 ERROR Job dst1_digoal_01 got error on connection 'db': permission denied to set parameter "session_rep
lication_role". Query: set session_replication_role = 'replica'
Traceback (most recent call last):
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/consumer.py", line 285, in exception_hook
dst_db = self.get_database(self.target_db)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 733, in get_database
return dbc.get_connection(params['isolation_level'], clist)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 954, in get_connection
self.setup_func(self.name, self.conn)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/londiste/playback.py", line 322, in connection_hook
curs.execute("set session_replication_role = 'replica'")
File "/opt/python2.7.3/lib/python2.7/site-packages/psycopg2/extras.py", line 123, in execute
return _cursor.execute(self, query, vars)
ProgrammingError: permission denied to set parameter "session_replication_role"
因此如果前面配置的是普通用户, 可以修改配置文件, 改成超级用户后reload
postgres@db5-> cat src_digoal_01.ini
[londiste3]
job_name = src_digoal_01
db = host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres
queue_name = replika
logfile = /home/postgres/londiste3/log/src_digoal_01.log
pidfile = /home/postgres/londiste3/pid/src_digoal_01.pid
postgres@db5-> cat dst1_digoal_01.ini
[londiste3]
job_name = dst1_digoal_01
db = host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres
queue_name = replika
logfile = /home/postgres/londiste3/log/dst1_digoal_01.log
pidfile = /home/postgres/londiste3/pid/dst1_digoal_01.pid
postgres@db5-> londiste3 -r /home/postgres/londiste3/src_digoal_01.ini
postgres@db5-> londiste3 -r /home/postgres/londiste3/dst1_digoal_01.ini
正常后, 可以查看当前的复制状态, status用于输出节点树.
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini status
Queue: replika Local node: src_digoal_01
src_digoal_01 (root)
| Tables: 0/0/0
| Lag: 26s, Tick: 14
+--dst1_digoal_01 (leaf)
Tables: 0/0/0
Lag: 26s, Tick: 14
members用于输出这个配置文件下的复制节点member信息.
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini members
Member info on src_digoal_01@replika:
node_name dead node_location
--------------- --------------- ----------------------------------------------------------------------------
dst1_digoal_01 False host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres
src_digoal_01 False host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres
开始添加需要复制的表, 首先是用provider的配置文件添加, 先添加两个带主键的表.
postgres@db5-> londiste3 -v /home/postgres/londiste3/src_digoal_01.ini add-table digoal_01.user_info digoal_01.user_session
2012-05-30 21:37:57,024 9537 DEBUG Connect 'db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 [...]'
2012-05-30 21:37:57,028 9537 DEBUG exec_query: select * from pgq_node.get_node_info('replika')
2012-05-30 21:37:57,036 9537 DEBUG exec_query: select * from pgq_node.get_queue_locations('replika')
2012-05-30 21:37:57,037 9537 DEBUG exec_cmd: select * from pgq_node.get_node_info('replika')
2012-05-30 21:37:57,038 9537 DEBUG 100 Ok
2012-05-30 21:37:57,039 9537 DEBUG Connect 'provider_db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 [...]'
2012-05-30 21:37:57,047 9537 DEBUG exec_cmd: select * from londiste.local_add_table('replika', 'digoal_01.user_info', '[]', null, null)
2012-05-30 21:37:57,064 9537 INFO Table added: digoal_01.user_info
2012-05-30 21:37:57,065 9537 DEBUG exec_cmd: select * from londiste.local_add_table('replika', 'digoal_01.user_session', '[]', null, null)
2012-05-30 21:37:57,067 9537 INFO Table added: digoal_01.user_session
开始添加需要复制的表, 然后是用subscriber的配置文件添加, 先添加两个带主键的表.
postgres@db5-> londiste3 -v /home/postgres/londiste3/dst1_digoal_01.ini add-table digoal_01.user_info digoal_01.user_session
2012-05-30 21:38:08,369 9548 DEBUG Connect 'db' to 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 [...]'
2012-05-30 21:38:08,373 9548 DEBUG exec_query: select * from pgq_node.get_node_info('replika')
2012-05-30 21:38:08,377 9548 DEBUG exec_query: select * from pgq_node.get_queue_locations('replika')
2012-05-30 21:38:08,378 9548 DEBUG exec_cmd: select * from pgq_node.get_node_info('replika')
2012-05-30 21:38:08,379 9548 DEBUG 100 Ok
2012-05-30 21:38:08,380 9548 DEBUG Connect 'provider_db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 [...]'
2012-05-30 21:38:08,392 9548 DEBUG exec_cmd: select * from londiste.local_add_table('replika', 'digoal_01.user_info', '[]', null, null)
2012-05-30 21:38:08,405 9548 INFO Table added: digoal_01.user_info
2012-05-30 21:38:08,406 9548 DEBUG exec_cmd: select * from londiste.local_add_table('replika', 'digoal_01.user_session', '[]', null, null)
2012-05-30 21:38:08,410 9548 INFO Table added: digoal_01.user_session
添加完后, 查看provider下面的加入复制的表
postgres@db5-> londiste3 -v /home/postgres/londiste3/src_digoal_01.ini tables
2012-05-30 21:38:36,499 9563 DEBUG Connect 'db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 [...]'
2012-05-30 21:38:36,503 9563 DEBUG display_table: select table_name, merge_state, table_attrs
from londiste.get_table_list('replika') where local
order by table_name
Tables on node
table_name merge_state table_attrs
---------------------- --------------- ---------------
digoal_01.user_info ok
digoal_01.user_session ok
添加完后, 查看subscriber 1下面的加入复制的表, 看到这里的状态是in-copy的, 表示正在初始化拷贝.
postgres@db5-> londiste3 -v /home/postgres/londiste3/dst1_digoal_01.ini tables
2012-05-30 21:38:41,135 9567 DEBUG Connect 'db' to 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 [...]'
2012-05-30 21:38:41,139 9567 DEBUG display_table: select table_name, merge_state, table_attrs
from londiste.get_table_list('replika') where local
order by table_name
Tables on node
table_name merge_state table_attrs
---------------------- --------------- ---------------
digoal_01.user_info in-copy
digoal_01.user_session None
注意londiste3的add table选项, 表示允许的并行拷贝的表的数量, 默认是1个1个的串行拷贝.
max-parallel-copy=MAX_PARALLEL_COPY
max number of parallel copy processes
另外需要注意的是, 初始化拷贝过程不是一个snapshot的过程, 比如添加的2个表, 即使并行拷贝也不是snapshot的.
还有两个选项是seqs, missing分别用于查看复制的序列和未加入复制的表.
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini seqs
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini missing
Missing objects on node
obj_kind obj_name
--------------- -------------------------
r digoal_01.user_login_rec
r digoal_01.user_logout_rec
接下来添加两个日志表, 因为日志表上没有PK, 所以添加不成功.
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini add-table digoal_01.user_login_rec digoal_01.user_logout_rec
2012-05-30 16:12:25,239 28118 ERROR Primary key missing on table: digoal_01.user_login_rec
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini missing
Missing objects on node
obj_kind obj_name
--------------- -------------------------
r digoal_01.user_login_rec
r digoal_01.user_logout_rec
因此需要在主节点和目标节点上把PK加上去.
postgres@db5-> psql digoal_01 digoal_01
psql (9.1.3)
Type "help" for help.
digoal_01=> alter table digoal_01.user_login_rec add column id serial8 primary key;
NOTICE: ALTER TABLE will create implicit sequence "user_login_rec_id_seq" for serial column "user_login_rec.id"
NOTICE: ALTER TABLE / ADD PRIMARY KEY will create implicit index "user_login_rec_pkey" for table "user_login_rec"
ALTER TABLE
digoal_01=> alter table digoal_01.user_logout_rec add column id serial8 primary key;
NOTICE: ALTER TABLE will create implicit sequence "user_logout_rec_id_seq" for serial column "user_logout_rec.id"
NOTICE: ALTER TABLE / ADD PRIMARY KEY will create implicit index "user_logout_rec_pkey" for table "user_logout_rec"
ALTER TABLE
pg92@db-172-16-3-33-> psql digoal_01 digoal_01
psql (9.2beta1)
Type "help" for help.
digoal_01=> alter table digoal_01.user_login_rec add column id serial8 primary key;
NOTICE: ALTER TABLE will create implicit sequence "user_login_rec_id_seq" for serial column "user_login_rec.id"
NOTICE: ALTER TABLE / ADD PRIMARY KEY will create implicit index "user_login_rec_pkey" for table "user_login_rec"
ALTER TABLE
digoal_01=> alter table digoal_01.user_logout_rec add column id serial8 primary key;
NOTICE: ALTER TABLE will create implicit sequence "user_logout_rec_id_seq" for serial column "user_logout_rec.id"
NOTICE: ALTER TABLE / ADD PRIMARY KEY will create implicit index "user_logout_rec_pkey" for table "user_logout_rec"
ALTER TABLE
加好PK后, 再添加两个日志表就OK了. 记得加完provider还要加subscriber.
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini add-table digoal_01.user_login_rec digoal_01.user_logout_rec
2012-05-31 08:50:47,715 30213 INFO Table added: digoal_01.user_login_rec
2012-05-31 08:50:47,719 30213 INFO Table added: digoal_01.user_logout_rec
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini add-table digoal_01.user_login_rec digoal_01.user_logout_rec
2012-05-31 08:50:53,307 30219 INFO Table added: digoal_01.user_login_rec
2012-05-31 08:50:53,313 30219 INFO Table added: digoal_01.user_logout_rec
-- 新增了两个序列, 需要使用add-seq添加
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini missing
Missing objects on node
obj_kind obj_name
--------------- --------------------------------
S digoal_01.user_login_rec_id_seq
S digoal_01.user_logout_rec_id_seq
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini tables
Tables on node
table_name merge_state table_attrs
------------------------- --------------- ---------------
digoal_01.user_info ok
digoal_01.user_login_rec ok
digoal_01.user_logout_rec ok
digoal_01.user_session ok
添加序列
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini add-seq digoal_01.user_login_rec_id_seq digoal_01.user_logout_rec_id_seq
2012-05-31 08:53:34,666 30314 INFO Sequence added: digoal_01.user_login_rec_id_seq
2012-05-31 08:53:34,668 30314 INFO Sequence added: digoal_01.user_logout_rec_id_seq
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini add-seq digoal_01.user_login_rec_id_seq digoal_01.user_logout_rec_id_seq
2012-05-31 08:53:39,322 30318 INFO Sequence added: digoal_01.user_login_rec_id_seq
2012-05-31 08:53:39,324 30318 INFO Sequence added: digoal_01.user_logout_rec_id_seq
查看添加的序列
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini seqs
Sequences on node
seq_name local last_value
-------------------------------- --------------- ---------------
digoal_01.user_login_rec_id_seq True 4396741
digoal_01.user_logout_rec_id_seq True 30001
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini seqs
Sequences on node
seq_name local last_value
-------------------------------- --------------- ---------------
digoal_01.user_login_rec_id_seq True 4396741
digoal_01.user_logout_rec_id_seq True 30001
其中provider的worker进程只连一个数据库(主库)
consumer的worker进程则连两个数据库(主库和目标库1).
postgres@db5-> netstat -anp|grep python
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 172.16.3.176:42865 172.16.3.176:1921 ESTABLISHED 27316/python
tcp 0 0 172.16.3.176:42869 172.16.3.176:1921 ESTABLISHED 27439/python
tcp 0 0 172.16.3.176:3172 172.16.3.33:1919 ESTABLISHED 27439/python
postgres@db5-> ps -ewf|grep python
postgres 27316 1 0 15:53 ? 00:00:00 /opt/python2.7.3/bin/python /opt/skytools3.0.2/bin/londiste3 -d /home/postgres/londiste3/src_digoal_01.ini worker
postgres 27439 1 0 15:57 ? 00:00:00 /opt/python2.7.3/bin/python /opt/skytools3.0.2/bin/londiste3 -d /home/postgres/londiste3/dst1_digoal_01.ini worker
下节URL
《Londiste 3 replicate case - 1 下节》