PostgreSQL 事件触发器应用 - DDL审计记录 + 异步通知(notify)
背景
DDL语句的审计是非常重要的,目前PG的DDL审计记录在日志文件中。不便于查看。
为了让DDL事件记录到表中,方便查看,我们可以通过事件触发器来达到这个效果。
结合PostgreSQL的notify, listen的消息通道功能,在DDL执行后可以立即通知到监听对应CHANNEL的程序。
事件触发器审计DDL操作
事件触发器语法:
Command: CREATE EVENT TRIGGER
Description: define a new event trigger
Syntax:
CREATE EVENT TRIGGER name
ON event
[ WHEN filter_variable IN (filter_value [, ... ]) [ AND ... ] ]
EXECUTE PROCEDURE function_name()
1、事件触发器的触发点(event)
目前支持4个触发点(event)
ddl_command_start, ddl_command_end, table_rewrite and sql_drop
这四个触发点,有3个能捕获到事件发生时的信息。
1.1 ddl_command_end
通过这个函数进行捕获:pg_event_trigger_ddl_commands()
Name | Type | Description |
---|---|---|
classid | Oid | OID of catalog the object belongs in |
objid | Oid | OID of the object in the catalog |
objsubid | integer | Object sub-id (e.g. attribute number for columns) |
command_tag | text | command tag |
object_type | text | Type of the object |
schema_name | text | Name of the schema the object belongs in, if any; otherwise NULL. No quoting is applied. |
object_identity | text | Text rendering of the object identity, schema-qualified. Each and every identifier present in the identity is quoted if necessary. |
in_extension | bool | whether the command is part of an extension script |
command | pg_ddl_command | A complete representation of the command, in internal format. This cannot be output directly, but it can be passed to other functions to obtain different pieces of information about the command. |
1.2 sql_drop
通过这个函数进行捕获:pg_event_trigger_dropped_objects()
Name | Type | Description |
---|---|---|
classid | Oid | OID of catalog the object belonged in |
objid | Oid | OID the object had within the catalog |
objsubid | int32 | Object sub-id (e.g. attribute number for columns) |
original | bool | Flag used to identify the root object(s) of the deletion |
normal | bool | Flag indicating that there’s a normal dependency relationship in the dependency graph leading to this object |
is_temporary | bool | Flag indicating that the object was a temporary object. |
object_type | text | Type of the object |
schema_name | text | Name of the schema the object belonged in, if any; otherwise NULL. No quoting is applied. |
object_name | text | Name of the object, if the combination of schema and name can be used as a unique identifier for the object; otherwise NULL. No quoting is applied, and name is never schema-qualified. |
object_identity | text | Text rendering of the object identity, schema-qualified. Each and every identifier present in the identity is quoted if necessary. |
address_names | text[] | An array that, together with object_type and address_args, can be used by the pg_get_object_address() to recreate the object address in a remote server containing an identically named object of the same kind. |
address_args | text[] | Complement for address_names above. |
1.3 table_rewrite
通过如下函数进行捕获:
Name | Return Type | Description |
---|---|---|
pg_event_trigger_table_rewrite_oid() | Oid | The OID of the table about to be rewritten. |
pg_event_trigger_table_rewrite_reason() | int | The reason code(s) explaining the reason for rewriting. The exact meaning of the codes is release dependent. |
2、创建三个触发点的捕获信息存储表
create schema pgaudit;
grant USAGE on schema pgaudit to public;
create table pgaudit.audit_ddl_command_end (
event text,
tag text,
username name default current_user,
datname name default current_database(),
client_addr inet default inet_client_addr(),
client_port int default inet_client_port(),
crt_time timestamp default now(),
classid oid,
objid oid,
objsubid int,
command_tag text,
object_type text,
schema_name text,
object_identity text,
is_extension bool,
xid bigint default txid_current()
);
create table pgaudit.audit_sql_drop (
event text,
tag text,
username name default current_user,
datname name default current_database(),
client_addr inet default inet_client_addr(),
client_port int default inet_client_port(),
crt_time timestamp default now(),
classid oid,
objid oid,
objsubid int,
original bool,
normal bool,
is_temporary bool,
object_type text,
schema_name text,
object_name text,
object_identity text,
address_names text[],
address_args text[],
xid bigint default txid_current()
);
create table pgaudit.audit_table_rewrite (
event text,
tag text,
username name default current_user,
datname name default current_database(),
client_addr inet default inet_client_addr(),
client_port int default inet_client_port(),
crt_time timestamp default now(),
table_rewrite_oid oid,
table_rewrite_reason int,
xid bigint default txid_current()
);
grant select,update,delete,insert,truncate on pgaudit.audit_ddl_command_end to public;
grant select,update,delete,insert,truncate on pgaudit.audit_sql_drop to public;
grant select,update,delete,insert,truncate on pgaudit.audit_table_rewrite to public;
3、创建三个触发点的事件触发器函数
create or replace function pgaudit.et_ddl_command_end() returns event_trigger as $$
declare
begin
insert into pgaudit.audit_ddl_command_end (event, tag, classid, objid, objsubid, command_tag, object_type, schema_name, object_identity, is_extension )
select TG_EVENT, TG_TAG,
classid, objid, objsubid, command_tag, object_type, schema_name, object_identity, in_extension from
pg_event_trigger_ddl_commands();
exception when others then -- ERROR: 22004: null values cannot be formatted as an SQL identifier
return;
end;
$$ language plpgsql strict;
create or replace function pgaudit.et_sql_drop() returns event_trigger as $$
declare
begin
insert into pgaudit.audit_sql_drop (event, tag, classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args)
select TG_EVENT, TG_TAG,
classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args from
pg_event_trigger_dropped_objects();
exception when others then -- ERROR: 22004: null values cannot be formatted as an SQL identifier
return;
end;
$$ language plpgsql strict;
create or replace function pgaudit.et_table_rewrite() returns event_trigger as $$
declare
begin
insert into pgaudit.audit_table_rewrite (event, tag, table_rewrite_oid, table_rewrite_reason)
select TG_EVENT, TG_TAG,
pg_event_trigger_table_rewrite_oid(),
pg_event_trigger_table_rewrite_reason();
exception when others then -- ERROR: 22004: null values cannot be formatted as an SQL identifier
return;
end;
$$ language plpgsql strict;
4、创建三个触发点的事件触发器
CREATE EVENT TRIGGER et_ddl_command_end on ddl_command_end EXECUTE PROCEDURE pgaudit.et_ddl_command_end();
CREATE EVENT TRIGGER et_sql_drop on sql_drop EXECUTE PROCEDURE pgaudit.et_sql_drop();
CREATE EVENT TRIGGER et_table_rewrite on table_rewrite EXECUTE PROCEDURE pgaudit.et_table_rewrite();
5、创建普通触发器,当写入审计表时,将结果推送到通道,或raise出来。
5.1、 ddl_command_end
create or replace function pgaudit.tg1() returns trigger as $$
declare
v_class_nsp name;
v_class_name name;
v_obj json;
begin
select t2.nspname,t1.relname into v_class_nsp,v_class_name from pg_class t1,pg_namespace t2 where t1.oid=NEW.classid and t1.relnamespace=t2.oid;
execute format('select row_to_json(t) from %I.%I t where oid=%s', v_class_nsp, v_class_name, NEW.objid) into v_obj;
-- raise notice 'CLASS_NSP:%, CLASS_NAME:%, OBJ:%, CONTENT:%', v_class_nsp, v_class_name, v_obj, row_to_json(NEW);
perform pg_notify('ddl_event', format('CLASS_NSP:%s, CLASS_NAME:%s, OBJ:%s, CONTENT:%s', v_class_nsp, v_class_name, v_obj, row_to_json(NEW)));
return null;
end;
$$ language plpgsql strict;
create trigger tg1 after insert on pgaudit.audit_ddl_command_end for each row execute procedure pgaudit.tg1();
5.2、 sql_drop
create or replace function pgaudit.tg2() returns trigger as $$
declare
v_class_nsp name;
v_class_name name;
v_obj json;
begin
select t2.nspname,t1.relname into v_class_nsp,v_class_name from pg_class t1,pg_namespace t2 where t1.oid=NEW.classid and t1.relnamespace=t2.oid;
execute format('select row_to_json(t) from %I.%I t where oid=%s', v_class_nsp, v_class_name, NEW.objid) into v_obj;
-- raise notice 'CLASS_NSP:%, CLASS_NAME:%, OBJ:%, CONTENT:%', v_class_nsp, v_class_name, v_obj, row_to_json(NEW);
perform pg_notify('ddl_event', format('CLASS_NSP:%s, CLASS_NAME:%s, OBJ:%s, CONTENT:%s', v_class_nsp, v_class_name, v_obj, row_to_json(NEW)));
return null;
end;
$$ language plpgsql strict;
create trigger tg2 after insert on pgaudit.audit_sql_drop for each row execute procedure pgaudit.tg2();
5.3、 table_rewrite
create or replace function pgaudit.tg3() returns trigger as $$
declare
begin
-- raise notice 'TABLE:%, CONTENT:%', (NEW.table_rewrite_oid)::regclass, row_to_json(NEW);
perform pg_notify('ddl_event', format('TABLE:%s, CONTENT:%s', (NEW.table_rewrite_oid)::regclass, row_to_json(NEW)));
return null;
end;
$$ language plpgsql strict;
create trigger tg3 after insert on pgaudit.audit_table_rewrite for each row execute procedure pgaudit.tg3();
6、模板化
在模板库,执行第二到第四步。
\c template1 postgres
-- 在模板库,执行第二到第四步。
7、通过模板创建的数据库,会自动继承这个模板。
postgres=# create database db1 template template1;
CREATE DATABASE
8、例子
8.1、建表
postgres=# \c db1 test
You are now connected to database "db1" as user "test".
db1=> create table tbl(id int);
CREATE TABLE
8.2、写入数据
db1=> insert into tbl select generate_series(1,100);
INSERT 0 100
8.3、重写表
db1=> alter table tbl add column info text default 'abc';
ALTER TABLE
8.4、删表
db1=> drop table tbl;
DROP TABLE
9、查询审计信息
db1=> select * from pgaudit.audit_ddl_command_end ;
-[ RECORD 1 ]---+---------------------------
event | ddl_command_end
tag | CREATE TABLE
username | test
datname | db1
client_addr |
client_port |
crt_time | 2017-09-25 16:05:39.459787
classid | 1259
objid | 33212
objsubid | 0
command_tag | CREATE TABLE
object_type | table
schema_name | public
object_identity | public.tbl
is_extension | f
-[ RECORD 2 ]---+---------------------------
event | ddl_command_end
tag | ALTER TABLE
username | test
datname | db1
client_addr |
client_port |
crt_time | 2017-09-25 16:05:59.781995
classid | 1259
objid | 33212
objsubid | 0
command_tag | ALTER TABLE
object_type | table
schema_name | public
object_identity | public.tbl
is_extension | f
db1=> select * from pgaudit.audit_sql_drop ;
-[ RECORD 1 ]---+--------------------------------
event | sql_drop
tag | DROP TABLE
username | test
datname | db1
client_addr |
client_port |
crt_time | 2017-09-25 16:06:08.22198
classid | 1259
objid | 33212
objsubid | 0
original | t
normal | f
is_temporary | f
object_type | table
schema_name | public
object_name | tbl
object_identity | public.tbl
address_names | {public,tbl}
address_args | {}
-[ RECORD 2 ]---+--------------------------------
event | sql_drop
tag | DROP TABLE
username | test
datname | db1
client_addr |
client_port |
crt_time | 2017-09-25 16:06:08.22198
classid | 2604
objid | 33215
objsubid | 0
original | f
normal | f
is_temporary | f
object_type | default value
schema_name |
object_name |
object_identity | for public.tbl.info
address_names | {public,tbl,info}
address_args | {}
-[ RECORD 3 ]---+--------------------------------
event | sql_drop
tag | DROP TABLE
username | test
datname | db1
client_addr |
client_port |
crt_time | 2017-09-25 16:06:08.22198
classid | 1247
objid | 33214
objsubid | 0
original | f
normal | f
is_temporary | f
object_type | type
schema_name | public
object_name | tbl
object_identity | public.tbl
address_names | {public.tbl}
address_args | {}
-[ RECORD 4 ]---+--------------------------------
event | sql_drop
tag | DROP TABLE
username | test
datname | db1
client_addr |
client_port |
crt_time | 2017-09-25 16:06:08.22198
classid | 1247
objid | 33213
objsubid | 0
original | f
normal | f
is_temporary | f
object_type | type
schema_name | public
object_name | _tbl
object_identity | public.tbl[]
address_names | {public.tbl[]}
address_args | {}
-[ RECORD 5 ]---+--------------------------------
event | sql_drop
tag | DROP TABLE
username | test
datname | db1
client_addr |
client_port |
crt_time | 2017-09-25 16:06:08.22198
classid | 1259
objid | 33222
objsubid | 0
original | f
normal | f
is_temporary | f
object_type | toast table
schema_name | pg_toast
object_name | pg_toast_33212
object_identity | pg_toast.pg_toast_33212
address_names | {pg_toast,pg_toast_33212}
address_args | {}
-[ RECORD 6 ]---+--------------------------------
event | sql_drop
tag | DROP TABLE
username | test
datname | db1
client_addr |
client_port |
crt_time | 2017-09-25 16:06:08.22198
classid | 1259
objid | 33224
objsubid | 0
original | f
normal | f
is_temporary | f
object_type | index
schema_name | pg_toast
object_name | pg_toast_33212_index
object_identity | pg_toast.pg_toast_33212_index
address_names | {pg_toast,pg_toast_33212_index}
address_args | {}
-[ RECORD 7 ]---+--------------------------------
event | sql_drop
tag | DROP TABLE
username | test
datname | db1
client_addr |
client_port |
crt_time | 2017-09-25 16:06:08.22198
classid | 1247
objid | 33223
objsubid | 0
original | f
normal | f
is_temporary | f
object_type | type
schema_name | pg_toast
object_name | pg_toast_33212
object_identity | pg_toast.pg_toast_33212
address_names | {pg_toast.pg_toast_33212}
address_args | {}
db1=> select * from pgaudit.audit_table_rewrite ;
event | tag | username | datname | client_addr | client_port | crt_time | table_rewrite_oid | table_rewrite_reason
---------------+-------------+----------+---------+-------------+-------------+----------------------------+-------------------+----------------------
table_rewrite | ALTER TABLE | test | db1 | | | 2017-09-25 16:05:59.781995 | 33212 | 2
(1 row)
10、 使用listen监听异步消息
postgres=# listen ddl_event;
LISTEN
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_class, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1259","objid":"1596751","objsubid":0,"original":true,"normal":false,"is_temporary":false,"object_type":"table","schema_name":"public","object_name":"a","object_identity":"public.a","address_names":["public","a"],"address_args":[],"xid":372671943}" received from server process with PID 51884.
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_type, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1247","objid":"1596753","objsubid":0,"original":false,"normal":false,"is_temporary":false,"object_type":"type","schema_name":"public","object_name":"a","object_identity":"public.a","address_names":["public.a"],"address_args":[],"xid":372671943}" received from server process with PID 51884.
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_type, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1247","objid":"1596752","objsubid":0,"original":false,"normal":false,"is_temporary":false,"object_type":"type","schema_name":"public","object_name":"_a","object_identity":"public.a[]","address_names":["public.a[]"],"address_args":[],"xid":372671943}" received from server process with PID 51884.
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_class, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1259","objid":"1596754","objsubid":0,"original":false,"normal":false,"is_temporary":false,"object_type":"toast table","schema_name":"pg_toast","object_name":"pg_toast_1596751","object_identity":"pg_toast.pg_toast_1596751","address_names":["pg_toast","pg_toast_1596751"],"address_args":[],"xid":372671943}" received from server process with PID 51884.
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_class, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1259","objid":"1596756","objsubid":0,"original":false,"normal":false,"is_temporary":false,"object_type":"index","schema_name":"pg_toast","object_name":"pg_toast_1596751_index","object_identity":"pg_toast.pg_toast_1596751_index","address_names":["pg_toast","pg_toast_1596751_index"],"address_args":[],"xid":372671943}" received from server process with PID 51884.
Asynchronous notification "ddl_event" with payload "CLASS_NSP:pg_catalog, CLASS_NAME:pg_type, OBJ:, CONTENT:{"event":"sql_drop","tag":"DROP TABLE","username":"postgres","datname":"postgres","client_addr":null,"client_port":null,"crt_time":"2018-03-13T15:27:22.792129","classid":"1247","objid":"1596755","objsubid":0,"original":false,"normal":false,"is_temporary":false,"object_type":"type","schema_name":"pg_toast","object_name":"pg_toast_1596751","object_identity":"pg_toast.pg_toast_1596751","address_names":["pg_toast.pg_toast_1596751"],"address_args":[],"xid":372671943}" received from server process with PID 51884.
PG 9.4的例子
create schema pgaudit;
grant USAGE on schema pgaudit to public;
create extension hstore SCHEMA pgaudit;
create table pgaudit.audit_ddl_command_end (
event text,
tag text,
username name default current_user,
datname name default current_database(),
client_addr inet default inet_client_addr(),
client_port int default inet_client_port(),
crt_time timestamp default now(),
ctx pgaudit.hstore,
xid bigint default txid_current()
);
create table pgaudit.audit_sql_drop (
event text,
tag text,
username name default current_user,
datname name default current_database(),
client_addr inet default inet_client_addr(),
client_port int default inet_client_port(),
crt_time timestamp default now(),
classid oid,
objid oid,
objsubid int,
object_type text,
schema_name text,
object_name text,
object_identity text,
xid bigint default txid_current()
);
grant select,update,delete,insert,truncate on pgaudit.audit_ddl_command_end to public;
grant select,update,delete,insert,truncate on pgaudit.audit_sql_drop to public;
create or replace function pgaudit.ef_ddl_command_end() returns event_trigger as $$
declare
rec pgaudit.hstore;
begin
select pgaudit.hstore(pg_stat_activity.*) into rec from pg_stat_activity where pid=pg_backend_pid();
insert into pgaudit.audit_ddl_command_end (event, tag, ctx) values (TG_EVENT, TG_TAG, rec);
end;
$$ language plpgsql strict;
create or replace function pgaudit.ef_sql_drop() returns event_trigger as $$
declare
begin
insert into pgaudit.audit_sql_drop (event, tag, classid, objid, objsubid, object_type, schema_name, object_name, object_identity)
select TG_EVENT, TG_TAG, classid, objid, objsubid, object_type, schema_name, object_name, object_identity from
pg_event_trigger_dropped_objects();
-- exception when others then
-- return;
end;
$$ language plpgsql strict;
create event trigger ef_ddl_command_end on ddl_command_end execute procedure pgaudit.ef_ddl_command_end();
create event trigger ef_sql_drop on sql_drop execute procedure pgaudit.ef_sql_drop();
小结
1、本文以PG 10为例,介绍了通过事件触发器,审计DDL的功能。(其他版本可能需要略微修改。)
2、事件触发器的其他应用,例如限制用户执行某些DDL等。
《PostgreSQL Oracle 兼容性之 - 事件触发器实现类似Oracle的回收站功能》
《PostgreSQL 事件触发器 - DDL审计 , DDL逻辑复制 , 打造DDL统一管理入》
《PostgreSQL 事件触发器 - PostgreSQL 9.3 Event Trigger》
参考
https://www.postgresql.org/docs/9.6/static/functions-event-triggers.html
https://www.postgresql.org/docs/devel/static/event-triggers.html
https://www.postgresql.org/docs/devel/static/functions-event-triggers.html
https://www.postgresql.org/docs/devel/static/plpgsql-trigger.html#PLPGSQL-EVENT-TRIGGER