diff options
| -rw-r--r-- | contrib/dbtran/Makefile | 17 | ||||
| -rw-r--r-- | contrib/dbtran/bind-triggers.sql | 29 | ||||
| -rw-r--r-- | contrib/dbtran/dbtran--1.0.sql | 349 | ||||
| -rw-r--r-- | contrib/dbtran/dbtran--unpackaged--1.0.sql | 76 | ||||
| -rw-r--r-- | contrib/dbtran/dbtran--unpackaged.sql | 373 | ||||
| -rw-r--r-- | contrib/dbtran/dbtran.c | 416 | ||||
| -rw-r--r-- | contrib/dbtran/dbtran.control | 6 | ||||
| -rw-r--r-- | src/backend/catalog/heap.c | 5 | ||||
| -rw-r--r-- | src/backend/storage/lmgr/predicate.c | 1 | ||||
| -rw-r--r-- | src/backend/utils/adt/trigfuncs.c | 159 | ||||
| -rw-r--r-- | src/include/catalog/pg_proc.h | 2 | ||||
| -rw-r--r-- | src/include/utils/builtins.h | 1 |
12 files changed, 1433 insertions, 1 deletions
diff --git a/contrib/dbtran/Makefile b/contrib/dbtran/Makefile new file mode 100644 index 0000000000..9c826f403c --- /dev/null +++ b/contrib/dbtran/Makefile @@ -0,0 +1,17 @@ +# contrib/dbtran/Makefile + +MODULES = dbtran + +EXTENSION = dbtran +DATA = dbtran--unpackaged.sql dbtran--1.0.sql dbtran--unpackaged--1.0.sql + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/dbtran +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/dbtran/bind-triggers.sql b/contrib/dbtran/bind-triggers.sql new file mode 100644 index 0000000000..d7d660a5f5 --- /dev/null +++ b/contrib/dbtran/bind-triggers.sql @@ -0,0 +1,29 @@ +DO $$DECLARE r text; +BEGIN + FOR r IN + SELECT +'CREATE TRIGGER "' || r.name || '_640_dbtran_trig" + AFTER INSERT OR UPDATE OR DELETE ON "' || r.name || '" + FOR EACH ROW EXECUTE PROCEDURE dbtran.capture_replication_data();' as stmt + FROM "Relation" r + JOIN pg_class c on (c.relname = r.name) + WHERE r."isExportEnabled" + AND c.relkind = 'r' + LOOP + EXECUTE r; + END LOOP; +END$$; + + +DO $$DECLARE r text; +BEGIN + FOR r IN + SELECT 'DROP TRIGGER "' || r.name || '_640_dbtran_trig" ON "' || r.name || '";' + FROM "Relation" r + JOIN pg_class c on (c.relname = r.name) + WHERE r."isExportEnabled" + AND c.relkind = 'r' + LOOP + EXECUTE r; + END LOOP; +END$$; diff --git a/contrib/dbtran/dbtran--1.0.sql b/contrib/dbtran/dbtran--1.0.sql new file mode 100644 index 0000000000..99d95e488a --- /dev/null +++ b/contrib/dbtran/dbtran--1.0.sql @@ -0,0 +1,349 @@ +/* contrib/dbtran/dbtran--unpackaged.sql */ + +create table "DbTranSummary" +( + "countyNo" "CountyNoT" not null, + "backendPid" int not null, + "tranStart" "TimestampT" not null, + "userId" "UserIdT", + "queryName" "QueryNameT", + "sourceRef" varchar(255), + "functionalArea" varchar(50), + "tranEnd" "TimestampT", + "tranImageSeqNo" "TranImageSeqNoT", + "runDuration" float, + primary key ("backendPid", "tranStart"), + unique ("tranImageSeqNo") +); +grant select on dbtran."DbTranSummary" to cc, viewer; + +create table "DbTranOp" +( + "countyNo" "CountyNoT" not null, + "backendPid" int not null, + "tranStart" "TimestampT" not null, + "logRecordSeqNo" "LogRecordSeqNoT" not null, + "operation" "OperationT" not null, + "tableName" "TableNameT" not null, + primary key ("backendPid", "tranStart", "logRecordSeqNo") +); +grant select on dbtran."DbTranOp" to cc, viewer; + +create table "DbTranOpValue" +( + "countyNo" "CountyNoT" not null, + "backendPid" int not null, + "tranStart" "TimestampT" not null, + "logRecordSeqNo" "LogRecordSeqNoT" not null, + "columnName" "ColumnNameT" not null, + "isAfter" "BooleanT" not null, + "textValue" text, + "intValue" bigint, + "numericValue" numeric, + "binaryValue" bytea, + "booleanValue" "BooleanT", + "dateValue" "DateT", + "timeValue" "TimeT", + "timestampValue" "TimestampT", + primary key ("backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter") +); +grant select on dbtran."DbTranOpValue" to cc; + +create table "DbTranLock" +( + "dummy" int +); +grant select on dbtran."DbTranLock" to cc, viewer; + +create sequence "DbTranSeq"; +grant select on sequence "DbTranSeq" to cc, viewer; + + +create or replace function "GetTranCountyNo"() + returns "CountyNoT" + language sql + stable +as $GetTranCountyNo$ + select "countyNo" from public."ControlRecord" order by "countyNo" limit 1; +$GetTranCountyNo$; +grant execute on function "GetTranCountyNo"() to cc, viewer; + +create or replace function "EnsureDbTranSummaryRowExists"() + returns void + language plpgsql + security definer +as $EnsureDbTranSummaryRowExists$ +begin + if not exists (select * from dbtran."DbTranSummary" + where "backendPid" = pg_backend_pid() + and "tranStart" = transaction_timestamp()) + then + insert into dbtran."DbTranSummary" + ("countyNo", "backendPid", "tranStart") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp()); + end if; +end; +$EnsureDbTranSummaryRowExists$; + +create or replace function "DbTranSummarySetInfo" + ( + "userId" text, + "queryName" text, + "sourceRef" text, + "functionalArea" text + ) + returns void + language plpgsql + security definer +as $DbTranSummarySetInfo$ +begin + perform dbtran."EnsureDbTranSummaryRowExists"(); + update dbtran."DbTranSummary" + set "userId" = $1, + "queryName" = $2, + "sourceRef" = $3, + "functionalArea" = $4 + where "backendPid" = pg_backend_pid() + and "tranStart" = transaction_timestamp(); +end; +$DbTranSummarySetInfo$; +grant execute on function "DbTranSummarySetInfo" + ( + "userId" text, + "queryName" text, + "sourceRef" text, + "functionalArea" text + ) to cc; + +create or replace function "FinishDbTranSummaryRow"() + returns trigger + language plpgsql + security definer +as $FinishDbTranSummaryRow$ +declare + endtime timestamp with time zone; + transeqno_from_time bigint; + transeqno_from_seq bigint; +begin + + if tg_when <> 'AFTER' or tg_op <> 'INSERT' or tg_level <> 'ROW' then + raise exception 'FinishDbTranSummaryRow: must be run as a trigger after insert to row'; + end if; + + if tg_table_name <> 'DbTranSummary' or tg_table_schema <> 'dbtran' then + raise exception 'FinishDbTranSummaryRow: may only be use for a trigger on dbtran."DbTranSummary"'; + end if; + + -- We want the larger of the last seqno + 1 or the current timestamp. + lock table dbtran."DbTranLock" in exclusive mode; + endtime := clock_timestamp(); + transeqno_from_time := extract(epoch from endtime) * 1000 + + extract(milliseconds from endtime); + transeqno_from_seq = nextval('"DbTranSeq"'); + if transeqno_from_time > transeqno_from_seq then + perform setval('"DbTranSeq"', transeqno_from_time); + transeqno_from_seq := transeqno_from_time; + end if; + + update dbtran."DbTranSummary" + set "tranEnd" = endtime, + "runDuration" = extract(microsecond from (endtime - transaction_timestamp())) / 1000.0::float, + "tranImageSeqNo" = transeqno_from_seq::numeric + where "backendPid" = pg_backend_pid() + and "tranStart" = transaction_timestamp(); + + return null; -- AFTER trigger; value doesn't matter. +end; +$FinishDbTranSummaryRow$; + +create constraint trigger finish_tran_summary + after insert on dbtran."DbTranSummary" + deferrable initially deferred + for each row execute procedure dbtran."FinishDbTranSummaryRow"(); + +create or replace function "DbTranOpInsert"("op" "OperationT", "table" "TableNameT") + returns "LogRecordSeqNoT" + language plpgsql + security definer +as $DbTranOpInsert$ +declare + "seqno" "LogRecordSeqNoT"; +begin + select coalesce(max("logRecordSeqNo"),0) + 1 + into "seqno" + from dbtran."DbTranOp" + where "backendPid" = pg_backend_pid() + and "tranStart" = transaction_timestamp(); + insert into dbtran."DbTranOp" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "operation", "tableName") + values + (dbtran."GetTranCountyNo"(), + pg_catalog.pg_backend_pid(), + pg_catalog.transaction_timestamp(), + "seqno", "op", "table"); + return seqno; +end; +$DbTranOpInsert$; + +create or replace function "DbTranOpValueInsertText" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "text" text + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertText$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "textValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "text"); +end; +$DbTranOpValueInsertText$; + +create or replace function "DbTranOpValueInsertInt" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "int" bigint + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertInt$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "intValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "int"); +end; +$DbTranOpValueInsertInt$; + +create or replace function "DbTranOpValueInsertNumeric" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "numeric" numeric + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertNumeric$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "numericValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "numeric"); +end; +$DbTranOpValueInsertNumeric$; + +create or replace function "DbTranOpValueInsertBinary" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "binary" bytea + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertBinary$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "binaryValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "binary"); +end; +$DbTranOpValueInsertBinary$; + +create or replace function "DbTranOpValueInsertBoolean" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "bool" "BooleanT" + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertBoolean$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "booleanValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "bool"); +end; +$DbTranOpValueInsertBoolean$; + +create or replace function "DbTranOpValueInsertDate" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "date" "DateT" + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertDate$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "dateValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "date"); +end; +$DbTranOpValueInsertDate$; + +create or replace function "DbTranOpValueInsertTime" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "time" "TimeT" + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertTime$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "timeValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "time"); +end; +$DbTranOpValueInsertTime$; + +create or replace function "DbTranOpValueInsertTimestamp" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "ts" "TimestampT" + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertTimestamp$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "timestampValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "ts"); +end; +$DbTranOpValueInsertTimestamp$; + +reset role; +create or replace function capture_replication_data() + returns trigger + language C + security definer + as 'dbtran', 'capture_replication_data'; +set role ccowner; +set search_path = public; + diff --git a/contrib/dbtran/dbtran--unpackaged--1.0.sql b/contrib/dbtran/dbtran--unpackaged--1.0.sql new file mode 100644 index 0000000000..09988453d3 --- /dev/null +++ b/contrib/dbtran/dbtran--unpackaged--1.0.sql @@ -0,0 +1,76 @@ +/* contrib/tablefunc/tablefunc--unpackaged--1.0.sql */ + +ALTER EXTENSION dbtran ADD table "DbTranSummary"; +ALTER EXTENSION dbtran ADD table "DbTranOp"; +ALTER EXTENSION dbtran ADD table "DbTranOpValue"; +ALTER EXTENSION dbtran ADD table "DbTranLock"; +ALTER EXTENSION dbtran ADD sequence "DbTranSeq"; +ALTER EXTENSION dbtran ADD function "GetTranCountyNo"(); +ALTER EXTENSION dbtran ADD function "EnsureDbTranSummaryRowExists"(); +ALTER EXTENSION dbtran ADD function "DbTranSummarySetInfo" + ( + "userId" text, + "queryName" text, + "sourceRef" text, + "functionalArea" text + ); +ALTER EXTENSION dbtran ADD function "FinishDbTranSummaryRow"() +ALTER EXTENSION dbtran ADD constraint trigger finish_tran_summary +ALTER EXTENSION dbtran ADD function "DbTranOpInsert"("op" "OperationT", "table" "TableNameT") +ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertText" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "text" text + ); +ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertInt" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "int" bigint + ); +ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertNumeric" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "numeric" numeric + ); +ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertBinary" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "binary" bytea + ); +ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertBoolean" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "bool" "BooleanT" + ); +ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertDate" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "date" "DateT" + ); +ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertTime" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "time" "TimeT" + ); +ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertTimestamp" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "ts" "TimestampT" + ); +ALTER EXTENSION dbtran ADD function capture_replication_data(); diff --git a/contrib/dbtran/dbtran--unpackaged.sql b/contrib/dbtran/dbtran--unpackaged.sql new file mode 100644 index 0000000000..83372bf319 --- /dev/null +++ b/contrib/dbtran/dbtran--unpackaged.sql @@ -0,0 +1,373 @@ +/* contrib/dbtran/dbtran--unpackaged.sql */ + +-- +-- Set up dbtran in a pre-9.1 unpackaged environment. +-- + +create schema dbtran; +revoke create on schema dbtran from public; +grant usage, create on schema dbtran to ccowner; +grant usage on schema dbtran to cc, viewer; + +set role ccowner; +set search_path = dbtran, public; + +create domain "LogRecordSeqNoT" integer; +create domain "OperationT" varchar(1); +create domain "TableNameT" varchar(18); +create domain "ColumnNameT" varchar(18); + +drop table if exists "DbTranSummary"; +drop table if exists "DbTranSummary"; +create table "DbTranSummary" +( + "countyNo" "CountyNoT" not null, + "backendPid" int not null, + "tranStart" "TimestampT" not null, + "userId" "UserIdT", + "queryName" "QueryNameT", + "sourceRef" varchar(255), + "functionalArea" varchar(50), + "tranEnd" "TimestampT", + "tranImageSeqNo" "TranImageSeqNoT", + "runDuration" float, + primary key ("backendPid", "tranStart"), + unique ("tranImageSeqNo") +); +grant select on dbtran."DbTranSummary" to cc, viewer; + +drop table if exists "DbTranOp"; +drop table if exists "DbTranOp"; +create table "DbTranOp" +( + "countyNo" "CountyNoT" not null, + "backendPid" int not null, + "tranStart" "TimestampT" not null, + "logRecordSeqNo" "LogRecordSeqNoT" not null, + "operation" "OperationT" not null, + "tableName" "TableNameT" not null, + primary key ("backendPid", "tranStart", "logRecordSeqNo") +); +grant select on dbtran."DbTranOp" to cc, viewer; + +drop table if exists "DbTranOpValue"; +drop table if exists "DbTranOpValue"; +create table "DbTranOpValue" +( + "countyNo" "CountyNoT" not null, + "backendPid" int not null, + "tranStart" "TimestampT" not null, + "logRecordSeqNo" "LogRecordSeqNoT" not null, + "columnName" "ColumnNameT" not null, + "isAfter" "BooleanT" not null, + "textValue" text, + "intValue" bigint, + "numericValue" numeric, + "binaryValue" bytea, + "booleanValue" "BooleanT", + "dateValue" "DateT", + "timeValue" "TimeT", + "timestampValue" "TimestampT", + primary key ("backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter") +); +grant select on dbtran."DbTranOpValue" to cc; + +drop table if exists "DbTranLock"; +drop table if exists "DbTranLock"; +create table "DbTranLock" +( + "dummy" int +); +grant select on dbtran."DbTranLock" to cc, viewer; + +create sequence "DbTranSeq"; +grant select on sequence "DbTranSeq" to cc, viewer; + + +create or replace function "GetTranCountyNo"() + returns "CountyNoT" + language sql + stable +as $GetTranCountyNo$ + select "countyNo" from public."ControlRecord" order by "countyNo" limit 1; +$GetTranCountyNo$; +grant execute on function "GetTranCountyNo"() to cc, viewer; + +create or replace function "EnsureDbTranSummaryRowExists"() + returns void + language plpgsql + security definer +as $EnsureDbTranSummaryRowExists$ +begin + if not exists (select * from dbtran."DbTranSummary" + where "backendPid" = pg_backend_pid() + and "tranStart" = transaction_timestamp()) + then + insert into dbtran."DbTranSummary" + ("countyNo", "backendPid", "tranStart") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp()); + end if; +end; +$EnsureDbTranSummaryRowExists$; + +create or replace function "DbTranSummarySetInfo" + ( + "userId" text, + "queryName" text, + "sourceRef" text, + "functionalArea" text + ) + returns void + language plpgsql + security definer +as $DbTranSummarySetInfo$ +begin + perform dbtran."EnsureDbTranSummaryRowExists"(); + update dbtran."DbTranSummary" + set "userId" = $1, + "queryName" = $2, + "sourceRef" = $3, + "functionalArea" = $4 + where "backendPid" = pg_backend_pid() + and "tranStart" = transaction_timestamp(); +end; +$DbTranSummarySetInfo$; +grant execute on function "DbTranSummarySetInfo" + ( + "userId" text, + "queryName" text, + "sourceRef" text, + "functionalArea" text + ) to cc; + +create or replace function "FinishDbTranSummaryRow"() + returns trigger + language plpgsql + security definer +as $FinishDbTranSummaryRow$ +declare + endtime timestamp with time zone; + transeqno_from_time bigint; + transeqno_from_seq bigint; +begin + + if tg_when <> 'AFTER' or tg_op <> 'INSERT' or tg_level <> 'ROW' then + raise exception 'FinishDbTranSummaryRow: must be run as a trigger after insert to row'; + end if; + + if tg_table_name <> 'DbTranSummary' or tg_table_schema <> 'dbtran' then + raise exception 'FinishDbTranSummaryRow: may only be use for a trigger on dbtran."DbTranSummary"'; + end if; + + -- We want the larger of the last seqno + 1 or the current timestamp. + lock table dbtran."DbTranLock" in exclusive mode; + endtime := clock_timestamp(); + transeqno_from_time := floor(extract(epoch from endtime) * 1000); + transeqno_from_seq = nextval('"DbTranSeq"'); + if transeqno_from_time > transeqno_from_seq then + perform setval('"DbTranSeq"', transeqno_from_time); + transeqno_from_seq := transeqno_from_time; + end if; + + update dbtran."DbTranSummary" + set "tranEnd" = endtime, + "runDuration" = extract(microsecond from (endtime - transaction_timestamp())) / 1000.0::float, + "tranImageSeqNo" = transeqno_from_seq::numeric + where "backendPid" = pg_backend_pid() + and "tranStart" = transaction_timestamp(); + + return null; -- AFTER trigger; value doesn't matter. +end; +$FinishDbTranSummaryRow$; + +create constraint trigger finish_tran_summary + after insert on dbtran."DbTranSummary" + deferrable initially deferred + for each row execute procedure dbtran."FinishDbTranSummaryRow"(); + +create or replace function "DbTranOpInsert"("op" "OperationT", "table" "TableNameT") + returns "LogRecordSeqNoT" + language plpgsql + security definer +as $DbTranOpInsert$ +declare + "seqno" "LogRecordSeqNoT"; +begin + select coalesce(max("logRecordSeqNo"),0) + 1 + into "seqno" + from dbtran."DbTranOp" + where "backendPid" = pg_backend_pid() + and "tranStart" = transaction_timestamp(); + insert into dbtran."DbTranOp" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "operation", "tableName") + values + (dbtran."GetTranCountyNo"(), + pg_catalog.pg_backend_pid(), + pg_catalog.transaction_timestamp(), + "seqno", "op", "table"); + return seqno; +end; +$DbTranOpInsert$; + +create or replace function "DbTranOpValueInsertText" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "text" text + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertText$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "textValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "text"); +end; +$DbTranOpValueInsertText$; + +create or replace function "DbTranOpValueInsertInt" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "int" bigint + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertInt$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "intValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "int"); +end; +$DbTranOpValueInsertInt$; + +create or replace function "DbTranOpValueInsertNumeric" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "numeric" numeric + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertNumeric$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "numericValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "numeric"); +end; +$DbTranOpValueInsertNumeric$; + +create or replace function "DbTranOpValueInsertBinary" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "binary" bytea + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertBinary$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "binaryValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "binary"); +end; +$DbTranOpValueInsertBinary$; + +create or replace function "DbTranOpValueInsertBoolean" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "bool" "BooleanT" + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertBoolean$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "booleanValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "bool"); +end; +$DbTranOpValueInsertBoolean$; + +create or replace function "DbTranOpValueInsertDate" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "date" "DateT" + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertDate$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "dateValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "date"); +end; +$DbTranOpValueInsertDate$; + +create or replace function "DbTranOpValueInsertTime" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "time" "TimeT" + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertTime$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "timeValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "time"); +end; +$DbTranOpValueInsertTime$; + +create or replace function "DbTranOpValueInsertTimestamp" + ( + "seqno" "LogRecordSeqNoT", + "column" "ColumnNameT", + "after" "BooleanT", + "ts" "TimestampT" + ) + returns void + language plpgsql + security definer +as $DbTranOpValueInsertTimestamp$ +begin + insert into dbtran."DbTranOpValue" + ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "timestampValue") + values + ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "ts"); +end; +$DbTranOpValueInsertTimestamp$; + +reset role; +create or replace function capture_replication_data() + returns trigger + language C + security definer + as 'dbtran', 'capture_replication_data'; +set role ccowner; +set search_path = public; + diff --git a/contrib/dbtran/dbtran.c b/contrib/dbtran/dbtran.c new file mode 100644 index 0000000000..7ba9f05039 --- /dev/null +++ b/contrib/dbtran/dbtran.c @@ -0,0 +1,416 @@ +/*------------------------------------------------------------------------- + * + * dbtran.c + * C code for capturing replication data in DbTranXxx tables. + * + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "catalog/pg_type.h" +#include "commands/trigger.h" +#include "executor/spi.h" +#include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/syscache.h" + +PG_MODULE_MAGIC; + + +static void generate_dbtran_plans(); +static void write_one_dbtran_value(Datum value, const TupleDesc tupdesc, int i, + Datum *args, const char *nulls); +extern Datum capture_replication_data(PG_FUNCTION_ARGS); + + + +/* ---------- + * Global data + * ---------- + */ +static SPIPlanPtr plan_ensuredbtransummary = NULL; +static const char *query_ensuredbtransummary = "SELECT \"EnsureDbTranSummaryRowExists\"()"; +static SPIPlanPtr plan_insertop = NULL; +static const char *query_insertop = "SELECT \"DbTranOpInsert\"($1, $2)"; +static SPIPlanPtr plan_insertvaltext = NULL; +static const char *query_insertvaltext = "SELECT \"DbTranOpValueInsertText\"($1, $2, $3, $4)"; +static SPIPlanPtr plan_insertvalint = NULL; +static const char *query_insertvalint = "SELECT \"DbTranOpValueInsertInt\"($1, $2, $3, $4)"; +static SPIPlanPtr plan_insertvalnumeric = NULL; +static const char *query_insertvalnumeric = "SELECT \"DbTranOpValueInsertNumeric\"($1, $2, $3, $4)"; +static SPIPlanPtr plan_insertvalbinary = NULL; +static const char *query_insertvalbinary = "SELECT \"DbTranOpValueInsertBinary\"($1, $2, $3, $4)"; +static SPIPlanPtr plan_insertvalboolean = NULL; +static const char *query_insertvalboolean = "SELECT \"DbTranOpValueInsertBoolean\"($1, $2, $3, $4)"; +static SPIPlanPtr plan_insertvaldate = NULL; +static const char *query_insertvaldate = "SELECT \"DbTranOpValueInsertDate\"($1, $2, $3, $4)"; +static SPIPlanPtr plan_insertvaltime = NULL; +static const char *query_insertvaltime = "SELECT \"DbTranOpValueInsertTime\"($1, $2, $3, $4)"; +static SPIPlanPtr plan_insertvaltimestamp = NULL; +static const char *query_insertvaltimestamp = "SELECT \"DbTranOpValueInsertTimestamp\"($1, $2, $3, $4)"; + +#define VALUE_BEFORE BoolGetDatum(false) +#define VALUE_AFTER BoolGetDatum(true) + +#define dbtran_plans_missing() (plan_insertvaltimestamp == NULL) + +static void +generate_dbtran_plans() +{ + Oid argtypes[4]; + SPIPlanPtr plan; + + if (plan_ensuredbtransummary == NULL) + { + plan = SPI_prepare(query_ensuredbtransummary, 0, NULL); + if (plan == NULL) + elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_ensuredbtransummary); + plan_ensuredbtransummary = SPI_saveplan(plan); + } + + if (plan_insertop == NULL) + { + argtypes[0] = TEXTOID; + argtypes[1] = TEXTOID; + plan = SPI_prepare(query_insertop, 2, argtypes); + if (plan == NULL) + elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertop); + plan_insertop = SPI_saveplan(plan); + } + + if (plan_insertvaltext == NULL) + { + argtypes[0] = INT4OID; + argtypes[1] = TEXTOID; + argtypes[2] = BOOLOID; + argtypes[3] = TEXTOID; + plan = SPI_prepare(query_insertvaltext, 4, argtypes); + if (plan == NULL) + elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvaltext); + plan_insertvaltext = SPI_saveplan(plan); + } + + if (plan_insertvalint == NULL) + { + argtypes[0] = INT4OID; + argtypes[1] = TEXTOID; + argtypes[2] = BOOLOID; + argtypes[3] = INT8OID; + plan = SPI_prepare(query_insertvalint, 4, argtypes); + if (plan == NULL) + elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvalint); + plan_insertvalint = SPI_saveplan(plan); + } + + if (plan_insertvalnumeric == NULL) + { + argtypes[0] = INT4OID; + argtypes[1] = TEXTOID; + argtypes[2] = BOOLOID; + argtypes[3] = NUMERICOID; + plan = SPI_prepare(query_insertvalnumeric, 4, argtypes); + if (plan == NULL) + elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvalnumeric); + plan_insertvalnumeric = SPI_saveplan(plan); + } + + if (plan_insertvalbinary == NULL) + { + argtypes[0] = INT4OID; + argtypes[1] = TEXTOID; + argtypes[2] = BOOLOID; + argtypes[3] = BYTEAOID; + plan = SPI_prepare(query_insertvalbinary, 4, argtypes); + if (plan == NULL) + elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvalbinary); + plan_insertvalbinary = SPI_saveplan(plan); + } + + if (plan_insertvalboolean == NULL) + { + argtypes[0] = INT4OID; + argtypes[1] = TEXTOID; + argtypes[2] = BOOLOID; + argtypes[3] = BOOLOID; + plan = SPI_prepare(query_insertvalboolean, 4, argtypes); + if (plan == NULL) + elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvalboolean); + plan_insertvalboolean = SPI_saveplan(plan); + } + + if (plan_insertvaldate == NULL) + { + argtypes[0] = INT4OID; + argtypes[1] = TEXTOID; + argtypes[2] = BOOLOID; + argtypes[3] = DATEOID; + plan = SPI_prepare(query_insertvaldate, 4, argtypes); + if (plan == NULL) + elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvaldate); + plan_insertvaldate = SPI_saveplan(plan); + } + + if (plan_insertvaltime == NULL) + { + argtypes[0] = INT4OID; + argtypes[1] = TEXTOID; + argtypes[2] = BOOLOID; + argtypes[3] = TIMEOID; + plan = SPI_prepare(query_insertvaltime, 4, argtypes); + if (plan == NULL) + elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvaltime); + plan_insertvaltime = SPI_saveplan(plan); + } + + if (plan_insertvaltimestamp == NULL) + { + argtypes[0] = INT4OID; + argtypes[1] = TEXTOID; + argtypes[2] = BOOLOID; + argtypes[3] = TIMESTAMPTZOID; + plan = SPI_prepare(query_insertvaltimestamp, 4, argtypes); + if (plan == NULL) + elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvaltimestamp); + plan_insertvaltimestamp = SPI_saveplan(plan); + } +} + +static void +write_one_dbtran_value(Datum value, const TupleDesc tupdesc, int i, + Datum *args, const char *nulls) +{ + int spirc; + HeapTuple tuple; + Form_pg_type typeTup; + Oid typid = tupdesc->attrs[i]->atttypid; + + tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "capture_replication_data: cache lookup failed for type %u", typid); + + typeTup = (Form_pg_type) GETSTRUCT(tuple); + if (typeTup->typbasetype != 0) + typid = typeTup->typbasetype; + + ReleaseSysCache(tuple); + + switch (typid) + { + case TEXTOID: + args[3] = value; + spirc = SPI_execp(plan_insertvaltext, args, nulls, 4); + break; + case VARCHAROID: + case CHAROID: + args[3] = PointerGetDatum(value); + spirc = SPI_execp(plan_insertvaltext, args, nulls, 4); + break; + case INT8OID: + args[3] = value; + spirc = SPI_execp(plan_insertvalint, args, nulls, 4); + break; + case INT2OID: + args[3] = Int64GetDatum(DatumGetInt16(value)); + spirc = SPI_execp(plan_insertvalint, args, nulls, 4); + break; + case INT4OID: + args[3] = Int64GetDatum(DatumGetInt32(value)); + spirc = SPI_execp(plan_insertvalint, args, nulls, 4); + break; + case NUMERICOID: + args[3] = value; + spirc = SPI_execp(plan_insertvalnumeric, args, nulls, 4); + break; + case BYTEAOID: + args[3] = value; + spirc = SPI_execp(plan_insertvalbinary, args, nulls, 4); + break; + case BOOLOID: + args[3] = value; + spirc = SPI_execp(plan_insertvalboolean, args, nulls, 4); + break; + case DATEOID: + args[3] = value; + spirc = SPI_execp(plan_insertvaldate, args, nulls, 4); + break; + case TIMEOID: + args[3] = value; + spirc = SPI_execp(plan_insertvaltime, args, nulls, 4); + break; + case TIMESTAMPTZOID: + args[3] = value; + spirc = SPI_execp(plan_insertvaltimestamp, args, nulls, 4); + break; + default: + elog(ERROR, "capture_replication_data: unrecognized type"); + spirc = -1; /* quiet compiler */ + } + + if (spirc < 0) + elog(ERROR, "capture_replication_data: SPI error %i while writing value", spirc); +} + + +PG_FUNCTION_INFO_V1(capture_replication_data); + +Datum +capture_replication_data(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + int rc; + char operation[2]; + Trigger *trigger; + int nargs; + HeapTuple newtuple, + oldtuple; + Relation rel; + TupleDesc tupdesc; + int spirc; + Datum args[4]; + char nulls[4]; + bool isnull; + int i; + + /* make sure it's called as a trigger */ + if (!CALLED_AS_TRIGGER(fcinfo)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("capture_replication_data: must be called as trigger"))); + + /* and that it's called after triggering DML */ + if (TRIGGER_FIRED_BEFORE(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("capture_replication_data: must be called after triggering action"))); + + /* and that it's called for each row */ + if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("capture_replication_data: must be called for each row"))); + + operation[1] = '\0'; + oldtuple = NULL; + newtuple = NULL; + if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + { + operation[0] = 'I'; + newtuple = trigdata->tg_trigtuple; + } + else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + { + operation[0] = 'U'; + oldtuple = trigdata->tg_trigtuple; + newtuple = trigdata->tg_newtuple; + } + else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) + { + operation[0] = 'D'; + oldtuple = trigdata->tg_trigtuple; + } + else + { + elog(ERROR, "capture_replication_data: trigger fired by unrecognized operation"); + } + + trigger = trigdata->tg_trigger; + nargs = trigger->tgnargs; + if (nargs > 0) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("capture_replication_data: must not be called with any parameters"))); + + /* get relation data */ + rel = trigdata->tg_relation; + tupdesc = rel->rd_att; + + /* Connect to the SPI manager. */ + if ((rc = SPI_connect()) < 0) + elog(ERROR, "capture_replication_data: SPI_connect() failed"); + + /* Make sure plans are all prepared. */ + if (dbtran_plans_missing()) + generate_dbtran_plans(); + + /* Make sure we've got a summary row. */ + spirc = SPI_execp(plan_ensuredbtransummary, 0, NULL, 0); + if (spirc != SPI_OK_SELECT) + elog(ERROR, "capture_replication_data: failed to insert transaction summary"); + + /* Insert a new operation row. */ + args[0] = CStringGetTextDatum(operation); + args[1] = CStringGetTextDatum(RelationGetRelationName(rel)); + spirc = SPI_execp(plan_insertop, args, NULL, 2); + if (spirc != SPI_OK_SELECT || SPI_processed != 1) + elog(ERROR, "capture_replication_data: failed to insert transaction operation"); + + /* Set those things which will be the same for all value roes below. */ + args[0] = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull); + Assert(!isnull); + nulls[0] = ' '; + nulls[1] = ' '; + nulls[2] = ' '; + + for (i = 0; i < tupdesc->natts; i++) + { + Datum before; + bool beforeIsNull; + Datum after; + bool afterIsNull; + + /* Skip dropped columns. */ + if (tupdesc->attrs[i]->attisdropped) + continue; + + /* We need column name regardless of operation. */ + args[1] = CStringGetTextDatum(SPI_fname(tupdesc, i + 1)); + + /* If there is a non-null "before" value, write it. */ + if (operation[0] == 'U' || operation[0] == 'D') + { + before = SPI_getbinval(oldtuple, tupdesc, i + 1, &beforeIsNull); + if (!beforeIsNull) + { + args[2] = VALUE_BEFORE; + nulls[3] = ' '; + write_one_dbtran_value(before, tupdesc, i, args, nulls); + } + } + else + { + before = BoolGetDatum(true); /* quiet compiler warning */ + beforeIsNull = true; + } + + /* If there is a non-null on insert or a different value on update, write it. */ + if (operation[0] == 'I' || operation[0] == 'U') + { + after = SPI_getbinval(newtuple, tupdesc, i + 1, &afterIsNull); + if (afterIsNull && beforeIsNull) + continue; + if (afterIsNull != beforeIsNull) + { + args[2] = VALUE_AFTER; + nulls[3] = afterIsNull ? 'n' : ' '; + write_one_dbtran_value(after, tupdesc, i, args, nulls); + continue; + } + /* Update with before and after both not null. Are they different? */ + if (!datumIsEqual(before, after, tupdesc->attrs[i]->attbyval, + tupdesc->attrs[i]->attlen)) + { + args[2] = VALUE_AFTER; + nulls[3] = ' '; + write_one_dbtran_value(after, tupdesc, i, args, nulls); + } + } + } + + SPI_finish(); + + return PointerGetDatum(NULL); +} diff --git a/contrib/dbtran/dbtran.control b/contrib/dbtran/dbtran.control new file mode 100644 index 0000000000..792056f464 --- /dev/null +++ b/contrib/dbtran/dbtran.control @@ -0,0 +1,6 @@ +# dbtran extension +comment = 'capture triggers for DbTranXxx tables' +default_version = '1.0' +module_pathname = '$libdir/dbtran' +relocatable = false +schema = dbtran diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 4399493625..423b56ac4c 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -1712,6 +1712,11 @@ heap_drop_with_catalog(Oid relid) } /* + * Clean up any remaining predicate locks on the relation. + */ + DropAllPredicateLocksFromTable(rel); + + /* * Close relcache entry, but *keep* AccessExclusiveLock on the relation * until transaction commit. This ensures no one else will try to do * something with the doomed relation. diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 8e7a7f001d..c324bceef5 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -194,6 +194,7 @@ #include "storage/procarray.h" #include "utils/rel.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" #include "utils/tqual.h" /* Uncomment the next line to test the graceful degradation code. */ diff --git a/src/backend/utils/adt/trigfuncs.c b/src/backend/utils/adt/trigfuncs.c index 474878de7d..fa3ada8ba0 100644 --- a/src/backend/utils/adt/trigfuncs.c +++ b/src/backend/utils/adt/trigfuncs.c @@ -13,8 +13,10 @@ */ #include "postgres.h" -#include "access/htup.h" +#include "catalog/pg_type.h" +#include "commands/async.h" #include "commands/trigger.h" +#include "executor/spi.h" #include "utils/builtins.h" #include "utils/rel.h" @@ -94,3 +96,158 @@ suppress_redundant_updates_trigger(PG_FUNCTION_ARGS) return PointerGetDatum(rettuple); } + + +/* + * Copy from s (for source) to r (for result), wrapping with q (quote) + * characters and doubling any quote characters found. + */ +static char * +strcpy_quoted(char *r, const char *s, const char q) +{ + *r++ = q; + while (*s) + { + if (*s == q) + *r++ = q; + *r++ = *s; + s++; + } + *r++ = q; + return r; +} + +/* + * triggered_change_notification + * + * This trigger function will send a notification of data modification with + * primary key values. The channel will be "tcn" unless the trigger is + * created with a parameter, in which case that parameter will be used. + */ +Datum +triggered_change_notification(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + Trigger *trigger; + int nargs; + HeapTuple trigtuple; + Relation rel; + TupleDesc tupdesc; + char *channel; + char operation; + bool foundPK; + char *payload = palloc(8 * 1024); + + List *indexoidlist; + ListCell *indexoidscan; + + /* make sure it's called as a trigger */ + if (!CALLED_AS_TRIGGER(fcinfo)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called as trigger"))); + + /* and that it's called after the change */ + if (!TRIGGER_FIRED_AFTER(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called after the change"))); + + /* and that it's called for each row */ + if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event)) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called for each row"))); + + if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + operation = 'I'; + else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + operation = 'U'; + else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) + operation = 'D'; + else + { + elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation"); + operation = 'X'; /* silence compiler warning */ + } + + trigger = trigdata->tg_trigger; + nargs = trigger->tgnargs; + if (nargs > 1) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must not be called with more than one parameter"))); + + if (nargs == 0) + channel = "tcn"; + else + channel = trigger->tgargs[0]; + + /* get tuple data */ + trigtuple = trigdata->tg_trigtuple; + rel = trigdata->tg_relation; + tupdesc = rel->rd_att; + + foundPK = false; + + /* + * Get the list of index OIDs for the table from the relcache, and look up + * each one in the pg_index syscache until we find one marked primary key + * (hopefully there isn't more than one such). + */ + indexoidlist = RelationGetIndexList(rel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + HeapTuple indexTuple; + Form_pg_index index; + + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) /* should not happen */ + elog(ERROR, "triggered_change_notification: cache lookup failed for index %u", indexoid); + index = (Form_pg_index) GETSTRUCT(indexTuple); + /* we're only interested if it is the primary key */ + if (index->indisprimary) + { + int numatts = index->indnatts; + char *p; + + if (numatts > 0) + { + int i; + + foundPK = true; + + p = strcpy_quoted(payload, RelationGetRelationName(rel), '"'); + *p++ = ','; + *p++ = operation; + + for (i = 0; i < numatts; i++) + { + int colno = index->indkey.values[i]; + + *p++ = ','; + p = strcpy_quoted(p, NameStr(tupdesc->attrs[colno - 1]->attname), '"'); + *p++ = '='; + p = strcpy_quoted(p, SPI_getvalue(trigtuple, tupdesc, colno), '\''); + } + *p = '\0'; + + Async_Notify(channel, payload); + } + ReleaseSysCache(indexTuple); + break; + } + ReleaseSysCache(indexTuple); + } + + list_free(indexoidlist); + + if (!foundPK) + ereport(ERROR, + (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED), + errmsg("triggered_change_notification: must be called on a table with a primary key"))); + + return PointerGetDatum(NULL); /* after trigger; value doesn't matter */ +} diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 96f43fe0b1..c53c1b2760 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -1299,6 +1299,8 @@ DESCR("convert oid to int8"); DATA(insert OID = 1291 ( suppress_redundant_updates_trigger PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ suppress_redundant_updates_trigger _null_ _null_ _null_ )); DESCR("trigger to suppress updates when new and old records match"); +DATA(insert OID = 2650 ( triggered_change_notification PGNSP PGUID 12 1 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ triggered_change_notification _null_ _null_ _null_ )); +DESCR("trigger function to send change notification with primary key in payload"); DATA(insert OID = 1292 ( tideq PGNSP PGUID 12 1 0 0 0 f f f t f i 2 0 16 "27 27" _null_ _null_ _null_ _null_ tideq _null_ _null_ _null_ )); DATA(insert OID = 1293 ( currtid PGNSP PGUID 12 1 0 0 0 f f f t f v 2 0 27 "26 27" _null_ _null_ _null_ _null_ currtid_byreloid _null_ _null_ _null_ )); diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 8a1c82ef72..7f5cedb112 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -964,6 +964,7 @@ extern Datum RI_FKey_setdefault_upd(PG_FUNCTION_ARGS); /* trigfuncs.c */ extern Datum suppress_redundant_updates_trigger(PG_FUNCTION_ARGS); +extern Datum triggered_change_notification(PG_FUNCTION_ARGS); /* encoding support functions */ extern Datum getdatabaseencoding(PG_FUNCTION_ARGS); |
