在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口
背景
如果业务上需要在数据库中跑LONG SQL,并且不希望跑的过程中因为窗口断开,导致数据库任务用户主动cancel query。有什么方法?
使用DBLINK异步调用是不错的方法,相当于数据库内部建立了连接在后台跑。
方法
1、创建任务表,方便观察任务状态
create table tbl_task (
id serial8 primary key, -- 任务ID
client_info jsonb, -- 客户端描述(usename, datname, search_path, client_addr, client_port)
sql text, -- SQL信息
start_time timestamp, -- 开始时间
end_time timestamp default clock_timestamp(), -- 结束时间
info text -- 描述信息
);
2、创建dblink 插件
create extension dblink;
3、创建生成dblink连接的函数,重复创建不报错。
create or replace function conn(
name, -- dblink名字
text -- 连接串,URL
) returns void as $$
declare
begin
perform dblink_connect($1, $2);
return;
exception when others then
return;
end;
$$ language plpgsql strict;
4、创建异步调用封装函数
create or replace function run_task(
sql text, -- 要执行的SQL
info text, -- 任务描述
conn_name name default 'link_for_task', -- dblink 名字
conn text default format('hostaddr=%s port=%s user=%s dbname=%s application_name=run_task', '127.0.0.1', current_setting('port'), current_user, current_database()) , -- 连接串
client_info jsonb default format('{"client_addr":"%s", "client_pot":"%s", "search_path":"%s", "usename":"%s", "datname":"%s"}', inet_client_addr(), inet_client_port(), replace(current_setting('search_path'),'"','\"'), current_user, current_database())::jsonb -- 客户端信息
) returns void as $$
declare
begin
perform conn(conn_name, conn); -- 连接。
-- perform dblink_get_result(conn_name); -- 消耗掉上一次异步连接的结果,否则会报错。
-- 发送异步DBLINK调用
perform dblink_send_query(conn_name, sql||format('; insert into tbl_task(client_info,sql,start_time,info) values (%L, %L, %L, %L);', client_info, sql, clock_timestamp(),info ));
end;
$$ language plpgsql strict;
5、调用异步调用封装函数
select run_task(
'select count(*) from test', -- 要RUN的SQL
'test dblink async call' -- 任务描述
);
6、查看当前正在跑的后台任务
postgres=# select * from pg_stat_activity where application_name='run_task';
-[ RECORD 1 ]----+---------------------------------------------------------------------------------------------
datid | 13285
datname | postgres
pid | 1510
usesysid | 10
usename | postgres
application_name | run_task
client_addr | 127.0.0.1
client_hostname |
client_port | 55088
backend_start | 2018-06-21 18:04:20.964586+08
xact_start |
query_start | 2018-06-21 18:04:20.967177+08
state_change | 2018-06-21 18:04:20.969363+08
wait_event_type | Client
wait_event | ClientRead
state | idle
backend_xid |
backend_xmin |
query | select count(*) from test; insert into tbl_task(client_info,sql,start_time,info) values (E'{"datname": "postgres", "usename": "postgres", "client_pot": "", "client_addr": "", "search_path": "\\"$user\\", public"}', 'select count(*) from test', '2018-06-21 18:04:20.967118+08', 'test dblink async call')
backend_type | client backend
7、查看任务状态
postgres=# select * from tbl_task;
-[ RECORD 1 ]------------------------------------------------------------------------------------------------------------------------
id | 1
client_info | {"datname": "postgres", "usename": "postgres", "client_pot": "", "client_addr": "", "search_path": "\"$user\", public"}
sql | select count(*) from test
start_time | 2018-06-21 18:07:39.60439
end_time | 2018-06-21 18:07:47.331041
info | test dblink async call
如果是数据库普通用户调用,不支持trust认证
如果是普通用户,请使用密码认证,同时请务必保障pg_hba.conf使用的是密码认证。
比如阿里云RDS PPAS的用户:
select run_task(
'select count(*) from test', -- 要RUN的SQL
'test dblink async call' , -- 任务描述
'link_task',
format('hostaddr=%s port=%s user=%s dbname=%s application_name=run_task, password=%s', '127.0.0.1', current_setting('port'), current_user, current_database(), '当前用户密码')
);
如果是阿里云RDS PG 9.4的用户,内核做过修改,请使用如下方法
select run_task(
'select count(*) from test', -- 要RUN的SQL
'test dblink async call' , -- 任务描述
'link_task',
format('user=%s dbname=%s application_name=run_task, password=%s', current_user, current_database(), '密码')
);
注意
1、DBLINK异步连接会占用连接,算连接数的。
2、单个DBLINK连接,如果异步调用的SQL没有执行完,不能发起第二次请求。
NOTICE: could not send query: another command is already in progress
那么你需要看看这个DBLINK的后台任务是否还在执行(通过前面查询pg_stat_activity的方法, 执行完state状态为idle),如果执行完了,那么执行以下SQL取一下结果,就可以继续使用这个DBLINK NAME发送异步请求了。
select * from dblink_get_result('link_for_task') as t(id text);
或者,你可以不用等这个DBLINK的异步任务执行完,马上想发起另一个异步任务,那么你需要使用一个新的DBLINK NAME。
select run_task(
$$select count(*) from test where c1='abc'$$, -- 要RUN的SQL
'test dblink async call', -- 任务描述
'new_dblink_name' -- 有别于前面已使用的DBLINK NAME
);
3、单个DBLINK连接,如果异步调用的SQL执行完了,调用dblink_get_result后,才能发起第二次请求。
NOTICE: could not send query: another command is already in progress
4、单个DBLINK连接,如果异步调用的SQL没有执行完,调用dblink_get_result时,会等待异步调用执行完,才会有返回。等待过程中堵塞当前调用dblink_get_result的会话。
5、调用DBLINK异步接口的会话如果断开了,那么它发起的dblink异步调用后台任务执行完成后,连接会自动释放。
6、如果SQL中包含单引号,可以使用转义的写法,也可以使用没有符号的写法,不需要转义。
select run_task(
$$select count(*) from test where c1='abc'$$, -- 要RUN的SQL
'test dblink async call' -- 任务描述
);
或
select run_task(
'select count(*) from test where c1=''abc''', -- 要RUN的SQL
'test dblink async call' -- 任务描述
);
或
select run_task(
E'select count(*) from test where c1=\'abc\'', -- 要RUN的SQL
'test dblink async call' -- 任务描述
);
或
美元符号内可以输入任意个字符,成对出现即可。
select run_task(
$_qqq_$select count(*) from test where c1='abc'$_qqq_$, -- 要RUN的SQL
'test dblink async call' -- 任务描述
);
参考
https://www.postgresql.org/docs/10/static/dblink.html
《PostgreSQL 批量导入性能 (采用dblink 异步调用)》