在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口

2 minute read

背景

如果业务上需要在数据库中跑LONG SQL,并且不希望跑的过程中因为窗口断开,导致数据库任务用户主动cancel query。有什么方法?

使用DBLINK异步调用是不错的方法,相当于数据库内部建立了连接在后台跑。

方法

1、创建任务表,方便观察任务状态

create table tbl_task (  
  id serial8 primary key,  -- 任务ID  
  client_info jsonb,       -- 客户端描述(usename, datname, search_path, client_addr, client_port)  
  sql text,                -- SQL信息  
  start_time timestamp,    -- 开始时间  
  end_time timestamp default clock_timestamp(),  -- 结束时间  
  info text                -- 描述信息  
);  

2、创建dblink 插件

create extension dblink;  

3、创建生成dblink连接的函数,重复创建不报错。

create or replace function conn(    
  name,   -- dblink名字    
  text    -- 连接串,URL    
) returns void as $$      
declare      
begin      
  perform dblink_connect($1, $2);     
  return;      
exception when others then      
  return;      
end;      
$$ language plpgsql strict;    

4、创建异步调用封装函数

create or replace function run_task(    
  sql text,       -- 要执行的SQL  
  info text,             -- 任务描述  
  conn_name name  default 'link_for_task', -- dblink 名字  
  conn text  default format('hostaddr=%s port=%s user=%s dbname=%s application_name=run_task', '127.0.0.1', current_setting('port'), current_user, current_database())  ,    -- 连接串   
  client_info jsonb   default format('{"client_addr":"%s", "client_pot":"%s", "search_path":"%s", "usename":"%s", "datname":"%s"}', inet_client_addr(), inet_client_port(), replace(current_setting('search_path'),'"','\"'), current_user, current_database())::jsonb   -- 客户端信息  
) returns void as $$     
declare    
begin    
  perform conn(conn_name,  conn);             -- 连接。       
    -- perform dblink_get_result(conn_name);    -- 消耗掉上一次异步连接的结果,否则会报错。      
      
  -- 发送异步DBLINK调用    
  perform dblink_send_query(conn_name, sql||format('; insert into tbl_task(client_info,sql,start_time,info) values (%L, %L, %L, %L);', client_info, sql, clock_timestamp(),info ));      
end;    
$$ language plpgsql strict;    

5、调用异步调用封装函数

select run_task(  
  'select count(*) from test',   -- 要RUN的SQL  
  'test dblink async call'       -- 任务描述  
);  

6、查看当前正在跑的后台任务

postgres=# select * from pg_stat_activity where application_name='run_task';  
-[ RECORD 1 ]----+---------------------------------------------------------------------------------------------  
datid            | 13285  
datname          | postgres  
pid              | 1510  
usesysid         | 10  
usename          | postgres  
application_name | run_task  
client_addr      | 127.0.0.1  
client_hostname  |   
client_port      | 55088  
backend_start    | 2018-06-21 18:04:20.964586+08  
xact_start       |   
query_start      | 2018-06-21 18:04:20.967177+08  
state_change     | 2018-06-21 18:04:20.969363+08  
wait_event_type  | Client  
wait_event       | ClientRead  
state            | idle  
backend_xid      |   
backend_xmin     |   
query            | select count(*) from test; insert into tbl_task(client_info,sql,start_time,info) values (E'{"datname": "postgres", "usename": "postgres", "client_pot": "", "client_addr": "", "search_path": "\\"$user\\", public"}', 'select count(*) from test', '2018-06-21 18:04:20.967118+08', 'test dblink async call')  
backend_type     | client backend  

7、查看任务状态

postgres=# select * from tbl_task;  
-[ RECORD 1 ]------------------------------------------------------------------------------------------------------------------------  
id          | 1  
client_info | {"datname": "postgres", "usename": "postgres", "client_pot": "", "client_addr": "", "search_path": "\"$user\", public"}  
sql         | select count(*) from test  
start_time  | 2018-06-21 18:07:39.60439  
end_time    | 2018-06-21 18:07:47.331041  
info        | test dblink async call  

如果是数据库普通用户调用,不支持trust认证

如果是普通用户,请使用密码认证,同时请务必保障pg_hba.conf使用的是密码认证。

比如阿里云RDS PPAS的用户:

select run_task(  
  'select count(*) from test',   -- 要RUN的SQL  
  'test dblink async call' ,      -- 任务描述  
  'link_task', 
  format('hostaddr=%s port=%s user=%s dbname=%s application_name=run_task, password=%s', '127.0.0.1', current_setting('port'), current_user, current_database(), '当前用户密码')  
);  

如果是阿里云RDS PG 9.4的用户,内核做过修改,请使用如下方法

select run_task(  
  'select count(*) from test',   -- 要RUN的SQL  
  'test dblink async call' ,      -- 任务描述  
  'link_task', 
  format('user=%s dbname=%s application_name=run_task, password=%s', current_user, current_database(), '密码')  
);  

注意

1、DBLINK异步连接会占用连接,算连接数的。

2、单个DBLINK连接,如果异步调用的SQL没有执行完,不能发起第二次请求。

NOTICE:  could not send query: another command is already in progress

那么你需要看看这个DBLINK的后台任务是否还在执行(通过前面查询pg_stat_activity的方法, 执行完state状态为idle),如果执行完了,那么执行以下SQL取一下结果,就可以继续使用这个DBLINK NAME发送异步请求了。

select * from dblink_get_result('link_for_task') as t(id text);

或者,你可以不用等这个DBLINK的异步任务执行完,马上想发起另一个异步任务,那么你需要使用一个新的DBLINK NAME。

select run_task(
  $$select count(*) from test where c1='abc'$$,   -- 要RUN的SQL
  'test dblink async call',       -- 任务描述
  'new_dblink_name'   -- 有别于前面已使用的DBLINK NAME
);

3、单个DBLINK连接,如果异步调用的SQL执行完了,调用dblink_get_result后,才能发起第二次请求。

NOTICE:  could not send query: another command is already in progress

4、单个DBLINK连接,如果异步调用的SQL没有执行完,调用dblink_get_result时,会等待异步调用执行完,才会有返回。等待过程中堵塞当前调用dblink_get_result的会话。

5、调用DBLINK异步接口的会话如果断开了,那么它发起的dblink异步调用后台任务执行完成后,连接会自动释放。

6、如果SQL中包含单引号,可以使用转义的写法,也可以使用没有符号的写法,不需要转义。

select run_task(
  $$select count(*) from test where c1='abc'$$,   -- 要RUN的SQL
  'test dblink async call'       -- 任务描述
);

select run_task(
  'select count(*) from test where c1=''abc''',   -- 要RUN的SQL
  'test dblink async call'       -- 任务描述
);

select run_task(
  E'select count(*) from test where c1=\'abc\'',   -- 要RUN的SQL
  'test dblink async call'       -- 任务描述
);

美元符号内可以输入任意个字符,成对出现即可。

select run_task(
  $_qqq_$select count(*) from test where c1='abc'$_qqq_$,   -- 要RUN的SQL
  'test dblink async call'       -- 任务描述
);

参考

https://www.postgresql.org/docs/10/static/dblink.html

《PostgreSQL 批量导入性能 (采用dblink 异步调用)》

Flag Counter

digoal’s 大量PostgreSQL文章入口