PostgreSQL Oracle 兼容性之 - 函数 自治事务 的写法和实现
背景
使用Oracle的用户,在函数中如果使用了自治事务的话,如果要转到PostgreSQL会遇到很棘手的问题。
因为PostgreSQL的函数是作为一个事务来处理的,要么全部提交,要么全部回滚,
除了exception,每个exception是一个子事务。
因此使用exception可以达到自治事务的目的。
除了使用exception,我们还可以使用dblink实现自治事务。
使用exception实现自治事务
例子1,
使用并行block和嵌套block,来控制子事务层级。
输入参数为block1, block2.1, block2.2, block3.1 。
这些参数代表执行在哪个block出错,出错时对应层级的block的exception会捕获错误,同时处理,然后跳到下一个block继续执行。
如果是外层的block出错,内层还没有被执行的block就没机会执行了。
根据业务需求,调整block层级或嵌套层级,达到目的。
这种用法可以完美的支撑业务的需求。
(除了一种情况不能满足,就是被提交的子事务立刻可以被其他事务可见。这种需求建本文下面的方法,用dblink来满足这种需求即可。)
create or replace function ft(err_level text) returns void as $$
declare
begin -- block level 1
raise notice 'block level 1';
if (err_level='block1') then
raise exception '%', err_level;
end if;
begin -- block level 2.1
raise notice 'block level 2.1'; -- 请用业务处理SQL代替
if (err_level='block2.1') then
raise exception '%', err_level;
end if;
begin -- block level 3.1
raise notice 'block level 3.1';
if (err_level='block3.1') then
raise exception '%', err_level;
end if;
exception when others then -- you can write catchup any ERROR CODE or ERROR STATE.
raise notice 'end block level 3.1';
end; -- end block level 3.1
exception when others then -- you can write catchup any ERROR CODE or ERROR STATE. 回滚block 2.1的业务处理SQL
raise notice 'end block level 2.1';
end; -- end block level 2.1
begin -- block level 2.2
raise notice 'block level 2.2';
if (err_level='block2.2') then
raise exception '%', err_level;
end if;
exception when others then -- you can write catchup any ERROR CODE or ERROR STATE.
raise notice 'end block level 2.2';
end; -- end block level 2.2
exception when others then -- you can write catchup any ERROR CODE or ERROR STATE.
raise notice 'end block level 1';
end; -- end block level 1
$$ language plpgsql;
测试:
在block 1出错,出错代码后面的代码都不会被执行。
postgres=# select ft('block1');
NOTICE: block level 1
NOTICE: end block level 1
ft
----
(1 row)
在block2.1出错,block 2.1内部出错代码后面的代码都不会被执行。但是同级代码如2.2会被执行。
postgres=# select ft('block2.1');
NOTICE: block level 1
NOTICE: block level 2.1
NOTICE: end block level 2.1
NOTICE: block level 2.2
ft
----
(1 row)
在block2.2出错。
postgres=# select ft('block2.2');
NOTICE: block level 1
NOTICE: block level 2.1
NOTICE: block level 3.1
NOTICE: block level 2.2
NOTICE: end block level 2.2
ft
----
(1 row)
在block3.1出错。
postgres=# select ft('block3.1');
NOTICE: block level 1
NOTICE: block level 2.1
NOTICE: block level 3.1
NOTICE: end block level 3.1
NOTICE: block level 2.2
ft
----
(1 row)
更直观的例子:
drop table tt;
create table tt(id int primary key, info text);
insert into tt values(5,'test');
create or replace function ft() returns void as $$
declare
begin -- block level 1
begin -- block level 2.1
insert into tt values (1,'test'),(2,'test'),(3,'test');
exception when others then
raise notice 'rollback block level 2.1';
end; -- end block level 2.1
begin -- block level 2.2
insert into tt values (4,'test'),(5,'test'),(6,'test'); -- 主键冲突, 插入失败, 但是不影响后面的block继续执行.
exception when others then
raise notice 'rollback block level 2.2';
end; -- end block level 2.2
begin -- block level 2.3
insert into tt values (7,'test'),(8,'test'),(9,'test');
exception when others then
raise notice 'rollback block level 2.3';
end; -- end block level 2.3
exception when others then -- you can write catchup any ERROR CODE or ERROR STATE.
raise notice 'rollback block level 1';
end; -- end block level 1
$$ language plpgsql;
postgres=# select ft();
NOTICE: rollback block level 2.2
ft
----
(1 row)
postgres=# select * from tt;
id | info
----+------
5 | test
1 | test
2 | test
3 | test
7 | test
8 | test
9 | test
(7 rows)
例子2:
使用dblink,同样需要将需要批量提交的部分写成子函数先。
例如 :
create extension dblink;
CREATE SERVER fdtest FOREIGN DATA WRAPPER dblink_fdw OPTIONS (hostaddr '127.0.0.1', dbname '函数所在的库名');
CREATE USER MAPPING FOR 需要调用函数的用户名 SERVER fdtest OPTIONS (user '需要调用函数的用户名', password '用户密码');
GRANT USAGE ON FOREIGN SERVER fdtest TO 需要调用函数的用户名;
函数体
declare
dblink_block_res1 record;
dblink_block_res2 record;
...
dblink_block_resn record;
...
其他变量定义;
begin
-- 建立连接
if ( dblink_connect('myconn', 'fdtest') <> 'OK' ) then
raise notice '连接失败';
return;
end if;
-- block 1,需要流转的变量通过参数传入下面的函数
select dblink('myconn', 'select func1($1,$2,...)') into dblink_block_res1; -- $1,$2,...使用常数替代, 或动态SQL
-- 中间结果判断
if not found then -- dblink调用异常
-- 异常处理
else
-- dblink对应子事务已提交
end if;
-- block 2,需要流转的变量通过参数传入下面的函数
select dblink('myconn', 'select func2($1,$2,...)') into dblink_block_res2; -- $1,$2,...使用常数替代, 或动态SQL
-- 中间结果判断
if not found then -- 远程调用异常
-- 异常处理
else
-- dblink对应子事务已提交
end if;
......
-- block n,需要流转的变量通过参数传入下面的函数
select dblink('myconn', 'select funcn($1,$2,...)') into dblink_block_resn; -- $1,$2,...使用常数替代, 或动态SQL
-- 中间结果判断
if not found then -- 远程调用异常
-- 异常处理
else
-- dblink对应子事务已提交
end if;
......
exception when others then
...
end;
使用dblink实现自治事务
https://docs.oracle.com/cd/B19306_01/appdev.102/b14261/sqloperations.htm
http://blog.dalibo.com/2016/08/19/Autonoumous_transactions_support_in_PostgreSQL.html
剥离除了的子事务逻辑,作为一个函数接口
CREATE PROCEDURE log_action (username VARCHAR2, event_date DATE, msg VARCHAR2)
IS
PRAGMA AUTONOMOUS_TRANSACTION;
BEGIN
INSERT INTO table_tracking VALUES (log_seq.nextval, username, event_date, msg);
COMMIT;
END log_action;
在PG中,创建对应的函数
CREATE OR REPLACE FUNCTION log_action_atx (
username text, event_date timestamp, msg text
) RETURNS VOID AS
$body$
BEGIN
INSERT INTO table_tracking VALUES (nextval('log_seq'), username, event_date, msg);
END;
$body$
LANGUAGE PLPGSQL
在其他函数中,调用dblink来调用以上子事务模块调用的函数,实现这个模块本身的自治事务。
--
-- dblink wrapper to call function log_action as an autonomous transaction
--
CREATE OR REPLACE FUNCTION log_action (
username text, event_date timestamp, msg text
) RETURNS VOID AS
$body$
DECLARE
-- Change this to reflect the dblink connection string
v_conn_str text := 'port=5432 dbname=testdb host=localhost user=pguser password=pgpass';
v_query text;
BEGIN
v_query := 'SELECT true FROM log_action_atx ( ' || quote_nullable(username) ||
',' || quote_nullable(event_date) || ',' || quote_nullable(msg) || ' )';
PERFORM * FROM dblink(v_conn_str, v_query) AS p (ret boolean);
END;
$body$
LANGUAGE plpgsql SECURITY DEFINER;
对于子事务调用,还可以使用两阶段提交。
《PostgreSQL’s two-phase commit used with dblink example - 2PC , 两阶段事务》
其他参考地址
http://www.postgresql.org/docs/9.5/static/contrib-dblink-function.html
http://postgresql.nabble.com/Autonomous-Transaction-WIP-td5798928.html
https://lwn.net/Articles/648973/
PostgreSQL 11已支持函数内COMMIT,支持自治事务。
https://www.postgresql.org/docs/devel/static/plpgsql-porting.html