PostgreSQL 审计成功事务 - PostgreSQL Fine-Grained Table,Column,Row Level Audit

5 minute read

背景

通过配置用户级或数据库级的参数可以实现用户以及数据库级别的审计, 但是这样的粒度可能还是太粗糙了.

如果需要更细致的审计, 例如针对某些表的操作审计, 某些用户对某些表的审计, 或者仅仅当某个列的值发生变化时才被审计(记录到LOG或表里面, 本文的例子是将审计信息输出到LOG, 使用raise).

这样的需求可以通过触发器来实现.

接下来以PostgreSQL 9.2为例进行讲解.

基础的参数配置

log_destination = 'csvlog'  
logging_collector = on  
log_directory = 'pg_log'  
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'  
log_file_mode = 0600  
log_truncate_on_rotation = on  
log_rotation_age = 1d  
log_rotation_size = 10MB  
log_connections = on  
log_error_verbosity = verbose  
log_timezone = 'PRC'  
log_statement = 'none'  
log_min_duration_statement = -1  

创建测试表 :

digoal=> create table user_account_kb(id int, info text, balance numeric, crt_time timestamp, mod_time timestamp);  
CREATE TABLE  

插入测试数据 :

digoal=> insert into user_account_kb select generate_series(1,10),'test',trunc(100*random()),now(),null;  
INSERT 0 10  
digoal=> select * from user_account_kb ;  
 id | info | balance |          crt_time          | mod_time   
----+------+---------+----------------------------+----------  
  1 | test |      66 | 2013-03-20 10:08:15.969523 |   
  2 | test |      50 | 2013-03-20 10:08:15.969523 |   
  3 | test |      95 | 2013-03-20 10:08:15.969523 |   
  4 | test |      90 | 2013-03-20 10:08:15.969523 |   
  5 | test |      50 | 2013-03-20 10:08:15.969523 |   
  6 | test |      12 | 2013-03-20 10:08:15.969523 |   
  7 | test |      39 | 2013-03-20 10:08:15.969523 |   
  8 | test |      42 | 2013-03-20 10:08:15.969523 |   
  9 | test |       6 | 2013-03-20 10:08:15.969523 |   
 10 | test |      11 | 2013-03-20 10:08:15.969523 |   
(10 rows)  

审计场景1

1. 审计某个表的insert, update, delete, truncate语句.

使用after for each statement触发器.

创建触发器函数

digoal=> create or replace function trace_statement() returns trigger as $$  
declare  
  v_user name;  
  v_db name;  
  v_query text;  
begin  
  select current_user, current_database(), current_query() into v_user, v_db, v_query;  
  raise warning 'user:%, db:%, query:%', v_user, v_db, v_query;  
  return null;  
end;  
$$ language plpgsql;  

创建触发器

digoal=> create trigger tg1 after insert or update or delete or truncate on user_account_kb for each statement execute procedure trace_statement();  
CREATE TRIGGER  

测试插入

digoal=> insert into user_account_kb values(11,'test',100,now(),null);  
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb values(11,'test',100,now(),null);  
INSERT 0 1  
digoal=> select * from user_account_kb where id=11;  
 id | info | balance |          crt_time          | mod_time   
----+------+---------+----------------------------+----------  
 11 | test |     100 | 2013-03-20 10:18:02.495836 |   
(1 row)  

测试更新

digoal=> update user_account_kb set info='new' where id=11;  
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;  
UPDATE 1  
digoal=> select * from user_account_kb where id=11;  
 id | info | balance |          crt_time          | mod_time   
----+------+---------+----------------------------+----------  
 11 | new  |     100 | 2013-03-20 10:18:02.495836 |   
(1 row)  

测试删除

digoal=> delete from user_account_kb where id=11;  
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id=11;  
DELETE 1  

测试truncate

digoal=> begin;  
BEGIN  
digoal=> truncate user_account_kb ;  
WARNING:  user:digoal, db:digoal, query:truncate user_account_kb ;  
TRUNCATE TABLE  
digoal=> rollback;  
ROLLBACK  

注意回滚的操作不会被记录. 即使log_statement = ‘ddl’, 所以rollback没有被记录下来.

这是个弊端. 需要注意. 希望未来的PostgreSQL版本加以改进. 现在的解决办法是修正触发器的触发点, 小结部分会提到.

以上操作的日志输出如下.

2013-03-20 10:18:02.496 CST,"digoal","digoal",4521,"[local]",51491867.11a9,9,"INSERT",2013-03-20 10:01:11 CST,1/229,3355,WARNING,01000,"user:digoal, db:digoal, query:insert into user_account_kb values(11,'test',100,now(),null);",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"  
2013-03-20 10:19:42.980 CST,"digoal","digoal",4521,"[local]",51491867.11a9,10,"UPDATE",2013-03-20 10:01:11 CST,1/233,3356,WARNING,01000,"user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"  
2013-03-20 10:19:53.612 CST,"digoal","digoal",4521,"[local]",51491867.11a9,11,"DELETE",2013-03-20 10:01:11 CST,1/236,3357,WARNING,01000,"user:digoal, db:digoal, query:delete from user_account_kb where id=11;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"  
2013-03-20 10:20:18.361 CST,"digoal","digoal",4521,"[local]",51491867.11a9,12,"TRUNCATE TABLE",2013-03-20 10:01:11 CST,1/237,3358,WARNING,01000,"user:digoal, db:digoal, query:truncate user_account_kb ;",,,,,,,,"exec_stmt_raise, pl_exec.c:2840","psql"  

审计场景2

2. 按用户审计某个表的insert, update, delete, truncate语句.

使用after for each statement when (current_user=’‘)触发器.

删除前面用到的触发器

digoal=> drop trigger tg1 on user_account_kb;  
DROP TRIGGER  

创建触发器, 这次带上when条件

digoal=> create trigger tg1 after insert or update or delete or truncate on user_account_kb for each statement when (current_user='digoal') execute procedure trace_statement();  
CREATE TRIGGER  

测试digoal用户的操作

digoal=> update user_account_kb set info='new' where id=11;  
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id=11;  
UPDATE 0  

测试其他用户的操作, 不被审计

digoal=> \c digoal postgres  
You are now connected to database "digoal" as user "postgres".  
digoal=# update digoal.user_account_kb set info='new' where id=11;  
UPDATE 0  

审计场景3

3. 按条件审计某个表的insert, update, delete语句.

使用after for each row when (new.balance <> old.balance)触发器.

删除前面用到的触发器

digoal=> drop trigger tg1 on user_account_kb;  
DROP TRIGGER  

新建触发器函数

digoal=> create or replace function trace_row() returns trigger as $$                                                              
declare  
  v_user name;  
  v_db name;  
  v_query text;  
begin  
select current_user, current_database(), current_query() into v_user, v_db, v_query;  
case TG_OP  
  when 'UPDATE' then  
    raise warning 'user:%, db:%, query:%, newdata:%, olddata:%', v_user, v_db, v_query, NEW, OLD;  
  when 'INSERT' then  
    raise warning 'user:%, db:%, query:%, newdata:%', v_user, v_db, v_query, NEW;  
  when 'DELETE' then  
    raise warning 'user:%, db:%, query:%, olddata:%', v_user, v_db, v_query, OLD;  
  else  
    null;  
end case;  
return null;  
end;  
$$ language plpgsql;  
CREATE FUNCTION  

新建触发器

digoal=> create trigger tg1 after insert or update or delete on user_account_kb for each row execute procedure trace_row();  
CREATE TRIGGER  

测试插入

digoal=> insert into user_account_kb select * from user_account_kb limit 3;  
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(1,test,66,"2013-03-20 10:08:15.969523",)  
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(2,test,50,"2013-03-20 10:08:15.969523",)  
WARNING:  user:digoal, db:digoal, query:insert into user_account_kb select * from user_account_kb limit 3;, newdata:(3,test,95,"2013-03-20 10:08:15.969523",)  
INSERT 0 3  

测试更新

digoal=> update user_account_kb set info='new' where id<3;  
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(1,new,66,"2013-03-20 10:08:15.969523",), olddata:(1,new,66,"2013-03-20 10:08:15.969523",)  
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(2,new,50,"2013-03-20 10:08:15.969523",), olddata:(2,new,50,"2013-03-20 10:08:15.969523",)  
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(1,new,66,"2013-03-20 10:08:15.969523",), olddata:(1,new,66,"2013-03-20 10:08:15.969523",)  
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new' where id<3;, newdata:(2,new,50,"2013-03-20 10:08:15.969523",), olddata:(2,new,50,"2013-03-20 10:08:15.969523",)  
UPDATE 4  

测试删除

digoal=> delete from user_account_kb where id<3;  
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(1,new,66,"2013-03-20 10:08:15.969523",)  
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(2,new,50,"2013-03-20 10:08:15.969523",)  
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(1,new,66,"2013-03-20 10:08:15.969523",)  
WARNING:  user:digoal, db:digoal, query:delete from user_account_kb where id<3;, olddata:(2,new,50,"2013-03-20 10:08:15.969523",)  
DELETE 4  

审计场景4

基于列的判断审计

删除前面用到的触发器

digoal=> drop trigger tg1 on user_account_kb;  
DROP TRIGGER  

创建触发器, 这里用到when条件, 只有当balance变化时才审计

digoal=> create trigger tg1 after update on user_account_kb for each row when (new.balance<>old.balance) execute procedure trace_row();  
CREATE TRIGGER  

测试

digoal=> update user_account_kb set info='new' where id=4;  
UPDATE 1  
digoal=> update user_account_kb set info='new',balance=balance where id=4;  
UPDATE 1  

balance变化才审计

digoal=> update user_account_kb set info='new',balance=balance-1 where id=4;  
WARNING:  user:digoal, db:digoal, query:update user_account_kb set info='new',balance=balance-1 where id=4;, newdata:(4,new,89,"2013-03-20 10:08:15.969523",), olddata:(4,new,90,"2013-03-20 10:08:15.969523",)  
UPDATE 1  

小结

1. 前面提到ROLLBACK等事务相关的SQL不会被审计到, 所以当SQL执行失败时, LOG已经记录了, 但是没有记录回滚的动作, 所以信息是不完整的, 除非从XLOG/CLOG中取出对应的XID是提交还是回滚.

为了使记录在LOG中的语句一定是提交的, 那么需要调整一下触发器的创建方法, 使得回滚的事务中所有的SQL都不被审计.

如下,

触发器只有在提交时才会触发, 回滚不触发. (使用constraint来创建触发器)

digoal=> create constraint trigger tg1 after update on user_account_kb DEFERRABLE INITIALLY deferred for each row when (new.balance<>old.balance) execute procedure trace_row();  
CREATE TRIGGER  
digoal=> begin;  
BEGIN  
digoal=> update user_account_kb set balance=balance+1 where id=1;  
UPDATE 0  
digoal=> update user_account_kb set balance=balance+1 where id=4;  
UPDATE 1  
digoal=> end;  
WARNING:  user:digoal, db:digoal, query:end;, newdata:(4,new,90,"2013-03-20 10:08:15.969523",), olddata:(4,new,89,"2013-03-20 10:08:15.969523",)  
COMMIT  
digoal=> begin;  
BEGIN  
digoal=> update user_account_kb set balance=balance+1 where id=4;  
UPDATE 1  
digoal=> rollback;  
ROLLBACK  

注意以上方法只有after … for each row才能被用到.

When the CONSTRAINT option is specified, this command creates a constraint trigger. This is the same as a regular trigger except that the timing of the trigger firing can be adjusted using SET CONSTRAINTS.   
  
Constraint triggers must be AFTER ROW triggers. They can be fired either at the end of the statement causing the triggering event, or at the end of the containing transaction; in the latter case they are said to be deferred.   
  
A pending deferred-trigger firing can also be forced to happen immediately by using SET CONSTRAINTS.   
  
Constraint triggers are expected to raise an exception when the constraints they implement are violated.  

参考

1. http://blog.163.com/digoal@126/blog/static/16387704020132208241607/

2. http://blog.163.com/digoal@126/blog/static/1638770402013283547959/

3. http://blog.163.com/digoal@126/blog/static/1638770402013211102130526/

4. http://blog.163.com/digoal@126/blog/static/163877040201252575529358/

5. http://blog.163.com/digoal@126/blog/static/16387704020132131361949/

6. http://www.postgresql.org/docs/9.2/static/sql-createtrigger.html

Flag Counter

digoal’s 大量PostgreSQL文章入口