PostgreSQL 事件触发器应用 - DDL审计记录 + 异步通知(notify)

9 minute read

背景

DDL语句的审计是非常重要的,目前PG的DDL审计记录在日志文件中。不便于查看。

为了让DDL事件记录到表中,方便查看,我们可以通过事件触发器来达到这个效果。

结合PostgreSQL的notify, listen的消息通道功能,在DDL执行后可以立即通知到监听对应CHANNEL的程序。

事件触发器审计DDL操作

事件触发器语法:

Command:     CREATE EVENT TRIGGER    
Description: define a new event trigger    
Syntax:    
CREATE EVENT TRIGGER name    
    ON event    
    [ WHEN filter_variable IN (filter_value [, ... ]) [ AND ... ] ]    
    EXECUTE PROCEDURE function_name()    

1、事件触发器的触发点(event)

目前支持4个触发点(event)

ddl_command_start, ddl_command_end, table_rewrite and sql_drop    

这四个触发点,有3个能捕获到事件发生时的信息。

1.1 ddl_command_end

通过这个函数进行捕获:pg_event_trigger_ddl_commands()

Name Type Description
classid Oid OID of catalog the object belongs in
objid Oid OID of the object in the catalog
objsubid integer Object sub-id (e.g. attribute number for columns)
command_tag text command tag
object_type text Type of the object
schema_name text Name of the schema the object belongs in, if any; otherwise NULL. No quoting is applied.
object_identity text Text rendering of the object identity, schema-qualified. Each and every identifier present in the identity is quoted if necessary.
in_extension bool whether the command is part of an extension script
command pg_ddl_command A complete representation of the command, in internal format. This cannot be output directly, but it can be passed to other functions to obtain different pieces of information about the command.

1.2 sql_drop

通过这个函数进行捕获:pg_event_trigger_dropped_objects()

Name Type Description
classid Oid OID of catalog the object belonged in
objid Oid OID the object had within the catalog
objsubid int32 Object sub-id (e.g. attribute number for columns)
original bool Flag used to identify the root object(s) of the deletion
normal bool Flag indicating that there’s a normal dependency relationship in the dependency graph leading to this object
is_temporary bool Flag indicating that the object was a temporary object.
object_type text Type of the object
schema_name text Name of the schema the object belonged in, if any; otherwise NULL. No quoting is applied.
object_name text Name of the object, if the combination of schema and name can be used as a unique identifier for the object; otherwise NULL. No quoting is applied, and name is never schema-qualified.
object_identity text Text rendering of the object identity, schema-qualified. Each and every identifier present in the identity is quoted if necessary.
address_names text[] An array that, together with object_type and address_args, can be used by the pg_get_object_address() to recreate the object address in a remote server containing an identically named object of the same kind.
address_args text[] Complement for address_names above.

1.3 table_rewrite

通过如下函数进行捕获:

Name Return Type Description
pg_event_trigger_table_rewrite_oid() Oid The OID of the table about to be rewritten.
pg_event_trigger_table_rewrite_reason() int The reason code(s) explaining the reason for rewriting. The exact meaning of the codes is release dependent.

2、创建三个触发点的捕获信息存储表

create schema pgaudit;  
grant USAGE on schema pgaudit to public;  
  
create table pgaudit.audit_ddl_command_end (    
  event text,    
  tag text,    
  username name default current_user,    
  datname name default current_database(),    
  client_addr inet default inet_client_addr(),    
  client_port int default inet_client_port(),    
  crt_time timestamp default now(),    
  classid oid,    
  objid oid,    
  objsubid int,    
  command_tag text,    
  object_type text,    
  schema_name text,    
  object_identity text,    
  is_extension bool,    
  xid bigint default txid_current()  
);    
    
create table pgaudit.audit_sql_drop (    
  event text,    
  tag text,    
  username name default current_user,    
  datname name default current_database(),    
  client_addr inet default inet_client_addr(),    
  client_port int default inet_client_port(),    
  crt_time timestamp default now(),    
  classid oid,    
  objid oid,    
  objsubid int,    
  original bool,    
  normal bool,    
  is_temporary bool,    
  object_type text,    
  schema_name text,    
  object_name text,    
  object_identity text,    
  address_names text[],    
  address_args text[],    
  xid bigint default txid_current()   
);    
    
create table pgaudit.audit_table_rewrite (    
  event text,    
  tag text,    
  username name default current_user,    
  datname name default current_database(),    
  client_addr inet default inet_client_addr(),    
  client_port int default inet_client_port(),    
  crt_time timestamp default now(),    
  table_rewrite_oid oid,    
  table_rewrite_reason int,    
  xid bigint default txid_current()  
);    
    
grant select,update,delete,insert,truncate on pgaudit.audit_ddl_command_end to public;    
grant select,update,delete,insert,truncate on pgaudit.audit_sql_drop to public;    
grant select,update,delete,insert,truncate on pgaudit.audit_table_rewrite to public;    

3、创建三个触发点的事件触发器函数

create or replace function pgaudit.et_ddl_command_end() returns event_trigger as $$    
declare    
begin    
  insert into pgaudit.audit_ddl_command_end (event, tag, classid, objid, objsubid, command_tag, object_type, schema_name, object_identity, is_extension )    
    select TG_EVENT, TG_TAG,      
      classid, objid, objsubid, command_tag, object_type, schema_name, object_identity, in_extension from    
      pg_event_trigger_ddl_commands();    
  exception when others then  --	ERROR:  22004: null values cannot be formatted as an SQL identifier  
  return;    
end;    
$$ language plpgsql strict;    
create or replace function pgaudit.et_sql_drop() returns event_trigger as $$    
declare    
begin    
  insert into pgaudit.audit_sql_drop (event, tag, classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args)    
    select TG_EVENT, TG_TAG,  
      classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args from    
      pg_event_trigger_dropped_objects();    
  exception when others then  --	ERROR:  22004: null values cannot be formatted as an SQL identifier  
  return;    
end;    
$$ language plpgsql strict;    
create or replace function pgaudit.et_table_rewrite() returns event_trigger as $$    
declare    
begin    
  insert into pgaudit.audit_table_rewrite (event, tag, table_rewrite_oid, table_rewrite_reason)     
    select TG_EVENT, TG_TAG,      
      pg_event_trigger_table_rewrite_oid(),    
      pg_event_trigger_table_rewrite_reason();    
  exception when others then  --	ERROR:  22004: null values cannot be formatted as an SQL identifier  
  return;    
end;    
$$ language plpgsql strict;    

4、创建三个触发点的事件触发器

CREATE EVENT TRIGGER et_ddl_command_end on ddl_command_end EXECUTE PROCEDURE pgaudit.et_ddl_command_end();    
    
CREATE EVENT TRIGGER et_sql_drop on sql_drop EXECUTE PROCEDURE pgaudit.et_sql_drop();    
    
CREATE EVENT TRIGGER et_table_rewrite on table_rewrite EXECUTE PROCEDURE pgaudit.et_table_rewrite();    

5、创建普通触发器,当写入审计表时,将结果推送到通道,或raise出来。

5.1、 ddl_command_end

create or replace function pgaudit.tg1() returns trigger as $$  
declare  
  v_class_nsp name;  
  v_class_name name;  
  v_obj json;  
begin  
  select t2.nspname,t1.relname into v_class_nsp,v_class_name from pg_class t1,pg_namespace t2 where t1.oid=NEW.classid and t1.relnamespace=t2.oid;  
  
  execute format('select row_to_json(t) from %I.%I t where oid=%s', v_class_nsp, v_class_name, NEW.objid) into v_obj;  
  
  -- raise notice 'CLASS_NSP:%, CLASS_NAME:%, OBJ:%, CONTENT:%', v_class_nsp, v_class_name, v_obj, row_to_json(NEW);  
    
  perform pg_notify('ddl_event', format('CLASS_NSP:%s, CLASS_NAME:%s, OBJ:%s, CONTENT:%s', v_class_nsp, v_class_name, v_obj, row_to_json(NEW)));  
  return null;  
end;  
$$ language plpgsql strict;  
  
create trigger tg1 after insert on pgaudit.audit_ddl_command_end for each row execute procedure pgaudit.tg1();  

5.2、 sql_drop

create or replace function pgaudit.tg2() returns trigger as $$  
declare  
  v_class_nsp name;  
  v_class_name name;  
  v_obj json;  
begin  
  select t2.nspname,t1.relname into v_class_nsp,v_class_name from pg_class t1,pg_namespace t2 where t1.oid=NEW.classid and t1.relnamespace=t2.oid;  
  
  execute format('select row_to_json(t) from %I.%I t where oid=%s', v_class_nsp, v_class_name, NEW.objid) into v_obj;  
  
  -- raise notice 'CLASS_NSP:%, CLASS_NAME:%, OBJ:%, CONTENT:%', v_class_nsp, v_class_name, v_obj, row_to_json(NEW);  
    
  perform pg_notify('ddl_event', format('CLASS_NSP:%s, CLASS_NAME:%s, OBJ:%s, CONTENT:%s', v_class_nsp, v_class_name, v_obj, row_to_json(NEW)));  
  return null;  
end;  
$$ language plpgsql strict;  
  
create trigger tg2 after insert on pgaudit.audit_sql_drop for each row execute procedure pgaudit.tg2();  

5.3、 table_rewrite

create or replace function pgaudit.tg3() returns trigger as $$  
declare  
begin  
  -- raise notice 'TABLE:%, CONTENT:%', (NEW.table_rewrite_oid)::regclass, row_to_json(NEW);  
    
  perform pg_notify('ddl_event', format('TABLE:%s, CONTENT:%s', (NEW.table_rewrite_oid)::regclass, row_to_json(NEW)));  
  return null;  
end;  
$$ language plpgsql strict;  
  
create trigger tg3 after insert on pgaudit.audit_table_rewrite for each row execute procedure pgaudit.tg3();  

6、模板化

在模板库,执行第二到第四步。

\c template1 postgres    
-- 在模板库,执行第二到第四步。    

7、通过模板创建的数据库,会自动继承这个模板。

postgres=# create database db1 template template1;    
CREATE DATABASE    

8、例子

8.1、建表

postgres=# \c db1 test    
You are now connected to database "db1" as user "test".    
db1=> create table tbl(id int);    
CREATE TABLE    

8.2、写入数据

db1=> insert into tbl select generate_series(1,100);    
INSERT 0 100    

8.3、重写表

db1=> alter table tbl add column info text default 'abc';    
ALTER TABLE    

8.4、删表

db1=> drop table tbl;    
DROP TABLE    

9、查询审计信息

db1=> select * from pgaudit.audit_ddl_command_end ;    
-[ RECORD 1 ]---+---------------------------    
event           | ddl_command_end    
tag             | CREATE TABLE    
username        | test    
datname         | db1    
client_addr     |     
client_port     |     
crt_time        | 2017-09-25 16:05:39.459787    
classid         | 1259    
objid           | 33212    
objsubid        | 0    
command_tag     | CREATE TABLE    
object_type     | table    
schema_name     | public    
object_identity | public.tbl    
is_extension    | f    
-[ RECORD 2 ]---+---------------------------    
event           | ddl_command_end    
tag             | ALTER TABLE    
username        | test    
datname         | db1    
client_addr     |     
client_port     |     
crt_time        | 2017-09-25 16:05:59.781995    
classid         | 1259    
objid           | 33212    
objsubid        | 0    
command_tag     | ALTER TABLE    
object_type     | table    
schema_name     | public    
object_identity | public.tbl    
is_extension    | f    
db1=> select * from pgaudit.audit_sql_drop ;    
-[ RECORD 1 ]---+--------------------------------    
event           | sql_drop    
tag             | DROP TABLE    
username        | test    
datname         | db1    
client_addr     |     
client_port     |     
crt_time        | 2017-09-25 16:06:08.22198    
classid         | 1259    
objid           | 33212    
objsubid        | 0    
original        | t    
normal          | f    
is_temporary    | f    
object_type     | table    
schema_name     | public    
object_name     | tbl    
object_identity | public.tbl    
address_names   | {public,tbl}    
address_args    | {}    
-[ RECORD 2 ]---+--------------------------------    
event           | sql_drop    
tag             | DROP TABLE    
username        | test    
datname         | db1    
client_addr     |     
client_port     |     
crt_time        | 2017-09-25 16:06:08.22198    
classid         | 2604    
objid           | 33215    
objsubid        | 0    
original        | f    
normal          | f    
is_temporary    | f    
object_type     | default value    
schema_name     |     
object_name     |     
object_identity | for public.tbl.info    
address_names   | {public,tbl,info}    
address_args    | {}    
-[ RECORD 3 ]---+--------------------------------    
event           | sql_drop    
tag             | DROP TABLE    
username        | test    
datname         | db1    
client_addr     |     
client_port     |     
crt_time        | 2017-09-25 16:06:08.22198    
classid         | 1247    
objid           | 33214    
objsubid        | 0    
original        | f    
normal          | f    
is_temporary    | f    
object_type     | type    
schema_name     | public    
object_name     | tbl    
object_identity | public.tbl    
address_names   | {public.tbl}    
address_args    | {}    
-[ RECORD 4 ]---+--------------------------------    
event           | sql_drop    
tag             | DROP TABLE    
username        | test    
datname         | db1    
client_addr     |     
client_port     |     
crt_time        | 2017-09-25 16:06:08.22198    
classid         | 1247    
objid           | 33213    
objsubid        | 0    
original        | f    
normal          | f    
is_temporary    | f    
object_type     | type    
schema_name     | public    
object_name     | _tbl    
object_identity | public.tbl[]    
address_names   | {public.tbl[]}    
address_args    | {}    
-[ RECORD 5 ]---+--------------------------------    
event           | sql_drop    
tag             | DROP TABLE    
username        | test    
datname         | db1    
client_addr     |     
client_port     |     
crt_time        | 2017-09-25 16:06:08.22198    
classid         | 1259    
objid           | 33222    
objsubid        | 0    
original        | f    
normal          | f    
is_temporary    | f    
object_type     | toast table    
schema_name     | pg_toast    
object_name     | pg_toast_33212    
object_identity | pg_toast.pg_toast_33212    
address_names   | {pg_toast,pg_toast_33212}    
address_args    | {}    
-[ RECORD 6 ]---+--------------------------------    
event           | sql_drop    
tag             | DROP TABLE    
username        | test    
datname         | db1    
client_addr     |     
client_port     |     
crt_time        | 2017-09-25 16:06:08.22198    
classid         | 1259    
objid           | 33224    
objsubid        | 0    
original        | f    
normal          | f    
is_temporary    | f    
object_type     | index    
schema_name     | pg_toast    
object_name     | pg_toast_33212_index    
object_identity | pg_toast.pg_toast_33212_index    
address_names   | {pg_toast,pg_toast_33212_index}    
address_args    | {}    
-[ RECORD 7 ]---+--------------------------------    
event           | sql_drop    
tag             | DROP TABLE    
username        | test    
datname         | db1    
client_addr     |     
client_port     |     
crt_time        | 2017-09-25 16:06:08.22198    
classid         | 1247    
objid           | 33223    
objsubid        | 0    
original        | f    
normal          | f    
is_temporary    | f    
object_type     | type    
schema_name     | pg_toast    
object_name     | pg_toast_33212    
object_identity | pg_toast.pg_toast_33212    
address_names   | {pg_toast.pg_toast_33212}    
address_args    | {}    
db1=> select * from pgaudit.audit_table_rewrite ;    
     event     |     tag     | username | datname | client_addr | client_port |          crt_time          | table_rewrite_oid | table_rewrite_reason     
---------------+-------------+----------+---------+-------------+-------------+----------------------------+-------------------+----------------------    
 table_rewrite | ALTER TABLE | test     | db1     |             |             | 2017-09-25 16:05:59.781995 |             33212 |                    2    
(1 row)    

10、 使用listen监听异步消息

postgres=# listen ddl_event;  
LISTEN  
  
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_class, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1259","objid":"1596751","objsubid":0,"original":true,"normal":false,"is_temporary":false,"object_type":"table","schema_name":"public","object_name":"a","object_identity":"public.a","address_names":["public","a"],"address_args":[],"xid":372671943}" received from server process with PID 51884.  
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_type, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1247","objid":"1596753","objsubid":0,"original":false,"normal":false,"is_temporary":false,"object_type":"type","schema_name":"public","object_name":"a","object_identity":"public.a","address_names":["public.a"],"address_args":[],"xid":372671943}" received from server process with PID 51884.  
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_type, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1247","objid":"1596752","objsubid":0,"original":false,"normal":false,"is_temporary":false,"object_type":"type","schema_name":"public","object_name":"_a","object_identity":"public.a[]","address_names":["public.a[]"],"address_args":[],"xid":372671943}" received from server process with PID 51884.  
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_class, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1259","objid":"1596754","objsubid":0,"original":false,"normal":false,"is_temporary":false,"object_type":"toast table","schema_name":"pg_toast","object_name":"pg_toast_1596751","object_identity":"pg_toast.pg_toast_1596751","address_names":["pg_toast","pg_toast_1596751"],"address_args":[],"xid":372671943}" received from server process with PID 51884.  
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_class, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1259","objid":"1596756","objsubid":0,"original":false,"normal":false,"is_temporary":false,"object_type":"index","schema_name":"pg_toast","object_name":"pg_toast_1596751_index","object_identity":"pg_toast.pg_toast_1596751_index","address_names":["pg_toast","pg_toast_1596751_index"],"address_args":[],"xid":372671943}" received from server process with PID 51884.  
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_type, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1247","objid":"1596755","objsubid":0,"original":false,"normal":false,"is_temporary":false,"object_type":"type","schema_name":"pg_toast","object_name":"pg_toast_1596751","object_identity":"pg_toast.pg_toast_1596751","address_names":["pg_toast.pg_toast_1596751"],"address_args":[],"xid":372671943}" received from server process with PID 51884.  

PG 9.4的例子

create schema pgaudit;  
grant USAGE on schema pgaudit to public;  
  
create extension hstore SCHEMA pgaudit;    
  
create table pgaudit.audit_ddl_command_end (   
  event text,    
  tag text,    
  username name default current_user,    
  datname name default current_database(),    
  client_addr inet default inet_client_addr(),    
  client_port int default inet_client_port(),    
  crt_time timestamp default now(),    
  ctx pgaudit.hstore,   
  xid bigint default txid_current()   
);     
  
create table pgaudit.audit_sql_drop (    
  event text,    
  tag text,    
  username name default current_user,    
  datname name default current_database(),    
  client_addr inet default inet_client_addr(),    
  client_port int default inet_client_port(),    
  crt_time timestamp default now(),    
  classid oid,    
  objid oid,    
  objsubid int,    
  object_type text,    
  schema_name text,    
  object_name text,    
  object_identity text,   
  xid bigint default txid_current()   
);    
  
grant select,update,delete,insert,truncate on pgaudit.audit_ddl_command_end to public;  
grant select,update,delete,insert,truncate on pgaudit.audit_sql_drop to public;  
  
    
create or replace function pgaudit.ef_ddl_command_end() returns event_trigger as $$    
declare    
  rec pgaudit.hstore;    
begin    
  select pgaudit.hstore(pg_stat_activity.*) into rec from pg_stat_activity where pid=pg_backend_pid();    
  insert into pgaudit.audit_ddl_command_end (event, tag, ctx) values (TG_EVENT, TG_TAG, rec);    
end;    
$$ language plpgsql strict;    
  
  
  
create or replace function pgaudit.ef_sql_drop() returns event_trigger as $$    
declare    
begin    
  insert into pgaudit.audit_sql_drop (event, tag, classid, objid, objsubid, object_type, schema_name, object_name, object_identity)    
    select TG_EVENT, TG_TAG, classid, objid, objsubid, object_type, schema_name, object_name, object_identity from     
      pg_event_trigger_dropped_objects();    
   -- exception when others then    
   --   return;    
end;    
$$ language plpgsql strict;    
  
  
  
create event trigger ef_ddl_command_end on ddl_command_end execute procedure pgaudit.ef_ddl_command_end();    
create event trigger ef_sql_drop on sql_drop execute procedure pgaudit.ef_sql_drop();    

小结

1、本文以PG 10为例,介绍了通过事件触发器,审计DDL的功能。(其他版本可能需要略微修改。)

2、事件触发器的其他应用,例如限制用户执行某些DDL等。

《PostgreSQL Oracle 兼容性之 - 事件触发器实现类似Oracle的回收站功能》

《PostgreSQL 事件触发器 - DDL审计 , DDL逻辑复制 , 打造DDL统一管理入》

《PostgreSQL 事件触发器 - PostgreSQL 9.3 Event Trigger》

参考

https://www.postgresql.org/docs/9.6/static/functions-event-triggers.html

https://www.postgresql.org/docs/devel/static/event-triggers.html

https://www.postgresql.org/docs/devel/static/functions-event-triggers.html

https://www.postgresql.org/docs/devel/static/plpgsql-trigger.html#PLPGSQL-EVENT-TRIGGER

Flag Counter

digoal’s 大量PostgreSQL文章入口