PostgreSQL共享存储一写多读(类Oracle RAC架构)

9 minute read

背景

亚马逊推出的Aurora数据库引擎,支持一份存储,一主多读的架构。这个架构和Oracle RAC类似,也是共享存储,但是只有一个实例可以执行写操作,其他实例只能执行读操作。相比传统的基于复制的一主多读,节约了存储和网络带宽的成本。

PostgreSQL共享存储一写多读(类Oracle RAC架构)

PostgreSQL天然支持物理流式复制来构建只读备库,但是并不是共享存储的架构,而是多份存储的架构。

我们可以使用PostgreSQL的hot standby模式来模拟这种共享存储一主多读的架构,但是需要通过内核修改,解决几个问题。

1、hot standby也会对数据库有写的动作,例如recovery时,会修改控制文件,数据文件等等,这些操作是多余的。

还有需要注意的是:

pg_xlog    
pg_log    
pg_clog    
pg_multixact    
postgresql.conf    
recovery.conf    
postmaster.pid    

2、主库内存状态的同步问题。有很多状态是存储在内存中的,所以内存状态也需要同步到备节点。

3、主库在flush dirty page前,原来只需要判断对应的WAL是否已落盘,确保一致性。现在还需要判断其是否已经在备库被apply(构造对应的BUFFER)。否则刷出去的脏页可能被备库读到导致不一致。

备库挂掉后,或者启动时,需要apply到一个点,才允许开放只读。这个点可以和主库来协商,与主库达到catch up状态即可。

为什么要让主库flush dirty page前,确保备库已APPLY到dirty page(head’s wal offset)截止位点的WAL?因为备库的shared buffer有限,回放的数据也可能挤出去,所以通过这种方法可以简单的实现备库的数据一致性。

4、为了让flush后的PAGE能够被备库读到。flush dirty page必须采用direct io,原来调用的是操作系统WRITE接口,实际上可以使用集群文件系统,实现全局读一致性,避免需要改代码来实现direct io。

最终实现一主多备的架构,需要通过改PG内核来实现:

这些文件应该是每个实例对应一份。

postgresql.conf, recovery.conf, postmaster.pid, pg_control    

hot standby不执行实际的恢复操作,但是需要更新自己的内存状态,如当前的OID,XID等等,以及更新自己的pg_control。

在多实例间,要实现主到备节点的OS脏页的同步,数据库shared buffer脏页的同步。

模拟过程1(未修改一行代码)

不改任何代码,在同一主机下启多实例测试,会遇到一些问题。(后面有问题描述,以及如何修改代码来修复这些问题)

1、主实例配置文件:

 # vi postgresql.conf    
listen_addresses='0.0.0.0'    
port=1921    
max_connections=100    
unix_socket_directories='.'    
ssl=on    
ssl_ciphers='EXPORT40'    
shared_buffers=512MB    
huge_pages=try    
max_prepared_transactions=0    
max_stack_depth=100kB    
dynamic_shared_memory_type=posix    
max_files_per_process=500    
wal_level=logical    
fsync=off    
synchronous_commit=off    
wal_sync_method=open_datasync    
full_page_writes=off    
wal_log_hints=off    
wal_buffers=16MB    
wal_writer_delay=10ms    
checkpoint_segments=8    
archive_mode=off    
archive_command='/bin/date'    
max_wal_senders=10    
max_replication_slots=10    
hot_standby=on    
wal_receiver_status_interval=1s    
hot_standby_feedback=off   
enable_bitmapscan=on    
enable_hashagg=on    
enable_hashjoin=on    
enable_indexscan=on    
enable_material=on    
enable_mergejoin=on    
enable_nestloop=on    
enable_seqscan=on    
enable_sort=on    
enable_tidscan=on    
log_destination='csvlog'    
logging_collector=on    
log_directory='pg_log'    
log_truncate_on_rotation=on    
log_rotation_size=10MB    
log_checkpoints=on    
log_connections=on    
log_disconnections=on    
log_duration=off    
log_error_verbosity=verbose    
log_line_prefix='%i    
log_statement='none'    
log_timezone='PRC'    
autovacuum=on    
log_autovacuum_min_duration=0    
autovacuum_vacuum_scale_factor=0.0002    
autovacuum_analyze_scale_factor=0.0001    
datestyle='iso,    
timezone='PRC'    
lc_messages='C'    
lc_monetary='C'    
lc_numeric='C'    
lc_time='C'    
default_text_search_config='pg_catalog.english'    
    
 # vi recovery.done    
recovery_target_timeline='latest'    
standby_mode=on    
primary_conninfo = 'host=127.0.0.1 port=1921 user=postgres keepalives_idle=60'    
    
 # vi pg_hba.conf    
local   replication     postgres                                trust    
host    replication     postgres 127.0.0.1/32            trust    

2、启动主实例。

postgres@digoal-> pg_ctl start    

3、启动只读实例,必须先删除postmaster.pid,这点PostgreSQL新版本加了一个PATCH,如果这个文件被删除,会自动关闭数据库,所以我们需要注意,不要使用最新的PGSQL,或者把这个patch干掉先。

postgres@digoal-> cd $PGDATA    
postgres@digoal-> mv recovery.done recovery.conf    
    
postgres@digoal-> rm -f postmaster.pid    
postgres@digoal-> pg_ctl start -o "-c log_directory=pg_log1922 -c port=1922"    

4、查看当前控制文件状态,只读实例改了控制文件,和前面描述一致。

postgres@digoal-> pg_controldata |grep state    
Database cluster state:               in archive recovery    

5、连到主实例,创建表,插入测试数据。

psql -p 1921    
postgres=# create table test1(id int);    
CREATE TABLE    
postgres=# insert into test1 select generate_series(1,10);    
INSERT 0 10    

6、在只读实例查看插入的数据。

postgres@digoal-> psql -h 127.0.0.1 -p 1922    
postgres=# select * from test1;    
 id    
----    
  1    
  2    
  3    
  4    
  5    
  6    
  7    
  8    
  9    
 10    
(10 rows)    

7、主实例执行检查点后,控制文件状态会改回生产状态。

psql -p 1921    
postgres=# checkpoint;    
CHECKPOINT    
    
postgres@digoal-> pg_controldata |grep state    
Database cluster state:               in production    

8、但是如果在只读实例执行完检查点,又会改回恢复状态。

postgres@digoal-> psql -h 127.0.0.1 -p 1922    
psql (9.4.4)    
postgres=# checkpoint;    
CHECKPOINT    
    
postgres@digoal-> pg_controldata |grep state    
Database cluster state:               in archive recovery    

注意到,上面的例子有1个问题,用流复制的话,会从主节点通过网络拷贝XLOG记录,并覆盖同一份已经写过的XLOG记录的对应的OFFSET,这是一个问题,因为可能会造成主节点看到的数据不一致(比如一个数据块改了多次,只读实例在恢复时将它覆盖到老的版本了,在主实例上看到的就会变成老版本的BLOCK,后面再来改这个问题,禁止只读实例恢复数据)。

另一方面,我们知道PostgreSQL standby会从三个地方(流、pg_xlog、restore_command)读取XLOG进行恢复,所以在共享存储的环境中,我们完全没有必要用流复制的方式,直接从pg_xlog目录读取即可。修改recovery.conf参数,将以下注释

 # primary_conninfo = 'host=127.0.0.1 port=1921 user=postgres keepalives_idle=60'    

9、重启只读实例。

pg_ctl stop -m fast    
postgres@digoal-> pg_ctl start -o "-c log_directory=pg_log1922 -c port=1922"    

10、重新测试数据一致性。

主实例:

postgres=# insert into test1 select generate_series(1,10);    
INSERT 0 10    
postgres=# insert into test1 select generate_series(1,10);    
INSERT 0 10    
postgres=# insert into test1 select generate_series(1,10);    
INSERT 0 10    
postgres=# insert into test1 select generate_series(1,10);    
INSERT 0 10    

只读实例:

postgres=# select count(*) from test1;    
 count    
-------    
    60    
(1 row)    

PostgreSQL共享存储一写多读 DEMO

问题分析

前面的简单测试(未修改PostgreSQL代码),有几个问题需要修改内核来解决:

1、standby还是要执行recovery的操作,recovery产生的write操作会随着只读实例数量的增加而增加。另外recovery有一个好处,解决了脏页的问题,主实例shared buffer中的脏页不需要额外的同步给只读实例了。

但是recovery需要解决一些问题:

回放可能和当前主节点操作同一个data page;或者回放时将块回放到老的状态,而实际上主节点又更新了这个块,造成数据块的不一致。如果此时只读实例关闭,然后立即关闭主实例,数据库再起来时,这个数据块是不一致的;

standby还是会改控制文件;

在同一个$PGDATA下启动实例,首先要删除postmaster.pid;

关闭实例时,已经被删除postmaster.pid的实例,只能通过找到postgres主进程的pid,然后发kill -s 15, 2或3的信号来关闭数据库;

 static void    
 set_mode(char *modeopt)    
 {    
         if (strcmp(modeopt, "s") == 0 || strcmp(modeopt, "smart") == 0)    
         {    
                 shutdown_mode = SMART_MODE;    
                 sig = SIGTERM;    
         }    
         else if (strcmp(modeopt, "f") == 0 || strcmp(modeopt, "fast") == 0)    
         {    
                 shutdown_mode = FAST_MODE;    
                 sig = SIGINT;    
         }    
         else if (strcmp(modeopt, "i") == 0 || strcmp(modeopt, "immediate") == 0)    
         {    
                 shutdown_mode = IMMEDIATE_MODE;    
                 sig = SIGQUIT;    
         }    
         else    
         {    
                 write_stderr(_("%s: unrecognized shutdown mode \"%s\"\n"), progname, modeopt);    
                 do_advice();    
                 exit(1);    
         }    
 }    

2、当主节点删除rel page时,只读实例回放时,会报invalid xlog对应的rel page不存在的错误,这个也是只读实例需要回放日志带来的问题。非常容易重现这个问题,删除一个表即可。

 2015-10-09 13:30:50.776 CST,,,2082,,561750ab.822,20,,2015-10-09 13:29:15 CST,1/0,0,WARNING,01000,"page 8 of relation base/151898/185251 does not exist",,,,,"xlog redo clean: rel 1663/151898/185251; blk 8 remxid 640632117",,,"report_invalid_page, xlogutils.c:67",""    
 2015-10-09 13:30:50.776 CST,,,2082,,561750ab.822,21,,2015-10-09 13:29:15 CST,1/0,0,PANIC,XX000,"WAL contains references to invalid pages",,,,,"xlog redo clean: rel 1663/151898/185251; blk 8 remxid 640632117",,,"log_invalid_page, xlogutils.c:91",""    

这个报错可以先注释这一段来绕过,从而可以演示下去。

src/backend/access/transam/xlogutils.c

 /* Log a reference to an invalid page */    
 static void    
 log_invalid_page(RelFileNode node, ForkNumber forkno, BlockNumber blkno,    
                                  bool present)    
 {    
   //////    
         /*    
          * Once recovery has reached a consistent state, the invalid-page table    
          * should be empty and remain so. If a reference to an invalid page is    
          * found after consistency is reached, PANIC immediately. This might seem    
          * aggressive, but it's better than letting the invalid reference linger    
          * in the hash table until the end of recovery and PANIC there, which    
          * might come only much later if this is a standby server.    
          */    
         //if (reachedConsistency)    
         //{    
         //      report_invalid_page(WARNING, node, forkno, blkno, present);    
         //      elog(PANIC, "WAL contains references to invalid pages");    
         //}    

3、由于本例是在同一个操作系统中演示,所以没有遇到OS的dirty page cache的问题,如果是不同主机的环境,我们需要解决OS dirty page cache 的同步问题,或者消除dirty page cache,如使用direct IO。或者集群文件系统如gfs2。

如果要产品化,至少需要解决以上问题。

PostgreSQL共享存储一写多读 DEMO代码

1、先解决Aurora实例写数据文件、控制文件、检查点的问题。

增加一个启动参数,表示这个实例是否为Aurora实例(即只读实例)

vi src/backend/utils/misc/guc.c

 /******** option records follow ********/    
    
 static struct config_bool ConfigureNamesBool[] =    
 {    
         {    
                 {"aurora", PGC_POSTMASTER, CONN_AUTH_SETTINGS,    
                         gettext_noop("Enables advertising the server via Bonjour."),    
                         NULL    
                 },    
                 &aurora,    
                 false,    
                 NULL, NULL, NULL    
         },    

新增变量

vi src/include/postmaster/postmaster.h

 extern bool aurora;    

禁止Aurora实例更新控制文件

vi src/backend/access/transam/xlog.c

 #include "postmaster/postmaster.h"    
 bool aurora;    
    
 void    
 UpdateControlFile(void)    
 {    
         if (aurora) return;    

禁止Aurora实例启动bgwriter进程

vi src/backend/postmaster/bgwriter.c

 #include "postmaster/postmaster.h"    
 bool  aurora;    
    
 /*    
  * Main entry point for bgwriter process    
  *    
  * This is invoked from AuxiliaryProcessMain, which has already created the    
  * basic execution environment, but not enabled signals yet.    
  */    
 void    
 BackgroundWriterMain(void)    
 {    
   //////    
         pg_usleep(1000000L);    
    
         /*    
          * If an exception is encountered, processing resumes here.    
          *    
          * See notes in postgres.c about the design of this coding.    
          */    
         if (!aurora && sigsetjmp(local_sigjmp_buf, 1) != 0)    
         {    
    
   //////    
                 /*    
                  * Do one cycle of dirty-buffer writing.    
                  */    
                 if (!aurora) {    
                 can_hibernate = BgBufferSync();    
   //////    
                 }    
                 pg_usleep(1000000L);    
         }    
 }    

禁止Aurora实例启动checkpointer进程

vi src/backend/postmaster/checkpointer.c

 #include "postmaster/postmaster.h"    
 bool  aurora;    
   //////    
 /*    
  * Main entry point for checkpointer process    
  *    
  * This is invoked from AuxiliaryProcessMain, which has already created the    
  * basic execution environment, but not enabled signals yet.    
  */    
 void    
 CheckpointerMain(void)    
 {    
   //////    
         /*    
          * Loop forever    
          */    
         for (;;)    
         {    
                 bool            do_checkpoint = false;    
                 int                     flags = 0;    
                 pg_time_t       now;    
                 int                     elapsed_secs;    
                 int                     cur_timeout;    
                 int                     rc;    
    
                 pg_usleep(100000L);    
    
                 /* Clear any already-pending wakeups */    
                 if (!aurora)  ResetLatch(&MyProc->procLatch);    
    
                 /*    
                  * Process any requests or signals received recently.    
                  */    
                 if (!aurora) AbsorbFsyncRequests();    
    
                 if (!aurora && got_SIGHUP)    
                 {    
                         got_SIGHUP = false;    
                         ProcessConfigFile(PGC_SIGHUP);    
    
                         /*    
                          * Checkpointer is the last process to shut down, so we ask it to    
                          * hold the keys for a range of other tasks required most of which    
                          * have nothing to do with checkpointing at all.    
                          *    
                          * For various reasons, some config values can change dynamically    
                          * so the primary copy of them is held in shared memory to make    
                          * sure all backends see the same value.  We make Checkpointer    
                          * responsible for updating the shared memory copy if the    
                          * parameter setting changes because of SIGHUP.    
                          */    
                         UpdateSharedMemoryConfig();    
                 }    
                 if (!aurora && checkpoint_requested)    
                 {    
                         checkpoint_requested = false;    
                         do_checkpoint = true;    
                         BgWriterStats.m_requested_checkpoints++;    
                 }    
                 if (!aurora && shutdown_requested)    
                 {    
                         /*    
                          * From here on, elog(ERROR) should end with exit(1), not send    
                          * control back to the sigsetjmp block above    
                          */    
                         ExitOnAnyError = true;    
                         /* Close down the database */    
                         ShutdownXLOG(0, 0);    
                         /* Normal exit from the checkpointer is here */    
                         proc_exit(0);           /* done */    
                 }    
    
                 /*    
                  * Force a checkpoint if too much time has elapsed since the last one.    
                  * Note that we count a timed checkpoint in stats only when this    
                  * occurs without an external request, but we set the CAUSE_TIME flag    
                  * bit even if there is also an external request.    
                  */    
                 now = (pg_time_t) time(NULL);    
                 elapsed_secs = now - last_checkpoint_time;    
                 if (!aurora && elapsed_secs >= CheckPointTimeout)    
                 {    
                         if (!do_checkpoint)    
                                 BgWriterStats.m_timed_checkpoints++;    
                         do_checkpoint = true;    
                         flags |= CHECKPOINT_CAUSE_TIME;    
                 }    
    
                 /*    
                  * Do a checkpoint if requested.    
                  */    
                 if (!aurora && do_checkpoint)    
                 {    
                         bool            ckpt_performed = false;    
                         bool            do_restartpoint;    
    
                         /* use volatile pointer to prevent code rearrangement */    
                         volatile CheckpointerShmemStruct *cps = CheckpointerShmem;    
    
                         /*    
                          * Check if we should perform a checkpoint or a restartpoint. As a    
                          * side-effect, RecoveryInProgress() initializes TimeLineID if    
                          * it's not set yet.    
                          */    
                         do_restartpoint = RecoveryInProgress();    
    
                         /*    
                          * Atomically fetch the request flags to figure out what kind of a    
                          * checkpoint we should perform, and increase the started-counter    
                          * to acknowledge that we've started a new checkpoint.    
                          */    
                         SpinLockAcquire(&cps->ckpt_lck);    
                         flags |= cps->ckpt_flags;    
                         cps->ckpt_flags = 0;    
                         cps->ckpt_started++;    
                         SpinLockRelease(&cps->ckpt_lck);    
    
                         /*    
                          * The end-of-recovery checkpoint is a real checkpoint that's    
                          * performed while we're still in recovery.    
                          */    
                         if (flags & CHECKPOINT_END_OF_RECOVERY)    
                                 do_restartpoint = false;    
   //////    
                         ckpt_active = false;    
                 }    
    
                 /* Check for archive_timeout and switch xlog files if necessary. */    
                 if (!aurora) CheckArchiveTimeout();    
                 /*    
                  * Send off activity statistics to the stats collector.  (The reason    
                  * why we re-use bgwriter-related code for this is that the bgwriter    
                  * and checkpointer used to be just one process.  It's probably not    
                  * worth the trouble to split the stats support into two independent    
                  * stats message types.)    
                  */    
                 if (!aurora) pgstat_send_bgwriter();    
    
                 /*    
                  * Sleep until we are signaled or it's time for another checkpoint or    
                  * xlog file switch.    
                  */    
                 now = (pg_time_t) time(NULL);    
                 elapsed_secs = now - last_checkpoint_time;    
                 if (elapsed_secs >= CheckPointTimeout)    
                         continue;                       /* no sleep for us ... */    
                 cur_timeout = CheckPointTimeout - elapsed_secs;    
                 if (!aurora && XLogArchiveTimeout > 0 && !RecoveryInProgress())    
                 {    
                         elapsed_secs = now - last_xlog_switch_time;    
                         if (elapsed_secs >= XLogArchiveTimeout)    
                                 continue;               /* no sleep for us ... */    
                         cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs);    
                 }    
    
                 if (!aurora) rc = WaitLatch(&MyProc->procLatch,    
                                            WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,    
                                            cur_timeout * 1000L /* convert to ms */ );    
    
                 /*    
                  * Emergency bailout if postmaster has died.  This is to avoid the    
                  * necessity for manual cleanup of all postmaster children.    
                  */    
                 if (rc & WL_POSTMASTER_DEATH)    
                         exit(1);    
         }    
 }    
    
   //////    
 /* SIGINT: set flag to run a normal checkpoint right away */    
 static void    
 ReqCheckpointHandler(SIGNAL_ARGS)    
 {    
         if (aurora)    
            return;    
         int                     save_errno = errno;    
    
         checkpoint_requested = true;    
         if (MyProc)    
                 SetLatch(&MyProc->procLatch);    
    
         errno = save_errno;    
 }    
    
   //////    
 /*    
  * AbsorbFsyncRequests    
  *              Retrieve queued fsync requests and pass them to local smgr.    
  *    
  * This is exported because it must be called during CreateCheckPoint;    
  * we have to be sure we have accepted all pending requests just before    
  * we start fsync'ing.  Since CreateCheckPoint sometimes runs in    
  * non-checkpointer processes, do nothing if not checkpointer.    
  */    
 void    
 AbsorbFsyncRequests(void)    
 {    
         CheckpointerRequest *requests = NULL;    
         CheckpointerRequest *request;    
         int                     n;    
    
         if (!AmCheckpointerProcess() || aurora)    
                 return;    
   //////    

禁止Aurora实例手工调用checkpoint命令

vi src/backend/tcop/utility.c

 #include "postmaster/postmaster.h"    
 bool  aurora;    
    
   //////    
 void    
 standard_ProcessUtility(Node *parsetree,    
                                                 const char *queryString,    
                                                 ProcessUtilityContext context,    
                                                 ParamListInfo params,    
                                                 DestReceiver *dest,    
                                                 char *completionTag)    
 {    
   //////    
                 case T_CheckPointStmt:    
                    if (!superuser() || aurora)    
                                 ereport(ERROR,    
                                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),    
                                                  errmsg("must be superuser to do CHECKPOINT")));    

改完上面的代码,重新编译一下,现在接近一个DEMO了。现在Aurora实例不会更新控制文件,不会写数据文件,不会执行checkpoint,是我们想要的结果。

PS:主库bgwriter, backend process, checkpoint刷脏页时,判断对应脏页是否已在备库APPLY,这部分DEMO里面没有做。

DEMO验证

启动只读实例时,加一个参数aurora=true,表示启动Aurora实例。

pg_ctl start -o "-c log_directory=pg_log1922 -c port=1922 -c aurora=true"    

要产品化,还有很多细节需要考虑,这是一个DEMO,但是已经表明了PostgreSQL是可以的。阿里云RDS的小伙伴们加油!

参考

1、https://aws.amazon.com/cn/rds/aurora/

2、src/backend/access/transam/xlog.c

 /*    
  * Open the WAL segment containing WAL position 'RecPtr'.    
  *    
  * The segment can be fetched via restore_command, or via walreceiver having    
  * streamed the record, or it can already be present in pg_xlog. Checking    
  * pg_xlog is mainly for crash recovery, but it will be polled in standby mode    
  * too, in case someone copies a new segment directly to pg_xlog. That is not    
  * documented or recommended, though.    
  *    
  * If 'fetching_ckpt' is true, we're fetching a checkpoint record, and should    
  * prepare to read WAL starting from RedoStartLSN after this.    
  *    
  * 'RecPtr' might not point to the beginning of the record we're interested    
  * in, it might also point to the page or segment header. In that case,    
  * 'tliRecPtr' is the position of the WAL record we're interested in. It is    
  * used to decide which timeline to stream the requested WAL from.    
  *    
  * If the record is not immediately available, the function returns false    
  * if we're not in standby mode. In standby mode, waits for it to become    
  * available.    
  *    
  * When the requested record becomes available, the function opens the file    
  * containing it (if not open already), and returns true. When end of standby    
  * mode is triggered by the user, and there is no more WAL available, returns    
  * false.    
  */    
 static bool    
 WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,    
                                                         bool fetching_ckpt, XLogRecPtr tliRecPtr)    
 {    
   //////    
         static pg_time_t last_fail_time = 0;    
         pg_time_t       now;    
    
         /*-------    
          * Standby mode is implemented by a state machine:    
          *    
          * 1. Read from either archive or pg_xlog (XLOG_FROM_ARCHIVE), or just    
          *        pg_xlog (XLOG_FROM_XLOG)    
          * 2. Check trigger file    
          * 3. Read from primary server via walreceiver (XLOG_FROM_STREAM)    
          * 4. Rescan timelines    
          * 5. Sleep 5 seconds, and loop back to 1.    
          *    
          * Failure to read from the current source advances the state machine to    
          * the next state.    
          *    
          * 'currentSource' indicates the current state. There are no currentSource    
          * values for "check trigger", "rescan timelines", and "sleep" states,    
          * those actions are taken when reading from the previous source fails, as    
          * part of advancing to the next state.    
          *-------    
          */    

3、src/backend/storage/buffer/bufmgr.c

Flag Counter

digoal’s 大量PostgreSQL文章入口