Londiste 3 replicate case - 1 上节

18 minute read

背景

前面讲解过Londiste3的安装, 有不了解的朋友可以参考如下BLOG:

《Londiste3 Install》

今天将介绍一下使用londiste3, 从PostgreSQL 9.1.3 复制表以及序列到PostgreSQL 9.2beta1. (这种场景的复制使用PostgreSQL的流复制是办不到的.目前使用slony也是无法做到的,因为slony和PostgreSQL的版本相关, 目前还没有支持PostgreSQL9.2的版本. 正因为slony对版本要求较高, 相对没有londiste灵活, 所以我选择了londiste作为跨数据库版本的复制工具)

londiste3支持合并复制,级联复制等. 合并复制是只可以把多个数据库的表复制到同一个目标表中, 常用在plproxy分区的数据库集群环境中, 同时londiste还支持部分复制, 即复制的源可以带where条件. 支持源和目标的库名不一样, 表名不一样, schema_name不一样, 支持自定义handler等. 功能非常强大. 这一切都基于PostgreSQL queue技术.

以后再讲解londiste3的合并复制和级联复制等.

一、测试环境 :

源库 :

PostgreSQL 9.1.3  
host=172.16.3.176 port=1921 user=digoal_01 dbname=digoal_01  

4个表,2个序列

目标库1 :

PostgreSQL 9.2 beta1  
host=172.16.3.33 port=1919 user=digoal_01 dbname=digoal_01   

复制4个表,2个序列

目标库2 :

PostgreSQL 9.2 beta1  
host=172.16.3.33 port=1919 user=digoal_02 dbname=digoal_02  

复制2个表,2个序列, 但是只复制userid<1000的记录

二、源库测试模型建立

源库的测试模型基于我前面写的《PostgreSQL性能优化综合案例讲解》, 如下 :

《PostgreSQL性能优化综合案例讲解 - 1》

《PostgreSQL性能优化综合案例讲解 - 2》

创建测试用户和测试库

postgres@db5-> psql  
psql (9.1.3)  
Type "help" for help.  
  
postgres=# create role digoal_01 nosuperuser nocreatedb nocreaterole noinherit login encrypted password 'digoal_01';  
CREATE ROLE  
postgres=# create database digoal_01 encoding 'UTF8' template template0 owner digoal_01;  
CREATE DATABASE  
postgres=# \c digoal_01 digoal_01  
You are now connected to database "digoal_01" as user "digoal_01".  
digoal_01=> create schema digoal_01 authorization digoal_01;  
CREATE SCHEMA  

创建测试表

create table user_info  
(userid int,  
engname text,  
cnname text,  
occupation text,  
birthday date,  
signname text,  
email text,  
qq numeric,  
crt_time timestamp without time zone,  
mod_time timestamp without time zone  
);  
  
create table user_session  
(userid int,  
logintime timestamp(0) without time zone,  
login_count bigint default 0,  
logouttime timestamp(0) without time zone,  
online_interval interval default interval '0'  
);  
  
create table user_login_rec  
(userid int,  
login_time timestamp without time zone,  
ip inet  
);  
  
create table user_logout_rec  
(userid int,  
logout_time timestamp without time zone,  
ip inet  
);  

初始化数据

insert into user_info (userid,engname,cnname,occupation,birthday,signname,email,qq,crt_time,mod_time)  
select generate_series(1,200000),  
'digoal.zhou',  
'德哥',  
'DBA',  
'1970-01-01'  
,E'公益是一辈子的事, I\'m Digoal.Zhou, Just do it!',  
'digoal@126.com',  
276732431,  
clock_timestamp(),  
NULL;  
  
insert into user_session (userid) select generate_series(1,200000);  
  
set work_mem='2048MB';  
set maintenance_work_mem='2048MB';  
alter table user_info add constraint pk_user_info primary key (userid);  
alter table user_session add constraint pk_user_session primary key (userid);  

业务函数

模拟用户登录的函数

create or replace function f_user_login   
(i_userid int,  
OUT o_userid int,  
OUT o_engname text,  
OUT o_cnname text,  
OUT o_occupation text,  
OUT o_birthday date,  
OUT o_signname text,  
OUT o_email text,  
OUT o_qq numeric  
)  
as $BODY$  
declare  
begin  
select userid,engname,cnname,occupation,birthday,signname,email,qq  
into o_userid,o_engname,o_cnname,o_occupation,o_birthday,o_signname,o_email,o_qq  
from user_info where userid=i_userid;  
insert into user_login_rec (userid,login_time,ip) values (i_userid,now(),inet_client_addr());  
update user_session set logintime=now(),login_count=login_count+1 where userid=i_userid;  
return;  
end;  
$BODY$  
language plpgsql;  

模拟用户退出的函数

create or replace function f_user_logout  
(i_userid int,  
OUT o_result int  
)  
as $BODY$  
declare  
begin  
insert into user_logout_rec (userid,logout_time,ip) values (i_userid,now(),inet_client_addr());  
update user_session set logouttime=now(),online_interval=online_interval+(now()-logintime) where userid=i_userid;  
o_result := 0;  
return;  
exception   
when others then  
o_result := 1;  
return;  
end;  
$BODY$  
language plpgsql;  

三、准备目标库1, (注意londiste的复制需要表上有主键.所以两个日志表一会在添加的时候会报错, 因为没有主键)

创建测试用户和测试库

PostgreSQL 9.2beta1  
host=172.16.3.33 port=1919 user=digoal_01 dbname=digoal_01  
pg92@db-172-16-3-33-> psql postgres postgres  
psql (9.2beta1)  
Type "help" for help.  
  
postgres=# create role digoal_01 nosuperuser nocreatedb nocreaterole noinherit login encrypted password 'digoal_01';  
CREATE ROLE  
postgres=# create database digoal_01 encoding 'UTF8' template template0 owner digoal_01;  
CREATE DATABASE  
postgres=# \c digoal_01 digoal_01  
You are now connected to database "digoal_01" as user "digoal_01".  
digoal_01=> create schema digoal_01 authorization digoal_01;  
CREATE SCHEMA  

创建测试表

create table user_info  
(userid int,  
engname text,  
cnname text,  
occupation text,  
birthday date,  
signname text,  
email text,  
qq numeric,  
crt_time timestamp without time zone,  
mod_time timestamp without time zone  
);  
  
create table user_session  
(userid int,  
logintime timestamp(0) without time zone,  
login_count bigint default 0,  
logouttime timestamp(0) without time zone,  
online_interval interval default interval '0'  
);  
  
create table user_login_rec  
(userid int,  
login_time timestamp without time zone,  
ip inet  
);  
  
create table user_logout_rec  
(userid int,  
logout_time timestamp without time zone,  
ip inet  
);  
  
alter table user_info add constraint pk_user_info primary key (userid);  
alter table user_session add constraint pk_user_session primary key (userid);  

四、准备目标库2, (目标库2里面只复制了两个用户相关的表, 都有主键.)

创建测试用户和测试库

PostgreSQL 9.2beta1  
host=172.16.3.33 port=1919 user=digoal_02 dbname=digoal_02  
pg92@db-172-16-3-33-> psql postgres postgres  
psql (9.2beta1)  
Type "help" for help.  
  
postgres=# create role digoal_02 nosuperuser nocreatedb nocreaterole noinherit login encrypted password 'digoal_02';  
CREATE ROLE  
postgres=# create database digoal_02 encoding 'UTF8' template template0 owner digoal_02;  
CREATE DATABASE  
postgres=# \c digoal_02 digoal_02  
You are now connected to database "digoal_02" as user "digoal_02".  
digoal_01=> create schema digoal_02 authorization digoal_02;  
CREATE SCHEMA  

创建测试表

create table user_info1  
(userid int,  
engname text,  
cnname text,  
occupation text,  
birthday date,  
signname text,  
email text,  
qq numeric,  
crt_time timestamp without time zone,  
mod_time timestamp without time zone  
);  
  
create table user_session1  
(userid int,  
logintime timestamp(0) without time zone,  
login_count bigint default 0,  
logouttime timestamp(0) without time zone,  
online_interval interval default interval '0'  
);  
  
alter table user_info1 add constraint pk_user_info primary key (userid);  
alter table user_session1 add constraint pk_user_session primary key (userid);  

五、从主库复制到目标库1
本次测试中我们把pgq, provider, consumer进程都放在主库, 当然这些都可以放在其他服务器上, 甚至不在主库和目标库的服务器上都行. 只要允许连接到数据库.

主库服务器上配置如下

配置放londiste配置文件的目录以及日志目录和pid文件目录.

mkdir -p /home/postgres/londiste3/log  
mkdir -p /home/postgres/londiste3/pid  

创建provider进程的配置文件, 配置文件模板可参考 londiste3 –ini的输出

注意db里面我配置了password, 如果你使用的是trust认证或者使用了.pgpass文件, 这里就不需要配密码了.

vi /home/postgres/londiste3/src_digoal_01.ini  
[londiste3]  
job_name = src_digoal_01  
db = host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres  
queue_name = replika  
logfile = /home/postgres/londiste3/log/src_digoal_01.log  
pidfile = /home/postgres/londiste3/pid/src_digoal_01.pid  

配置好provider 后, 就可以创建根节点了.

postgres@db5-> londiste3 -v /home/postgres/londiste3/src_digoal_01.ini create-root src_digoal_01 "host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres"  
2012-05-30 21:32:36,847 9378 DEBUG Connect 'new_node' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01  [...]'  
2012-05-30 21:32:36,853 9378 INFO plpgsql is installed  
2012-05-30 21:32:36,853 9378 INFO Installing pgq  
2012-05-30 21:32:36,853 9378 INFO   Reading from /opt/skytools3.0.2/share/skytools3/pgq.sql  
2012-05-30 21:32:37,339 9378 INFO pgq.get_batch_cursor is installed  
2012-05-30 21:32:37,339 9378 INFO Installing pgq_ext  
2012-05-30 21:32:37,339 9378 INFO   Reading from /opt/skytools3.0.2/share/skytools3/pgq_ext.sql  
2012-05-30 21:32:37,568 9378 INFO Installing pgq_node  
2012-05-30 21:32:37,569 9378 INFO   Reading from /opt/skytools3.0.2/share/skytools3/pgq_node.sql  
2012-05-30 21:32:37,839 9378 INFO Installing londiste  
2012-05-30 21:32:37,839 9378 INFO   Reading from /opt/skytools3.0.2/share/skytools3/londiste.sql  
2012-05-30 21:32:38,117 9378 INFO londiste.global_add_table is installed  
2012-05-30 21:32:38,118 9378 DEBUG exec_query: select * from pgq_node.get_node_info('replika')  
2012-05-30 21:32:38,121 9378 INFO Initializing node  
2012-05-30 21:32:38,121 9378 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'src_digoal_01', 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres', false)  
2012-05-30 21:32:38,122 9378 INFO Location registered  
2012-05-30 21:32:38,122 9378 DEBUG exec_cmd: select * from pgq_node.create_node('replika', 'root', 'src_digoal_01', 'src_digoal_01', null, null, null)  
2012-05-30 21:32:38,377 9378 INFO Node "src_digoal_01" initialized for queue "replika" with type "root"  
2012-05-30 21:32:38,378 9378 INFO Done  

根节点创建完, 连接到主库的digoal_01库下面, 会看到多了几个schema: londiste, pgq, pgq_ext, pgq_node 这些都是londiste3套件新增的schema, 用于完成复制所必须的.

postgres=# \c digoal_01 digoal_01  
You are now connected to database "digoal_01" as user "digoal_01".  
digoal_01=> \dn  
    List of schemas  
   Name    |   Owner     
-----------+-----------  
 digoal_01 | digoal_01  
 londiste  | postgres  
 pgq       | postgres  
 pgq_ext   | postgres  
 pgq_node  | postgres  
 public    | postgres  
(6 rows)  

以下则是新增的这些schema下面的对象,

digoal_01=> select nspname,relkind,relname from pg_class,pg_namespace where relnamespace=pg_namespace.oid and relnamespace in (select oid from pg_namespace where nspname in ('londiste','pgq','pgq_ext','pgq_node','public','digoal_01')) order by relnamespace,relkind,relname;  
  nspname  | relkind |         relname           
-----------+---------+-------------------------  
 digoal_01 | i       | pk_user_info  
 digoal_01 | i       | pk_user_session  
 digoal_01 | r       | user_info  
 digoal_01 | r       | user_login_rec  
 digoal_01 | r       | user_logout_rec  
 digoal_01 | r       | user_session  
 pgq       | S       | batch_id_seq  
 pgq       | S       | consumer_co_id_seq  
 pgq       | S       | event_1_id_seq  
 pgq       | S       | event_1_tick_seq  
 pgq       | S       | queue_queue_id_seq  
 pgq       | S       | subscription_sub_id_seq  
 pgq       | i       | consumer_name_uq  
 pgq       | i       | consumer_pkey  
 pgq       | i       | event_1_0_txid_idx  
 pgq       | i       | event_1_1_txid_idx  
 pgq       | i       | event_1_2_txid_idx  
 pgq       | i       | queue_name_uq  
 pgq       | i       | queue_pkey  
 pgq       | i       | rq_pkey  
 pgq       | i       | rq_retry_idx  
 pgq       | i       | subscription_batch_idx  
 pgq       | i       | subscription_pkey  
 pgq       | i       | tick_pkey  
 pgq       | r       | consumer  
 pgq       | r       | event_1  
 pgq       | r       | event_1_0  
 pgq       | r       | event_1_1  
 pgq       | r       | event_1_2  
 pgq       | r       | event_template  
 pgq       | r       | queue  
 pgq       | r       | retry_queue  
 pgq       | r       | subscription  
 pgq       | r       | tick  
 pgq_ext   | i       | completed_batch_pkey  
 pgq_ext   | i       | completed_event_pkey  
 pgq_ext   | i       | completed_tick_pkey  
 pgq_ext   | i       | partial_batch_pkey  
 pgq_ext   | r       | completed_batch  
 pgq_ext   | r       | completed_event  
 pgq_ext   | r       | completed_tick  
 pgq_ext   | r       | partial_batch  
 pgq_node  | i       | local_state_pkey  
 pgq_node  | i       | node_info_pkey  
 pgq_node  | i       | node_location_pkey  
 pgq_node  | i       | subscriber_info_pkey  
 pgq_node  | r       | local_state  
 pgq_node  | r       | node_info  
 pgq_node  | r       | node_location  
 pgq_node  | r       | subscriber_info  
 londiste  | S       | seq_info_nr_seq  
 londiste  | S       | table_info_nr_seq  
 londiste  | i       | applied_execute_pkey  
 londiste  | i       | pending_fkeys_pkey  
 londiste  | i       | seq_info_pkey  
 londiste  | i       | table_info_pkey  
 londiste  | r       | applied_execute  
 londiste  | r       | pending_fkeys  
 londiste  | r       | seq_info  
 londiste  | r       | table_info  

以下是新增的函数, 复制, batch, 触发器handle, 注册, 注销, 映射关系等.

digoal_01=> select nspname,lanname,proname from pg_proc,pg_namespace,pg_language where prolang=pg_language.oid and pronamespace=pg_namespace.oid and pronamespace in (select oid from pg_namespace where nspname in ('londiste','pgq','pgq_ext','pgq_node','public','digoal_01')) order by pronamespace,proname;  
  nspname  | lanname |          proname            
-----------+---------+---------------------------  
 digoal_01 | plpgsql | f_user_login  
 digoal_01 | plpgsql | f_user_logout  
 pgq       | plpgsql | _grant_perms_from  
 pgq       | plpgsql | batch_event_sql  
 pgq       | plpgsql | batch_event_tables  
 pgq       | plpgsql | batch_retry  
 pgq       | plpgsql | create_queue  
 pgq       | plpgsql | current_event_table  
 pgq       | plpgsql | drop_queue  
 pgq       | plpgsql | drop_queue  
 pgq       | plpgsql | event_retry  
 pgq       | plpgsql | event_retry  
 pgq       | plpgsql | event_retry_raw  
 pgq       | plpgsql | find_tick_helper  
 pgq       | plpgsql | finish_batch  
 pgq       | plpgsql | force_tick  
 pgq       | plpgsql | get_batch_cursor  
 pgq       | plpgsql | get_batch_cursor  
 pgq       | plpgsql | get_batch_events  
 pgq       | plpgsql | get_batch_info  
 pgq       | plpgsql | get_consumer_info  
 pgq       | plpgsql | get_consumer_info  
 pgq       | plpgsql | get_consumer_info  
 pgq       | plpgsql | get_queue_info  
 pgq       | plpgsql | get_queue_info  
 pgq       | plpgsql | grant_perms  
 pgq       | plpgsql | insert_event  
 pgq       | plpgsql | insert_event  
 pgq       | c       | insert_event_raw  
 pgq       | c       | logutriga  
 pgq       | plpgsql | maint_operations  
 pgq       | plpgsql | maint_retry_events  
 pgq       | plpgsql | maint_rotate_tables_step1  
 pgq       | plpgsql | maint_rotate_tables_step2  
 pgq       | plpgsql | maint_tables_to_vacuum  
 pgq       | plpgsql | next_batch  
 pgq       | plpgsql | next_batch_custom  
 pgq       | plpgsql | next_batch_info  
 pgq       | plpgsql | register_consumer  
 pgq       | plpgsql | register_consumer_at  
 pgq       | plpgsql | seq_getval  
 pgq       | plpgsql | seq_setval  
 pgq       | plpgsql | set_queue_config  
 pgq       | c       | sqltriga  
 pgq       | plpgsql | ticker  
 pgq       | plpgsql | ticker  
 pgq       | plpgsql | ticker  
 pgq       | plpgsql | tune_storage  
 pgq       | plpgsql | unregister_consumer  
 pgq       | plpgsql | upgrade_schema  
 pgq       | plpgsql | version  
 pgq_ext   | plpgsql | get_last_tick  
 pgq_ext   | plpgsql | get_last_tick  
 pgq_ext   | plpgsql | is_batch_done  
 pgq_ext   | plpgsql | is_batch_done  
 pgq_ext   | plpgsql | is_event_done  
 pgq_ext   | plpgsql | is_event_done  
 pgq_ext   | plpgsql | set_batch_done  
 pgq_ext   | plpgsql | set_batch_done  
 pgq_ext   | plpgsql | set_event_done  
 pgq_ext   | plpgsql | set_event_done  
 pgq_ext   | plpgsql | set_last_tick  
 pgq_ext   | plpgsql | set_last_tick  
 pgq_ext   | plpgsql | upgrade_schema  
 pgq_ext   | plpgsql | version  
 pgq_node  | plpgsql | change_consumer_provider  
 pgq_node  | plpgsql | create_node  
 pgq_node  | plpgsql | demote_root  
 pgq_node  | plpgsql | drop_node  
 pgq_node  | plpgsql | get_consumer_info  
 pgq_node  | plpgsql | get_consumer_state  
 pgq_node  | plpgsql | get_node_info  
 pgq_node  | plpgsql | get_queue_locations  
 pgq_node  | plpgsql | get_subscriber_info  
 pgq_node  | plpgsql | get_worker_state  
 pgq_node  | plpgsql | is_leaf_node  
 pgq_node  | plpgsql | is_root_node  
 pgq_node  | plpgsql | maint_watermark  
 pgq_node  | plpgsql | promote_branch  
 pgq_node  | plpgsql | register_consumer  
 pgq_node  | plpgsql | register_location  
 pgq_node  | plpgsql | register_subscriber  
 pgq_node  | plpgsql | set_consumer_completed  
 pgq_node  | plpgsql | set_consumer_error  
 pgq_node  | plpgsql | set_consumer_paused  
 pgq_node  | plpgsql | set_consumer_uptodate  
 pgq_node  | plpgsql | set_global_watermark  
 pgq_node  | plpgsql | set_node_attrs  
 pgq_node  | plpgsql | set_partition_watermark  
 pgq_node  | plpgsql | set_subscriber_watermark  
 pgq_node  | plpgsql | unregister_consumer  
 pgq_node  | plpgsql | unregister_location  
 pgq_node  | plpgsql | unregister_subscriber  
 pgq_node  | plpgsql | upgrade_schema  
 pgq_node  | plpgsql | version  
 londiste  | plpgsql | _coordinate_copy  
 londiste  | plpgsql | drop_table_fkey  
 londiste  | plpgsql | drop_table_triggers  
 londiste  | plpgsql | execute_finish  
 londiste  | plpgsql | execute_start  
 londiste  | plpgsql | find_column_types  
 londiste  | plpgsql | find_rel_oid  
 londiste  | plpgsql | find_seq_oid  
 londiste  | plpgsql | find_table_fkeys  
 londiste  | plpgsql | find_table_oid  
 londiste  | plpgsql | get_seq_list  
 londiste  | plpgsql | get_table_list  
 londiste  | plpgsql | get_table_pending_fkeys  
 londiste  | plpgsql | get_valid_pending_fkeys  
 londiste  | plpgsql | global_add_table  
 londiste  | plpgsql | global_remove_seq  
 londiste  | plpgsql | global_remove_table  
 londiste  | plpgsql | global_update_seq  
 londiste  | sql     | is_replica_func  
 londiste  | plpgsql | local_add_seq  
 londiste  | plpgsql | local_add_table  
 londiste  | plpgsql | local_add_table  
 londiste  | plpgsql | local_add_table  
 londiste  | plpgsql | local_add_table  
 londiste  | plpgsql | local_remove_seq  
 londiste  | plpgsql | local_remove_table  
 londiste  | plpgsql | local_set_table_attrs  
 londiste  | plpgsql | local_set_table_state  
 londiste  | plpgsql | local_set_table_struct  
 londiste  | plpgsql | local_show_missing  
 londiste  | plpgsql | make_fqname  
 londiste  | plpgsql | quote_fqname  
 londiste  | plpgsql | restore_table_fkey  
 londiste  | plpgsql | root_check_seqs  
 londiste  | plpgsql | root_check_seqs  
 londiste  | plpgsql | root_notify_change  
 londiste  | plpgsql | split_fqname  
 londiste  | plpgsql | table_info_trigger  
 londiste  | plpgsql | upgrade_schema  
 londiste  | plpgsql | version  
(135 rows)  

创建完根节点的这些对象, 函数等之后. 如果没有异常, 就可以启动provider的worker (Replay events to subscriber: it is needed to make the replication active as it will start to replay the events.) 进程了.

postgres@db5-> londiste3 -d /home/postgres/londiste3/src_digoal_01.ini worker  

其实是一个python进程, 如下

postgres@db5-> ps -ewf|grep python  
postgres 27316     1  0 15:53 ?        00:00:00 /opt/python2.7.3/bin/python /opt/skytools3.0.2/bin/londiste3 -d /home/postgres/londiste3/src_digoal_01.ini worker  

根配置完了, 就可以辐射出去了, 接下来配置目标1的配置文件 :

同样, 注意db里面我配置了password, 如果你使用的是trust认证或者使用了.pgpass文件, 这里就不需要配密码了.

queue的名字和provider的queue的名字必须一致. job_name和前面的不能一致.

vi /home/postgres/londiste3/dst1_digoal_01.ini  
[londiste3]  
job_name = dst1_digoal_01  
db = host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres  
queue_name = replika  
logfile = /home/postgres/londiste3/log/dst1_digoal_01.log  
pidfile = /home/postgres/londiste3/pid/dst1_digoal_01.pid  

配置好后, 创建叶节点, 注意我这里创建的是页节点, 而不是树枝节点, 因为本次不讲级联复制. 如果是级联的话创建的是树枝节点.

postgres@db5-> londiste3 -v /home/postgres/londiste3/dst1_digoal_01.ini create-leaf dst1_digoal_01 "host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres" --provider="host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres"  
2012-05-30 21:33:47,316 9412 DEBUG Connect 'new_node' to 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01  [...]'  
2012-05-30 21:33:47,324 9412 INFO plpgsql is installed  
2012-05-30 21:33:47,325 9412 INFO Installing pgq  
2012-05-30 21:33:47,325 9412 INFO   Reading from /opt/skytools3.0.2/share/skytools3/pgq.sql  
2012-05-30 21:33:47,721 9412 INFO pgq.get_batch_cursor is installed  
2012-05-30 21:33:47,721 9412 INFO Installing pgq_ext  
2012-05-30 21:33:47,722 9412 INFO   Reading from /opt/skytools3.0.2/share/skytools3/pgq_ext.sql  
2012-05-30 21:33:47,918 9412 INFO Installing pgq_node  
2012-05-30 21:33:47,919 9412 INFO   Reading from /opt/skytools3.0.2/share/skytools3/pgq_node.sql  
2012-05-30 21:33:48,217 9412 INFO Installing londiste  
2012-05-30 21:33:48,217 9412 INFO   Reading from /opt/skytools3.0.2/share/skytools3/londiste.sql  
2012-05-30 21:33:48,430 9412 INFO londiste.global_add_table is installed  
2012-05-30 21:33:48,431 9412 DEBUG exec_query: select * from pgq_node.get_node_info('replika')  
2012-05-30 21:33:48,434 9412 INFO Initializing node  
2012-05-30 21:33:48,434 9412 DEBUG Connect 'root_db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01  [...]'  
2012-05-30 21:33:48,437 9412 DEBUG exec_query: select * from pgq_node.get_node_info('replika')  
2012-05-30 21:33:48,446 9412 DEBUG db='host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres' -- type='root' provider='host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres'  
2012-05-30 21:33:48,446 9412 DEBUG exec_query: select * from pgq_node.get_node_info('replika')  
2012-05-30 21:33:48,447 9412 DEBUG exec_query: select * from pgq_node.get_queue_locations('replika')  
2012-05-30 21:33:48,448 9412 DEBUG Connect 'provider_db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01  [...]'  
2012-05-30 21:33:48,451 9412 DEBUG exec_query: select node_type, node_name from pgq_node.get_node_info('replika')  
2012-05-30 21:33:48,460 9412 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'dst1_digoal_01', 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres', false)  
2012-05-30 21:33:48,463 9412 INFO Location registered  
2012-05-30 21:33:48,463 9412 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'dst1_digoal_01', 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres', false)  
2012-05-30 21:33:48,466 9412 INFO Location registered  
2012-05-30 21:33:48,466 9412 DEBUG exec_cmd: select * from pgq_node.register_subscriber('replika', 'dst1_digoal_01', 'dst1_digoal_01', null)  
2012-05-30 21:33:48,470 9412 INFO Subscriber registered: dst1_digoal_01  
2012-05-30 21:33:48,471 9412 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'dst1_digoal_01', 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres', false)  
2012-05-30 21:33:48,472 9412 INFO Location registered  
2012-05-30 21:33:48,472 9412 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'src_digoal_01', 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres', 'False')  
2012-05-30 21:33:48,473 9412 INFO Location registered  
2012-05-30 21:33:48,473 9412 DEBUG exec_cmd: select * from pgq_node.create_node('replika', 'leaf', 'dst1_digoal_01', 'dst1_digoal_01', 'src_digoal_01', '1', null)  
2012-05-30 21:33:48,475 9412 INFO Node "dst1_digoal_01" initialized for queue "replika" with type "leaf"  
2012-05-30 21:33:48,477 9412 INFO Done  

创建完叶节点, 启动subscriber的worker进程 :

postgres@db5-> londiste3 -d /home/postgres/londiste3/dst1_digoal_01.ini worker  

可以看到现在系统中有两个python进程, 分别是provider和subscriber的worker进程.

postgres@db5-> ps -ewf|grep python  
postgres 27316     1  0 15:53 ?        00:00:00 /opt/python2.7.3/bin/python /opt/skytools3.0.2/bin/londiste3 -d /home/postgres/londiste3/src_digoal_01.ini worker  
postgres 27439     1  0 15:57 ?        00:00:00 /opt/python2.7.3/bin/python /opt/skytools3.0.2/bin/londiste3 -d /home/postgres/londiste3/dst1_digoal_01.ini worker  

接下来配置pgq的ticker进程需要的配置文件, ticker是用来做batch的.

(Londiste needs a ticker which has to target the P(rovider) database, and can be run from another machine. The common usage is to run the ticker directly on the Provider database host.  
Any ticker can host as many queues as you want. Each queue has a unique name and can be used by as many subscribers as needed.  
If you have several copies of the same database, you can subscribe to the same queue from several subscribers. If you want to have different subsets of the same source database on several subscribers, you either can have those use the same queue but only a part of the tables in it, or have a queue per set of tables.)  
vi /home/postgres/londiste3/pgqd.ini  
[pgqd]  
base_connstr = host=172.16.3.176 port=1921 user=postgres password=postgres  
initial_database = template1  
logfile = /home/postgres/londiste3/log/pgqd.log  
pidfile = /home/postgres/londiste3/pid/pgqd.pid  

启动pgqd进程.

postgres@db5-> pgqd -d /home/postgres/londiste3/pgqd.ini  
2012-05-30 21:35:07.497 9446 LOG Starting pgqd 3.0.2  

pgqd是一个c程序.

postgres@db5-> ps -ewf|grep pgqd  
postgres 27637     1  0 16:01 ?        00:00:00 pgqd -d /home/postgres/londiste3/pgqd.ini  

异常注意

使用普通用户连接的话看到src和dst的数据库日志都有报错, set session_replication_role = ‘replica’这个必须超级用户执行,

我以前写过一篇关于这个参数的BLOG, 有兴趣的朋友参考如下 :

《Can session_replication_role used like MySQL’s BlackHole Engine?》

http://blog.163.com/digoal@126/blog/static/163877040201119111234570/

postgres@db5-> less pgqd.log  
2012-05-30 15:59:17.110 CST,"digoal_01","digoal_01",18490,"172.16.3.176:33835",4fc5d355.483a,1,"SET",2012-05-30 15:59:17 CST,3/295,0,ERROR,42501,"permission denied to set parameter ""session_replication_role""",,,,,,"set session_replication_role = 'replica'",,,""  

londiste的日志也有报错

postgres@db5-> less src_digoal_01.log   
2012-05-30 16:03:29,221 27316 WARNING Failure to call pgq_node.set_consumer_error()  
2012-05-30 16:03:29,221 27316 ERROR Job src_digoal_01 got error on connection 'db': permission denied to set parameter "session_repl  
ication_role".   Query: set session_replication_role = 'replica'  
Traceback (most recent call last):  
  File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/consumer.py", line 285, in exception_hook  
    dst_db = self.get_database(self.target_db)  
  File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 733, in get_database  
    return dbc.get_connection(params['isolation_level'], clist)  
  File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 954, in get_connection  
    self.setup_func(self.name, self.conn)  
  File "/opt/skytools3.0.2/lib/python2.7/site-packages/londiste/playback.py", line 322, in connection_hook  
    curs.execute("set session_replication_role = 'replica'")  
  File "/opt/python2.7.3/lib/python2.7/site-packages/psycopg2/extras.py", line 123, in execute  
    return _cursor.execute(self, query, vars)  
ProgrammingError: permission denied to set parameter "session_replication_role"  
  
postgres@db5-> less dst1_digoal_01.log   
2012-05-30 16:03:34,347 27439 WARNING Failure to call pgq_node.set_consumer_error()  
2012-05-30 16:03:34,347 27439 ERROR Job dst1_digoal_01 got error on connection 'db': permission denied to set parameter "session_rep  
lication_role".   Query: set session_replication_role = 'replica'  
Traceback (most recent call last):  
  File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/consumer.py", line 285, in exception_hook  
    dst_db = self.get_database(self.target_db)  
  File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 733, in get_database  
    return dbc.get_connection(params['isolation_level'], clist)  
  File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 954, in get_connection  
    self.setup_func(self.name, self.conn)  
  File "/opt/skytools3.0.2/lib/python2.7/site-packages/londiste/playback.py", line 322, in connection_hook  
    curs.execute("set session_replication_role = 'replica'")  
  File "/opt/python2.7.3/lib/python2.7/site-packages/psycopg2/extras.py", line 123, in execute  
    return _cursor.execute(self, query, vars)  
ProgrammingError: permission denied to set parameter "session_replication_role"  

因此如果前面配置的是普通用户, 可以修改配置文件, 改成超级用户后reload

postgres@db5-> cat src_digoal_01.ini   
[londiste3]  
job_name = src_digoal_01  
db = host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres  
queue_name = replika  
logfile = /home/postgres/londiste3/log/src_digoal_01.log  
pidfile = /home/postgres/londiste3/pid/src_digoal_01.pid  
  
postgres@db5-> cat dst1_digoal_01.ini   
[londiste3]  
job_name = dst1_digoal_01  
db = host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres  
queue_name = replika  
logfile = /home/postgres/londiste3/log/dst1_digoal_01.log  
pidfile = /home/postgres/londiste3/pid/dst1_digoal_01.pid  
  
postgres@db5-> londiste3 -r /home/postgres/londiste3/src_digoal_01.ini  
postgres@db5-> londiste3 -r /home/postgres/londiste3/dst1_digoal_01.ini  

正常后, 可以查看当前的复制状态, status用于输出节点树.

postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini status  
Queue: replika   Local node: src_digoal_01  
  
src_digoal_01 (root)  
  |                           Tables: 0/0/0  
  |                           Lag: 26s, Tick: 14  
  +--dst1_digoal_01 (leaf)  
                              Tables: 0/0/0  
                              Lag: 26s, Tick: 14  

members用于输出这个配置文件下的复制节点member信息.

postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini members  
Member info on src_digoal_01@replika:  
node_name        dead             node_location  
---------------  ---------------  ----------------------------------------------------------------------------  
dst1_digoal_01   False            host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres  
src_digoal_01    False            host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres  

开始添加需要复制的表, 首先是用provider的配置文件添加, 先添加两个带主键的表.

postgres@db5-> londiste3 -v /home/postgres/londiste3/src_digoal_01.ini add-table digoal_01.user_info digoal_01.user_session  
2012-05-30 21:37:57,024 9537 DEBUG Connect 'db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01  [...]'  
2012-05-30 21:37:57,028 9537 DEBUG exec_query: select * from pgq_node.get_node_info('replika')  
2012-05-30 21:37:57,036 9537 DEBUG exec_query: select * from pgq_node.get_queue_locations('replika')  
2012-05-30 21:37:57,037 9537 DEBUG exec_cmd: select * from pgq_node.get_node_info('replika')  
2012-05-30 21:37:57,038 9537 DEBUG 100 Ok  
2012-05-30 21:37:57,039 9537 DEBUG Connect 'provider_db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01  [...]'  
2012-05-30 21:37:57,047 9537 DEBUG exec_cmd: select * from londiste.local_add_table('replika', 'digoal_01.user_info', '[]', null, null)  
2012-05-30 21:37:57,064 9537 INFO Table added: digoal_01.user_info  
2012-05-30 21:37:57,065 9537 DEBUG exec_cmd: select * from londiste.local_add_table('replika', 'digoal_01.user_session', '[]', null, null)  
2012-05-30 21:37:57,067 9537 INFO Table added: digoal_01.user_session  

开始添加需要复制的表, 然后是用subscriber的配置文件添加, 先添加两个带主键的表.

postgres@db5-> londiste3 -v /home/postgres/londiste3/dst1_digoal_01.ini add-table digoal_01.user_info digoal_01.user_session  
2012-05-30 21:38:08,369 9548 DEBUG Connect 'db' to 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01  [...]'  
2012-05-30 21:38:08,373 9548 DEBUG exec_query: select * from pgq_node.get_node_info('replika')  
2012-05-30 21:38:08,377 9548 DEBUG exec_query: select * from pgq_node.get_queue_locations('replika')  
2012-05-30 21:38:08,378 9548 DEBUG exec_cmd: select * from pgq_node.get_node_info('replika')  
2012-05-30 21:38:08,379 9548 DEBUG 100 Ok  
2012-05-30 21:38:08,380 9548 DEBUG Connect 'provider_db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01  [...]'  
2012-05-30 21:38:08,392 9548 DEBUG exec_cmd: select * from londiste.local_add_table('replika', 'digoal_01.user_info', '[]', null, null)  
2012-05-30 21:38:08,405 9548 INFO Table added: digoal_01.user_info  
2012-05-30 21:38:08,406 9548 DEBUG exec_cmd: select * from londiste.local_add_table('replika', 'digoal_01.user_session', '[]', null, null)  
2012-05-30 21:38:08,410 9548 INFO Table added: digoal_01.user_session  

添加完后, 查看provider下面的加入复制的表

postgres@db5-> londiste3 -v /home/postgres/londiste3/src_digoal_01.ini tables  
2012-05-30 21:38:36,499 9563 DEBUG Connect 'db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01  [...]'  
2012-05-30 21:38:36,503 9563 DEBUG display_table: select table_name, merge_state, table_attrs  
        from londiste.get_table_list('replika') where local  
        order by table_name  
Tables on node  
table_name              merge_state      table_attrs  
----------------------  ---------------  ---------------  
digoal_01.user_info     ok                 
digoal_01.user_session  ok                 

添加完后, 查看subscriber 1下面的加入复制的表, 看到这里的状态是in-copy的, 表示正在初始化拷贝.

postgres@db5-> londiste3 -v /home/postgres/londiste3/dst1_digoal_01.ini tables  
2012-05-30 21:38:41,135 9567 DEBUG Connect 'db' to 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01  [...]'  
2012-05-30 21:38:41,139 9567 DEBUG display_table: select table_name, merge_state, table_attrs  
        from londiste.get_table_list('replika') where local  
        order by table_name  
Tables on node  
table_name              merge_state      table_attrs  
----------------------  ---------------  ---------------  
digoal_01.user_info     in-copy            
digoal_01.user_session  None               

注意londiste3的add table选项, 表示允许的并行拷贝的表的数量, 默认是1个1个的串行拷贝.

max-parallel-copy=MAX_PARALLEL_COPY  
max number of parallel copy processes  

另外需要注意的是, 初始化拷贝过程不是一个snapshot的过程, 比如添加的2个表, 即使并行拷贝也不是snapshot的.

还有两个选项是seqs, missing分别用于查看复制的序列和未加入复制的表.

postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini seqs  
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini missing  
Missing objects on node  
obj_kind         obj_name  
---------------  -------------------------  
r                digoal_01.user_login_rec  
r                digoal_01.user_logout_rec  

接下来添加两个日志表, 因为日志表上没有PK, 所以添加不成功.

postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini add-table digoal_01.user_login_rec digoal_01.user_logout_rec  
2012-05-30 16:12:25,239 28118 ERROR Primary key missing on table: digoal_01.user_login_rec  
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini missing  
Missing objects on node  
obj_kind         obj_name  
---------------  -------------------------  
r                digoal_01.user_login_rec  
r                digoal_01.user_logout_rec  

因此需要在主节点和目标节点上把PK加上去.

postgres@db5-> psql digoal_01 digoal_01  
psql (9.1.3)  
Type "help" for help.  
digoal_01=> alter table digoal_01.user_login_rec add column id serial8 primary key;  
NOTICE:  ALTER TABLE will create implicit sequence "user_login_rec_id_seq" for serial column "user_login_rec.id"  
NOTICE:  ALTER TABLE / ADD PRIMARY KEY will create implicit index "user_login_rec_pkey" for table "user_login_rec"  
ALTER TABLE  
digoal_01=> alter table digoal_01.user_logout_rec add column id serial8 primary key;  
NOTICE:  ALTER TABLE will create implicit sequence "user_logout_rec_id_seq" for serial column "user_logout_rec.id"  
NOTICE:  ALTER TABLE / ADD PRIMARY KEY will create implicit index "user_logout_rec_pkey" for table "user_logout_rec"  
ALTER TABLE  
  
pg92@db-172-16-3-33-> psql digoal_01 digoal_01  
psql (9.2beta1)  
Type "help" for help.  
  
digoal_01=> alter table digoal_01.user_login_rec add column id serial8 primary key;  
NOTICE:  ALTER TABLE will create implicit sequence "user_login_rec_id_seq" for serial column "user_login_rec.id"  
NOTICE:  ALTER TABLE / ADD PRIMARY KEY will create implicit index "user_login_rec_pkey" for table "user_login_rec"  
ALTER TABLE  
digoal_01=> alter table digoal_01.user_logout_rec add column id serial8 primary key;  
NOTICE:  ALTER TABLE will create implicit sequence "user_logout_rec_id_seq" for serial column "user_logout_rec.id"  
NOTICE:  ALTER TABLE / ADD PRIMARY KEY will create implicit index "user_logout_rec_pkey" for table "user_logout_rec"  
ALTER TABLE  

加好PK后, 再添加两个日志表就OK了. 记得加完provider还要加subscriber.

postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini add-table digoal_01.user_login_rec digoal_01.user_logout_rec  
2012-05-31 08:50:47,715 30213 INFO Table added: digoal_01.user_login_rec  
2012-05-31 08:50:47,719 30213 INFO Table added: digoal_01.user_logout_rec  
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini add-table digoal_01.user_login_rec digoal_01.user_logout_rec  
2012-05-31 08:50:53,307 30219 INFO Table added: digoal_01.user_login_rec  
2012-05-31 08:50:53,313 30219 INFO Table added: digoal_01.user_logout_rec  
-- 新增了两个序列, 需要使用add-seq添加  
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini missing  
Missing objects on node  
obj_kind         obj_name  
---------------  --------------------------------  
S                digoal_01.user_login_rec_id_seq  
S                digoal_01.user_logout_rec_id_seq  
  
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini tables  
Tables on node  
table_name                 merge_state      table_attrs  
-------------------------  ---------------  ---------------  
digoal_01.user_info        ok                 
digoal_01.user_login_rec   ok                 
digoal_01.user_logout_rec  ok                 
digoal_01.user_session     ok   

添加序列

postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini add-seq digoal_01.user_login_rec_id_seq digoal_01.user_logout_rec_id_seq  
2012-05-31 08:53:34,666 30314 INFO Sequence added: digoal_01.user_login_rec_id_seq  
2012-05-31 08:53:34,668 30314 INFO Sequence added: digoal_01.user_logout_rec_id_seq  
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini add-seq digoal_01.user_login_rec_id_seq digoal_01.user_logout_rec_id_seq  
2012-05-31 08:53:39,322 30318 INFO Sequence added: digoal_01.user_login_rec_id_seq  
2012-05-31 08:53:39,324 30318 INFO Sequence added: digoal_01.user_logout_rec_id_seq  

查看添加的序列

postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini seqs  
Sequences on node  
seq_name                          local            last_value  
--------------------------------  ---------------  ---------------  
digoal_01.user_login_rec_id_seq   True             4396741  
digoal_01.user_logout_rec_id_seq  True             30001  
  
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini seqs  
Sequences on node  
seq_name                          local            last_value  
--------------------------------  ---------------  ---------------  
digoal_01.user_login_rec_id_seq   True             4396741  
digoal_01.user_logout_rec_id_seq  True             30001  

其中provider的worker进程只连一个数据库(主库)

consumer的worker进程则连两个数据库(主库和目标库1).

postgres@db5-> netstat -anp|grep python  
(Not all processes could be identified, non-owned process info  
 will not be shown, you would have to be root to see it all.)  
tcp        0      0 172.16.3.176:42865          172.16.3.176:1921           ESTABLISHED 27316/python          
tcp        0      0 172.16.3.176:42869          172.16.3.176:1921           ESTABLISHED 27439/python          
tcp        0      0 172.16.3.176:3172           172.16.3.33:1919            ESTABLISHED 27439/python          
postgres@db5-> ps -ewf|grep python  
postgres 27316     1  0 15:53 ?        00:00:00 /opt/python2.7.3/bin/python /opt/skytools3.0.2/bin/londiste3 -d /home/postgres/londiste3/src_digoal_01.ini worker  
postgres 27439     1  0 15:57 ?        00:00:00 /opt/python2.7.3/bin/python /opt/skytools3.0.2/bin/londiste3 -d /home/postgres/londiste3/dst1_digoal_01.ini worker  

下节URL

《Londiste 3 replicate case - 1 下节》

Flag Counter

digoal’s 大量PostgreSQL文章入口