PostgreSQL Distributed Sharded Databases Consistent PITR Thinking

4 minute read

背景

前言, 建议先了解一下PostgreSQL 的PITR再看本文.

PostgreSQL recovery target introduce

PostgreSQL PITR THREE recovery target MODE: name,xid,time USE CASE - 1

PostgreSQL PITR THREE recovery target MODE: name,xid,time USE CASE - 2

最近几天收到一位网友wsxsl的EMAIL, 在他们的应用场景中有跨库事务的需求。

数据库做了sharding后, 就可能会引入跨库的事务. 当然也有不跨库的事务。

有跨库的事务必然会同时要求数据库可以实现全局一致的还原。

shard 后数据库的一致性还原可以这么来理解(首先需要对事务进行分解, 分为跨库事务和 非跨库事务也就是本地事务) :

1. 对于跨库事务必须确保它在各个shard节点中的操作都提交或者都回滚.(all committed, or all abort.)

2. 对于单个shard节点, 必须确保一致性. (本地事务操作的原子性(all committed or all abort), 同时确保时序性(不能出现后提交的事务已恢复但是先提交的事务未恢复).)

以上两点, 第二点PostgreSQL自身的PITR就可以满足这个需求.

第一点, 则是需要处理的重点.

在谈第一点之前, 来看一个图, 跨库事务可能存在的交错现象 :

pic

解释一下上图 :

1

1. ABCDE是五个节点, 其中A为产生全局ID的节点, BCDE为数据节点.

2. 每个节点有各自的时间轴.

3. 格子里面的数字代表全局事务ID(跨库事务), 以及在各自的时间轴上的事务提交位置(其实就是各节点PG_XLOG中的位置).

4. 全局事务ID对应的本地事务ID为节点号_id , 例如全局事务1在B节点的本地事务ID为B1. 图中未标出, 后面讲解需要用到.

5. 灰色格子代表该事务是回滚掉的. 白色格子代表该事务是提交掉的.

6. 同一个时间轴上, 两个事务之间可能还有非跨库事务(本地事务), 这些在图中没有画出, 因为不影响全局一致性还原. 但是大家要知道一下.

注意跨库事务1, 在B节点是第一个提交的. 但是在C节点, 它是在跨库事务2的后面提交的 .

如果以跨库事务1为PITR的目标, 以PostgreSQL recovery.conf中配置recovery_target_xid为例, 如下 :

recovery_target_xid=B1, recovery_target_inclusive=true  
recovery_target_xid=C1, recovery_target_inclusive=true  
recovery_target_xid=D1, recovery_target_inclusive=true  
recovery_target_xid=E1, recovery_target_inclusive=true  

恢复后的全局事务状态 :

B : B1 committed  
C : C2,C1 committed  
D : D1 committed  
E : E2,E1 committed  

整体来看, 全局事务2出现了不一致的现象, 在B,D节点未提交, 但是在C,E节点已提交.

2

还有一种情况是, 跨库事务可能出现在部分节点, 例如 :

跨库事务7 发生在2个节点 : B7, D7;

跨库事务8 发生在2个节点 : E8, C8;

那么这两个跨库事务则不可能存在交叉. 因为他们没有发生在同一个shard节点上的事务.

3

1. 对于上图, 6号和7号跨库事务显然是可以作为PITR的还原目标的, 因为不会出现跨库事务交错.

2. 5号事务其实也可以作为PITR的还原目标, 为什么呢? 因为4号事务是回滚的. 所以也不会有影响.

综上所述, 全局一致性还原的重点在如何找到合适的还原点。下面列举几种方法 :

方法1

计算现成的一致性还原点, 必要的要素 :

1. 唯一的全局事务ID, 这个可以使用统一的shard算法, 也就是说全局事务ID的产生节点也可以做成分布式的, 不会成为瓶颈.

算法举例 :

PostgreSQL sharding有序UUID最佳实践 - serial global uuid stored in 64bit int8

2. 全局事务ID对应的shard节点上的xid, 以及shard节点上该XID COMMIT或ABORT的时间 (是shard节点的时间, 这个时间必须取PG_XLOG中存储的时间). 请参见本文末尾的参考部分.

例如在B,C,D,E节点执行如下SQL, 创建记录GID的信息表.

还原点可以从这个表里面分析出来.

postgres=# create type xstatus as enum ('commit', 'abort');  
CREATE TYPE  
postgres=# create table gid   
postgres-# (gid int8 primary key, -- 跨库事务ID, 由统一的算法产生. 参考前面的算法举例  
postgres(# gxstatus xstatus, -- 跨库事务状态, 业务APP负责写入  
postgres(# nodes text[], -- 跨库事务涉及的节点, 业务APP负责写入  
postgres(# lxid int[], -- 跨库事务节点上对应的本地事务ID, 业务APP负责写入  
postgres(# lxcompletetime timestamptz[], -- 跨库事务节点上对应的本地事务完成时记录在XLogRecord上的时间. 从XLOG中解析, 需更新记录.  
postgres(# lxstatus xstatus[] -- 跨库事务节点上对应的本地事务的状态, 业务APP负责写入  
postgres(# );  
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "gid_pkey" for table "gid"  
CREATE TABLE  

为什么要在数据节点创建这个表呢?

因为全局ID信息如果丢失(假设因为业务APPs Crash), 分两种情况.

1. 该全局ID对应的跨库事务状态是abort的. 也就是数据节点事务全部abort;

这种情况这个全局ID的信息丢失对一致性还原不会造成影响.

2. 该全局ID对应的跨库事务状态是commit的.

这在情况下, 如果该全局ID的信息丢失, 会影响最终的全局一致性还原点解析, 导致数据库逻辑上全局不一致.

所以解决办法是将全局ID数据的写入合并到业务的二阶事务中, 这样能够保证业务的处理和全局ID数据的处理的原子性.

放到数据节点一来省了服务器, 二来减少了交互, 第三由于分布了也降低了全局ID处理产生瓶颈的概率.

如果跨库事务ID的存储也是分布式的, 那么计算全局一致性还原点时, 需要在所有数据中分析.

全局一致性还原点算法 :

1. nodes没有与其他任何记录的nodes相交的记录, 该gid可以作为一致性还原点.

可能性几乎为0

2. nodes与其他记录有相交时的算法, 仅对比相交的node. :

2.1. lxcompletetime[]转换成gtime_range, gtime_range不与其他任何gtime_range相交.

2.2. 如果有相交, 则lxcompletetime[]每个对应元素必须完全相等.

2.3. 过滤abort的记录, 重新执行2.1和2.2.

对于一个跨库事务频繁执行的业务, 第二种方法能找到的一致性还原点也几乎为0. 所以需要修复算法 :

修复算法 :

业务层必须记录事务的undo和redo. 还原到有跨库事务交叉的点时,使用undo或redo修复有交叉的事务。

方法2

主动创建一致性还原点 :

1. 发出指令禁止新发起跨库事务的操作请求, 并等待所有的跨库事务完结.

2. 等待所有的跨库事务完结后.

3. 在所有数据库节点上创建同名的还原点(recovery_target_name). 注意多次创建的还原点的名字不要重复.

4. 释放冻结,允许新发起跨库事务的操作请求(barrier).

具体的实现方法可以参考PostgreS-XC的做法.

使用这种方法对数据库有轻微的堵塞, 堵塞跨库事务的操作. 所以最好加个超时机制. 如果全局不能在一个时间段内达到堵塞状态, 则放弃堵塞.

举例 :

跨库事务, 以两个节点为例 :

所有的跨库事务必须先获得所有涉及节点的一个共享锁, 否则不进行下去.

NODE A :   
begin;  
select 1 from barrier where id=1 for share nowait;  
... 业务SQL.  
  
NODE B :   
begin;  
select 1 from barrier where id=1 for share nowait;  
... 业务SQL.  
插入还原点(所有的sharding节点都必须先获得排他锁, 以此来拒绝新发起的跨库事务, 确保数据逻辑状态一致) :   
  
NODE A :   
begin;  
select 1 from barrier where id=1 for update;  
  
NODE B :   
begin;  
select 1 from barrier where id=1 for update;  
只有当NODE A,B都获得独占锁后方可插入还原点.  
注意控制超时, 不能for update太久, 否则堵塞影响时间就太长了.  
  
NODE A, B :   
select * from pg_create_restore_point('node A,B 相同的唯一值');  
  
NODE A, B :   
释放barrier where id=1 for update.  

方法3, Postgres-XC的做法

与上面类似.参考 :

http://postgres-xc.sourceforge.net/docs/1_0/barriers.html

http://postgres-xc.sourceforge.net/docs/1_0/sql-createbarrier.html

src/backend/pgxc/barrier/barrier.c

src/include/pgxc/barrier.h

参考

1. 《PostgreSQL’s two-phase commit used with dblink example》

2. 《PostgreSQL recovery target introduce》

3. 《PostgreSQL PITR THREE recovery target MODE: name,xid,time USE CASE - 1》

4. 《PostgreSQL PITR THREE recovery target MODE: name,xid,time USE CASE - 2》

5. 《PostgreSQL sharding有序UUID最佳实践 - serial global uuid stored in 64bit int8》

6. http://postgres-xc.sourceforge.net/

7. http://postgres-xc.sourceforge.net/docs/1_0/barriers.html

8. http://postgres-xc.sourceforge.net/docs/1_0/sql-createbarrier.html

9. src/backend/pgxc/barrier/barrier.c

10. src/include/pgxc/barrier.h

11. src/backend/storage/lmgr/README

12. src/backend/storage/lmgr/README.barrier

13. src/include/storage/barrier.h

14.

XLogRecord中的COMMIT/ABORT的时间信息相关代码 :

src/backend/utils/adt/timestamp.c

01262 /*  
01263  * GetCurrentTimestamp -- get the current operating system time  
01264  *  
01265  * Result is in the form of a TimestampTz value, and is expressed to the  
01266  * full precision of the gettimeofday() syscall  
01267  */  
01268 TimestampTz  
01269 GetCurrentTimestamp(void)  
01270 {  
01271     TimestampTz result;  
01272     struct timeval tp;  
01273   
01274     gettimeofday(&tp, NULL);  
01275   
01276     result = (TimestampTz) tp.tv_sec -  
01277         ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);  
01278   
01279 #ifdef HAVE_INT64_TIMESTAMP  
01280     result = (result * USECS_PER_SEC) + tp.tv_usec;  
01281 #else  
01282     result = result + (tp.tv_usec / 1000000.0);  
01283 #endif  
01284   
01285     return result;  
01286 }  

src/backend/access/transam/xact.c

/*  
 * xactStartTimestamp is the value of transaction_timestamp().  
 * stmtStartTimestamp is the value of statement_timestamp().  
 * xactStopTimestamp is the time at which we log a commit or abort WAL record.  
 * These do not change as we enter and exit subtransactions, so we don't  
 * keep them inside the TransactionState stack.  
 */  
static TimestampTz xactStartTimestamp;  
static TimestampTz stmtStartTimestamp;  
static TimestampTz xactStopTimestamp;  
  
/*  
 *      transaction states - transaction state from server perspective  
 */  
typedef enum TransState  
{  
        TRANS_DEFAULT,                          /* idle */  
        TRANS_START,                            /* transaction starting */  
        TRANS_INPROGRESS,                       /* inside a valid transaction */  
        TRANS_COMMIT,                           /* commit in progress */  
        TRANS_ABORT,                            /* abort in progress */  
        TRANS_PREPARE                           /* prepare in progress */  
} TransState;  
  
/*  
 *      GetCurrentTransactionStopTimestamp  
 *  
 * We return current time if the transaction stop time hasn't been set  
 * (which can happen if we decide we don't need to log an XLOG record).  
 */  
TimestampTz  
GetCurrentTransactionStopTimestamp(void)  
{  
        if (xactStopTimestamp != 0)  
                return xactStopTimestamp;  
        return GetCurrentTimestamp();  
}  
  
/*  
 *      SetCurrentTransactionStopTimestamp  
 */  
static inline void  
SetCurrentTransactionStopTimestamp(void)  
{  
        xactStopTimestamp = GetCurrentTimestamp();  
}  
  
/*  
 *      RecordTransactionCommit  
 *  
 * Returns latest XID among xact and its children, or InvalidTransactionId  
 * if the xact has no XID.      (We compute that here just because it's easier.)  
 */  
static TransactionId  
RecordTransactionCommit(void)  
....  
  xlrec.xact_time = xactStopTimestamp;  
  
/*  
 *      RecordTransactionAbort  
 *  
 * Returns latest XID among xact and its children, or InvalidTransactionId  
 * if the xact has no XID.      (We compute that here just because it's easier.)  
 */  
static TransactionId  
RecordTransactionAbort(bool isSubXact)  
....  
        /* Write the ABORT record */  
        if (isSubXact)  
                xlrec.xact_time = GetCurrentTimestamp();  
        else  
        {  
                SetCurrentTransactionStopTimestamp();  
                xlrec.xact_time = xactStopTimestamp;  
        }  

Flag Counter

digoal’s 大量PostgreSQL文章入口