PostgreSQL 事件触发器 - DDL审计 , DDL逻辑复制 , 打造DDL统一管理入

3 minute read

背景

DDL是非常重的操作,(锁大,或者会导致TABLE REWRITE导致消耗大量资源,影响大,例如DROP,TRUNCATE)也是数据库的使用过程中需要关注的。

通常企业会对DDL做一定的限制,不允许某些用户执行,或者不允许从网络登录的用户执行DDL。

同时DDL还有被审计的需求,所有的DDL可能都要记录到日志中。

构建逻辑复制的备库时,由于DDL不记录在REDO中,所以如果要复制DDL,通常的做法是使用事件触发器,将DDL记录到表里面,然后使用这张表产生的REDO解析出DDL语句。

(当然,随着PostgreSQL支持自定义WAL record,直接的DDL的复制也逐渐成为可能了)

最后就是DBA也有一种需求,比如收集DDL,统一审计和执行。用户在执行DDL时,并不会真的执行,而是记录在

事件触发器在这种需求中担当了很好的角色。

例子

如果你要记录用户表被DDL修改定义, 设置默认值, 等等alter table可以完成的工作时的记录, 可以使用事件触发器来达到这个目的.

例子如下 :

postgres=# create extension hstore;  
  
postgres=# create or replace function ef_alter() returns event_trigger as $$  
declare  
  rec hstore;  
begin  
  select hstore(pg_stat_activity.*) into rec from pg_stat_activity where pid=pg_backend_pid();  -- 记录pg_stat_activity的内容  
  insert into aud_alter (ctx) values (rec);  
end;  
$$ language plpgsql strict;  
CREATE FUNCTION  
  
postgres=# create event trigger e_alter on ddl_command_end when tag in ('ALTER TABLE') execute procedure ef_alter();  
CREATE EVENT TRIGGER  
  
postgres=# create table aud_alter(id serial primary key, crt_time timestamp default now(), ctx hstore);  
CREATE TABLE  
  
postgres=# create table test(id int);  
CREATE TABLE  
  
postgres=# alter table test alter column id type int8;  
ALTER TABLE  
  
postgres=# select * from aud_alter;  
 id |          crt_time          |                                                                                                    
                                                                                                                                      
       ctx                                                                                                                            
                                                                                                                      
----+----------------------------+------------   
  1 | 2014-12-12 05:43:42.840327 | "pid"=>"48406", "datid"=>"12949", "query"=>"alter table test alter column id type int8;", "state"  
=>"active", "datname"=>"postgres", "usename"=>"postgres", "waiting"=>"f", "usesysid"=>"10", "xact_start"=>"2014-12-12 05:43:42.84032  
7+08", "client_addr"=>NULL, "client_port"=>"-1", "query_start"=>"2014-12-12 05:43:42.840327+08", "state_change"=>"2014-12-12 05:43:4  
2.840331+08", "backend_start"=>"2014-12-12 05:38:37.084733+08", "client_hostname"=>NULL, "application_name"=>"psql"  
(1 row)  
  
  
postgres=# select each(ctx) from aud_alter where id=1;  
                         each                            
-------------------------------------------------------  
 (pid,48406)  
 (datid,12949)  
 (query,"alter table test alter column id type int8;")  
 (state,active)  
 (datname,postgres)  
 (usename,postgres)  
 (waiting,f)  
 (usesysid,10)  
 (xact_start,"2014-12-12 05:43:42.840327+08")  
 (client_addr,)  
 (client_port,-1)  
 (query_start,"2014-12-12 05:43:42.840327+08")  
 (state_change,"2014-12-12 05:43:42.840331+08")  
 (backend_start,"2014-12-12 05:38:37.084733+08")  
 (client_hostname,)  
 (application_name,psql)  
(16 rows)  

query即当时的ALTER TABLE SQL.

其他辅助的还有用户当时的IP, PORT, 用户, 链接的数据库等信息.

如果你不想让它真正的被执行,只想记录下来.

create or replace function ef_alter() returns event_trigger as $$  
declare  
  rec hstore;  
  v1 int8;  
  v2 oid;  
  v3 text;  
  v4 text;  
  v5 text;  
  v6 text;  
  v7 text;  
  v8 text;  
  v9 text;  
  v10 text;  
  v11 text;  
  v12 text;  
  v13 text;  
  r record;  
begin  
  GET DIAGNOSTICS v1 = ROW_COUNT,  
                  v2 = RESULT_OID,  
                  v3 = PG_CONTEXT;  
  RAISE NOTICE 'ROW_COUNT:%, RESULT_OID:%, PG_CONTEXT:%', v1,v2,v3;  
  
if TG_EVENT='ddl_command_end' then  
    FOR r IN SELECT * FROM pg_event_trigger_ddl_commands() LOOP  
        RAISE NOTICE 'classid:%, objid:%, objsubid:%, command_tag:%, object_type:%, schema_name:%, object_identity:%, in_extension:%, command:%',   
	  r.classid, r.objid, r.objsubid, r.command_tag, r.object_type, r.schema_name, r.object_identity, r.in_extension, r.command;  
    END LOOP;  
end if;  
  
if TG_EVENT='sql_drop' then  
    FOR r IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP  
        RAISE NOTICE 'classid:%, objid:%, objsubid:%, original:%, normal:%, is_temporary:%, object_type:%, schema_name:%, object_name:%, object_identity:%, address_names:%, address_args:%',   
	  r.classid, r.objid, r.objsubid, r.original, r.normal, r.is_temporary, r.object_type, r.schema_name, r.object_name, r.object_identity, r.address_names, r.address_args;  
    END LOOP;  
end if;  
  
if TG_EVENT='table_rewrite' then  
    RAISE NOTICE 'rewriting table % for reason %',  
                pg_event_trigger_table_rewrite_oid()::regclass,  
                pg_event_trigger_table_rewrite_reason();  
end if;  
  
  raise exception 'error';  
  
  exception when others then  
    select hstore(pg_stat_activity.*) into rec from pg_stat_activity where pid=pg_backend_pid();  -- 记录pg_stat_activity的内容  
    -- insert into aud_alter (ctx) values (rec);  -- 这条并不能提交,如果要提交,可以使用dblink代替.
    -- 如果要记录,使用dblink即可. 
    raise notice '%', rec;  
      
    GET STACKED DIAGNOSTICS v4 = RETURNED_SQLSTATE,  
                            v5 = COLUMN_NAME,  
                            v6 = CONSTRAINT_NAME,  
                            v7 = PG_DATATYPE_NAME,  
                            v8 = MESSAGE_TEXT,  
                            v9 = TABLE_NAME,  
                            v10 = SCHEMA_NAME,  
                            v11 = PG_EXCEPTION_DETAIL,  
                            v12 = PG_EXCEPTION_HINT,  
                            v13 = PG_EXCEPTION_CONTEXT;  
  
    RAISE NOTICE 'RETURNED_SQLSTATE:%, COLUMN_NAME:%, CONSTRAINT_NAME:%, PG_DATATYPE_NAME:%, MESSAGE_TEXT:%, TABLE_NAME:%, SCHEMA_NAME:%, PG_EXCEPTION_DETAIL:%, PG_EXCEPTION_HINT:%, PG_EXCEPTION_CONTEXT:%',   
      v4,v5,v6,v7,v8::text,v9,v10,v11,v12,v13;  
    raise exception 'error';  -- 回滚
end;  
$$ language plpgsql strict;  
  
create event trigger e1 on ddl_command_end  execute procedure ef_alter();  
  
create event trigger e2 on sql_drop  execute procedure ef_alter();  
  
create event trigger e3 on table_rewrite  execute procedure ef_alter();  
  
  
postgres=# drop table image;
NOTICE:  ROW_COUNT:0, RESULT_OID:0, PG_CONTEXT:PL/pgSQL function ef_alter() line 19 at GET DIAGNOSTICS
NOTICE:  classid:1259, objid:253698, objsubid:0, original:t, normal:f, is_temporary:f, object_type:table, schema_name:public, object_name:image, object_identity:public.image, address_names:{public,image}, address_args:{}
NOTICE:  classid:1247, objid:253700, objsubid:0, original:f, normal:f, is_temporary:f, object_type:type, schema_name:public, object_name:image, object_identity:public.image, address_names:{public.image}, address_args:{}
NOTICE:  classid:1247, objid:253699, objsubid:0, original:f, normal:f, is_temporary:f, object_type:type, schema_name:public, object_name:_image, object_identity:public.image[], address_names:{public.image[]}, address_args:{}
NOTICE:  classid:1259, objid:253701, objsubid:0, original:f, normal:f, is_temporary:f, object_type:toast table, schema_name:pg_toast, object_name:pg_toast_253698, object_identity:pg_toast.pg_toast_253698, address_names:{pg_toast,pg_toast_253698}, address_args:{}
NOTICE:  classid:1259, objid:253703, objsubid:0, original:f, normal:f, is_temporary:f, object_type:index, schema_name:pg_toast, object_name:pg_toast_253698_index, object_identity:pg_toast.pg_toast_253698_index, address_names:{pg_toast,pg_toast_253698_index}, address_args:{}
NOTICE:  classid:1247, objid:253702, objsubid:0, original:f, normal:f, is_temporary:f, object_type:type, schema_name:pg_toast, object_name:pg_toast_253698, object_identity:pg_toast.pg_toast_253698, address_names:{pg_toast.pg_toast_253698}, address_args:{}
NOTICE:  "pid"=>"93404", "datid"=>"13269", "query"=>"drop table image;", "state"=>"active", "datname"=>"postgres", "usename"=>"postgres", "usesysid"=>"10", "wait_event"=>NULL, "xact_start"=>"2016-12-07 16:16:56.795366+08", "backend_xid"=>"436698321", "client_addr"=>"127.0.0.1", "client_port"=>"23404", "query_start"=>"2016-12-07 16:16:56.795366+08", "backend_xmin"=>"436698321", "state_change"=>"2016-12-07 16:16:56.79537+08", "backend_start"=>"2016-12-07 16:06:23.205288+08", "client_hostname"=>NULL, "wait_event_type"=>NULL, "application_name"=>"psql"
NOTICE:  RETURNED_SQLSTATE:P0001, COLUMN_NAME:, CONSTRAINT_NAME:, PG_DATATYPE_NAME:, MESSAGE_TEXT:error, TABLE_NAME:, SCHEMA_NAME:, PG_EXCEPTION_DETAIL:, PG_EXCEPTION_HINT:, PG_EXCEPTION_CONTEXT:PL/pgSQL function ef_alter() line 44 at RAISE
ERROR:  error
CONTEXT:  PL/pgSQL function ef_alter() line 64 at RAISE

postgres=# \d image  -- 未删除, 但是从raise中可以获取query.  
   Table "public.image"
 Column | Type | Modifiers 
--------+------+-----------
 name   | text | 
 raster | oid  |  

如果只想跟踪表的字段被修改的前后类型, 更严格的做法应该是从parse tree中取出被修改的字段, 类型.

参考

http://www.postgresql.org/docs/9.3/static/event-trigger-interface.html

typedef struct EventTriggerData  
{  
    NodeTag     type;  
    const char *event;      /* event name */  
    Node       *parsetree;  /* parse tree */  
    const char *tag;        /* command tag */  
} EventTriggerData;  

参考

1. http://www.postgresql.org/docs/9.3/static/event-triggers.html

2. http://www.postgresql.org/docs/9.3/static/sql-createeventtrigger.html

3. src/backend/commands/event_trigger.c

4. src/include/commands/event_trigger.h

5. http://blog.163.com/digoal@126/blog/static/163877040201252575529358/

6. http://stackoverflow.com/questions/23488228/how-to-get-sql-text-from-postgres-event-trigger

7. https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html#PLPGSQL-ERROR-TRAPPING

8. https://www.postgresql.org/docs/9.6/static/plpgsql-statements.html#PLPGSQL-STATEMENTS-DIAGNOSTICS

Flag Counter

digoal’s 大量PostgreSQL文章入口