From b82ea9800fb7eb56509db90c0405985f7b8f3bb2 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Wed, 12 Nov 2025 10:43:19 +0530 Subject: [PATCH] Add configurable conflict log table for Logical Replication This patch adds a feature to provide a structured, queryable record of all logical replication conflicts. The current approach of logging conflicts as plain text in the server logs makes it difficult to query, analyze, and use for external monitoring and automation. This patch addresses these limitations by introducing a configurable conflict_log_table option in the CREATE SUBSCRIPTION command. Key design decisions include: User-Managed Table: The conflict log is stored in a user-managed table rather than a system catalog. Structured Data: Conflict details, including the original and remote tuples, are stored in JSON columns, providing a flexible format to accommodate different table schemas. Comprehensive Information: The log table captures essential attributes such as local and remote transaction IDs, LSNs, commit timestamps, and conflict type, providing a complete record for post-mortem analysis. This feature will make logical replication conflicts easier to monitor and manage, significantly improving the overall resilience and operability of replication setups. The conflict log tables will not be included in a publication, even if the publication is configured to include ALL TABLES or ALL TABLES IN SCHEMA. Note: A single remote tuple may conflict with multiple local tuples when conflict type is CT_MULTIPLE_UNIQUE_CONFLICTS, so for handling this case we create a single row in conflict log table with respect to each remote conflict tuple even if it conflicts with multiple local tuples and we store the multiple conflict tuples as a single JSON array element in format as [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ] We can extract the elements of the local tuples from the conflict log table row as given in below example. SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid, local_conflicts[1] ->> 'tuple' AS local_tuple FROM myschema.conflict_log_history2; remote_xid | relname | remote_origin | local_xid | local_tuple ------------+----------+---------------+-----------+--------------------- 760 | test | pg_16406 | 771 | {"a":1,"b":10} 765 | conf_tab | pg_16406 | 775 | {"a":2,"b":2,"c":2} --- src/backend/catalog/pg_publication.c | 26 +- src/backend/commands/subscriptioncmds.c | 281 +++++++++- src/backend/replication/logical/conflict.c | 579 ++++++++++++++++++++- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/worker.c | 37 +- src/backend/utils/cache/lsyscache.c | 38 ++ src/bin/psql/describe.c | 24 +- src/bin/psql/tab-complete.in.c | 8 +- src/include/catalog/pg_subscription.h | 5 + src/include/commands/subscriptioncmds.h | 2 + src/include/replication/conflict.h | 32 ++ src/include/replication/worker_internal.h | 7 + src/include/utils/lsyscache.h | 1 + src/test/regress/expected/subscription.out | 360 +++++++++---- src/test/regress/sql/subscription.sql | 108 ++++ 15 files changed, 1381 insertions(+), 128 deletions(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 7aa3f1799240..9f84e02b7eff 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -31,6 +31,7 @@ #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/publicationcmds.h" +#include "commands/subscriptioncmds.h" #include "funcapi.h" #include "utils/array.h" #include "utils/builtins.h" @@ -85,6 +86,15 @@ check_publication_add_relation(Relation targetrel) errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("This operation is not supported for unlogged tables."))); + + /* Can't be conflict log table */ + if (IsConflictLogTable(RelationGetRelid(targetrel))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s.%s\" to publication", + get_namespace_name(RelationGetNamespace(targetrel)), + RelationGetRelationName(targetrel)), + errdetail("This operation is not supported for conflict log tables."))); } /* @@ -145,6 +155,13 @@ is_publishable_class(Oid relid, Form_pg_class reltuple) /* * Another variant of is_publishable_class(), taking a Relation. + * + * Note: Conflict log tables are not publishable. However, we intentionally + * skip this check here because this function is called for every change and + * performing this check during every change publication is costly. To ensure + * unpublishable entries are ignored without incurring performance overhead, + * tuples inserted into the conflict log table uses the HEAP_INSERT_NO_LOGICAL + * flag. This allows the decoding layer to bypass these entries automatically. */ bool is_publishable_relation(Relation rel) @@ -169,7 +186,10 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); if (!HeapTupleIsValid(tuple)) PG_RETURN_NULL(); - result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)); + + /* Subscription conflict log tables are not published */ + result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)) && + !IsConflictLogTable(relid); ReleaseSysCache(tuple); PG_RETURN_BOOL(result); } @@ -890,7 +910,9 @@ GetAllPublicationRelations(char relkind, bool pubviaroot) Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; + /* Subscription conflict log tables are not published */ if (is_publishable_class(relid, relForm) && + !IsConflictLogTable(relid) && !(relForm->relispartition && pubviaroot)) result = lappend_oid(result, relid); } @@ -1018,7 +1040,7 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) Oid relid = relForm->oid; char relkind; - if (!is_publishable_class(relid, relForm)) + if (!is_publishable_class(relid, relForm) || IsConflictLogTable(relid)) continue; relkind = get_rel_relkind(relid); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index abbcaff0838b..eb3fe068ddbe 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -15,16 +15,19 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "access/heapam.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" +#include "catalog/heap.h" #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/pg_am_d.h" #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" #include "catalog/pg_subscription.h" @@ -34,9 +37,12 @@ #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "executor/spi.h" +#include "funcapi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" +#include "replication/conflict.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "replication/origin.h" @@ -47,10 +53,12 @@ #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" +#include "utils/regproc.h" #include "utils/syscache.h" /* @@ -75,6 +83,7 @@ #define SUBOPT_MAX_RETENTION_DURATION 0x00008000 #define SUBOPT_LSN 0x00010000 #define SUBOPT_ORIGIN 0x00020000 +#define SUBOPT_CONFLICT_LOG_TABLE 0x00040000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -103,6 +112,7 @@ typedef struct SubOpts bool retaindeadtuples; int32 maxretention; char *origin; + char *conflictlogtable; XLogRecPtr lsn; } SubOpts; @@ -135,7 +145,9 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel); - +static void create_conflict_log_table(Oid namespaceId, char *conflictrel, + Oid subid); +static void drop_conflict_log_table(Oid namespaceId, char *conflictrel); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -191,6 +203,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->maxretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_TABLE)) + opts->conflictlogtable = NULL; /* Parse options */ foreach(lc, stmt_options) @@ -402,6 +416,19 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_TABLE) && + strcmp(defel->defname, "conflict_log_table") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_TABLE)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_CONFLICT_LOG_TABLE; + opts->conflictlogtable = defGetString(defel); + + /* Setting conflict_log_table = NONE is treated as no table. */ + if (strcmp(opts->conflictlogtable, "none") == 0) + opts->conflictlogtable = NULL; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -599,6 +626,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bits32 supported_opts; SubOpts opts = {0}; AclResult aclresult; + Oid conflictlogtable_nspid = InvalidOid; + char *conflictlogtable = NULL; /* * Parse and check options. @@ -612,7 +641,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_RETAIN_DEAD_TUPLES | - SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN); + SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN | + SUBOPT_CONFLICT_LOG_TABLE); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -747,6 +777,25 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + /* + * If a conflict log table name is specified, parse the schema and table + * name from the string. Store the namespace OID and the table name in + * the pg_subscription catalog tuple. + */ + if (opts.conflictlogtable) + { + List *names = stringToQualifiedNameList(opts.conflictlogtable, NULL); + + conflictlogtable_nspid = + QualifiedNameGetCreationNamespace(names, &conflictlogtable); + values[Anum_pg_subscription_subconflictlognspid - 1] = + ObjectIdGetDatum(conflictlogtable_nspid); + values[Anum_pg_subscription_subconflictlogtable - 1] = + CStringGetTextDatum(conflictlogtable); + } + else + nulls[Anum_pg_subscription_subconflictlogtable - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -768,6 +817,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); + /* If a conflict log table name is given then create the table. */ + if (opts.conflictlogtable) + create_conflict_log_table(conflictlogtable_nspid, conflictlogtable, + subid); + /* * Connect to remote side to execute requested commands and fetch table * and sequence info. @@ -1410,7 +1464,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_MAX_RETENTION_DURATION | - SUBOPT_ORIGIN); + SUBOPT_ORIGIN | + SUBOPT_CONFLICT_LOG_TABLE); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1665,6 +1720,73 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, origin = opts.origin; } + if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_TABLE)) + { + Oid nspid = InvalidOid; + char *relname = NULL; + Oid old_nspid = InvalidOid; + char *old_relname = NULL; + List *names = NIL; + + /* Fetch the eixsting conflict table table information. */ + old_relname = + get_subscription_conflict_log_table(subid, &old_nspid); + if (opts.conflictlogtable != NULL) + { + names = stringToQualifiedNameList(opts.conflictlogtable, + NULL); + nspid = QualifiedNameGetCreationNamespace(names, &relname); + } + + /* + * If the subscription already uses this conflict log table + * and it exists, just issue a notice. + */ + if (old_relname != NULL && relname != NULL && + strcmp(old_relname, relname) == 0 && + old_nspid == nspid && + OidIsValid(get_relname_relid(relname, nspid))) + { + char *nspname = get_namespace_name(nspid); + + ereport(NOTICE, + (errmsg("\"%s.%s\" is already in use as the conflict log table for this subscription", + nspname, relname))); + pfree(nspname); + } + else + { + /* Need to drop the old table if one was set. */ + if (old_relname != NULL) + drop_conflict_log_table(old_nspid, old_relname); + + /* + * Need to create a new table if a new name was + * provided. + */ + if (relname != NULL) + create_conflict_log_table(nspid, relname, subid); + + values[Anum_pg_subscription_subconflictlognspid - 1] = + ObjectIdGetDatum(nspid); + + if (relname != NULL) + values[Anum_pg_subscription_subconflictlogtable - 1] = + CStringGetTextDatum(relname); + else + nulls[Anum_pg_subscription_subconflictlogtable - 1] = + true; + + replaces[Anum_pg_subscription_subconflictlognspid - 1] = + true; + replaces[Anum_pg_subscription_subconflictlogtable - 1] = + true; + } + + if (old_relname != NULL) + pfree(old_relname); + } + update_tuple = true; break; } @@ -2027,6 +2149,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) Form_pg_subscription form; List *rstates; bool must_use_password; + Oid conflictlogtable_nsp = InvalidOid; + char *conflictlogtable = NULL; /* * The launcher may concurrently start a new worker for this subscription. @@ -2110,6 +2234,20 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ObjectAddressSet(myself, SubscriptionRelationId, subid); EventTriggerSQLDropAddObject(&myself, true, true); + /* Fetch the conflict log table information. */ + conflictlogtable = + get_subscription_conflict_log_table(subid, &conflictlogtable_nsp); + + /* + * If the subscription had a conflict log table, drop it now. This happens + * before deleting the subscription tuple. + */ + if (conflictlogtable) + { + drop_conflict_log_table(conflictlogtable_nsp, conflictlogtable); + pfree(conflictlogtable); + } + /* Remove the tuple from catalog. */ CatalogTupleDelete(rel, &tup->t_self); @@ -3188,3 +3326,140 @@ defGetStreamingMode(DefElem *def) def->defname))); return LOGICALREP_STREAM_OFF; /* keep compiler quiet */ } + +/* + * Builds the TupleDesc for the conflict log table. + */ +static TupleDesc +create_conflict_log_table_tupdesc(void) +{ + TupleDesc tupdesc; + int i; + + tupdesc = CreateTemplateTupleDesc(MAX_CONFLICT_ATTR_NUM); + + for (i = 0; i < MAX_CONFLICT_ATTR_NUM; i++) + { + Oid type_oid = ConflictLogSchema[i].atttypid; + + /* Special handling for the JSON array type for proper TupleDescInitEntry call */ + if (type_oid == JSONARRAYOID) + type_oid = get_array_type(JSONOID); + + TupleDescInitEntry(tupdesc, i + 1, + ConflictLogSchema[i].attname, + type_oid, + -1, 0); + } + + return BlessTupleDesc(tupdesc); +} + +/* + * Create conflict log table. + * + * The subscription owner becomes the owner of this table and has all + * privileges on it. + */ +static void +create_conflict_log_table(Oid namespaceId, char *conflictrel, Oid subid) +{ + TupleDesc tupdesc; + + /* Report an error if the specified conflict log table already exists. */ + if (OidIsValid(get_relname_relid(conflictrel, namespaceId))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("cannot create conflict log table \"%s.%s\" because a table with that name already exists", + get_namespace_name(namespaceId), conflictrel), + errhint("Use a different name for the conflict log table or drop the existing table."))); + + /* Build the tuple descriptor for the new table. */ + tupdesc = create_conflict_log_table_tupdesc(); + + /* Create conflict log table. */ + heap_create_with_catalog(conflictrel, + namespaceId, + 0, + InvalidOid, + InvalidOid, + InvalidOid, + GetUserId(), + HEAP_TABLE_AM_OID, + tupdesc, + NIL, + RELKIND_RELATION, + RELPERSISTENCE_PERMANENT, + false, + false, + ONCOMMIT_NOOP, + (Datum) 0, + false, + false, + false, + InvalidOid, + NULL); + + /* Release tuple descriptor memory. */ + FreeTupleDesc(tupdesc); +} + +/* + * Drop the conflict log table. + */ +static void +drop_conflict_log_table(Oid namespaceId, char *conflictrel) +{ + Oid relid; + ObjectAddress object; + + relid = get_relname_relid(conflictrel, namespaceId); + + /* Return quietly if the table does not exist (e.g., user dropped it) */ + if (!OidIsValid(relid)) + return; + + /* Create the object address for the table. */ + ObjectAddressSet(object, RelationRelationId, relid); + + /* + * Perform the deletion. Using DROP_CASCADE ensures the deletion of + * dependent objects. + */ + performDeletion(&object, DROP_CASCADE, 0); +} + +/* + * Check if the specified relation is used as a conflict log table by any + * subscription. + */ +bool +IsConflictLogTable(Oid relid) +{ + Relation rel; + TableScanDesc scan; + HeapTuple tup; + bool is_clt = false; + + rel = table_open(SubscriptionRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup); + Oid nspid; + char *relname; + + relname = get_subscription_conflict_log_table(subform->oid, &nspid); + if (relname && relid == get_relname_relid(relname, nspid)) + { + is_clt = true; + break; + } + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + + return is_clt; +} diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 166955922650..f5de424bf7e4 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -15,13 +15,31 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "access/heapam.h" +#include "access/htup.h" +#include "access/skey.h" #include "access/tableam.h" +#include "access/table.h" +#include "catalog/pg_attribute.h" +#include "catalog/indexing.h" +#include "catalog/namespace.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_type.h" #include "executor/executor.h" +#include "executor/spi.h" +#include "funcapi.h" #include "pgstat.h" #include "replication/conflict.h" #include "replication/worker_internal.h" #include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/pg_lsn.h" +#include "utils/array.h" +#include "utils/jsonb.h" + +#define MAX_LOCAL_CONFLICT_INFO_ATTRS 5 static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", @@ -50,8 +68,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid); +static void build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull); static char *build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid); +static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot); +static Datum tuple_table_slot_to_indextup_json(EState *estate, + Relation localrel, + Oid replica_index, + TupleTableSlot *slot); +static TupleDesc build_conflict_tupledesc(void); +static Datum build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples); +static void prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot); /* * Get the xmin and commit timestamp data (origin and timestamp) associated @@ -106,6 +143,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, TupleTableSlot *remoteslot, List *conflicttuples) { Relation localrel = relinfo->ri_RelationDesc; + Relation conflictlogrel = GetConflictLogTableRel(); StringInfoData err_detail; initStringInfo(&err_detail); @@ -120,6 +158,37 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, conflicttuple->ts, &err_detail); + /* Insert conflict details to conflict log table. */ + if (conflictlogrel) + { + if (ValidateConflictLogTable(conflictlogrel)) + { + /* + * Prepare the conflict log tuple. If the error level is below + * ERROR, insert it immediately. Otherwise, defer the insertion to + * a new transaction after the current one aborts, ensuring the + * insertion of the log tuple is not rolled back. + */ + prepare_conflict_log_tuple(estate, + relinfo->ri_RelationDesc, + conflictlogrel, + type, + searchslot, + conflicttuples, + remoteslot); + if (elevel < ERROR) + InsertConflictLogTuple(conflictlogrel); + } + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Conflict log table \"%s.%s\" structure changed, skipping insertion", + get_namespace_name(RelationGetNamespace(conflictlogrel)), + RelationGetRelationName(conflictlogrel))); + + table_close(conflictlogrel, RowExclusiveLock); + } + pgstat_report_subscription_conflict(MySubscription->oid, type); ereport(elevel, @@ -162,6 +231,141 @@ InitConflictIndexes(ResultRelInfo *relInfo) relInfo->ri_onConflictArbiterIndexes = uniqueIndexes; } +/* + * GetConflictLogTableRel + * + * Get the information of the specific conflict log table defined in + * pg_subscription and opens the relation for insertion. The caller is + * responsible for closing the returned relation handle. + */ +Relation +GetConflictLogTableRel(void) +{ + Oid nspid; + Oid conflictlogrelid; + Relation conflictlogrel = NULL; + char *conflictlogtable; + + /* If conflict log table is not set for the subscription just return. */ + conflictlogtable = get_subscription_conflict_log_table( + MyLogicalRepWorker->subid, &nspid); + if (conflictlogtable == NULL) + return NULL; + + conflictlogrelid = get_relname_relid(conflictlogtable, nspid); + if (OidIsValid(conflictlogrelid)) + conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock); + + /* Conflict log table is dropped or not accessible. */ + if (conflictlogrel == NULL) + ereport(WARNING, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("conflict log table \"%s.%s\" does not exist", + get_namespace_name(nspid), conflictlogtable))); + + pfree(conflictlogtable); + + return conflictlogrel; +} + +/* + * InsertConflictLogTuple + * + * Insert conflict log tuple into the conflict log table. It uses + * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding + * of the tuple inserted into the conflict log table. + */ +void +InsertConflictLogTuple(Relation conflictlogrel) +{ + int options = HEAP_INSERT_NO_LOGICAL; + + /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */ + Assert(MyLogicalRepWorker->conflict_log_tuple != NULL); + + heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple, + GetCurrentCommandId(true), options, NULL); + + /* Free conflict log tuple. */ + heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); + MyLogicalRepWorker->conflict_log_tuple = NULL; +} + +/* + * ValidateConflictLogTable - Validate conflict log table + * + * Validate whether the conflict log table is still suitable for considering as + * conflict log table. + */ +bool +ValidateConflictLogTable(Relation rel) +{ + Relation pg_attribute; + HeapTuple atup; + ScanKeyData scankey; + SysScanDesc scan; + Form_pg_attribute attForm; + int attcnt = 0; + bool tbl_ok = true; + + /* + * Check whether the table definition including its column names, data + * types, and column ordering meets the requirements for conflict log + * table. + */ + pg_attribute = table_open(AttributeRelationId, AccessShareLock); + ScanKeyInit(&scankey, + Anum_pg_attribute_attrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + scan = systable_beginscan(pg_attribute, AttributeRelidNumIndexId, true, + SnapshotSelf, 1, &scankey); + + /* We only need to check up to MAX_CONFLICT_ATTR_NUM attributes */ + while (HeapTupleIsValid(atup = systable_getnext(scan))) + { + const ConflictLogColumnDef *expected; + int schema_idx; + + attForm = (Form_pg_attribute) GETSTRUCT(atup); + + /* Skip system columns and dropped columns */ + if (attForm->attnum < 1 || attForm->attisdropped) + continue; + + attcnt++; + + /* attnum 1 corresponds to index 0 in ConflictLogSchema */ + schema_idx = attForm->attnum - 1; + + /* Check against the central schema definition */ + if (schema_idx >= MAX_CONFLICT_ATTR_NUM) + { + /* Found an extra column beyond the required set */ + tbl_ok = false; + break; + } + + expected = &ConflictLogSchema[schema_idx]; + + if (attForm->atttypid != expected->atttypid || + strcmp(NameStr(attForm->attname), expected->attname) != 0) + { + tbl_ok = false; + break; + } + } + + systable_endscan(scan); + table_close(pg_attribute, AccessShareLock); + + if (attcnt != MAX_CONFLICT_ATTR_NUM || !tbl_ok) + return false; + + return true; +} + /* * Add SQLSTATE error code to the current conflict report. */ @@ -472,6 +676,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, return tuple_value.data; } +/* + * Helper function to extract the "raw" index key Datums and their null flags + * from a TupleTableSlot, given an already open index descriptor. + * This is the reusable core logic. + */ +static void +build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull) +{ + TupleTableSlot *tableslot = slot; + + /* + * If the slot is a virtual slot, copy it into a heap tuple slot as + * FormIndexDatum only works with heap tuple slots. + */ + if (TTS_IS_VIRTUAL(slot)) + { + /* Slot is created within the EState's tuple table */ + tableslot = table_slot_create(localrel, &estate->es_tupleTable); + tableslot = ExecCopySlot(tableslot, slot); + } + + /* + * Initialize ecxt_scantuple for potential use in FormIndexDatum + */ + GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + + /* Form the index datums */ + FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, + isnull); +} + /* * Helper functions to construct a string describing the contents of an index * entry. See BuildIndexValueDescription for details. @@ -487,41 +725,336 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Relation indexDesc; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; - TupleTableSlot *tableslot = slot; - if (!tableslot) + if (!slot) return NULL; Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); indexDesc = index_open(indexoid, NoLock); - /* - * If the slot is a virtual slot, copy it into a heap tuple slot as - * FormIndexDatum only works with heap tuple slots. - */ - if (TTS_IS_VIRTUAL(slot)) + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + + index_value = BuildIndexValueDescription(indexDesc, values, isnull); + + index_close(indexDesc, NoLock); + + return index_value; +} + +/* + * tuple_table_slot_to_json_datum + * + * Helper function to convert a TupleTableSlot to Jsonb. + */ +static Datum +tuple_table_slot_to_json_datum(TupleTableSlot *slot) +{ + HeapTuple tuple; + Datum datum; + Datum json; + + Assert(slot != NULL); + + tuple = ExecCopySlotHeapTuple(slot); + datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor); + + json = DirectFunctionCall1(row_to_json, datum); + heap_freetuple(tuple); + + return json; +} + +/* + * tuple_table_slot_to_indextup_json + * + * Fetch replica identity key from the tuple table slot and convert into a + * jsonb datum. + */ +static Datum +tuple_table_slot_to_indextup_json(EState *estate, Relation localrel, + Oid indexid, TupleTableSlot *slot) +{ + Relation indexDesc; + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + HeapTuple tuple; + TupleDesc tupdesc; + Datum datum; + + Assert(slot != NULL); + + Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true)); + + indexDesc = index_open(indexid, NoLock); + + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + tupdesc = RelationGetDescr(indexDesc); + + /* Bless the tupdesc so it can be looked up by row_to_json. */ + BlessTupleDesc(tupdesc); + + /* Form the replica identity tuple. */ + tuple = heap_form_tuple(tupdesc, values, isnull); + datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + index_close(indexDesc, NoLock); + + /* Convert to a JSONB datum. */ + return DirectFunctionCall1(row_to_json, datum); +} + +/* + * Initialize the tuple descriptor for local conflict info. + */ +static TupleDesc +build_conflict_tupledesc(void) +{ + TupleDesc tupdesc; + int attno = 1; + + tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS); + + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "xid", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "commit_ts", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "origin", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "key", + JSONOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno, "tuple", + JSONOID, -1, 0); + + BlessTupleDesc(tupdesc); + + Assert(attno == MAX_LOCAL_CONFLICT_INFO_ATTRS); + + return tupdesc; +} + +/* + * Builds the local conflicts JSONB array column from the list of + * ConflictTupleInfo objects. + * + * Example output structure: + * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ] + */ +static Datum +build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples) +{ + ListCell *lc; + List *json_datums = NIL; /* List to hold the row_to_json results (type json) */ + Datum *json_datum_array; + bool *json_null_array; + Datum json_array_datum; + int num_conflicts; + int i; + int16 typlen; + bool typbyval; + char typalign; + TupleDesc tupdesc; + + /* Build local conflicts tuple descriptor. */ + tupdesc = build_conflict_tupledesc(); + + /* Process local conflict tuple list and prepare an array of JSON. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) { - tableslot = table_slot_create(localrel, &estate->es_tupleTable); - tableslot = ExecCopySlot(tableslot, slot); + Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS]; + bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS]; + char *origin_name = NULL; + HeapTuple tuple; + Datum json_datum; + int attno; + + memset(values, 0, sizeof(Datum) * MAX_LOCAL_CONFLICT_INFO_ATTRS); + memset(nulls, 0, sizeof(bool) * MAX_LOCAL_CONFLICT_INFO_ATTRS); + + attno = 0; + values[attno++] = TransactionIdGetDatum(conflicttuple->xmin); + + if (conflicttuple->ts) + values[attno++] = TimestampTzGetDatum(conflicttuple->ts); + else + nulls[attno++] = true; + + if (conflicttuple->origin != InvalidRepOriginId) + replorigin_by_oid(conflicttuple->origin, true, &origin_name); + + /* Store empty string if origin name for the tuple is NULL. */ + if (origin_name != NULL) + values[attno++] = CStringGetTextDatum(origin_name); + else + nulls[attno++] = true; + + /* + * Add the conflicting key values in the case of a unique constraint + * violation. + */ + if (conflict_type == CT_INSERT_EXISTS || + conflict_type == CT_UPDATE_EXISTS || + conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS) + { + Oid indexoid = conflicttuple->indexoid; + + Assert(OidIsValid(indexoid) && conflicttuple->slot && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, + true)); + values[attno++] = + tuple_table_slot_to_indextup_json(estate, rel, + indexoid, + conflicttuple->slot); + } + else + nulls[attno++] = true; + + /* Convert conflicting tuple to JSON datum. */ + if (conflicttuple->slot) + values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot); + else + nulls[attno] = true; + + Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + json_datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + /* + * Build the higher level JSON datum in format described in function + * header. + */ + json_datum = DirectFunctionCall1(row_to_json, json_datum); + + /* Done with the temporary tuple. */ + heap_freetuple(tuple); + + /* Add to the array element. */ + json_datums = lappend(json_datums, (void *) json_datum); } - /* - * Initialize ecxt_scantuple for potential use in FormIndexDatum when - * index expressions are present. - */ - GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + num_conflicts = list_length(json_datums); - /* - * The values/nulls arrays passed to BuildIndexValueDescription should be - * the results of FormIndexDatum, which are the "raw" input to the index - * AM. - */ - FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull); + json_datum_array = (Datum *) palloc(num_conflicts * sizeof(Datum)); + json_null_array = (bool *) palloc0(num_conflicts * sizeof(bool)); - index_value = BuildIndexValueDescription(indexDesc, values, isnull); + i = 0; + foreach(lc, json_datums) + { + json_datum_array[i] = (Datum) lfirst(lc); + i++; + } - index_close(indexDesc, NoLock); + /* Construct the json[] array Datum. */ + get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign); + json_array_datum = PointerGetDatum(construct_array(json_datum_array, + num_conflicts, + JSONOID, + typlen, + typbyval, + typalign)); + pfree(json_datum_array); + pfree(json_null_array); + + return json_array_datum; +} - return index_value; +/* + * prepare_conflict_log_tuple + * + * This routine prepares a tuple detailing a conflict encountered during + * logical replication. The prepared tuple will be stored in + * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the + * conflict log table by calling InsertConflictLogTuple. + */ +static void +prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot) +{ + Datum values[MAX_CONFLICT_ATTR_NUM]; + bool nulls[MAX_CONFLICT_ATTR_NUM]; + int attno; + char *remote_origin = NULL; + MemoryContext oldctx; + + Assert(MyLogicalRepWorker->conflict_log_tuple == NULL); + + /* Initialize values and nulls arrays. */ + memset(values, 0, sizeof(Datum) * MAX_CONFLICT_ATTR_NUM); + memset(nulls, 0, sizeof(bool) * MAX_CONFLICT_ATTR_NUM); + + /* Populate the values and nulls arrays. */ + attno = 0; + values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel)); + + values[attno++] = + CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel))); + + values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel)); + + values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]); + + if (TransactionIdIsValid(remote_xid)) + values[attno++] = TransactionIdGetDatum(remote_xid); + else + nulls[attno++] = true; + + values[attno++] = LSNGetDatum(remote_final_lsn); + + if (remote_commit_ts > 0) + values[attno++] = TimestampTzGetDatum(remote_commit_ts); + else + nulls[attno++] = true; + + if (replorigin_session_origin != InvalidRepOriginId) + replorigin_by_oid(replorigin_session_origin, true, &remote_origin); + + if (remote_origin != NULL) + values[attno++] = CStringGetTextDatum(remote_origin); + else + nulls[attno++] = true; + + if (!TupIsNull(searchslot)) + { + Oid replica_index = GetRelationIdentityOrPK(rel); + + /* + * If the table has a valid replica identity index, build the index + * json datum from key value. Otherwise, construct it from the complete + * tuple in REPLICA IDENTITY FULL cases. + */ + if (OidIsValid(replica_index)) + values[attno++] = tuple_table_slot_to_indextup_json(estate, rel, + replica_index, + searchslot); + else + values[attno++] = tuple_table_slot_to_json_datum(searchslot); + } + else + nulls[attno++] = true; + + if (!TupIsNull(remoteslot)) + values[attno++] = tuple_table_slot_to_json_datum(remoteslot); + else + nulls[attno++] = true; + + values[attno] = build_local_conflicts_json_array(estate, rel, + conflict_type, + conflicttuples); + + Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM); + + oldctx = MemoryContextSwitchTo(ApplyContext); + MyLogicalRepWorker->conflict_log_tuple = + heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls); + MemoryContextSwitchTo(oldctx); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3991e1495d4c..bc7e1d9ebde6 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -477,6 +477,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, worker->oldest_nonremovable_xid = retain_dead_tuples ? MyReplicationSlot->data.xmin : InvalidTransactionId; + worker->conflict_log_tuple = NULL; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fc64476a9ef1..e8f7ab3d5d67 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -482,7 +482,9 @@ static bool MySubscriptionValid = false; static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; -static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +TransactionId remote_xid = InvalidTransactionId; +TimestampTz remote_commit_ts = 0; /* fields valid only when processing streamed transaction */ static bool in_streamed_transaction = false; @@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s) set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; + remote_commit_ts = begin_data.committime; + remote_xid = begin_data.xid; maybe_start_skipping_changes(begin_data.final_lsn); @@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s) /* extract XID of the top-level transaction */ stream_xid = logicalrep_read_stream_start(s, &first_segment); + remote_xid = stream_xid; + remote_final_lsn = InvalidXLogRecPtr; + remote_commit_ts = 0; + if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -5609,6 +5617,33 @@ start_apply(XLogRecPtr origin_startpos) pgstat_report_subscription_error(MySubscription->oid, MyLogicalRepWorker->type); + /* + * Insert any pending conflict log tuple under a new transaction. + */ + if (MyLogicalRepWorker->conflict_log_tuple != NULL) + { + Relation conflictlogrel; + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Open conflict log table and insert the tuple. */ + conflictlogrel = GetConflictLogTableRel(); + if (ValidateConflictLogTable(conflictlogrel)) + InsertConflictLogTuple(conflictlogrel); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Conflict log table \"%s.%s\" structure changed, skipping insertion", + get_namespace_name(RelationGetNamespace(conflictlogrel)), + RelationGetRelationName(conflictlogrel))); + MyLogicalRepWorker->conflict_log_tuple = NULL; + table_close(conflictlogrel, RowExclusiveLock); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + PG_RE_THROW(); } } diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index 5aa7a26d95c3..e1132e47adca 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -3879,3 +3879,41 @@ get_subscription_name(Oid subid, bool missing_ok) return subname; } + +/* + * get_subscription_conflict_log_table + * + * Get conflict log table name and namespace id from subscription. + */ +char * +get_subscription_conflict_log_table(Oid subid, Oid *nspid) +{ + HeapTuple tup; + Datum datum; + bool isnull; + char *relname = NULL; + Form_pg_subscription subform; + + *nspid = InvalidOid; + + tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + return NULL; + + subform = (Form_pg_subscription) GETSTRUCT(tup); + + /* Get conflict log table name. */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subconflictlogtable, + &isnull); + if (!isnull) + { + *nspid = subform->subconflictlognspid; + relname = pstrdup(TextDatumGetCString(datum)); + } + + ReleaseSysCache(tup); + return relname; +} diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 36f245028429..906167fe466d 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6806,7 +6806,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false, false, false, false}; + false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6900,15 +6900,25 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subskiplsn AS \"%s\"\n", gettext_noop("Skip LSN")); + + /* Conflict log table is only supported in v19 and higher */ + if (pset.sversion >= 190000) + appendPQExpBuffer(&buf, + ", (CASE\n" + " WHEN subconflictlogtable IS NULL THEN NULL\n" + " ELSE pg_catalog.quote_ident(n.nspname) || '.' ||" + " pg_catalog.quote_ident(subconflictlogtable::text)\n" + "END) AS \"%s\"\n", + gettext_noop("Conflict log table")); } /* Only display subscriptions in current database. */ - appendPQExpBufferStr(&buf, - "FROM pg_catalog.pg_subscription\n" - "WHERE subdbid = (SELECT oid\n" - " FROM pg_catalog.pg_database\n" - " WHERE datname = pg_catalog.current_database())"); - + appendPQExpBuffer(&buf, + "FROM pg_catalog.pg_subscription " + "LEFT JOIN pg_catalog.pg_namespace AS n ON subconflictlognspid = n.oid\n" + "WHERE subdbid = (SELECT oid\n" + " FROM pg_catalog.pg_database\n" + " WHERE datname = pg_catalog.current_database())"); if (!validateSQLNamePattern(&buf, pattern, true, false, NULL, "subname", NULL, NULL, diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 20d7a65c614e..6ab4ab8857de 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2344,8 +2344,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", - "max_retention_duration", "origin", + COMPLETE_WITH("binary", "conflict_log_table", "disable_on_error", + "failover", "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); @@ -3814,8 +3814,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("WITH ("); /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) - COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", + COMPLETE_WITH("binary", "conflict_log_table", "connect", "copy_data", + "create_slot", "disable_on_error", "enabled", "failover", "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 55cb9b1eefae..f4526c15ec3a 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -80,6 +80,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subretaindeadtuples; /* True if dead tuples useful for * conflict detection are retained */ + Oid subconflictlognspid; /* Namespace Oid in which the conflict + * log table is created. */ int32 submaxretention; /* The maximum duration (in milliseconds) * for which information useful for @@ -105,6 +107,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW /* Only publish data originating from the specified origin */ text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY); + + /* Conflict log table name if specified */ + text subconflictlogtable; #endif } FormData_pg_subscription; diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index fb4e26a51a4d..6c062b0991f6 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -36,4 +36,6 @@ extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, bool retention_active, bool max_retention_set); +extern bool IsConflictLogTable(Oid relid); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index c8fbf9e51b8d..c7e67bd300ed 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -9,9 +9,12 @@ #ifndef CONFLICT_H #define CONFLICT_H +#include "access/htup.h" #include "access/xlogdefs.h" +#include "catalog/pg_type.h" #include "nodes/pg_list.h" #include "utils/timestamp.h" +#include "utils/relcache.h" /* Avoid including execnodes.h here */ typedef struct EState EState; @@ -79,6 +82,32 @@ typedef struct ConflictTupleInfo * conflicting local row occurred */ } ConflictTupleInfo; +/* Structure to hold metadata for one column of the conflict log table */ +typedef struct ConflictLogColumnDef +{ + const char *attname; /* Column name */ + Oid atttypid; /* Data type OID */ +} ConflictLogColumnDef; + +/* The single source of truth for the conflict log table schema */ +static const ConflictLogColumnDef ConflictLogSchema[] = +{ + { .attname = "relid", .atttypid = OIDOID }, + { .attname = "schemaname", .atttypid = TEXTOID }, + { .attname = "relname", .atttypid = TEXTOID }, + { .attname = "conflict_type", .atttypid = TEXTOID }, + { .attname = "remote_xid", .atttypid = XIDOID }, + { .attname = "remote_commit_lsn",.atttypid = LSNOID }, + { .attname = "remote_commit_ts", .atttypid = TIMESTAMPTZOID }, + { .attname = "remote_origin", .atttypid = TEXTOID }, + { .attname = "replica_identity", .atttypid = JSONOID }, + { .attname = "remote_tuple", .atttypid = JSONOID }, + { .attname = "local_conflicts", .atttypid = JSONARRAYOID } +}; + +/* Define the count using the array size */ +#define MAX_CONFLICT_ATTR_NUM (sizeof(ConflictLogSchema) / sizeof(ConflictLogSchema[0])) + extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, @@ -89,4 +118,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, List *conflicttuples); extern void InitConflictIndexes(ResultRelInfo *relInfo); +extern Relation GetConflictLogTableRel(void); +extern void InsertConflictLogTuple(Relation conflictlogrel); +extern bool ValidateConflictLogTable(Relation rel); #endif diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f081619f1513..711c04c7297c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -101,6 +101,9 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; + /* A conflict log tuple that is prepared but not yet inserted. */ + HeapTuple conflict_log_tuple; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; +extern XLogRecPtr remote_final_lsn; +extern TimestampTz remote_commit_ts; +extern TransactionId remote_xid; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 50fb149e9ac9..3bebf04bf51c 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -210,6 +210,7 @@ extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); extern Oid get_subscription_oid(const char *subname, bool missing_ok); extern char *get_subscription_name(Oid subid, bool missing_ok); +extern char *get_subscription_conflict_log_table(Oid subid, Oid *nspid); #define type_is_array(typid) (get_element_type(typid) != InvalidOid) /* type_is_array_domain accepts both plain arrays and domains over arrays */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 327d1e7731f9..bab2d0ea9543 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 | (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 | (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -433,10 +433,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -450,19 +450,19 @@ NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabl WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) -- ok ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -517,7 +517,191 @@ COMMIT; -- ok, owning it is enough for this stuff ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- +-- CONFLICT LOG TABLE TESTS +-- +SET SESSION AUTHORIZATION 'regress_subscription_user'; +-- fail - conflict_log_table specified when table already exists +CREATE TABLE public.regress_conflict_log_temp (id int); +CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log_temp'); +ERROR: cannot create conflict log table "public.regress_conflict_log_temp" because a table with that name already exists +HINT: Use a different name for the conflict log table or drop the existing table. +DROP TABLE public.regress_conflict_log_temp; +-- ok - conflict_log_table creation with CREATE SUBSCRIPTION +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1'); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +-- check metadata in pg_subscription +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + subname | subconflictlogtable | is_public_schema +------------------------+-----------------------+------------------ + regress_conflict_test1 | regress_conflict_log1 | t +(1 row) + +-- check if the table exists and has the correct schema (11 columns) +SELECT count(*) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attnum > 0; + count +------- + 11 +(1 row) + +-- check a specific column type (e.g., key_tuple should be JSON) +SELECT format_type(atttypid, atttypmod) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attname = 'key_tuple'; + format_type +------------- + json +(1 row) + +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +------------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+----------------------- + regress_conflict_test1 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | regress_conflict_log1 +(1 row) + +-- ok - adding conflict_log_table with ALTER SUBSCRIPTION +CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log2'); +-- check metadata after ALTER +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogtable | is_public_schema +------------------------+-----------------------+------------------ + regress_conflict_test2 | regress_conflict_log2 | t +(1 row) + +-- ok - change the conflict log table name for an existing subscription that already had one +CREATE SCHEMA clt; +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3'); +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogtable | is_public_schema +------------------------+-----------------------+------------------ + regress_conflict_test2 | regress_conflict_log3 | f +(1 row) + +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +------------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+----------------------- + regress_conflict_test1 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | regress_conflict_log1 + regress_conflict_test2 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | regress_conflict_log3 +(2 rows) + +-- check the new table was created and the old table was dropped +SELECT count(*) FROM pg_class WHERE relname = 'regress_conflict_log2'; + count +------- + 0 +(1 row) + +SELECT count(*) FROM pg_attribute WHERE attrelid = 'clt.regress_conflict_log3'::regclass AND attnum > 0; + count +------- + 11 +(1 row) + +-- ok (NOTICE) - set conflict_log_table to one already used by this subscription +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3'); +NOTICE: "clt.regress_conflict_log3" is already in use as the conflict log table for this subscription +-- fail - try to publish the conflict_log_table +CREATE PUBLICATION pub FOR TABLE clt.regress_conflict_log3; +ERROR: cannot add relation "clt.regress_conflict_log3" to publication +DETAIL: This operation is not supported for conflict log tables. +-- suppress warning that depends on wal_level +SET client_min_messages = 'ERROR'; +-- ok - conflict_log_table should not be published with ALL TABLE +CREATE PUBLICATION pub FOR TABLES IN SCHEMA clt; +SELECT * FROM pg_publication_tables WHERE pubname = 'pub'; + pubname | schemaname | tablename | attnames | rowfilter +---------+------------+-----------+----------+----------- +(0 rows) + +\dt+ clt.regress_conflict_log3 + List of tables + Schema | Name | Type | Owner | Persistence | Size | Description +--------+-----------------------+-------+---------------------------+-------------+---------+------------- + clt | regress_conflict_log3 | table | regress_subscription_user | permanent | 0 bytes | +(1 row) + +DROP PUBLICATION pub; +-- fail - set conflict_log_table to one already used by a different subscription +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1'); +ERROR: cannot create conflict log table "public.regress_conflict_log1" because a table with that name already exists +HINT: Use a different name for the conflict log table or drop the existing table. +-- ok - dropping subscription also drops the log table +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; +-- should return NULL, meaning the table was dropped +SELECT to_regclass('public.regress_conflict_log1'); + to_regclass +------------- + +(1 row) + +-- ok - dropping subscription when the log table was manually dropped first +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1'); +DROP TABLE public.regress_conflict_log1; +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; +-- should return NULL, meaning the subscription was dropped successfully +SELECT subname FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + subname +--------- +(0 rows) + +-- ok - create subscription with conflict_log_table = NONE +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = NONE); +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogtable +------------------------+----------------------- + regress_conflict_test2 | regress_conflict_log3 +(1 row) + +--ok - alter subscription with valid conflict log table name +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1'); +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogtable +------------------------+----------------------- + regress_conflict_test2 | regress_conflict_log1 +(1 row) + +--ok - pg_relation_is_publishable should return false for conflict log table +SELECT pg_relation_is_publishable('public.regress_conflict_log1'); + pg_relation_is_publishable +---------------------------- + f +(1 row) + +--ok - alter subscription with conflict log table = NONE +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = NONE); +-- should return NULL, meaning the table was dropped +SELECT to_regclass('public.regress_conflict_log1'); + to_regclass +------------- + +(1 row) + +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogtable +------------------------+--------------------- + regress_conflict_test2 | +(1 row) + +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; +-- Clean up remaining test subscription +ALTER SUBSCRIPTION regress_conflict_test2 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test2; RESET SESSION AUTHORIZATION; +DROP SCHEMA clt; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; DROP ROLE regress_subscription_user3; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index ef0c298d2df7..416847f081b1 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -365,7 +365,115 @@ COMMIT; ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- +-- CONFLICT LOG TABLE TESTS +-- + +SET SESSION AUTHORIZATION 'regress_subscription_user'; + +-- fail - conflict_log_table specified when table already exists +CREATE TABLE public.regress_conflict_log_temp (id int); +CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log_temp'); +DROP TABLE public.regress_conflict_log_temp; + +-- ok - conflict_log_table creation with CREATE SUBSCRIPTION +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1'); + +-- check metadata in pg_subscription +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + +-- check if the table exists and has the correct schema (11 columns) +SELECT count(*) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attnum > 0; + +-- check a specific column type (e.g., key_tuple should be JSON) +SELECT format_type(atttypid, atttypmod) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attname = 'key_tuple'; + +\dRs+ + +-- ok - adding conflict_log_table with ALTER SUBSCRIPTION +CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log2'); + +-- check metadata after ALTER +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- ok - change the conflict log table name for an existing subscription that already had one +CREATE SCHEMA clt; +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3'); +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; +\dRs+ + +-- check the new table was created and the old table was dropped +SELECT count(*) FROM pg_class WHERE relname = 'regress_conflict_log2'; +SELECT count(*) FROM pg_attribute WHERE attrelid = 'clt.regress_conflict_log3'::regclass AND attnum > 0; + +-- ok (NOTICE) - set conflict_log_table to one already used by this subscription +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3'); + +-- fail - try to publish the conflict_log_table +CREATE PUBLICATION pub FOR TABLE clt.regress_conflict_log3; + +-- suppress warning that depends on wal_level +SET client_min_messages = 'ERROR'; + +-- ok - conflict_log_table should not be published with ALL TABLE +CREATE PUBLICATION pub FOR TABLES IN SCHEMA clt; +SELECT * FROM pg_publication_tables WHERE pubname = 'pub'; +\dt+ clt.regress_conflict_log3 +DROP PUBLICATION pub; + +-- fail - set conflict_log_table to one already used by a different subscription +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1'); + +-- ok - dropping subscription also drops the log table +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; + +-- should return NULL, meaning the table was dropped +SELECT to_regclass('public.regress_conflict_log1'); + +-- ok - dropping subscription when the log table was manually dropped first +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1'); +DROP TABLE public.regress_conflict_log1; +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; + +-- should return NULL, meaning the subscription was dropped successfully +SELECT subname FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + +-- ok - create subscription with conflict_log_table = NONE +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = NONE); +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; +--ok - alter subscription with valid conflict log table name +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1'); +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +--ok - pg_relation_is_publishable should return false for conflict log table +SELECT pg_relation_is_publishable('public.regress_conflict_log1'); + +--ok - alter subscription with conflict log table = NONE +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = NONE); + +-- should return NULL, meaning the table was dropped +SELECT to_regclass('public.regress_conflict_log1'); +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; + +-- Clean up remaining test subscription +ALTER SUBSCRIPTION regress_conflict_test2 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test2; + RESET SESSION AUTHORIZATION; +DROP SCHEMA clt; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; DROP ROLE regress_subscription_user3;