您现在的位置是:亿华云 > IT科技类资讯

自定义SQL实现PostgreSQL安全审计

亿华云2025-10-03 20:27:31【IT科技类资讯】0人已围观

简介数据审计是一个跟踪表内容随时间变化的系统,在现在安全合规方面数据审计是必须要的功能之一。PostgreSQL作为一个强大现代的开源关系数据库,也有一个相关插件PGAudit可以提供审计功能。关于PGA

数据审计是自定一个跟踪表内容随时间变化的系统,在现在安全合规方面数据审计是实现审计必须要的功能之一。PostgreSQL作为一个强大现代的安全开源关系数据库,也有一个相关插件PGAudit可以提供审计功能。自定

关于PGAudit插件以后有机会可以详细介绍,实现审计本文我们介绍一个简单SQL语句实现的安全数据集审计功能。

概述

最终实现效果为:

创建一个示例表:

create extension supa_audit cascade;

create table public.account(

id int primary key,自定

name text not null

);

启用审计:

select audit.enable_tracking(public.account::regclass);

增改删操作:

insert into public.account(id, name)

values (1, Chongchong);

update public.account set name = CC where id = 1;

delete from public.account where id = 1;

清空表:

truncate table public.account;

查看审计日志:

select * from audit.record_history

请注意,record_id和old_record_id在更新行时保持不变,实现审计这样就可以轻松查询单行的安全历史记录。

要关闭审计追踪,自定只需执行:

select audit.disable_tracking(public.account::regclass);

实现

首先创建一个名为audit schema为审计用:

create schema if not exists audit;

记录存储

接下来,实现审计需要一个表来跟踪插入、安全更新和删除。自定

传统上,实现审计使用audit schema并附加了一些元数据列,安全如提交的时间戳。

该解决方案存在一些维护挑战:

对表启用审计需要数据库迁移当源表的模式改变时,审计表的模式也必须改变

为此使用PostgreSQL的无模式JSONB数据类型将每条记录的数据存储在单个列中的。这种方法的另一个好处是允许将多个表的亿华云审计历史存储在一个审计表中。

create table audit.record_version(

id bigserial primary key,

record_id uuid,

old_record_id uuid,

op varchar(8) not null,

ts timestamptz not null default now(),

table_oid oid not null,

table_schema name not null,

table_name name not null,

record jsonb,

old_record jsonb

);

查询和索引

查询性能很重要,如果不能快速查询日志,则该审计日志没有多大实际意义。为了提高查询的性能,需要对最常用的查询涉及字段创建索引。

时间范围内查询

对于时间范围,需要一个索引ts。 由于审计表仅用于插入记录,其中ts列插入操作时间,其值ts自然是升序排列。PostgreSQL的内置BRIN索引可以利用值和物理位置之间的相关性来生成一个索引,该索引在规模上比默认值(BTREE索引)小数百倍,并且查找时间更快。

create index record_version_ts

on audit.record_version

using brin(ts);

对于表查询,包含了一个 table_oid跟踪PostgreSQL内部数字表标识符的列。可以为该列添加索引而不是table_schema和 able_name列,最小化索引大小并提供更好的性能。

create index record_version_table_oid

on audit.record_version

using btree(table_oid);

记录唯一标识

将每一行的数据存储为的缺点之一jsonb是基于列值的过滤变得非常低效。如果想快速查找一行的历史记录,云服务器提供商需要为每一行提取和索引一个唯一标识符。

对于全局唯一标识符,使用以下结构:

[table_oid, primary_key_value_1, primary_key_value_2, ...]

并将该数组散列为UUID v5以获得有效的可索引UUID类型,以识别对数据更改具有鲁棒性的行。

使用一个实用函数来查找记录的主键列名:

create or replace function audit.primary_key_columns(entity_oid oid)

returns text[]

stable

security definer

language sql

as $$

-- Looks up the names of a tables primary key columns

select

coalesce(

array_agg(pa.attname::text order by pa.attnum),

array[]::text[]

) column_names

from

pg_index pi

join pg_attribute pa

on pi.indrelid = pa.attrelid

and pa.attnum = any(pi.indkey)

where

indrelid = $1

and indisprimary

$$;

另一个为table_oid和主键,将结果转换为记录的UUID。

create or replace function audit.to_record_id(

entity_oid oid,

pkey_cols text[],

rec jsonb

)

returns uuid

stable

language sql

as $$

select

case

when rec is null then null

-- if no primary key exists, use a random uuid

when pkey_cols = array[]::text[] then uuid_generate_v4()

else (

select

uuid_generate_v5(

fd62bc3d-8d6e-43c2-919c-802ba3762271,

(

jsonb_build_array(to_jsonb($1))

|| jsonb_agg($3 ->> key_)

)::text

)

from

unnest($2) x(key_)

)

end

$$;

最后,索引record_id和old_record_id包含这些用于快速查询的唯一标识符的列。

create index record_version_record_id

on audit.record_version(record_id)

where record_id is not null;

create index record_version_old_record_id

on audit.record_version(record_id)

where old_record_id is not null;触发器

为了让审计功能真正起作用,需要在最终用户不对其事务进行任何更改的情况下插入记录给审计表。为此,设置一个触发器在数据更改时触发,为每个插入/更新/删除的行为触发一次触发器。

create or replace function audit.insert_update_delete_trigger()

returns trigger

security definer

language plpgsql

as $$

declare

pkey_cols text[] = audit.primary_key_columns(TG_RELID);

record_jsonb jsonb = to_jsonb(new);

record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, record_jsonb);

old_record_jsonb jsonb = to_jsonb(old);

old_record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, old_record_jsonb);

begin

insert into audit.record_version(

record_id,

old_record_id,

op,

table_oid,

table_schema,

table_name,

record,

old_record

)

select

record_id,

old_record_id,

TG_OP,

TG_RELID,

TG_TABLE_SCHEMA,

TG_TABLE_NAME,

record_jsonb,

old_record_jsonb;

return coalesce(new, old);

end;

$$;

API

将公开的用于对表启用审计的API:

select audit.enable_tracking(<schema>.<table>::regclass);

禁用跟踪:

select audit.disable_tracking(<schema>.<table>::regclass);

这些函数根据请求由表注册审计触发器:

create or replace function audit.enable_tracking(regclass)

returns void

volatile

security definer

language plpgsql

as $$

declare

statement_row text = format(

create trigger audit_i_u_d

before insert or update or delete

on %I

for each row

execute procedure audit.insert_update_delete_trigger();,

$1

);

pkey_cols text[] = audit.primary_key_columns($1);

begin

if pkey_cols = array[]::text[] then

raise exception Table % can not be audited because it has no primary key, $1;

end if;

if not exists(select 1 from pg_trigger where tgrelid = $1 and tgname = audit_i_u_d) then

execute statement_row;

end if;

end;

$$;

create or replace function audit.disable_tracking(regclass)

returns void

volatile

security definer

language plpgsql

as $$

declare

statement_row text = format(

drop trigger if exists audit_i_u_d on %I;,

$1

);

begin

execute statement_row;

end;

$$;

性能开销

开启审计表后会降低插入、更新和删除的吞吐量。但是在吞吐量低于每秒1000次写入的情况下,其开销通常可以忽略不计。对于写入频率较高的表,建议使用pgAudit。站群服务器

总结

通过简单纯sql语句就实现了Postgresql数据库的安全审计,总体上算起来实现才150行sql语句。大家可以自己手动尝试一下,主要是搞清楚其原理,如果生产环境中有需求还是建议用pgAudit。

很赞哦!(8)