summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/dbtran/Makefile17
-rw-r--r--contrib/dbtran/bind-triggers.sql29
-rw-r--r--contrib/dbtran/dbtran--1.0.sql349
-rw-r--r--contrib/dbtran/dbtran--unpackaged--1.0.sql76
-rw-r--r--contrib/dbtran/dbtran--unpackaged.sql373
-rw-r--r--contrib/dbtran/dbtran.c416
-rw-r--r--contrib/dbtran/dbtran.control6
-rw-r--r--src/backend/catalog/heap.c5
-rw-r--r--src/backend/storage/lmgr/predicate.c1
-rw-r--r--src/backend/utils/adt/trigfuncs.c159
-rw-r--r--src/include/catalog/pg_proc.h2
-rw-r--r--src/include/utils/builtins.h1
12 files changed, 1433 insertions, 1 deletions
diff --git a/contrib/dbtran/Makefile b/contrib/dbtran/Makefile
new file mode 100644
index 0000000000..9c826f403c
--- /dev/null
+++ b/contrib/dbtran/Makefile
@@ -0,0 +1,17 @@
+# contrib/dbtran/Makefile
+
+MODULES = dbtran
+
+EXTENSION = dbtran
+DATA = dbtran--unpackaged.sql dbtran--1.0.sql dbtran--unpackaged--1.0.sql
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/dbtran
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/dbtran/bind-triggers.sql b/contrib/dbtran/bind-triggers.sql
new file mode 100644
index 0000000000..d7d660a5f5
--- /dev/null
+++ b/contrib/dbtran/bind-triggers.sql
@@ -0,0 +1,29 @@
+DO $$DECLARE r text;
+BEGIN
+ FOR r IN
+ SELECT
+'CREATE TRIGGER "' || r.name || '_640_dbtran_trig"
+ AFTER INSERT OR UPDATE OR DELETE ON "' || r.name || '"
+ FOR EACH ROW EXECUTE PROCEDURE dbtran.capture_replication_data();' as stmt
+ FROM "Relation" r
+ JOIN pg_class c on (c.relname = r.name)
+ WHERE r."isExportEnabled"
+ AND c.relkind = 'r'
+ LOOP
+ EXECUTE r;
+ END LOOP;
+END$$;
+
+
+DO $$DECLARE r text;
+BEGIN
+ FOR r IN
+ SELECT 'DROP TRIGGER "' || r.name || '_640_dbtran_trig" ON "' || r.name || '";'
+ FROM "Relation" r
+ JOIN pg_class c on (c.relname = r.name)
+ WHERE r."isExportEnabled"
+ AND c.relkind = 'r'
+ LOOP
+ EXECUTE r;
+ END LOOP;
+END$$;
diff --git a/contrib/dbtran/dbtran--1.0.sql b/contrib/dbtran/dbtran--1.0.sql
new file mode 100644
index 0000000000..99d95e488a
--- /dev/null
+++ b/contrib/dbtran/dbtran--1.0.sql
@@ -0,0 +1,349 @@
+/* contrib/dbtran/dbtran--unpackaged.sql */
+
+create table "DbTranSummary"
+(
+ "countyNo" "CountyNoT" not null,
+ "backendPid" int not null,
+ "tranStart" "TimestampT" not null,
+ "userId" "UserIdT",
+ "queryName" "QueryNameT",
+ "sourceRef" varchar(255),
+ "functionalArea" varchar(50),
+ "tranEnd" "TimestampT",
+ "tranImageSeqNo" "TranImageSeqNoT",
+ "runDuration" float,
+ primary key ("backendPid", "tranStart"),
+ unique ("tranImageSeqNo")
+);
+grant select on dbtran."DbTranSummary" to cc, viewer;
+
+create table "DbTranOp"
+(
+ "countyNo" "CountyNoT" not null,
+ "backendPid" int not null,
+ "tranStart" "TimestampT" not null,
+ "logRecordSeqNo" "LogRecordSeqNoT" not null,
+ "operation" "OperationT" not null,
+ "tableName" "TableNameT" not null,
+ primary key ("backendPid", "tranStart", "logRecordSeqNo")
+);
+grant select on dbtran."DbTranOp" to cc, viewer;
+
+create table "DbTranOpValue"
+(
+ "countyNo" "CountyNoT" not null,
+ "backendPid" int not null,
+ "tranStart" "TimestampT" not null,
+ "logRecordSeqNo" "LogRecordSeqNoT" not null,
+ "columnName" "ColumnNameT" not null,
+ "isAfter" "BooleanT" not null,
+ "textValue" text,
+ "intValue" bigint,
+ "numericValue" numeric,
+ "binaryValue" bytea,
+ "booleanValue" "BooleanT",
+ "dateValue" "DateT",
+ "timeValue" "TimeT",
+ "timestampValue" "TimestampT",
+ primary key ("backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter")
+);
+grant select on dbtran."DbTranOpValue" to cc;
+
+create table "DbTranLock"
+(
+ "dummy" int
+);
+grant select on dbtran."DbTranLock" to cc, viewer;
+
+create sequence "DbTranSeq";
+grant select on sequence "DbTranSeq" to cc, viewer;
+
+
+create or replace function "GetTranCountyNo"()
+ returns "CountyNoT"
+ language sql
+ stable
+as $GetTranCountyNo$
+ select "countyNo" from public."ControlRecord" order by "countyNo" limit 1;
+$GetTranCountyNo$;
+grant execute on function "GetTranCountyNo"() to cc, viewer;
+
+create or replace function "EnsureDbTranSummaryRowExists"()
+ returns void
+ language plpgsql
+ security definer
+as $EnsureDbTranSummaryRowExists$
+begin
+ if not exists (select * from dbtran."DbTranSummary"
+ where "backendPid" = pg_backend_pid()
+ and "tranStart" = transaction_timestamp())
+ then
+ insert into dbtran."DbTranSummary"
+ ("countyNo", "backendPid", "tranStart")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp());
+ end if;
+end;
+$EnsureDbTranSummaryRowExists$;
+
+create or replace function "DbTranSummarySetInfo"
+ (
+ "userId" text,
+ "queryName" text,
+ "sourceRef" text,
+ "functionalArea" text
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranSummarySetInfo$
+begin
+ perform dbtran."EnsureDbTranSummaryRowExists"();
+ update dbtran."DbTranSummary"
+ set "userId" = $1,
+ "queryName" = $2,
+ "sourceRef" = $3,
+ "functionalArea" = $4
+ where "backendPid" = pg_backend_pid()
+ and "tranStart" = transaction_timestamp();
+end;
+$DbTranSummarySetInfo$;
+grant execute on function "DbTranSummarySetInfo"
+ (
+ "userId" text,
+ "queryName" text,
+ "sourceRef" text,
+ "functionalArea" text
+ ) to cc;
+
+create or replace function "FinishDbTranSummaryRow"()
+ returns trigger
+ language plpgsql
+ security definer
+as $FinishDbTranSummaryRow$
+declare
+ endtime timestamp with time zone;
+ transeqno_from_time bigint;
+ transeqno_from_seq bigint;
+begin
+
+ if tg_when <> 'AFTER' or tg_op <> 'INSERT' or tg_level <> 'ROW' then
+ raise exception 'FinishDbTranSummaryRow: must be run as a trigger after insert to row';
+ end if;
+
+ if tg_table_name <> 'DbTranSummary' or tg_table_schema <> 'dbtran' then
+ raise exception 'FinishDbTranSummaryRow: may only be use for a trigger on dbtran."DbTranSummary"';
+ end if;
+
+ -- We want the larger of the last seqno + 1 or the current timestamp.
+ lock table dbtran."DbTranLock" in exclusive mode;
+ endtime := clock_timestamp();
+ transeqno_from_time := extract(epoch from endtime) * 1000
+ + extract(milliseconds from endtime);
+ transeqno_from_seq = nextval('"DbTranSeq"');
+ if transeqno_from_time > transeqno_from_seq then
+ perform setval('"DbTranSeq"', transeqno_from_time);
+ transeqno_from_seq := transeqno_from_time;
+ end if;
+
+ update dbtran."DbTranSummary"
+ set "tranEnd" = endtime,
+ "runDuration" = extract(microsecond from (endtime - transaction_timestamp())) / 1000.0::float,
+ "tranImageSeqNo" = transeqno_from_seq::numeric
+ where "backendPid" = pg_backend_pid()
+ and "tranStart" = transaction_timestamp();
+
+ return null; -- AFTER trigger; value doesn't matter.
+end;
+$FinishDbTranSummaryRow$;
+
+create constraint trigger finish_tran_summary
+ after insert on dbtran."DbTranSummary"
+ deferrable initially deferred
+ for each row execute procedure dbtran."FinishDbTranSummaryRow"();
+
+create or replace function "DbTranOpInsert"("op" "OperationT", "table" "TableNameT")
+ returns "LogRecordSeqNoT"
+ language plpgsql
+ security definer
+as $DbTranOpInsert$
+declare
+ "seqno" "LogRecordSeqNoT";
+begin
+ select coalesce(max("logRecordSeqNo"),0) + 1
+ into "seqno"
+ from dbtran."DbTranOp"
+ where "backendPid" = pg_backend_pid()
+ and "tranStart" = transaction_timestamp();
+ insert into dbtran."DbTranOp"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "operation", "tableName")
+ values
+ (dbtran."GetTranCountyNo"(),
+ pg_catalog.pg_backend_pid(),
+ pg_catalog.transaction_timestamp(),
+ "seqno", "op", "table");
+ return seqno;
+end;
+$DbTranOpInsert$;
+
+create or replace function "DbTranOpValueInsertText"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "text" text
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertText$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "textValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "text");
+end;
+$DbTranOpValueInsertText$;
+
+create or replace function "DbTranOpValueInsertInt"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "int" bigint
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertInt$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "intValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "int");
+end;
+$DbTranOpValueInsertInt$;
+
+create or replace function "DbTranOpValueInsertNumeric"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "numeric" numeric
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertNumeric$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "numericValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "numeric");
+end;
+$DbTranOpValueInsertNumeric$;
+
+create or replace function "DbTranOpValueInsertBinary"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "binary" bytea
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertBinary$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "binaryValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "binary");
+end;
+$DbTranOpValueInsertBinary$;
+
+create or replace function "DbTranOpValueInsertBoolean"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "bool" "BooleanT"
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertBoolean$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "booleanValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "bool");
+end;
+$DbTranOpValueInsertBoolean$;
+
+create or replace function "DbTranOpValueInsertDate"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "date" "DateT"
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertDate$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "dateValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "date");
+end;
+$DbTranOpValueInsertDate$;
+
+create or replace function "DbTranOpValueInsertTime"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "time" "TimeT"
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertTime$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "timeValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "time");
+end;
+$DbTranOpValueInsertTime$;
+
+create or replace function "DbTranOpValueInsertTimestamp"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "ts" "TimestampT"
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertTimestamp$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "timestampValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "ts");
+end;
+$DbTranOpValueInsertTimestamp$;
+
+reset role;
+create or replace function capture_replication_data()
+ returns trigger
+ language C
+ security definer
+ as 'dbtran', 'capture_replication_data';
+set role ccowner;
+set search_path = public;
+
diff --git a/contrib/dbtran/dbtran--unpackaged--1.0.sql b/contrib/dbtran/dbtran--unpackaged--1.0.sql
new file mode 100644
index 0000000000..09988453d3
--- /dev/null
+++ b/contrib/dbtran/dbtran--unpackaged--1.0.sql
@@ -0,0 +1,76 @@
+/* contrib/tablefunc/tablefunc--unpackaged--1.0.sql */
+
+ALTER EXTENSION dbtran ADD table "DbTranSummary";
+ALTER EXTENSION dbtran ADD table "DbTranOp";
+ALTER EXTENSION dbtran ADD table "DbTranOpValue";
+ALTER EXTENSION dbtran ADD table "DbTranLock";
+ALTER EXTENSION dbtran ADD sequence "DbTranSeq";
+ALTER EXTENSION dbtran ADD function "GetTranCountyNo"();
+ALTER EXTENSION dbtran ADD function "EnsureDbTranSummaryRowExists"();
+ALTER EXTENSION dbtran ADD function "DbTranSummarySetInfo"
+ (
+ "userId" text,
+ "queryName" text,
+ "sourceRef" text,
+ "functionalArea" text
+ );
+ALTER EXTENSION dbtran ADD function "FinishDbTranSummaryRow"()
+ALTER EXTENSION dbtran ADD constraint trigger finish_tran_summary
+ALTER EXTENSION dbtran ADD function "DbTranOpInsert"("op" "OperationT", "table" "TableNameT")
+ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertText"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "text" text
+ );
+ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertInt"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "int" bigint
+ );
+ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertNumeric"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "numeric" numeric
+ );
+ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertBinary"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "binary" bytea
+ );
+ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertBoolean"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "bool" "BooleanT"
+ );
+ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertDate"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "date" "DateT"
+ );
+ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertTime"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "time" "TimeT"
+ );
+ALTER EXTENSION dbtran ADD function "DbTranOpValueInsertTimestamp"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "ts" "TimestampT"
+ );
+ALTER EXTENSION dbtran ADD function capture_replication_data();
diff --git a/contrib/dbtran/dbtran--unpackaged.sql b/contrib/dbtran/dbtran--unpackaged.sql
new file mode 100644
index 0000000000..83372bf319
--- /dev/null
+++ b/contrib/dbtran/dbtran--unpackaged.sql
@@ -0,0 +1,373 @@
+/* contrib/dbtran/dbtran--unpackaged.sql */
+
+--
+-- Set up dbtran in a pre-9.1 unpackaged environment.
+--
+
+create schema dbtran;
+revoke create on schema dbtran from public;
+grant usage, create on schema dbtran to ccowner;
+grant usage on schema dbtran to cc, viewer;
+
+set role ccowner;
+set search_path = dbtran, public;
+
+create domain "LogRecordSeqNoT" integer;
+create domain "OperationT" varchar(1);
+create domain "TableNameT" varchar(18);
+create domain "ColumnNameT" varchar(18);
+
+drop table if exists "DbTranSummary";
+drop table if exists "DbTranSummary";
+create table "DbTranSummary"
+(
+ "countyNo" "CountyNoT" not null,
+ "backendPid" int not null,
+ "tranStart" "TimestampT" not null,
+ "userId" "UserIdT",
+ "queryName" "QueryNameT",
+ "sourceRef" varchar(255),
+ "functionalArea" varchar(50),
+ "tranEnd" "TimestampT",
+ "tranImageSeqNo" "TranImageSeqNoT",
+ "runDuration" float,
+ primary key ("backendPid", "tranStart"),
+ unique ("tranImageSeqNo")
+);
+grant select on dbtran."DbTranSummary" to cc, viewer;
+
+drop table if exists "DbTranOp";
+drop table if exists "DbTranOp";
+create table "DbTranOp"
+(
+ "countyNo" "CountyNoT" not null,
+ "backendPid" int not null,
+ "tranStart" "TimestampT" not null,
+ "logRecordSeqNo" "LogRecordSeqNoT" not null,
+ "operation" "OperationT" not null,
+ "tableName" "TableNameT" not null,
+ primary key ("backendPid", "tranStart", "logRecordSeqNo")
+);
+grant select on dbtran."DbTranOp" to cc, viewer;
+
+drop table if exists "DbTranOpValue";
+drop table if exists "DbTranOpValue";
+create table "DbTranOpValue"
+(
+ "countyNo" "CountyNoT" not null,
+ "backendPid" int not null,
+ "tranStart" "TimestampT" not null,
+ "logRecordSeqNo" "LogRecordSeqNoT" not null,
+ "columnName" "ColumnNameT" not null,
+ "isAfter" "BooleanT" not null,
+ "textValue" text,
+ "intValue" bigint,
+ "numericValue" numeric,
+ "binaryValue" bytea,
+ "booleanValue" "BooleanT",
+ "dateValue" "DateT",
+ "timeValue" "TimeT",
+ "timestampValue" "TimestampT",
+ primary key ("backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter")
+);
+grant select on dbtran."DbTranOpValue" to cc;
+
+drop table if exists "DbTranLock";
+drop table if exists "DbTranLock";
+create table "DbTranLock"
+(
+ "dummy" int
+);
+grant select on dbtran."DbTranLock" to cc, viewer;
+
+create sequence "DbTranSeq";
+grant select on sequence "DbTranSeq" to cc, viewer;
+
+
+create or replace function "GetTranCountyNo"()
+ returns "CountyNoT"
+ language sql
+ stable
+as $GetTranCountyNo$
+ select "countyNo" from public."ControlRecord" order by "countyNo" limit 1;
+$GetTranCountyNo$;
+grant execute on function "GetTranCountyNo"() to cc, viewer;
+
+create or replace function "EnsureDbTranSummaryRowExists"()
+ returns void
+ language plpgsql
+ security definer
+as $EnsureDbTranSummaryRowExists$
+begin
+ if not exists (select * from dbtran."DbTranSummary"
+ where "backendPid" = pg_backend_pid()
+ and "tranStart" = transaction_timestamp())
+ then
+ insert into dbtran."DbTranSummary"
+ ("countyNo", "backendPid", "tranStart")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp());
+ end if;
+end;
+$EnsureDbTranSummaryRowExists$;
+
+create or replace function "DbTranSummarySetInfo"
+ (
+ "userId" text,
+ "queryName" text,
+ "sourceRef" text,
+ "functionalArea" text
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranSummarySetInfo$
+begin
+ perform dbtran."EnsureDbTranSummaryRowExists"();
+ update dbtran."DbTranSummary"
+ set "userId" = $1,
+ "queryName" = $2,
+ "sourceRef" = $3,
+ "functionalArea" = $4
+ where "backendPid" = pg_backend_pid()
+ and "tranStart" = transaction_timestamp();
+end;
+$DbTranSummarySetInfo$;
+grant execute on function "DbTranSummarySetInfo"
+ (
+ "userId" text,
+ "queryName" text,
+ "sourceRef" text,
+ "functionalArea" text
+ ) to cc;
+
+create or replace function "FinishDbTranSummaryRow"()
+ returns trigger
+ language plpgsql
+ security definer
+as $FinishDbTranSummaryRow$
+declare
+ endtime timestamp with time zone;
+ transeqno_from_time bigint;
+ transeqno_from_seq bigint;
+begin
+
+ if tg_when <> 'AFTER' or tg_op <> 'INSERT' or tg_level <> 'ROW' then
+ raise exception 'FinishDbTranSummaryRow: must be run as a trigger after insert to row';
+ end if;
+
+ if tg_table_name <> 'DbTranSummary' or tg_table_schema <> 'dbtran' then
+ raise exception 'FinishDbTranSummaryRow: may only be use for a trigger on dbtran."DbTranSummary"';
+ end if;
+
+ -- We want the larger of the last seqno + 1 or the current timestamp.
+ lock table dbtran."DbTranLock" in exclusive mode;
+ endtime := clock_timestamp();
+ transeqno_from_time := floor(extract(epoch from endtime) * 1000);
+ transeqno_from_seq = nextval('"DbTranSeq"');
+ if transeqno_from_time > transeqno_from_seq then
+ perform setval('"DbTranSeq"', transeqno_from_time);
+ transeqno_from_seq := transeqno_from_time;
+ end if;
+
+ update dbtran."DbTranSummary"
+ set "tranEnd" = endtime,
+ "runDuration" = extract(microsecond from (endtime - transaction_timestamp())) / 1000.0::float,
+ "tranImageSeqNo" = transeqno_from_seq::numeric
+ where "backendPid" = pg_backend_pid()
+ and "tranStart" = transaction_timestamp();
+
+ return null; -- AFTER trigger; value doesn't matter.
+end;
+$FinishDbTranSummaryRow$;
+
+create constraint trigger finish_tran_summary
+ after insert on dbtran."DbTranSummary"
+ deferrable initially deferred
+ for each row execute procedure dbtran."FinishDbTranSummaryRow"();
+
+create or replace function "DbTranOpInsert"("op" "OperationT", "table" "TableNameT")
+ returns "LogRecordSeqNoT"
+ language plpgsql
+ security definer
+as $DbTranOpInsert$
+declare
+ "seqno" "LogRecordSeqNoT";
+begin
+ select coalesce(max("logRecordSeqNo"),0) + 1
+ into "seqno"
+ from dbtran."DbTranOp"
+ where "backendPid" = pg_backend_pid()
+ and "tranStart" = transaction_timestamp();
+ insert into dbtran."DbTranOp"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "operation", "tableName")
+ values
+ (dbtran."GetTranCountyNo"(),
+ pg_catalog.pg_backend_pid(),
+ pg_catalog.transaction_timestamp(),
+ "seqno", "op", "table");
+ return seqno;
+end;
+$DbTranOpInsert$;
+
+create or replace function "DbTranOpValueInsertText"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "text" text
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertText$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "textValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "text");
+end;
+$DbTranOpValueInsertText$;
+
+create or replace function "DbTranOpValueInsertInt"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "int" bigint
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertInt$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "intValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "int");
+end;
+$DbTranOpValueInsertInt$;
+
+create or replace function "DbTranOpValueInsertNumeric"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "numeric" numeric
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertNumeric$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "numericValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "numeric");
+end;
+$DbTranOpValueInsertNumeric$;
+
+create or replace function "DbTranOpValueInsertBinary"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "binary" bytea
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertBinary$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "binaryValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "binary");
+end;
+$DbTranOpValueInsertBinary$;
+
+create or replace function "DbTranOpValueInsertBoolean"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "bool" "BooleanT"
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertBoolean$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "booleanValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "bool");
+end;
+$DbTranOpValueInsertBoolean$;
+
+create or replace function "DbTranOpValueInsertDate"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "date" "DateT"
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertDate$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "dateValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "date");
+end;
+$DbTranOpValueInsertDate$;
+
+create or replace function "DbTranOpValueInsertTime"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "time" "TimeT"
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertTime$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "timeValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "time");
+end;
+$DbTranOpValueInsertTime$;
+
+create or replace function "DbTranOpValueInsertTimestamp"
+ (
+ "seqno" "LogRecordSeqNoT",
+ "column" "ColumnNameT",
+ "after" "BooleanT",
+ "ts" "TimestampT"
+ )
+ returns void
+ language plpgsql
+ security definer
+as $DbTranOpValueInsertTimestamp$
+begin
+ insert into dbtran."DbTranOpValue"
+ ("countyNo", "backendPid", "tranStart", "logRecordSeqNo", "columnName", "isAfter", "timestampValue")
+ values
+ ("GetTranCountyNo"(), pg_backend_pid(), transaction_timestamp(), "seqno", "column", "after", "ts");
+end;
+$DbTranOpValueInsertTimestamp$;
+
+reset role;
+create or replace function capture_replication_data()
+ returns trigger
+ language C
+ security definer
+ as 'dbtran', 'capture_replication_data';
+set role ccowner;
+set search_path = public;
+
diff --git a/contrib/dbtran/dbtran.c b/contrib/dbtran/dbtran.c
new file mode 100644
index 0000000000..7ba9f05039
--- /dev/null
+++ b/contrib/dbtran/dbtran.c
@@ -0,0 +1,416 @@
+/*-------------------------------------------------------------------------
+ *
+ * dbtran.c
+ * C code for capturing replication data in DbTranXxx tables.
+ *
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_type.h"
+#include "commands/trigger.h"
+#include "executor/spi.h"
+#include "utils/builtins.h"
+#include "utils/datum.h"
+#include "utils/syscache.h"
+
+PG_MODULE_MAGIC;
+
+
+static void generate_dbtran_plans();
+static void write_one_dbtran_value(Datum value, const TupleDesc tupdesc, int i,
+ Datum *args, const char *nulls);
+extern Datum capture_replication_data(PG_FUNCTION_ARGS);
+
+
+
+/* ----------
+ * Global data
+ * ----------
+ */
+static SPIPlanPtr plan_ensuredbtransummary = NULL;
+static const char *query_ensuredbtransummary = "SELECT \"EnsureDbTranSummaryRowExists\"()";
+static SPIPlanPtr plan_insertop = NULL;
+static const char *query_insertop = "SELECT \"DbTranOpInsert\"($1, $2)";
+static SPIPlanPtr plan_insertvaltext = NULL;
+static const char *query_insertvaltext = "SELECT \"DbTranOpValueInsertText\"($1, $2, $3, $4)";
+static SPIPlanPtr plan_insertvalint = NULL;
+static const char *query_insertvalint = "SELECT \"DbTranOpValueInsertInt\"($1, $2, $3, $4)";
+static SPIPlanPtr plan_insertvalnumeric = NULL;
+static const char *query_insertvalnumeric = "SELECT \"DbTranOpValueInsertNumeric\"($1, $2, $3, $4)";
+static SPIPlanPtr plan_insertvalbinary = NULL;
+static const char *query_insertvalbinary = "SELECT \"DbTranOpValueInsertBinary\"($1, $2, $3, $4)";
+static SPIPlanPtr plan_insertvalboolean = NULL;
+static const char *query_insertvalboolean = "SELECT \"DbTranOpValueInsertBoolean\"($1, $2, $3, $4)";
+static SPIPlanPtr plan_insertvaldate = NULL;
+static const char *query_insertvaldate = "SELECT \"DbTranOpValueInsertDate\"($1, $2, $3, $4)";
+static SPIPlanPtr plan_insertvaltime = NULL;
+static const char *query_insertvaltime = "SELECT \"DbTranOpValueInsertTime\"($1, $2, $3, $4)";
+static SPIPlanPtr plan_insertvaltimestamp = NULL;
+static const char *query_insertvaltimestamp = "SELECT \"DbTranOpValueInsertTimestamp\"($1, $2, $3, $4)";
+
+#define VALUE_BEFORE BoolGetDatum(false)
+#define VALUE_AFTER BoolGetDatum(true)
+
+#define dbtran_plans_missing() (plan_insertvaltimestamp == NULL)
+
+static void
+generate_dbtran_plans()
+{
+ Oid argtypes[4];
+ SPIPlanPtr plan;
+
+ if (plan_ensuredbtransummary == NULL)
+ {
+ plan = SPI_prepare(query_ensuredbtransummary, 0, NULL);
+ if (plan == NULL)
+ elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_ensuredbtransummary);
+ plan_ensuredbtransummary = SPI_saveplan(plan);
+ }
+
+ if (plan_insertop == NULL)
+ {
+ argtypes[0] = TEXTOID;
+ argtypes[1] = TEXTOID;
+ plan = SPI_prepare(query_insertop, 2, argtypes);
+ if (plan == NULL)
+ elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertop);
+ plan_insertop = SPI_saveplan(plan);
+ }
+
+ if (plan_insertvaltext == NULL)
+ {
+ argtypes[0] = INT4OID;
+ argtypes[1] = TEXTOID;
+ argtypes[2] = BOOLOID;
+ argtypes[3] = TEXTOID;
+ plan = SPI_prepare(query_insertvaltext, 4, argtypes);
+ if (plan == NULL)
+ elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvaltext);
+ plan_insertvaltext = SPI_saveplan(plan);
+ }
+
+ if (plan_insertvalint == NULL)
+ {
+ argtypes[0] = INT4OID;
+ argtypes[1] = TEXTOID;
+ argtypes[2] = BOOLOID;
+ argtypes[3] = INT8OID;
+ plan = SPI_prepare(query_insertvalint, 4, argtypes);
+ if (plan == NULL)
+ elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvalint);
+ plan_insertvalint = SPI_saveplan(plan);
+ }
+
+ if (plan_insertvalnumeric == NULL)
+ {
+ argtypes[0] = INT4OID;
+ argtypes[1] = TEXTOID;
+ argtypes[2] = BOOLOID;
+ argtypes[3] = NUMERICOID;
+ plan = SPI_prepare(query_insertvalnumeric, 4, argtypes);
+ if (plan == NULL)
+ elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvalnumeric);
+ plan_insertvalnumeric = SPI_saveplan(plan);
+ }
+
+ if (plan_insertvalbinary == NULL)
+ {
+ argtypes[0] = INT4OID;
+ argtypes[1] = TEXTOID;
+ argtypes[2] = BOOLOID;
+ argtypes[3] = BYTEAOID;
+ plan = SPI_prepare(query_insertvalbinary, 4, argtypes);
+ if (plan == NULL)
+ elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvalbinary);
+ plan_insertvalbinary = SPI_saveplan(plan);
+ }
+
+ if (plan_insertvalboolean == NULL)
+ {
+ argtypes[0] = INT4OID;
+ argtypes[1] = TEXTOID;
+ argtypes[2] = BOOLOID;
+ argtypes[3] = BOOLOID;
+ plan = SPI_prepare(query_insertvalboolean, 4, argtypes);
+ if (plan == NULL)
+ elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvalboolean);
+ plan_insertvalboolean = SPI_saveplan(plan);
+ }
+
+ if (plan_insertvaldate == NULL)
+ {
+ argtypes[0] = INT4OID;
+ argtypes[1] = TEXTOID;
+ argtypes[2] = BOOLOID;
+ argtypes[3] = DATEOID;
+ plan = SPI_prepare(query_insertvaldate, 4, argtypes);
+ if (plan == NULL)
+ elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvaldate);
+ plan_insertvaldate = SPI_saveplan(plan);
+ }
+
+ if (plan_insertvaltime == NULL)
+ {
+ argtypes[0] = INT4OID;
+ argtypes[1] = TEXTOID;
+ argtypes[2] = BOOLOID;
+ argtypes[3] = TIMEOID;
+ plan = SPI_prepare(query_insertvaltime, 4, argtypes);
+ if (plan == NULL)
+ elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvaltime);
+ plan_insertvaltime = SPI_saveplan(plan);
+ }
+
+ if (plan_insertvaltimestamp == NULL)
+ {
+ argtypes[0] = INT4OID;
+ argtypes[1] = TEXTOID;
+ argtypes[2] = BOOLOID;
+ argtypes[3] = TIMESTAMPTZOID;
+ plan = SPI_prepare(query_insertvaltimestamp, 4, argtypes);
+ if (plan == NULL)
+ elog(ERROR, "capture_replication_data: SPI_prepare failed for \"%s\"", query_insertvaltimestamp);
+ plan_insertvaltimestamp = SPI_saveplan(plan);
+ }
+}
+
+static void
+write_one_dbtran_value(Datum value, const TupleDesc tupdesc, int i,
+ Datum *args, const char *nulls)
+{
+ int spirc;
+ HeapTuple tuple;
+ Form_pg_type typeTup;
+ Oid typid = tupdesc->attrs[i]->atttypid;
+
+ tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "capture_replication_data: cache lookup failed for type %u", typid);
+
+ typeTup = (Form_pg_type) GETSTRUCT(tuple);
+ if (typeTup->typbasetype != 0)
+ typid = typeTup->typbasetype;
+
+ ReleaseSysCache(tuple);
+
+ switch (typid)
+ {
+ case TEXTOID:
+ args[3] = value;
+ spirc = SPI_execp(plan_insertvaltext, args, nulls, 4);
+ break;
+ case VARCHAROID:
+ case CHAROID:
+ args[3] = PointerGetDatum(value);
+ spirc = SPI_execp(plan_insertvaltext, args, nulls, 4);
+ break;
+ case INT8OID:
+ args[3] = value;
+ spirc = SPI_execp(plan_insertvalint, args, nulls, 4);
+ break;
+ case INT2OID:
+ args[3] = Int64GetDatum(DatumGetInt16(value));
+ spirc = SPI_execp(plan_insertvalint, args, nulls, 4);
+ break;
+ case INT4OID:
+ args[3] = Int64GetDatum(DatumGetInt32(value));
+ spirc = SPI_execp(plan_insertvalint, args, nulls, 4);
+ break;
+ case NUMERICOID:
+ args[3] = value;
+ spirc = SPI_execp(plan_insertvalnumeric, args, nulls, 4);
+ break;
+ case BYTEAOID:
+ args[3] = value;
+ spirc = SPI_execp(plan_insertvalbinary, args, nulls, 4);
+ break;
+ case BOOLOID:
+ args[3] = value;
+ spirc = SPI_execp(plan_insertvalboolean, args, nulls, 4);
+ break;
+ case DATEOID:
+ args[3] = value;
+ spirc = SPI_execp(plan_insertvaldate, args, nulls, 4);
+ break;
+ case TIMEOID:
+ args[3] = value;
+ spirc = SPI_execp(plan_insertvaltime, args, nulls, 4);
+ break;
+ case TIMESTAMPTZOID:
+ args[3] = value;
+ spirc = SPI_execp(plan_insertvaltimestamp, args, nulls, 4);
+ break;
+ default:
+ elog(ERROR, "capture_replication_data: unrecognized type");
+ spirc = -1; /* quiet compiler */
+ }
+
+ if (spirc < 0)
+ elog(ERROR, "capture_replication_data: SPI error %i while writing value", spirc);
+}
+
+
+PG_FUNCTION_INFO_V1(capture_replication_data);
+
+Datum
+capture_replication_data(PG_FUNCTION_ARGS)
+{
+ TriggerData *trigdata = (TriggerData *) fcinfo->context;
+ int rc;
+ char operation[2];
+ Trigger *trigger;
+ int nargs;
+ HeapTuple newtuple,
+ oldtuple;
+ Relation rel;
+ TupleDesc tupdesc;
+ int spirc;
+ Datum args[4];
+ char nulls[4];
+ bool isnull;
+ int i;
+
+ /* make sure it's called as a trigger */
+ if (!CALLED_AS_TRIGGER(fcinfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
+ errmsg("capture_replication_data: must be called as trigger")));
+
+ /* and that it's called after triggering DML */
+ if (TRIGGER_FIRED_BEFORE(trigdata->tg_event))
+ ereport(ERROR,
+ (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
+ errmsg("capture_replication_data: must be called after triggering action")));
+
+ /* and that it's called for each row */
+ if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
+ ereport(ERROR,
+ (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
+ errmsg("capture_replication_data: must be called for each row")));
+
+ operation[1] = '\0';
+ oldtuple = NULL;
+ newtuple = NULL;
+ if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
+ {
+ operation[0] = 'I';
+ newtuple = trigdata->tg_trigtuple;
+ }
+ else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
+ {
+ operation[0] = 'U';
+ oldtuple = trigdata->tg_trigtuple;
+ newtuple = trigdata->tg_newtuple;
+ }
+ else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
+ {
+ operation[0] = 'D';
+ oldtuple = trigdata->tg_trigtuple;
+ }
+ else
+ {
+ elog(ERROR, "capture_replication_data: trigger fired by unrecognized operation");
+ }
+
+ trigger = trigdata->tg_trigger;
+ nargs = trigger->tgnargs;
+ if (nargs > 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
+ errmsg("capture_replication_data: must not be called with any parameters")));
+
+ /* get relation data */
+ rel = trigdata->tg_relation;
+ tupdesc = rel->rd_att;
+
+ /* Connect to the SPI manager. */
+ if ((rc = SPI_connect()) < 0)
+ elog(ERROR, "capture_replication_data: SPI_connect() failed");
+
+ /* Make sure plans are all prepared. */
+ if (dbtran_plans_missing())
+ generate_dbtran_plans();
+
+ /* Make sure we've got a summary row. */
+ spirc = SPI_execp(plan_ensuredbtransummary, 0, NULL, 0);
+ if (spirc != SPI_OK_SELECT)
+ elog(ERROR, "capture_replication_data: failed to insert transaction summary");
+
+ /* Insert a new operation row. */
+ args[0] = CStringGetTextDatum(operation);
+ args[1] = CStringGetTextDatum(RelationGetRelationName(rel));
+ spirc = SPI_execp(plan_insertop, args, NULL, 2);
+ if (spirc != SPI_OK_SELECT || SPI_processed != 1)
+ elog(ERROR, "capture_replication_data: failed to insert transaction operation");
+
+ /* Set those things which will be the same for all value roes below. */
+ args[0] = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);
+ Assert(!isnull);
+ nulls[0] = ' ';
+ nulls[1] = ' ';
+ nulls[2] = ' ';
+
+ for (i = 0; i < tupdesc->natts; i++)
+ {
+ Datum before;
+ bool beforeIsNull;
+ Datum after;
+ bool afterIsNull;
+
+ /* Skip dropped columns. */
+ if (tupdesc->attrs[i]->attisdropped)
+ continue;
+
+ /* We need column name regardless of operation. */
+ args[1] = CStringGetTextDatum(SPI_fname(tupdesc, i + 1));
+
+ /* If there is a non-null "before" value, write it. */
+ if (operation[0] == 'U' || operation[0] == 'D')
+ {
+ before = SPI_getbinval(oldtuple, tupdesc, i + 1, &beforeIsNull);
+ if (!beforeIsNull)
+ {
+ args[2] = VALUE_BEFORE;
+ nulls[3] = ' ';
+ write_one_dbtran_value(before, tupdesc, i, args, nulls);
+ }
+ }
+ else
+ {
+ before = BoolGetDatum(true); /* quiet compiler warning */
+ beforeIsNull = true;
+ }
+
+ /* If there is a non-null on insert or a different value on update, write it. */
+ if (operation[0] == 'I' || operation[0] == 'U')
+ {
+ after = SPI_getbinval(newtuple, tupdesc, i + 1, &afterIsNull);
+ if (afterIsNull && beforeIsNull)
+ continue;
+ if (afterIsNull != beforeIsNull)
+ {
+ args[2] = VALUE_AFTER;
+ nulls[3] = afterIsNull ? 'n' : ' ';
+ write_one_dbtran_value(after, tupdesc, i, args, nulls);
+ continue;
+ }
+ /* Update with before and after both not null. Are they different? */
+ if (!datumIsEqual(before, after, tupdesc->attrs[i]->attbyval,
+ tupdesc->attrs[i]->attlen))
+ {
+ args[2] = VALUE_AFTER;
+ nulls[3] = ' ';
+ write_one_dbtran_value(after, tupdesc, i, args, nulls);
+ }
+ }
+ }
+
+ SPI_finish();
+
+ return PointerGetDatum(NULL);
+}
diff --git a/contrib/dbtran/dbtran.control b/contrib/dbtran/dbtran.control
new file mode 100644
index 0000000000..792056f464
--- /dev/null
+++ b/contrib/dbtran/dbtran.control
@@ -0,0 +1,6 @@
+# dbtran extension
+comment = 'capture triggers for DbTranXxx tables'
+default_version = '1.0'
+module_pathname = '$libdir/dbtran'
+relocatable = false
+schema = dbtran
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 4399493625..423b56ac4c 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -1712,6 +1712,11 @@ heap_drop_with_catalog(Oid relid)
}
/*
+ * Clean up any remaining predicate locks on the relation.
+ */
+ DropAllPredicateLocksFromTable(rel);
+
+ /*
* Close relcache entry, but *keep* AccessExclusiveLock on the relation
* until transaction commit. This ensures no one else will try to do
* something with the doomed relation.
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index 8e7a7f001d..c324bceef5 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -194,6 +194,7 @@
#include "storage/procarray.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
+#include "utils/syscache.h"
#include "utils/tqual.h"
/* Uncomment the next line to test the graceful degradation code. */
diff --git a/src/backend/utils/adt/trigfuncs.c b/src/backend/utils/adt/trigfuncs.c
index 474878de7d..fa3ada8ba0 100644
--- a/src/backend/utils/adt/trigfuncs.c
+++ b/src/backend/utils/adt/trigfuncs.c
@@ -13,8 +13,10 @@
*/
#include "postgres.h"
-#include "access/htup.h"
+#include "catalog/pg_type.h"
+#include "commands/async.h"
#include "commands/trigger.h"
+#include "executor/spi.h"
#include "utils/builtins.h"
#include "utils/rel.h"
@@ -94,3 +96,158 @@ suppress_redundant_updates_trigger(PG_FUNCTION_ARGS)
return PointerGetDatum(rettuple);
}
+
+
+/*
+ * Copy from s (for source) to r (for result), wrapping with q (quote)
+ * characters and doubling any quote characters found.
+ */
+static char *
+strcpy_quoted(char *r, const char *s, const char q)
+{
+ *r++ = q;
+ while (*s)
+ {
+ if (*s == q)
+ *r++ = q;
+ *r++ = *s;
+ s++;
+ }
+ *r++ = q;
+ return r;
+}
+
+/*
+ * triggered_change_notification
+ *
+ * This trigger function will send a notification of data modification with
+ * primary key values. The channel will be "tcn" unless the trigger is
+ * created with a parameter, in which case that parameter will be used.
+ */
+Datum
+triggered_change_notification(PG_FUNCTION_ARGS)
+{
+ TriggerData *trigdata = (TriggerData *) fcinfo->context;
+ Trigger *trigger;
+ int nargs;
+ HeapTuple trigtuple;
+ Relation rel;
+ TupleDesc tupdesc;
+ char *channel;
+ char operation;
+ bool foundPK;
+ char *payload = palloc(8 * 1024);
+
+ List *indexoidlist;
+ ListCell *indexoidscan;
+
+ /* make sure it's called as a trigger */
+ if (!CALLED_AS_TRIGGER(fcinfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
+ errmsg("triggered_change_notification: must be called as trigger")));
+
+ /* and that it's called after the change */
+ if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
+ ereport(ERROR,
+ (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
+ errmsg("triggered_change_notification: must be called after the change")));
+
+ /* and that it's called for each row */
+ if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
+ ereport(ERROR,
+ (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
+ errmsg("triggered_change_notification: must be called for each row")));
+
+ if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
+ operation = 'I';
+ else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
+ operation = 'U';
+ else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
+ operation = 'D';
+ else
+ {
+ elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
+ operation = 'X'; /* silence compiler warning */
+ }
+
+ trigger = trigdata->tg_trigger;
+ nargs = trigger->tgnargs;
+ if (nargs > 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
+ errmsg("triggered_change_notification: must not be called with more than one parameter")));
+
+ if (nargs == 0)
+ channel = "tcn";
+ else
+ channel = trigger->tgargs[0];
+
+ /* get tuple data */
+ trigtuple = trigdata->tg_trigtuple;
+ rel = trigdata->tg_relation;
+ tupdesc = rel->rd_att;
+
+ foundPK = false;
+
+ /*
+ * Get the list of index OIDs for the table from the relcache, and look up
+ * each one in the pg_index syscache until we find one marked primary key
+ * (hopefully there isn't more than one such).
+ */
+ indexoidlist = RelationGetIndexList(rel);
+
+ foreach(indexoidscan, indexoidlist)
+ {
+ Oid indexoid = lfirst_oid(indexoidscan);
+ HeapTuple indexTuple;
+ Form_pg_index index;
+
+ indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+ if (!HeapTupleIsValid(indexTuple)) /* should not happen */
+ elog(ERROR, "triggered_change_notification: cache lookup failed for index %u", indexoid);
+ index = (Form_pg_index) GETSTRUCT(indexTuple);
+ /* we're only interested if it is the primary key */
+ if (index->indisprimary)
+ {
+ int numatts = index->indnatts;
+ char *p;
+
+ if (numatts > 0)
+ {
+ int i;
+
+ foundPK = true;
+
+ p = strcpy_quoted(payload, RelationGetRelationName(rel), '"');
+ *p++ = ',';
+ *p++ = operation;
+
+ for (i = 0; i < numatts; i++)
+ {
+ int colno = index->indkey.values[i];
+
+ *p++ = ',';
+ p = strcpy_quoted(p, NameStr(tupdesc->attrs[colno - 1]->attname), '"');
+ *p++ = '=';
+ p = strcpy_quoted(p, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
+ }
+ *p = '\0';
+
+ Async_Notify(channel, payload);
+ }
+ ReleaseSysCache(indexTuple);
+ break;
+ }
+ ReleaseSysCache(indexTuple);
+ }
+
+ list_free(indexoidlist);
+
+ if (!foundPK)
+ ereport(ERROR,
+ (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
+ errmsg("triggered_change_notification: must be called on a table with a primary key")));
+
+ return PointerGetDatum(NULL); /* after trigger; value doesn't matter */
+}
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 96f43fe0b1..c53c1b2760 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -1299,6 +1299,8 @@ DESCR("convert oid to int8");
DATA(insert OID = 1291 ( suppress_redundant_updates_trigger PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ suppress_redundant_updates_trigger _null_ _null_ _null_ ));
DESCR("trigger to suppress updates when new and old records match");
+DATA(insert OID = 2650 ( triggered_change_notification PGNSP PGUID 12 1 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ triggered_change_notification _null_ _null_ _null_ ));
+DESCR("trigger function to send change notification with primary key in payload");
DATA(insert OID = 1292 ( tideq PGNSP PGUID 12 1 0 0 0 f f f t f i 2 0 16 "27 27" _null_ _null_ _null_ _null_ tideq _null_ _null_ _null_ ));
DATA(insert OID = 1293 ( currtid PGNSP PGUID 12 1 0 0 0 f f f t f v 2 0 27 "26 27" _null_ _null_ _null_ _null_ currtid_byreloid _null_ _null_ _null_ ));
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 8a1c82ef72..7f5cedb112 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -964,6 +964,7 @@ extern Datum RI_FKey_setdefault_upd(PG_FUNCTION_ARGS);
/* trigfuncs.c */
extern Datum suppress_redundant_updates_trigger(PG_FUNCTION_ARGS);
+extern Datum triggered_change_notification(PG_FUNCTION_ARGS);
/* encoding support functions */
extern Datum getdatabaseencoding(PG_FUNCTION_ARGS);