PostgreSQL How to get upstream node conninfo from standby node

7 minute read

背景

在PostgreSQL的upstream节点可以获得downstream节点的连接信息,当downstream节点主动连接upstream节点时,在upstream节点会有一个sender process负责发送XLOG信息,而这个sender进程其实就是backend process,在pg_stat_activity中可以查看。

pg_stat_replication的视图定义如下:

postgres=# \x  
Expanded display is on.  
postgres=# select pg_get_viewdef('pg_stat_replication');  
-[ RECORD 1 ]--+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
pg_get_viewdef |  SELECT s.pid,  
               |     s.usesysid,  
               |     u.rolname AS usename,  
               |     s.application_name,  
               |     s.client_addr,  
               |     s.client_hostname,  
               |     s.client_port,  
               |     s.backend_start,  
               |     s.backend_xmin,  
               |     w.state,  
               |     w.sent_location,  
               |     w.write_location,  
               |     w.flush_location,  
               |     w.replay_location,  
               |     w.sync_priority,  
               |     w.sync_state  
               |    FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, waiting, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin),  
               |     pg_authid u,  
               |     pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state)  
               |   WHERE ((s.usesysid = u.oid) AND (s.pid = w.pid));  

可以看出这里用到了pg_stat_get_activity来获得sender进程的信息,包含了client_addr。

pg_stat_get_wal_senders则是用来获取sender进程的PID的,当然这里还包含了由downstream节点的walreceiver进程发送过来的wal sent,write,flush,replay的位置以及其他信息等。

好了,那么downstream节点如何获取upstream节点的连接信息呢?

因为downstream节点在pg_stat_activity中没有walreceiver进程的信息,所以需要其他方法来获取upstream节点的信息。

其中一种方法是直接从walreceiver的共享内存中获取。既然downstream节点要连接upstream节点,它必然需要连接信息,这个连接信息存储在downstream节点的$PGDATA/recovery.conf文件中,数据库启动时startup进程会从这个文件中解析到primary_conninfo。

首先分析walreceiver进程,它实际上是一个动态的加载模块,如果在recovery.conf中配置了primary_conninfo, startup进程会唤醒它。

startup进程同时需要从recover.conf中读取primary_conninfo的信息,存储在WalRcvData结构中。

如下:

src/include/replication/walreceiver.h

/*  
 * MAXCONNINFO: maximum size of a connection string.  
 *  
 * XXX: Should this move to pg_config_manual.h?  
 */  
#define MAXCONNINFO             1024  
  
/* Shared memory area for management of walreceiver process */  
typedef struct  
{  
        /*  
         * PID of currently active walreceiver process, its current state and  
         * start time (actually, the time at which it was requested to be  
         * started).  
         */  
        pid_t           pid;  
        WalRcvState walRcvState;  
        pg_time_t       startTime;  
  
        /*  
         * receiveStart and receiveStartTLI indicate the first byte position and  
         * timeline that will be received. When startup process starts the  
         * walreceiver, it sets these to the point where it wants the streaming to  
         * begin.  
         */  
        XLogRecPtr      receiveStart;  
        TimeLineID      receiveStartTLI;  
  
        /*  
         * receivedUpto-1 is the last byte position that has already been  
         * received, and receivedTLI is the timeline it came from.  At the first  
         * startup of walreceiver, these are set to receiveStart and  
         * receiveStartTLI. After that, walreceiver updates these whenever it  
         * flushes the received WAL to disk.  
         */  
        XLogRecPtr      receivedUpto;  
        TimeLineID      receivedTLI;  
  
        /*  
         * latestChunkStart is the starting byte position of the current "batch"  
         * of received WAL.  It's actually the same as the previous value of  
         * receivedUpto before the last flush to disk.  Startup process can use  
         * this to detect whether it's keeping up or not.  
         */  
        XLogRecPtr      latestChunkStart;  
  
        /*  
         * Time of send and receive of any message received.  
         */  
        TimestampTz lastMsgSendTime;  
        TimestampTz lastMsgReceiptTime;  
  
        /*  
         * Latest reported end of WAL on the sender  
         */  
        XLogRecPtr      latestWalEnd;  
        TimestampTz latestWalEndTime;  
  
        /*  
         * connection string; is used for walreceiver to connect with the primary.  
         */  
        char            conninfo[MAXCONNINFO];  
  
        /*  
         * replication slot name; is also used for walreceiver to connect with the  
         * primary  
         */  
        char            slotname[NAMEDATALEN];  
  
        slock_t         mutex;                  /* locks shared variables shown above */  
  
        /*  
         * Latch used by startup process to wake up walreceiver after telling it  
         * where to start streaming (after setting receiveStart and  
         * receiveStartTLI).  
         */  
        Latch           latch;  
} WalRcvData;  
extern WalRcvData *WalRcv;  

hook如下:

......  
/* libpqwalreceiver hooks */  
typedef void (*walrcv_connect_type) (char *conninfo);  
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;  
  
typedef void (*walrcv_identify_system_type) (TimeLineID *primary_tli);  
extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;  
  
typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);  
extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;  
  
typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname);  
extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;  
  
typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);  
extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;  
  
typedef int (*walrcv_receive_type) (int timeout, char **buffer);  
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;  
  
typedef void (*walrcv_send_type) (const char *buffer, int nbytes);  
extern PGDLLIMPORT walrcv_send_type walrcv_send;  
  
typedef void (*walrcv_disconnect_type) (void);  
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;  
  
/* prototypes for functions in walreceiver.c */  
extern void WalReceiverMain(void) __attribute__((noreturn));  
......  

walreceiver模块使用WalRcv中的conninfo,连接到upstream节点。

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

/*  
 * Module load callback  
 */  
void  
_PG_init(void)  
{  
......  
        walrcv_connect = libpqrcv_connect;  
}  
  
/*  
 * Establish the connection to the primary server for XLOG streaming  
 */  
static void  
libpqrcv_connect(char *conninfo)  
{  
        char            conninfo_repl[MAXCONNINFO + 75];  
  
        /*  
         * Connect using deliberately undocumented parameter: replication. The  
         * database name is ignored by the server in replication mode, but specify  
         * "replication" for .pgpass lookup.  
         */  
        snprintf(conninfo_repl, sizeof(conninfo_repl),  
                         "%s dbname=replication replication=true fallback_application_name=walreceiver",  
                         conninfo);  
  
        streamConn = PQconnectdb(conninfo_repl);  
        if (PQstatus(streamConn) != CONNECTION_OK)  
                ereport(ERROR,  
                                (errmsg("could not connect to the primary server: %s",  
                                                PQerrorMessage(streamConn))));  
}  

src/backend/replication/walreceiver.c

/* Main entry point for walreceiver process */  
void  
WalReceiverMain(void)  
{  
......  
        char            conninfo[MAXCONNINFO];  
......  
        /* use volatile pointer to prevent code rearrangement */  
        volatile WalRcvData *walrcv = WalRcv;  
......  
        /* Fetch information required to start streaming */  
        strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);  
......  
        /* Establish the connection to the primary for XLOG streaming */  
        EnableWalRcvImmediateExit();  
        walrcv_connect(conninfo);  
        DisableWalRcvImmediateExit();  
......  

我们可以定义一个函数,来读取WalRcv的信息,即可读取upstream节点的信息:

[root@digoal ~]# vi get_upstream_conninfo.c   
#include "postgres.h"  
#include <assert.h>  
#include "fmgr.h"  
#include "access/xlog.h"  
#include "replication/walreceiver.h"  
#include "utils/elog.h"  
#include "utils/builtins.h"  
  
#ifdef PG_MODULE_MAGIC  
PG_MODULE_MAGIC;  
#endif  
  
PG_FUNCTION_INFO_V1(get_upstream_conninfo);  
  
Datum  
get_upstream_conninfo(PG_FUNCTION_ARGS)  
{  
        assert(PG_NARGS() == 0);  
        if (!RecoveryInProgress())  
                ereport(ERROR,  
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),  
                                 errmsg("recovery is not in progress"),  
                                 errhint("This functions can only be executed during recovery.")));  
  
        /* use volatile pointer to prevent code rearrangement */  
        volatile WalRcvData *walrcv = WalRcv;  
  
        PG_RETURN_TEXT_P(cstring_to_text((char *) walrcv->conninfo));  
}  

编译:

[root@digoal ~]# gcc -O3 -Wall -Wextra -Werror -I /opt/soft_bak/postgresql-9.4.4/src/include -g -fPIC -c ./get_upstream_conninfo.c -o digoal.o  
[root@digoal ~]# gcc -O3 -Wall -Wextra -Werror -I /opt/soft_bak/postgresql-9.4.4/src/include -g -shared digoal.o -o libdigoal.so  

拷贝到$PGHOME/lib

[root@digoal ~]# cp libdigoal.so /opt/pgsql/lib/  

创建函数:

postgres=# \set VERBOSITY verbose  
postgres=# create or replace function get_upstream_conninfo() returns text as '$libdir/libdigoal.so', 'get_upstream_conninfo' language C STRICT;  
CREATE FUNCTION  

在主库执行直接返回错误即可:

postgres=# select get_upstream_conninfo();  
ERROR:  55000: recovery is not in progress  
HINT:  This functions can only be executed during recovery.  
LOCATION:  get_upstream_conninfo, get_upstream_conninfo.c:25  

在downstream节点执行,可以获取到upstream节点的信息:

postgres@digoal-> psql -h 127.0.0.1 -p 1922  
psql (9.4.4)  
Type "help" for help.  
postgres=# \set VERBOSITY verbose  
postgres=# select get_upstream_conninfo();  
                     get_upstream_conninfo                        
----------------------------------------------------------------  
 host=192.168.150.128 port=1921 user=replica keepalives_idle=60  
(1 row)  

还有一种方法是直接读取recover.conf文件来解析连接信息,如下。

postgres=# select * from regexp_split_to_table(pg_read_file('recovery.conf'),'\n') t(a) where a ~ '^ *primary_conninfo';  
                                                                 a                                                                   
-----------------------------------------------------------------------------------------------------------------------------------  
   primary_conninfo = 'host=192.168.150.128 port=1922 user=replica keepalives_idle=60'           # e.g. 'host=localhost port=5432'  
(1 row)  

XLOG最后接收到的时间戳也可以在walrcv数据结构中获取,这个可以用来判断心跳。

#include "postgres.h"  
#include <assert.h>  
#include "fmgr.h"  
#include "access/xlog.h"  
#include "replication/walreceiver.h"  
#include "utils/elog.h"  
#include "utils/builtins.h"  
#include "utils/timestamp.h"  
  
  
#ifdef PG_MODULE_MAGIC  
PG_MODULE_MAGIC;  
#endif  
  
PG_FUNCTION_INFO_V1(get_upstream_conninfo);  
  
Datum   
get_upstream_conninfo(PG_FUNCTION_ARGS)  
{  
        assert(PG_NARGS() == 0);  
        if (!RecoveryInProgress())  
                ereport(ERROR,  
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),  
                                 errmsg("recovery is not in progress"),  
                                 errhint("This functions can only be executed during recovery.")));  
  
  
        /* use volatile pointer to prevent code rearrangement */  
        volatile WalRcvData *walrcv = WalRcv;  
  
        /* Fetch information required to start streaming */  
        // strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);  
  
        PG_RETURN_TEXT_P(cstring_to_text((char *) walrcv->conninfo));  
}  
  
PG_FUNCTION_INFO_V1(get_rcv_timestamp);  
  
Datum  
get_rcv_timestamp(PG_FUNCTION_ARGS)  
{  
        assert(PG_NARGS() == 0);  
        if (!RecoveryInProgress())  
                ereport(ERROR,  
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),  
                                 errmsg("recovery is not in progress"),  
                                 errhint("This functions can only be executed during recovery.")));  
  
  
        /* use volatile pointer to prevent code rearrangement */  
        volatile WalRcvData *walrcv = WalRcv;  
  
        PG_RETURN_TEXT_P(cstring_to_text(timestamptz_to_str(walrcv->latestWalEndTime)));  
}  
create or replace function get_rcv_timestamp() returns text as '$libdir/libdigoal.so', 'get_rcv_timestamp' language C STRICT;  
  
postgres=# select get_rcv_timestamp();  
       get_rcv_timestamp         
-------------------------------  
 2015-08-03 10:13:22.570627+08  
(1 row)  

参考

1. 解析配置文件的公共函数

src/backend/utils/misc/guc-file.c

*  
 * Read and parse a single configuration file.  This function recurses  
 * to handle "include" directives.  
 *  
 * Input parameters:  
 *      fp: file pointer from AllocateFile for the configuration file to parse  
 *      config_file: absolute or relative path name of the configuration file  
 *      depth: recursion depth (should be 0 in the outermost call)  
 *      elevel: error logging level to use  
 * Output parameters:  
 *      head_p, tail_p: head and tail of linked list of name/value pairs  
 *  
 * *head_p and *tail_p must be initialized to NULL before calling the outer  
 * recursion level.  On exit, they contain a list of name-value pairs read  
 * from the input file(s).  
 *  
 * Returns TRUE if successful, FALSE if an error occurred.  The error has  
 * already been ereport'd, it is only necessary for the caller to clean up  
 * its own state and release the ConfigVariable list.  
 *  
 * Note: if elevel >= ERROR then an error will not return control to the  
 * caller, so there is no need to check the return value in that case.  
 */  
bool  
ParseConfigFp(FILE *fp, const char *config_file, int depth, int elevel,  
                          ConfigVariable **head_p, ConfigVariable **tail_p)  
{  
......  
}  

2. 读取recovery,.conf

src/backend/access/transam/xlog.c

#define RECOVERY_COMMAND_FILE   "recovery.conf"  
  
/*  
 * See if there is a recovery command file (recovery.conf), and if so  
 * read in parameters for archive recovery and XLOG streaming.  
 *  
 * The file is parsed using the main configuration parser.  
 */  
static void  
readRecoveryCommandFile(void)  
{  
  
        for (item = head; item; item = item->next)  
        {  
                if (strcmp(item->name, "restore_command") == 0)  
                {  
                        recoveryRestoreCommand = pstrdup(item->value);  
                        ereport(DEBUG2,  
                                        (errmsg_internal("restore_command = '%s'",  
                                                                         recoveryRestoreCommand)));  
                }  
......  
                else if (strcmp(item->name, "primary_conninfo") == 0)  
                {  
                        PrimaryConnInfo = pstrdup(item->value);  
                        ereport(DEBUG2,  
                                        (errmsg_internal("primary_conninfo = '%s'",  
                                                                         PrimaryConnInfo)));  
                }  
}  

3. 从流复制接收恢复数据库需要的XLOG数据

src/backend/access/transam/xlog.c

static bool  
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,  
                                                        bool fetching_ckpt, XLogRecPtr tliRecPtr)  
{  
......  
                                        if (PrimaryConnInfo)  
                                        {  
                                                XLogRecPtr      ptr;  
                                                TimeLineID      tli;  
  
                                                if (fetching_ckpt)  
                                                {  
                                                        ptr = RedoStartLSN;  
                                                        tli = ControlFile->checkPointCopy.ThisTimeLineID;  
                                                }  
                                                else  
                                                {  
                                                        ptr = tliRecPtr;  
                                                        tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);  
  
                                                        if (curFileTLI > 0 && tli < curFileTLI)  
                                                                elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",  
                                                                         (uint32) (ptr >> 32), (uint32) ptr,  
                                                                         tli, curFileTLI);  
                                                }  
                                                curFileTLI = tli;  
                                                RequestXLogStreaming(tli, ptr, PrimaryConnInfo,  
                                                                                         PrimarySlotName);  
                                                receivedUpto = 0;  
                                        }  

Flag Counter

digoal’s 大量PostgreSQL文章入口