Londiste 3 replicate case - 1 下节
背景
上节URL
《Londiste 3 replicate case - 1 上节》
接下来测试一下复制是否正确, 在主节点上开启pgbench做数据更改的压力测试.
postgres@db5-> cat login.sql
\setrandom userid 1 200000
select userid,engname,cnname,occupation,birthday,signname,email,qq from user_info where userid=:userid;
insert into user_login_rec (userid,login_time,ip) values (:userid,now(),inet_client_addr());
update user_session set logintime=now(),login_count=login_count+1 where userid=:userid;
postgres@db5-> cat logout.sql
\setrandom userid 1 200000
insert into user_logout_rec (userid,logout_time,ip) values (:userid,now(),inet_client_addr());
update user_session set logouttime=now(),online_interval=online_interval+(now()-logintime) where userid=:userid;
postgres@db5-> pgbench -M prepared -f ./login.sql -j 2 -c 2 -n -r -h 127.0.0.1 -p 1921 -U digoal_01 -T 120 digoal_01 &
postgres@db5-> pgbench -M prepared -f ./logout.sql -j 2 -c 2 -n -r -h 127.0.0.1 -p 1921 -U digoal_01 -T 120 digoal_01 &
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 2
number of threads: 2
duration: 120 s
number of transactions actually processed: 318962
tps = 2616.506274 (including connections establishing)
tps = 2616.586583 (excluding connections establishing)
statement latencies in milliseconds:
0.002284 \setrandom userid 1 200000
0.112055 select userid,engname,cnname,occupation,birthday,signname,email,qq from user_info where userid=:userid;
0.260081 insert into user_login_rec (userid,login_time,ip) values (:userid,now(),inet_client_addr());
0.378190 update user_session set logintime=now(),login_count=login_count+1 where userid=:userid;
transaction type: Custom query
scaling factor: 1
query mode: prepared
number of clients: 2
number of threads: 2
duration: 120 s
number of transactions actually processed: 620568
tps = 5143.160023 (including connections establishing)
tps = 5143.328868 (excluding connections establishing)
statement latencies in milliseconds:
0.002408 \setrandom userid 1 20000000
0.258112 insert into user_logout_rec (userid,logout_time,ip) values (:userid,now(),inet_client_addr());
0.122814 update user_session set logouttime=now(),online_interval=online_interval+(now()-logintime) where userid=:userid;
并行度是1的情况下, 同一时间只有一个表在同步
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini tables
Tables on node
table_name merge_state table_attrs
------------------------- --------------- ---------------
digoal_01.user_info wanna-sync:1558
digoal_01.user_login_rec ok
digoal_01.user_logout_rec ok
digoal_01.user_session None
接下来修改一下并行度
parallel_copies = 16 添加到dst1_digoal_01.ini和src_digoal_01.ini然后reload
postgres@db5-> londiste3 -r /home/postgres/londiste3/src_digoal_01.ini
postgres@db5-> londiste3 -r /home/postgres/londiste3/dst1_digoal_01.ini
现在复制就很快了
londiste3 /home/postgres/londiste3/dst1_digoal_01.ini tables
Tables on node
table_name merge_state table_attrs
------------------------- --------------- ---------------
digoal_01.user_info ok
digoal_01.user_login_rec ok
digoal_01.user_logout_rec ok
digoal_01.user_session ok
pgbench执行完后马上去执行compare, 由于目标库1 还在执行queue里面的SQL, 所以显示如下.
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini compare
2012-05-31 09:11:03,224 30974 ERROR Consumer lagging too much, cannot proceed
大概过150秒后再次执行compare, 已经同步完成. checksum一致.
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini compare
2012-05-31 09:13:41,442 31116 INFO Locking digoal_01.user_info
2012-05-31 09:13:41,442 31116 INFO Syncing digoal_01.user_info
2012-05-31 09:13:44,950 31116 INFO Counting digoal_01.user_info
2012-05-31 09:13:46,085 31116 INFO srcdb: 200000 rows, checksum=545503592610
2012-05-31 09:13:46,861 31116 INFO dstdb: 200000 rows, checksum=545503592610
2012-05-31 09:13:46,863 31116 INFO Locking digoal_01.user_session
2012-05-31 09:13:46,863 31116 INFO Syncing digoal_01.user_session
2012-05-31 09:13:49,868 31116 INFO Counting digoal_01.user_session
2012-05-31 09:13:50,642 31116 INFO srcdb: 200000 rows, checksum=-132792841596
2012-05-31 09:13:51,229 31116 INFO dstdb: 200000 rows, checksum=-132792841596
2012-05-31 09:13:51,231 31116 INFO Locking digoal_01.user_login_rec
2012-05-31 09:13:51,231 31116 INFO Syncing digoal_01.user_login_rec
2012-05-31 09:13:54,739 31116 INFO Counting digoal_01.user_login_rec
2012-05-31 09:14:13,845 31116 INFO srcdb: 5007284 rows, checksum=-2059054881703
2012-05-31 09:14:26,583 31116 INFO dstdb: 5007284 rows, checksum=-2059054881703
2012-05-31 09:14:26,585 31116 INFO Locking digoal_01.user_logout_rec
2012-05-31 09:14:26,586 31116 INFO Syncing digoal_01.user_logout_rec
2012-05-31 09:14:30,094 31116 INFO Counting digoal_01.user_logout_rec
2012-05-31 09:14:34,920 31116 INFO srcdb: 1250644 rows, checksum=1378863938106
2012-05-31 09:14:38,163 31116 INFO dstdb: 1250644 rows, checksum=1378863938106
默认compare的配置以及用到的SQL如下 :
## compare/repair
# max amount of time table can be locked
#lock_timeout = 10
# compare: sql to use
#compare_sql = select count(1) as cnt, sum(hashtext(t.*::text)) as chksum from only _TABLE_ t
#compare_fmt = %(cnt)d rows, checksum=%(chksum)s
接下来要考验一下londiste3的处理网络故障的能力. 即把目标库断开, 再连上. 我这里关闭目标库1来模拟.
关闭目标库1
pg92@db-172-16-3-33-> pg_ctl stop -m fast
waiting for server to shut down.... done
server stopped
在主库开启pgbench
postgres@db5-> pgbench -M prepared -f ./login.sql -j 2 -c 2 -n -r -h 127.0.0.1 -p 1921 -U digoal_01 -T 120 digoal_01 &
postgres@db5-> pgbench -M prepared -f ./logout.sql -j 2 -c 2 -n -r -h 127.0.0.1 -p 1921 -U digoal_01 -T 120 digoal_01 &
查看dst1_digoal_01.log, 表示已经连不上目标库1了.
2012-05-31 09:19:18,509 10706 WARNING Failure to call pgq_node.set_consumer_error()
2012-05-31 09:19:18,509 10706 ERROR Job dst1_digoal_01 crashed: could not connect to server: Connection refused
Is the server running on host "172.16.3.33" and accepting
TCP/IP connections on port 1919?
Traceback (most recent call last):
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/consumer.py", line 285, in exception_hook
dst_db = self.get_database(self.target_db)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 733, in get_database
return dbc.get_connection(params['isolation_level'], clist)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 948, in get_connection
self.conn = skytools.connect_database(self.loc)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/psycopgwrapper.py", line 135, in connect_database
db = _CompatConnection(connstr)
OperationalError: could not connect to server: Connection refused
Is the server running on host "172.16.3.33" and accepting
TCP/IP connections on port 1919?
查看一下QUQUE的内容 :
digoal_01=# select * from pgq.event_1_2 limit 10;
ev_id | ev_time | ev_txid | ev_owner | ev_retry | ev_type |
ev_data | ev_extra1 | ev_extra2 | ev_extra3 | ev_extra4
---------+-------------------------------+----------+----------+----------+----------+----------------------------------------------
--------------------------------------------------------------+--------------------------+-----------+-----------+-----------
3188535 | 2012-05-31 08:59:47.064327+08 | 20777539 | | | U:userid | userid=29499&logintime=2012%2d05%2d31+08%3a59
%3a47&login_count=17&logouttime&online_interval=00%3a00%3a00 | digoal_01.user_session | | |
3188541 | 2012-05-31 08:59:47.064719+08 | 20777545 | | | I:id | userid=101464&login_time=2012%2d05%2d31+08%3a
59%3a47.064719&ip=127.0.0.1&id=4424162 | digoal_01.user_login_rec | | |
3188543 | 2012-05-31 08:59:47.064955+08 | 20777547 | | | U:userid | userid=101464&logintime=2012%2d05%2d31+08%3a5
9%3a47&login_count=13&logouttime&online_interval=00%3a00%3a00 | digoal_01.user_session | | |
3188548 | 2012-05-31 08:59:47.065316+08 | 20777551 | | | I:id | userid=56992&login_time=2012%2d05%2d31+08%3a5
9%3a47.065316&ip=127.0.0.1&id=4424164 | digoal_01.user_login_rec | | |
3188551 | 2012-05-31 08:59:47.065573+08 | 20777555 | | | U:userid | userid=56992&logintime=2012%2d05%2d31+08%3a59
%3a47&login_count=25&logouttime&online_interval=00%3a00%3a00 | digoal_01.user_session | | |
3188556 | 2012-05-31 08:59:47.065952+08 | 20777560 | | | I:id | userid=190783&login_time=2012%2d05%2d31+08%3a
59%3a47.065952&ip=127.0.0.1&id=4424166 | digoal_01.user_login_rec | | |
3188558 | 2012-05-31 08:59:47.066187+08 | 20777562 | | | U:userid | userid=190783&logintime=2012%2d05%2d31+08%3a5
9%3a47&login_count=16&logouttime&online_interval=00%3a00%3a00 | digoal_01.user_session | | |
3188563 | 2012-05-31 08:59:47.066566+08 | 20777567 | | | I:id | userid=42506&login_time=2012%2d05%2d31+08%3a5
9%3a47.066566&ip=127.0.0.1&id=4424168 | digoal_01.user_login_rec | | |
3188566 | 2012-05-31 08:59:47.066788+08 | 20777570 | | | U:userid | userid=42506&logintime=2012%2d05%2d31+08%3a59
%3a47&login_count=21&logouttime&online_interval=00%3a00%3a00 | digoal_01.user_session | | |
3188571 | 2012-05-31 08:59:47.067132+08 | 20777575 | | | I:id | userid=23589&login_time=2012%2d05%2d31+08%3a5
9%3a47.067132&ip=127.0.0.1&id=4424170 | digoal_01.user_login_rec | | |
(10 rows)
执行完pgbench后, 开启目标库1, 看看数据能否正常复制过去.
启动目标库1
pg92@db-172-16-3-33-> pg_ctl start
server starting
查看目标库1的subscriber的状态
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini tables
Tables on node
table_name merge_state table_attrs
------------------------- --------------- ---------------
digoal_01.user_info ok
digoal_01.user_login_rec ok
digoal_01.user_logout_rec ok
digoal_01.user_session ok
查看目标库1上是否有复制SQL在执行, 注意看, 一次包含了多条SQL, 是batch的模式, 因为我的目标库1配置的track_activity_query_size = 1024, 所以通过pg_stat_activity.query无法查到全面的SQL, 时间上一个batch比这里看到的SQL远远要多.
pg92@db-172-16-3-33-> psql digoal_01 postgres
psql (9.2beta1)
Type "help" for help.
digoal_01=# select client_addr,query from pg_stat_activity where client_addr='172.16.3.176';
client_addr | query
--------------+---------------------------------------------------------------------------------------------------------------------
--------------------------------------------------
172.16.3.176 | update only digoal_01.user_session set online_interval = '00:00:00', login_count = '18', logintime = '2012-05-31 09:
20:17', logouttime = null where userid = '65376';+
| insert into digoal_01.user_logout_rec (ip, userid, logout_time, id) values ('127.0.0.1', '940907', '2012-05-31 09:20
:16.75214', '1575012'); +
| insert into digoal_01.user_login_rec (ip, userid, id, login_time) values ('127.0.0.1', '160682', '5202739', '2012-05
-31 09:20:16.752364'); +
| insert into digoal_01.user_logout_rec (ip, userid, logout_time, id) values ('127.0.0.1', '19330841', '2012-05-31 09:
20:16.752426', '1575013'); +
| insert into digoal_01.user_login_rec (ip, userid, id, login_time) values ('127.0.0.1', '62766', '5202740', '2012-05-
31 09:20:16.752488'); +
| insert into digoal_01.user_logout_rec (ip, userid, logout_time, id) values ('127.0.0.1', '2407021', '2012-05-31 09:2
0:16.752483', '1575014'); +
| update only digoal_01.user_session set online_interval = '00:00:00', login_count = '17', logintime = '2012-05-31 09:
20:17', logouttime = null where userid
(1 row)
过2分钟后compare一下, 看看数据是否正常.
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini compare
2012-05-31 09:27:28,895 31555 INFO Locking digoal_01.user_info
2012-05-31 09:27:28,896 31555 INFO Syncing digoal_01.user_info
2012-05-31 09:27:32,407 31555 INFO Counting digoal_01.user_info
2012-05-31 09:27:33,543 31555 INFO srcdb: 200000 rows, checksum=545503592610
2012-05-31 09:27:34,375 31555 INFO dstdb: 200000 rows, checksum=545503592610
2012-05-31 09:27:34,377 31555 INFO Locking digoal_01.user_session
2012-05-31 09:27:34,378 31555 INFO Syncing digoal_01.user_session
2012-05-31 09:27:37,381 31555 INFO Counting digoal_01.user_session
2012-05-31 09:27:38,160 31555 INFO srcdb: 200000 rows, checksum=661833980628
2012-05-31 09:27:38,727 31555 INFO dstdb: 200000 rows, checksum=661833980628
2012-05-31 09:27:38,728 31555 INFO Locking digoal_01.user_login_rec
2012-05-31 09:27:38,729 31555 INFO Syncing digoal_01.user_login_rec
2012-05-31 09:27:41,731 31555 INFO Counting digoal_01.user_login_rec
2012-05-31 09:28:02,303 31555 INFO srcdb: 5342247 rows, checksum=-1165249103392
2012-05-31 09:28:16,048 31555 INFO dstdb: 5342247 rows, checksum=-1165249103392
2012-05-31 09:28:16,049 31555 INFO Locking digoal_01.user_logout_rec
2012-05-31 09:28:16,050 31555 INFO Syncing digoal_01.user_logout_rec
2012-05-31 09:28:19,556 31555 INFO Counting digoal_01.user_logout_rec
2012-05-31 09:28:27,022 31555 INFO srcdb: 1922981 rows, checksum=1998341487944
2012-05-31 09:28:32,027 31555 INFO dstdb: 1922981 rows, checksum=1998341487944
看看truncate能不能复制
主库执行
postgres@db5-> psql digoal_01 digoal_01
psql (9.1.3)
Type "help" for help.
digoal_01=> TRUNCATE user_login_rec ;
TRUNCATE TABLE
digoal_01=> TRUNCATE user_logout_rec ;
TRUNCATE TABLE
compare比较结果如下, 说明truncate可以正常复制.
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini compare
2012-05-31 09:29:12,317 31598 INFO Locking digoal_01.user_info
2012-05-31 09:29:12,318 31598 INFO Syncing digoal_01.user_info
2012-05-31 09:29:15,327 31598 INFO Counting digoal_01.user_info
2012-05-31 09:29:16,458 31598 INFO srcdb: 200000 rows, checksum=545503592610
2012-05-31 09:29:17,244 31598 INFO dstdb: 200000 rows, checksum=545503592610
2012-05-31 09:29:17,246 31598 INFO Locking digoal_01.user_session
2012-05-31 09:29:17,246 31598 INFO Syncing digoal_01.user_session
2012-05-31 09:29:20,756 31598 INFO Counting digoal_01.user_session
2012-05-31 09:29:21,531 31598 INFO srcdb: 200000 rows, checksum=661833980628
2012-05-31 09:29:22,040 31598 INFO dstdb: 200000 rows, checksum=661833980628
2012-05-31 09:29:22,042 31598 INFO Locking digoal_01.user_login_rec
2012-05-31 09:29:22,042 31598 INFO Syncing digoal_01.user_login_rec
2012-05-31 09:29:25,549 31598 INFO Counting digoal_01.user_login_rec
2012-05-31 09:29:25,550 31598 INFO srcdb: 0 rows, checksum=None
2012-05-31 09:29:25,551 31598 INFO dstdb: 0 rows, checksum=None
2012-05-31 09:29:25,552 31598 INFO Locking digoal_01.user_logout_rec
2012-05-31 09:29:25,552 31598 INFO Syncing digoal_01.user_logout_rec
2012-05-31 09:29:28,555 31598 INFO Counting digoal_01.user_logout_rec
2012-05-31 09:29:28,556 31598 INFO srcdb: 0 rows, checksum=None
2012-05-31 09:29:28,557 31598 INFO dstdb: 0 rows, checksum=None
接下来模拟provider和consumer, pgqd的进程异常的情况, 看看能不能正常复制.
关闭provider,consumer,pgqd进程
postgres@db5-> pgqd -s /home/postgres/londiste3/pgqd.ini
SIGINT sent
postgres@db5-> londiste3 -s /home/postgres/londiste3/src_digoal_01.ini
postgres@db5-> londiste3 -s /home/postgres/londiste3/dst1_digoal_01.ini
开启pgbench压力测试
postgres@db5-> pgbench -M prepared -f ./login.sql -j 2 -c 2 -n -r -h 127.0.0.1 -p 1921 -U digoal_01 -T 120 digoal_01 &
postgres@db5-> pgbench -M prepared -f ./logout.sql -j 2 -c 2 -n -r -h 127.0.0.1 -p 1921 -U digoal_01 -T 120 digoal_01 &
压力测试完后, 开启provider,consumer,pgqd进程
postgres@db5-> londiste3 -d /home/postgres/londiste3/src_digoal_01.ini worker
postgres@db5-> londiste3 -d /home/postgres/londiste3/dst1_digoal_01.ini worker
postgres@db5-> pgqd -d /home/postgres/londiste3/pgqd.ini
2012-05-31 09:39:22.321 31878 LOG Starting pgqd 3.0.2
过2分钟左右compare主库和目标库1, 结果如下, 表示复制正常
postgres@db5-> londiste3 /home/postgres/londiste3/dst1_digoal_01.ini compare
2012-05-31 09:43:45,693 32038 INFO Locking digoal_01.user_info
2012-05-31 09:43:45,694 32038 INFO Syncing digoal_01.user_info
2012-05-31 09:43:49,204 32038 INFO Counting digoal_01.user_info
2012-05-31 09:43:50,334 32038 INFO srcdb: 200000 rows, checksum=545503592610
2012-05-31 09:43:51,123 32038 INFO dstdb: 200000 rows, checksum=545503592610
2012-05-31 09:43:51,124 32038 INFO Locking digoal_01.user_session
2012-05-31 09:43:51,125 32038 INFO Syncing digoal_01.user_session
2012-05-31 09:43:54,133 32038 INFO Counting digoal_01.user_session
2012-05-31 09:43:54,938 32038 INFO srcdb: 200000 rows, checksum=455874670113
2012-05-31 09:43:55,549 32038 INFO dstdb: 200000 rows, checksum=455874670113
2012-05-31 09:43:55,551 32038 INFO Locking digoal_01.user_login_rec
2012-05-31 09:43:55,552 32038 INFO Syncing digoal_01.user_login_rec
2012-05-31 09:43:58,554 32038 INFO Counting digoal_01.user_login_rec
2012-05-31 09:43:59,707 32038 INFO srcdb: 299331 rows, checksum=-307537943288
2012-05-31 09:44:00,474 32038 INFO dstdb: 299331 rows, checksum=-307537943288
2012-05-31 09:44:00,476 32038 INFO Locking digoal_01.user_logout_rec
2012-05-31 09:44:00,476 32038 INFO Syncing digoal_01.user_logout_rec
2012-05-31 09:44:03,480 32038 INFO Counting digoal_01.user_logout_rec
2012-05-31 09:44:05,790 32038 INFO srcdb: 595666 rows, checksum=314887387132
2012-05-31 09:44:07,340 32038 INFO dstdb: 595666 rows, checksum=314887387132
六、从主库复制到目标库2
这个测试中, 目标库2的库名和主库不一样, 目标库2的表名和主库不一样, 目标库2的schema名和主库不一样.
同时只复制满足条件的记录. where userid<1000
创建配置文件
vi /home/postgres/londiste3/dst2_digoal_02.ini
[londiste3]
job_name = dst2_digoal_02
db = host=172.16.3.33 port=1919 user=postgres dbname=digoal_02 password=postgres
queue_name = replika
logfile = /home/postgres/londiste3/log/dst2_digoal_02.log
pidfile = /home/postgres/londiste3/pid/dst2_digoal_02.pid
parallel_copies = 16
创建页节点
postgres@db5-> londiste3 -v /home/postgres/londiste3/dst2_digoal_02.ini create-leaf dst2_digoal_02 "host=172.16.3.33 port=1919 user=postgres dbname=digoal_02 password=postgres" --provider="host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres"
2012-05-31 09:51:04,882 32287 DEBUG Connect 'new_node' to 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_02 [...]'
2012-05-31 09:51:04,889 32287 INFO plpgsql is installed
2012-05-31 09:51:04,889 32287 INFO Installing pgq
2012-05-31 09:51:04,890 32287 INFO Reading from /opt/skytools3.0.2/share/skytools3/pgq.sql
2012-05-31 09:51:05,268 32287 INFO pgq.get_batch_cursor is installed
2012-05-31 09:51:05,268 32287 INFO Installing pgq_ext
2012-05-31 09:51:05,268 32287 INFO Reading from /opt/skytools3.0.2/share/skytools3/pgq_ext.sql
2012-05-31 09:51:05,419 32287 INFO Installing pgq_node
2012-05-31 09:51:05,419 32287 INFO Reading from /opt/skytools3.0.2/share/skytools3/pgq_node.sql
2012-05-31 09:51:05,615 32287 INFO Installing londiste
2012-05-31 09:51:05,615 32287 INFO Reading from /opt/skytools3.0.2/share/skytools3/londiste.sql
2012-05-31 09:51:05,852 32287 INFO londiste.global_add_table is installed
2012-05-31 09:51:05,853 32287 DEBUG exec_query: select * from pgq_node.get_node_info('replika')
2012-05-31 09:51:05,856 32287 INFO Initializing node
2012-05-31 09:51:05,856 32287 DEBUG Connect 'root_db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 [...]'
2012-05-31 09:51:05,859 32287 DEBUG exec_query: select * from pgq_node.get_node_info('replika')
2012-05-31 09:51:05,868 32287 DEBUG db='host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres' -- type='root' provider='host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres'
2012-05-31 09:51:05,868 32287 DEBUG exec_query: select * from pgq_node.get_node_info('replika')
2012-05-31 09:51:05,870 32287 DEBUG exec_query: select * from pgq_node.get_queue_locations('replika')
2012-05-31 09:51:05,871 32287 DEBUG Connect 'provider_db' to 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 [...]'
2012-05-31 09:51:05,874 32287 DEBUG exec_query: select node_type, node_name from pgq_node.get_node_info('replika')
2012-05-31 09:51:05,883 32287 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'dst2_digoal_02', 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_02 password=postgres', false)
2012-05-31 09:51:05,886 32287 INFO Location registered
2012-05-31 09:51:05,886 32287 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'dst2_digoal_02', 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_02 password=postgres', false)
2012-05-31 09:51:05,889 32287 INFO Location registered
2012-05-31 09:51:05,889 32287 DEBUG exec_cmd: select * from pgq_node.register_subscriber('replika', 'dst2_digoal_02', 'dst2_digoal_02', null)
2012-05-31 09:51:05,893 32287 INFO Subscriber registered: dst2_digoal_02
2012-05-31 09:51:05,894 32287 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'dst2_digoal_02', 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_02 password=postgres', false)
2012-05-31 09:51:05,895 32287 INFO Location registered
2012-05-31 09:51:05,895 32287 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'src_digoal_01', 'host=172.16.3.176 port=1921 user=postgres dbname=digoal_01 password=postgres', 'False')
2012-05-31 09:51:05,896 32287 INFO Location registered
2012-05-31 09:51:05,896 32287 DEBUG exec_cmd: select * from pgq_node.register_location('replika', 'dst1_digoal_01', 'host=172.16.3.33 port=1919 user=postgres dbname=digoal_01 password=postgres', 'False')
2012-05-31 09:51:05,897 32287 INFO Location registered
2012-05-31 09:51:05,897 32287 DEBUG exec_cmd: select * from pgq_node.create_node('replika', 'leaf', 'dst2_digoal_02', 'dst2_digoal_02', 'src_digoal_01', '1944', null)
2012-05-31 09:51:05,900 32287 INFO Node "dst2_digoal_02" initialized for queue "replika" with type "leaf"
2012-05-31 09:51:05,906 32287 INFO Done
启动worker进程
postgres@db5-> londiste3 -d /home/postgres/londiste3/dst2_digoal_02.ini worker
添加表
postgres@db5-> londiste3 /home/postgres/londiste3/dst2_digoal_02.ini add-table --dest-table=digoal_02.user_info1 --copy-condition="userid<1000" digoal_01.user_info
2012-05-31 09:53:56,235 32362 INFO Table added: digoal_01.user_info(digoal_02.user_info1)
postgres@db5-> londiste3 /home/postgres/londiste3/dst2_digoal_02.ini add-table --dest-table=digoal_02.user_session1 --copy-condition="userid<1000" digoal_01.user_session
2012-05-31 09:54:26,298 32377 INFO Table added: digoal_01.user_session(digoal_02.user_session1)
查看复制状态
postgres@db5-> londiste3 /home/postgres/londiste3/dst2_digoal_02.ini tables
Tables on node
table_name merge_state table_attrs
---------------------- --------------- ----------------------------
digoal_01.user_info in-copy {'copy_condition': 'userid<1000'}
digoal_01.user_session in-copy {'copy_condition': 'userid<1000'}
compare比较, 由于目标库2只复制了部分记录, 所以checksum和记录数不一致
postgres@db5-> londiste3 /home/postgres/londiste3/dst2_digoal_02.ini compare
2012-05-31 09:55:43,611 32418 INFO Locking digoal_01.user_info
2012-05-31 09:55:43,612 32418 INFO Syncing digoal_02.user_info1
2012-05-31 09:55:47,121 32418 INFO Counting digoal_02.user_info1
2012-05-31 09:55:48,259 32418 INFO srcdb: 200000 rows, checksum=545503592610
2012-05-31 09:55:48,264 32418 INFO dstdb: 999 rows, checksum=19444530475
2012-05-31 09:55:48,265 32418 WARNING digoal_02.user_info1: Results do not match!
2012-05-31 09:55:48,266 32418 INFO Locking digoal_01.user_session
2012-05-31 09:55:48,267 32418 INFO Syncing digoal_02.user_session1
2012-05-31 09:55:50,773 32418 INFO Counting digoal_02.user_session1
2012-05-31 09:55:51,556 32418 INFO srcdb: 200000 rows, checksum=455874670113
2012-05-31 09:55:51,559 32418 INFO dstdb: 999 rows, checksum=-69913661777
2012-05-31 09:55:51,559 32418 WARNING digoal_02.user_session1: Results do not match!
为了得到正确的compare,目前没有好的解决办法, 通过修改配置文件暂时使用, 不适用, 因为其他表可能没有或不是这个条件.
以下是测试, 注意生产中不要这样使用. 非要这么用的话就写多个配置文件.
postgres@db5-> vi dst2_digoal_02.ini
[londiste3]
job_name = dst2_digoal_02
db = host=172.16.3.33 port=1919 user=postgres dbname=digoal_02 password=postgres
queue_name = replika
logfile = /home/postgres/londiste3/log/dst2_digoal_02.log
pidfile = /home/postgres/londiste3/pid/dst2_digoal_02.pid
parallel_copies = 16
compare_sql = select count(1) as cnt, sum(hashtext(t.*::text)) as chksum from only _TABLE_ t where userid<1000
postgres@db5-> londiste3 -r /home/postgres/londiste3/dst2_digoal_02.ini
postgres@db5-> londiste3 /home/postgres/londiste3/dst2_digoal_02.ini compare
2012-05-31 09:59:09,559 32498 INFO Locking digoal_01.user_info
2012-05-31 09:59:09,560 32498 INFO Syncing digoal_02.user_info1
2012-05-31 09:59:13,568 32498 INFO Counting digoal_02.user_info1
2012-05-31 09:59:13,576 32498 INFO srcdb: 999 rows, checksum=19444530475
2012-05-31 09:59:13,581 32498 INFO dstdb: 999 rows, checksum=19444530475
2012-05-31 09:59:13,582 32498 INFO Locking digoal_01.user_session
2012-05-31 09:59:13,583 32498 INFO Syncing digoal_02.user_session1
2012-05-31 09:59:17,589 32498 INFO Counting digoal_02.user_session1
2012-05-31 09:59:17,597 32498 INFO srcdb: 999 rows, checksum=-69913661777
2012-05-31 09:59:17,601 32498 INFO dstdb: 999 rows, checksum=-69913661777
写多个配置文件的用法, compare用专用的配置文件.
把dst2_digoal_02.ini恢复, 并reload.
postgres@db5-> cat dst2_digoal_02.ini
[londiste3]
job_name = dst2_digoal_02
db = host=172.16.3.33 port=1919 user=postgres dbname=digoal_02 password=postgres
queue_name = replika
logfile = /home/postgres/londiste3/log/dst2_digoal_02.log
pidfile = /home/postgres/londiste3/pid/dst2_digoal_02.pid
parallel_copies = 16
postgres@db5-> londiste3 -r /home/postgres/londiste3/dst2_digoal_02.ini
新建一个配置文件, 专用于比较
postgres@db5-> cat dst2_digoal_02_compare.ini
[londiste3]
job_name = dst2_digoal_02
db = host=172.16.3.33 port=1919 user=postgres dbname=digoal_02 password=postgres
queue_name = replika
logfile = /home/postgres/londiste3/log/dst2_digoal_02.log
pidfile = /home/postgres/londiste3/pid/dst2_digoal_02.pid
parallel_copies = 16
compare_sql = select count(1) as cnt, sum(hashtext(t.*::text)) as chksum from only _TABLE_ t where userid<1000
使用这个配置文件进行比较, 这样比较复杂, 但是很安全.
postgres@db5-> londiste3 /home/postgres/londiste3/dst2_digoal_02_compare.ini compare
2012-05-31 10:03:55,834 32689 INFO Locking digoal_01.user_info
2012-05-31 10:03:55,835 32689 INFO Syncing digoal_02.user_info1
2012-05-31 10:03:58,339 32689 INFO Counting digoal_02.user_info1
2012-05-31 10:03:58,346 32689 INFO srcdb: 999 rows, checksum=19444530475
2012-05-31 10:03:58,351 32689 INFO dstdb: 999 rows, checksum=19444530475
2012-05-31 10:03:58,353 32689 INFO Locking digoal_01.user_session
2012-05-31 10:03:58,353 32689 INFO Syncing digoal_02.user_session1
2012-05-31 10:04:01,355 32689 INFO Counting digoal_02.user_session1
2012-05-31 10:04:01,363 32689 INFO srcdb: 999 rows, checksum=-69913661777
2012-05-31 10:04:01,367 32689 INFO dstdb: 999 rows, checksum=-69913661777
七、测试添加字段
把添加字段的SQL放到一个文件中执行. 目标库和主库的表名schema名一致的情况下, 可以正常的复制过去, 而不一致的情况下需要手工修复, 所以建议复制尽量一致.
postgres@db5-> vi add_column.sql
begin;
alter table digoal_01.user_info add column c1 int;
alter table digoal_01.user_session add column c1 int;
end;
使用provider的配置文件执行,
postgres@db5-> londiste3 /home/postgres/londiste3/src_digoal_01.ini execute ./add_column.sql
2012-05-31 10:11:09,230 482 INFO Executing: add_column.sql
2012-05-31 10:11:09,253 482 INFO Execute finished: add_column.sql
使用consumer执行会报错如下
postgres@db5-> londiste3 /home/postgres/londiste3/dst2_digoal_02.ini execute ./add_column.sql
2012-05-31 10:14:52,014 584 ERROR Node is not root node: replika
主库src_digoal_01.log日志
2012-05-31 10:11:09,230 482 INFO Executing: add_column.sql
2012-05-31 10:11:09,253 482 INFO Execute finished: add_column.sql
查看目标库1是否正常的添加了c1字段, 因为目标库1的schema和表名都和主库一致.
dst1_digoal_01.log
2012-05-31 10:11:11,203 31872 INFO Executing: add_column.sql
2012-05-31 10:11:11,224 31872 INFO Execute finished: add_column.sql
pg92@db-172-16-3-33-> psql digoal_01 digoal_01
psql (9.2beta1)
Type "help" for help.
digoal_01=> \d user_info
Table "digoal_01.user_info"
Column | Type | Modifiers
------------+-----------------------------+-----------
userid | integer | not null
engname | text |
cnname | text |
occupation | text |
birthday | date |
signname | text |
email | text |
qq | numeric |
crt_time | timestamp without time zone |
mod_time | timestamp without time zone |
c1 | integer |
Indexes:
"pk_user_info" PRIMARY KEY, btree (userid)
Triggers:
_londiste_replika AFTER INSERT OR DELETE OR UPDATE ON user_info FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('replika', 'deny')
_londiste_replika_truncate AFTER TRUNCATE ON user_info FOR EACH STATEMENT EXECUTE PROCEDURE pgq.sqltriga('replika', 'deny')
digoal_01=> \d user_session
Table "digoal_01.user_session"
Column | Type | Modifiers
-----------------+--------------------------------+------------------------------
userid | integer | not null
logintime | timestamp(0) without time zone |
login_count | bigint | default 0
logouttime | timestamp(0) without time zone |
online_interval | interval | default '00:00:00'::interval
c1 | integer |
Indexes:
"pk_user_session" PRIMARY KEY, btree (userid)
Triggers:
_londiste_replika AFTER INSERT OR DELETE OR UPDATE ON user_session FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('replika', 'deny')
_londiste_replika_truncate AFTER TRUNCATE ON user_session FOR EACH STATEMENT EXECUTE PROCEDURE pgq.sqltriga('replika', 'deny')
目标库2没有添加c1和c2字段, 所以需要手工执行. 因为目标库2的schema名和表名不一致, 报错如下.
2012-05-31 10:16:50,921 32303 INFO Executing: add_column.sql
2012-05-31 10:16:50,928 32303 ERROR Job dst2_digoal_02 got error on connection 'db': schema "digoal_01" does not exist. Query: alt
er table digoal_01.user_info add column c1 int;
Traceback (most recent call last):
File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 565, in run_func_safely
return func()
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/consumer.py", line 199, in work
return Consumer.work(self)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/consumer.py", line 257, in work
self._launch_process_batch(db, batch_id, ev_list)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/consumer.py", line 286, in _launch_process_batch
self.process_batch(db, batch_id, list)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/consumer.py", line 172, in process_batch
self.process_remote_batch(src_db, tick_id, event_list, dst_db)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/londiste/playback.py", line 368, in process_remote_batch
CascadedWorker.process_remote_batch(self, src_db, tick_id, ev_list, dst_db)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/worker.py", line 155, in process_remote_batch
self.process_remote_event(src_curs, dst_curs, ev)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/londiste/playback.py", line 597, in process_remote_event
self.handle_execute_event(ev, dst_curs)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/londiste/playback.py", line 669, in handle_execute_event
dst_curs.execute(stmt)
File "/opt/python2.7.3/lib/python2.7/site-packages/psycopg2/extras.py", line 123, in execute
return _cursor.execute(self, query, vars)
ProgrammingError: schema "digoal_01" does not exist
手工执行
pg92@db-172-16-3-33-> psql digoal_02 digoal_02
psql (9.2beta1)
Type "help" for help.
digoal_02=> begin;
BEGIN
digoal_02=> alter table digoal_02.user_info1 add column c1 int;
ALTER TABLE
digoal_02=> alter table digoal_02.user_session1 add column c1 int;
ALTER TABLE
digoal_02=> end;
COMMIT
这里还要注意, 手工执行完后, 目标库2的consumer上, 还会不停的去执行刚才队列里面的SQL, 通过状态我们可以看出.
postgres@db5-> londiste3 /home/postgres/londiste3/dst2_digoal_02.ini status
Queue: replika Local node: dst2_digoal_02
src_digoal_01 (root)
| Tables: 4/0/0
| Lag: 18s, Tick: 2238
+--dst1_digoal_01 (leaf)
| Tables: 4/0/0
| Lag: 18s, Tick: 2238
+--dst2_digoal_02 (leaf)
Tables: 2/0/2
Lag: 2h10m42s, Tick: 2021
ERR: dst2_digoal_02: schema "digoal_01" does not exist
日志的错误也一直会有, 而且后面发生的更改, 目标库2在解决这些异常前都不能执行下去.
2012-05-31 12:28:44,500 32303 INFO Executing: add_column.sql
2012-05-31 12:28:44,507 32303 ERROR Job dst2_digoal_02 got error on connection 'db': schema "digoal_01" does not exist. Query: alt
er table digoal_01.user_info add column c1 int;
Traceback (most recent call last):
File "/opt/skytools3.0.2/lib/python2.7/site-packages/skytools/scripting.py", line 565, in run_func_safely
return func()
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/consumer.py", line 199, in work
return Consumer.work(self)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/consumer.py", line 257, in work
self._launch_process_batch(db, batch_id, ev_list)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/consumer.py", line 286, in _launch_process_batch
self.process_batch(db, batch_id, list)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/consumer.py", line 172, in process_batch
self.process_remote_batch(src_db, tick_id, event_list, dst_db)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/londiste/playback.py", line 368, in process_remote_batch
CascadedWorker.process_remote_batch(self, src_db, tick_id, ev_list, dst_db)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/pgq/cascade/worker.py", line 155, in process_remote_batch
self.process_remote_event(src_curs, dst_curs, ev)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/londiste/playback.py", line 597, in process_remote_event
self.handle_execute_event(ev, dst_curs)
File "/opt/skytools3.0.2/lib/python2.7/site-packages/londiste/playback.py", line 669, in handle_execute_event
dst_curs.execute(stmt)
File "/opt/python2.7.3/lib/python2.7/site-packages/psycopg2/extras.py", line 123, in execute
return _cursor.execute(self, query, vars)
ProgrammingError: schema "digoal_01" does not exist
那么怎么解决呢, 一种办法是跳过这个ticker_id, 让目标库2跳过这些SQL.
另一种办法是在目标库2上新建digoal_01 schema以及user_info和user_session表, 让这个ticker可以在目标库2上执行下去. 通过之后再删掉这个临时用的schema.
pg92@db-172-16-3-33-> psql digoal_02 digoal_02
psql (9.2beta1)
Type "help" for help.
digoal_02=> create schema digoal_01;
CREATE SCHEMA
digoal_02=> create table digoal_01.user_info(id int);
CREATE TABLE
digoal_02=> create table digoal_01.user_session(id int);
CREATE TABLE
等会在看看状态, 正常后删除这个schema .
postgres@db5-> londiste3 /home/postgres/londiste3/dst2_digoal_02.ini status
Queue: replika Local node: dst2_digoal_02
src_digoal_01 (root)
| Tables: 4/0/0
| Lag: 19s, Tick: 2266
+--dst1_digoal_01 (leaf)
| Tables: 4/0/0
| Lag: 19s, Tick: 2266
+--dst2_digoal_02 (leaf)
Tables: 2/0/2
Lag: 19s, Tick: 2266
删除临时schema
digoal_02=> select * from digoal_01.user_info ;
id | c1
----+----
(0 rows)
digoal_02=> select * from digoal_01.user_session ;
id | c1
----+----
(0 rows)
digoal_02=> drop schema digoal_01 cascade;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table digoal_01.user_info
drop cascades to table digoal_01.user_session
DROP SCHEMA
一定要注意londiste的健康, 有异常要火速解决, 否则queue会越来越大, 没有处理的tick也会堆积.
参考
《Can session_replication_role used like MySQL’s BlackHole Engine?》
http://blog.163.com/digoal@126/blog/static/163877040201119111234570/
《Londiste 3 replicate case - 1 上节》
http://skytools.projects.postgresql.org/skytools-3.0/
postgres@db5-> londiste3 --ini
[londiste3]
## Parameters for Londiste ##
# target database
db = dbname=somedb host=127.0.0.1
# how many tables can be copied in parallel
#parallel_copies = 1
# accept only events for locally present tables
#local_only = true
## compare/repair
# max amount of time table can be locked
#lock_timeout = 10
# compare: sql to use
#compare_sql = select count(1) as cnt, sum(hashtext(t.*::text)) as chksum from only _TABLE_ t
#compare_fmt = %(cnt)d rows, checksum=%(chksum)s
## Parameters for pgq.CascadedWorker ##
# how often the root node should push wm downstream (seconds)
#global_wm_publish_period = 300
# how often the nodes should report their wm upstream (seconds)
#local_wm_publish_period = 300
## Parameters for pgq.Consumer ##
# queue name to read from
queue_name =
# override consumer name
#consumer_name = %(job_name)s
# whether to use cursor to fetch events (0 disables)
#pgq_lazy_fetch = 300
# whether to read from source size in autocommmit mode
# not compatible with pgq_lazy_fetch
# the actual user script on top of pgq.Consumer must also support it
#pgq_autocommit = 0
# whether to wait for specified number of events, before
# assigning a batch (0 disables)
#pgq_batch_collect_events = 0
# whether to wait specified amount of time,
# before assigning a batch (postgres interval)
#pgq_batch_collect_interval =
# whether to stay behind queue top (postgres interval)
#pgq_keep_lag =
## Parameters for skytools.DBScript ##
# default lifetime for database connections (in seconds)
#connection_lifetime = 1200
## Parameters for skytools.DBScript ##
# how many seconds to sleep between work loops
# if missing or 0, then instead sleeping, the script will exit
loop_delay = 1.0
# where to log
logfile = ~/log/%(job_name)s.log
# where to write pidfile
pidfile = ~/pid/%(job_name)s.pid
# per-process name to use in logging
#job_name = %(config_name)s
# whether centralized logging should be used
# search-path [ ./skylog.ini, ~/.skylog.ini, /etc/skylog.ini ]
# 0 - disabled
# 1 - enabled, unless non-daemon on console (os.isatty())
# 2 - always enabled
#use_skylog = 0
# how many seconds to sleep after catching a exception
#exception_sleep = 20
postgres@db5-> pgqd --ini
[pgqd]
# where to log
logfile = ~/log/pgqd.log
# pidfile
pidfile = ~/pid/pgqd.pid
## optional parameters ##
# libpq connect string without dbname=
#base_connstr =
# startup db to query other databases
#initial_database = template1
# limit ticker to specific databases
#database_list =
# log into syslog
#syslog = 1
#syslog_ident = pgqd
## optional timeouts ##
# how often to check for new databases
#check_period = 60
# how often to flush retry queue
#retry_period = 30
# how often to do maintentance
#maint_period = 120
# how often to run ticker
#ticker_period = 1
londiste3调用的其实都是skytools lib里面的py脚本, 例如compare调用的是compare.py
pg92@db-172-16-3-33-> pwd
/opt/skytools3.0.2/lib/python2.7/site-packages/londiste
pg92@db-172-16-3-33-> ll
total 220K
-rw-r--r-- 1 root root 2.8K May 11 02:40 bublin.py
-rw-r--r-- 1 root root 3.5K May 30 14:47 bublin.pyc
-rw-r--r-- 1 root root 1.6K May 11 02:40 compare.py
-rw-r--r-- 1 root root 2.0K May 30 14:47 compare.pyc
-rw-r--r-- 1 root root 6.6K May 11 02:40 handler.py
-rw-r--r-- 1 root root 8.6K May 30 14:47 handler.pyc
drwxr-xr-x 2 root root 4.0K May 30 14:47 handlers
-rw-r--r-- 1 root root 601 May 11 02:40 __init__.py
-rw-r--r-- 1 root root 708 May 30 14:47 __init__.pyc
-rw-r--r-- 1 root root 32K May 11 02:40 playback.py
-rw-r--r-- 1 root root 28K May 30 14:47 playback.pyc
-rw-r--r-- 1 root root 9.8K May 11 02:40 repair.py
-rw-r--r-- 1 root root 11K May 30 14:47 repair.pyc
-rw-r--r-- 1 root root 21K May 11 02:40 setup.py
-rw-r--r-- 1 root root 20K May 30 14:47 setup.pyc
-rw-r--r-- 1 root root 8.5K May 11 02:40 syncer.py
-rw-r--r-- 1 root root 8.4K May 30 14:47 syncer.pyc
-rw-r--r-- 1 root root 8.6K May 11 02:40 table_copy.py
-rw-r--r-- 1 root root 7.1K May 30 14:47 table_copy.pyc
添加删除列, 添加触发器的过程实际上是这样的。
Adding a column to a replicated table
This case is handled in a simple process:
add the column on all the subscribers
BEGIN; -- on the provider
add the column on the provider
SELECT londiste.provider_refresh_trigger('queue_name', 'tablename');
COMMIT;
Removing a column to a replicated table
drop column from provider and change trigger in same transaction
look at the lag when londiste has passed the moment of the drop
drop column on subscribers
The trick here is to drop the column on the subscribers only when there's no more event in the queue referencing it.
Adding custom triggers on subscriber side
By default, londiste will consider that the triggers which exist on the subscriber tables are there because you just restore the provider schema there, being as lazy as possible. If you intend to run custom triggers on the subscriber, you have to tell londiste about them as follows:
create the custom trigger on the subscriber
londiste.py p-to-s.ini subscriber add public.T1
londiste.py p-to-s.ini subscriber restore-triggers public.T1
CAUTION: When londiste stops, it will remove the trigger again.
You can even give the restore trigger a specific trigger name, and only this one will get reinstalled.