PostgreSQL Oracle 兼容性之 - 函数 自治事务 的写法和实现

4 minute read

背景

使用Oracle的用户,在函数中如果使用了自治事务的话,如果要转到PostgreSQL会遇到很棘手的问题。

因为PostgreSQL的函数是作为一个事务来处理的,要么全部提交,要么全部回滚,

除了exception,每个exception是一个子事务。

因此使用exception可以达到自治事务的目的。

除了使用exception,我们还可以使用dblink实现自治事务。

使用exception实现自治事务

例子1,

使用并行block和嵌套block,来控制子事务层级。

输入参数为block1, block2.1, block2.2, block3.1 。

这些参数代表执行在哪个block出错,出错时对应层级的block的exception会捕获错误,同时处理,然后跳到下一个block继续执行。

如果是外层的block出错,内层还没有被执行的block就没机会执行了。

根据业务需求,调整block层级或嵌套层级,达到目的。

这种用法可以完美的支撑业务的需求。

(除了一种情况不能满足,就是被提交的子事务立刻可以被其他事务可见。这种需求建本文下面的方法,用dblink来满足这种需求即可。)

create or replace function ft(err_level text) returns void as $$  
declare  
begin -- block level 1  
  raise notice 'block level 1';  
  if (err_level='block1') then  
    raise exception '%', err_level;  
  end if;  
  
  begin -- block level 2.1  
    raise notice 'block level 2.1';  -- 请用业务处理SQL代替  
    if (err_level='block2.1') then  
      raise exception '%', err_level;  
    end if;  
  
    begin -- block level 3.1  
      raise notice 'block level 3.1';  
      if (err_level='block3.1') then  
        raise exception '%', err_level;  
      end if;  
      exception when others then  -- you can write catchup any ERROR CODE or ERROR STATE.  
        raise notice 'end block level 3.1';  
    end; -- end block level 3.1  
  
    exception when others then  -- you can write catchup any ERROR CODE or ERROR STATE. 回滚block 2.1的业务处理SQL  
      raise notice 'end block level 2.1';  
  end; -- end block level 2.1  
  
  begin -- block level 2.2  
    raise notice 'block level 2.2';  
    if (err_level='block2.2') then  
      raise exception '%', err_level;  
    end if;  
    exception when others then  -- you can write catchup any ERROR CODE or ERROR STATE.  
      raise notice 'end block level 2.2';  
  end; -- end block level 2.2  
  
  exception when others then  -- you can write catchup any ERROR CODE or ERROR STATE.  
    raise notice 'end block level 1';  
end; -- end block level 1  
$$ language plpgsql;  

测试:

在block 1出错,出错代码后面的代码都不会被执行。

postgres=# select ft('block1');  
NOTICE:  block level 1  
NOTICE:  end block level 1  
 ft   
----  
   
(1 row)  

在block2.1出错,block 2.1内部出错代码后面的代码都不会被执行。但是同级代码如2.2会被执行。

postgres=# select ft('block2.1');  
NOTICE:  block level 1  
NOTICE:  block level 2.1  
NOTICE:  end block level 2.1  
NOTICE:  block level 2.2  
 ft   
----  
   
(1 row)  

在block2.2出错。

postgres=# select ft('block2.2');  
NOTICE:  block level 1  
NOTICE:  block level 2.1  
NOTICE:  block level 3.1  
NOTICE:  block level 2.2  
NOTICE:  end block level 2.2  
 ft   
----  
   
(1 row)  

在block3.1出错。

postgres=# select ft('block3.1');  
NOTICE:  block level 1  
NOTICE:  block level 2.1  
NOTICE:  block level 3.1  
NOTICE:  end block level 3.1  
NOTICE:  block level 2.2  
 ft   
----  
   
(1 row)  

更直观的例子:

drop table tt;  
create table tt(id int primary key, info text);  
insert into tt values(5,'test');  
  
create or replace function ft() returns void as $$  
declare  
begin -- block level 1  
  
  begin -- block level 2.1  
    insert into tt values (1,'test'),(2,'test'),(3,'test');  
    exception when others then   
      raise notice 'rollback block level 2.1';  
  end; -- end block level 2.1  
  
  begin -- block level 2.2  
    insert into tt values (4,'test'),(5,'test'),(6,'test'); -- 主键冲突, 插入失败, 但是不影响后面的block继续执行.  
    exception when others then   
      raise notice 'rollback block level 2.2';  
  end; -- end block level 2.2  
  
  begin -- block level 2.3  
    insert into tt values (7,'test'),(8,'test'),(9,'test');  
    exception when others then   
      raise notice 'rollback block level 2.3';  
  end; -- end block level 2.3  
  
  exception when others then  -- you can write catchup any ERROR CODE or ERROR STATE.  
    raise notice 'rollback block level 1';  
end; -- end block level 1  
$$ language plpgsql;  
  
postgres=# select ft();  
NOTICE:  rollback block level 2.2  
 ft   
----  
   
(1 row)  
  
postgres=# select * from tt;  
 id | info   
----+------  
  5 | test  
  1 | test  
  2 | test  
  3 | test  
  7 | test  
  8 | test  
  9 | test  
(7 rows)  

例子2:

使用dblink,同样需要将需要批量提交的部分写成子函数先。

例如 :

create extension dblink;  
CREATE SERVER fdtest FOREIGN DATA WRAPPER dblink_fdw OPTIONS (hostaddr '127.0.0.1', dbname '函数所在的库名');  
CREATE USER MAPPING FOR 需要调用函数的用户名 SERVER fdtest OPTIONS (user '需要调用函数的用户名', password '用户密码');  
GRANT USAGE ON FOREIGN SERVER fdtest TO 需要调用函数的用户名;  

函数体

declare  
  dblink_block_res1 record;  
  dblink_block_res2 record;  
...  
  dblink_block_resn record;  
...  
  其他变量定义;  
begin  
-- 建立连接  
if ( dblink_connect('myconn', 'fdtest') <> 'OK' ) then  
  raise notice '连接失败';  
  return;  
end if;  
-- block 1,需要流转的变量通过参数传入下面的函数  
  select dblink('myconn', 'select func1($1,$2,...)') into dblink_block_res1;  -- $1,$2,...使用常数替代, 或动态SQL  
-- 中间结果判断  
  if not found then  -- dblink调用异常  
    -- 异常处理  
  else  
    -- dblink对应子事务已提交  
  end if;  
-- block 2,需要流转的变量通过参数传入下面的函数  
  select dblink('myconn', 'select func2($1,$2,...)') into dblink_block_res2;  -- $1,$2,...使用常数替代, 或动态SQL  
-- 中间结果判断  
  if not found then  -- 远程调用异常  
    -- 异常处理  
  else  
    -- dblink对应子事务已提交  
  end if;  
......  
-- block n,需要流转的变量通过参数传入下面的函数  
  select dblink('myconn', 'select funcn($1,$2,...)') into dblink_block_resn;  -- $1,$2,...使用常数替代, 或动态SQL  
-- 中间结果判断  
  if not found then  -- 远程调用异常  
    -- 异常处理  
  else  
    -- dblink对应子事务已提交  
  end if;  
......  
exception when others then  
...  
end;  

使用dblink实现自治事务

https://docs.oracle.com/cd/B19306_01/appdev.102/b14261/sqloperations.htm

http://blog.dalibo.com/2016/08/19/Autonoumous_transactions_support_in_PostgreSQL.html

剥离除了的子事务逻辑,作为一个函数接口

CREATE PROCEDURE log_action (username VARCHAR2, event_date DATE, msg VARCHAR2)
IS
   PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
   INSERT INTO table_tracking VALUES (log_seq.nextval, username, event_date, msg);
   COMMIT;
END log_action;

在PG中,创建对应的函数

CREATE OR REPLACE FUNCTION log_action_atx (
	username text, event_date timestamp, msg text
) RETURNS VOID AS
$body$
BEGIN
   INSERT INTO table_tracking VALUES (nextval('log_seq'), username, event_date, msg);
END;
$body$
LANGUAGE PLPGSQL

在其他函数中,调用dblink来调用以上子事务模块调用的函数,实现这个模块本身的自治事务。

--
-- dblink wrapper to call function log_action as an autonomous transaction
--
CREATE OR REPLACE FUNCTION log_action (
	username text, event_date timestamp, msg text
) RETURNS VOID AS
$body$
DECLARE
        -- Change this to reflect the dblink connection string
        v_conn_str  text := 'port=5432 dbname=testdb host=localhost user=pguser password=pgpass';
        v_query     text;

BEGIN
        v_query := 'SELECT true FROM log_action_atx ( ' || quote_nullable(username) ||
		 ',' || quote_nullable(event_date) || ',' || quote_nullable(msg) || ' )';
        PERFORM * FROM dblink(v_conn_str, v_query) AS p (ret boolean);

END;
$body$
LANGUAGE plpgsql SECURITY DEFINER;

对于子事务调用,还可以使用两阶段提交。

《PostgreSQL’s two-phase commit used with dblink example - 2PC , 两阶段事务》

其他参考地址

http://www.postgresql.org/docs/9.5/static/contrib-dblink-function.html

http://postgresql.nabble.com/Autonomous-Transaction-WIP-td5798928.html

https://lwn.net/Articles/648973/

PostgreSQL 11已支持函数内COMMIT,支持自治事务。

https://www.postgresql.org/docs/devel/static/plpgsql-porting.html

Flag Counter

digoal’s 大量PostgreSQL文章入口