diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 7aa3f1799240..9f84e02b7eff 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -31,6 +31,7 @@ #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/publicationcmds.h" +#include "commands/subscriptioncmds.h" #include "funcapi.h" #include "utils/array.h" #include "utils/builtins.h" @@ -85,6 +86,15 @@ check_publication_add_relation(Relation targetrel) errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("This operation is not supported for unlogged tables."))); + + /* Can't be conflict log table */ + if (IsConflictLogTable(RelationGetRelid(targetrel))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s.%s\" to publication", + get_namespace_name(RelationGetNamespace(targetrel)), + RelationGetRelationName(targetrel)), + errdetail("This operation is not supported for conflict log tables."))); } /* @@ -145,6 +155,13 @@ is_publishable_class(Oid relid, Form_pg_class reltuple) /* * Another variant of is_publishable_class(), taking a Relation. + * + * Note: Conflict log tables are not publishable. However, we intentionally + * skip this check here because this function is called for every change and + * performing this check during every change publication is costly. To ensure + * unpublishable entries are ignored without incurring performance overhead, + * tuples inserted into the conflict log table uses the HEAP_INSERT_NO_LOGICAL + * flag. This allows the decoding layer to bypass these entries automatically. */ bool is_publishable_relation(Relation rel) @@ -169,7 +186,10 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); if (!HeapTupleIsValid(tuple)) PG_RETURN_NULL(); - result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)); + + /* Subscription conflict log tables are not published */ + result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)) && + !IsConflictLogTable(relid); ReleaseSysCache(tuple); PG_RETURN_BOOL(result); } @@ -890,7 +910,9 @@ GetAllPublicationRelations(char relkind, bool pubviaroot) Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; + /* Subscription conflict log tables are not published */ if (is_publishable_class(relid, relForm) && + !IsConflictLogTable(relid) && !(relForm->relispartition && pubviaroot)) result = lappend_oid(result, relid); } @@ -1018,7 +1040,7 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt) Oid relid = relForm->oid; char relkind; - if (!is_publishable_class(relid, relForm)) + if (!is_publishable_class(relid, relForm) || IsConflictLogTable(relid)) continue; relkind = get_rel_relkind(relid); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index abbcaff0838b..eb3fe068ddbe 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -15,16 +15,19 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "access/heapam.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" +#include "catalog/heap.h" #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/pg_am_d.h" #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" #include "catalog/pg_subscription.h" @@ -34,9 +37,12 @@ #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "executor/spi.h" +#include "funcapi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" +#include "replication/conflict.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "replication/origin.h" @@ -47,10 +53,12 @@ #include "storage/lmgr.h" #include "utils/acl.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" +#include "utils/regproc.h" #include "utils/syscache.h" /* @@ -75,6 +83,7 @@ #define SUBOPT_MAX_RETENTION_DURATION 0x00008000 #define SUBOPT_LSN 0x00010000 #define SUBOPT_ORIGIN 0x00020000 +#define SUBOPT_CONFLICT_LOG_TABLE 0x00040000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -103,6 +112,7 @@ typedef struct SubOpts bool retaindeadtuples; int32 maxretention; char *origin; + char *conflictlogtable; XLogRecPtr lsn; } SubOpts; @@ -135,7 +145,9 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel); - +static void create_conflict_log_table(Oid namespaceId, char *conflictrel, + Oid subid); +static void drop_conflict_log_table(Oid namespaceId, char *conflictrel); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -191,6 +203,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->maxretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_TABLE)) + opts->conflictlogtable = NULL; /* Parse options */ foreach(lc, stmt_options) @@ -402,6 +416,19 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_TABLE) && + strcmp(defel->defname, "conflict_log_table") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_TABLE)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_CONFLICT_LOG_TABLE; + opts->conflictlogtable = defGetString(defel); + + /* Setting conflict_log_table = NONE is treated as no table. */ + if (strcmp(opts->conflictlogtable, "none") == 0) + opts->conflictlogtable = NULL; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -599,6 +626,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bits32 supported_opts; SubOpts opts = {0}; AclResult aclresult; + Oid conflictlogtable_nspid = InvalidOid; + char *conflictlogtable = NULL; /* * Parse and check options. @@ -612,7 +641,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_RETAIN_DEAD_TUPLES | - SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN); + SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN | + SUBOPT_CONFLICT_LOG_TABLE); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -747,6 +777,25 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + /* + * If a conflict log table name is specified, parse the schema and table + * name from the string. Store the namespace OID and the table name in + * the pg_subscription catalog tuple. + */ + if (opts.conflictlogtable) + { + List *names = stringToQualifiedNameList(opts.conflictlogtable, NULL); + + conflictlogtable_nspid = + QualifiedNameGetCreationNamespace(names, &conflictlogtable); + values[Anum_pg_subscription_subconflictlognspid - 1] = + ObjectIdGetDatum(conflictlogtable_nspid); + values[Anum_pg_subscription_subconflictlogtable - 1] = + CStringGetTextDatum(conflictlogtable); + } + else + nulls[Anum_pg_subscription_subconflictlogtable - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -768,6 +817,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname)); replorigin_create(originname); + /* If a conflict log table name is given then create the table. */ + if (opts.conflictlogtable) + create_conflict_log_table(conflictlogtable_nspid, conflictlogtable, + subid); + /* * Connect to remote side to execute requested commands and fetch table * and sequence info. @@ -1410,7 +1464,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_MAX_RETENTION_DURATION | - SUBOPT_ORIGIN); + SUBOPT_ORIGIN | + SUBOPT_CONFLICT_LOG_TABLE); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1665,6 +1720,73 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, origin = opts.origin; } + if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_TABLE)) + { + Oid nspid = InvalidOid; + char *relname = NULL; + Oid old_nspid = InvalidOid; + char *old_relname = NULL; + List *names = NIL; + + /* Fetch the eixsting conflict table table information. */ + old_relname = + get_subscription_conflict_log_table(subid, &old_nspid); + if (opts.conflictlogtable != NULL) + { + names = stringToQualifiedNameList(opts.conflictlogtable, + NULL); + nspid = QualifiedNameGetCreationNamespace(names, &relname); + } + + /* + * If the subscription already uses this conflict log table + * and it exists, just issue a notice. + */ + if (old_relname != NULL && relname != NULL && + strcmp(old_relname, relname) == 0 && + old_nspid == nspid && + OidIsValid(get_relname_relid(relname, nspid))) + { + char *nspname = get_namespace_name(nspid); + + ereport(NOTICE, + (errmsg("\"%s.%s\" is already in use as the conflict log table for this subscription", + nspname, relname))); + pfree(nspname); + } + else + { + /* Need to drop the old table if one was set. */ + if (old_relname != NULL) + drop_conflict_log_table(old_nspid, old_relname); + + /* + * Need to create a new table if a new name was + * provided. + */ + if (relname != NULL) + create_conflict_log_table(nspid, relname, subid); + + values[Anum_pg_subscription_subconflictlognspid - 1] = + ObjectIdGetDatum(nspid); + + if (relname != NULL) + values[Anum_pg_subscription_subconflictlogtable - 1] = + CStringGetTextDatum(relname); + else + nulls[Anum_pg_subscription_subconflictlogtable - 1] = + true; + + replaces[Anum_pg_subscription_subconflictlognspid - 1] = + true; + replaces[Anum_pg_subscription_subconflictlogtable - 1] = + true; + } + + if (old_relname != NULL) + pfree(old_relname); + } + update_tuple = true; break; } @@ -2027,6 +2149,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) Form_pg_subscription form; List *rstates; bool must_use_password; + Oid conflictlogtable_nsp = InvalidOid; + char *conflictlogtable = NULL; /* * The launcher may concurrently start a new worker for this subscription. @@ -2110,6 +2234,20 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ObjectAddressSet(myself, SubscriptionRelationId, subid); EventTriggerSQLDropAddObject(&myself, true, true); + /* Fetch the conflict log table information. */ + conflictlogtable = + get_subscription_conflict_log_table(subid, &conflictlogtable_nsp); + + /* + * If the subscription had a conflict log table, drop it now. This happens + * before deleting the subscription tuple. + */ + if (conflictlogtable) + { + drop_conflict_log_table(conflictlogtable_nsp, conflictlogtable); + pfree(conflictlogtable); + } + /* Remove the tuple from catalog. */ CatalogTupleDelete(rel, &tup->t_self); @@ -3188,3 +3326,140 @@ defGetStreamingMode(DefElem *def) def->defname))); return LOGICALREP_STREAM_OFF; /* keep compiler quiet */ } + +/* + * Builds the TupleDesc for the conflict log table. + */ +static TupleDesc +create_conflict_log_table_tupdesc(void) +{ + TupleDesc tupdesc; + int i; + + tupdesc = CreateTemplateTupleDesc(MAX_CONFLICT_ATTR_NUM); + + for (i = 0; i < MAX_CONFLICT_ATTR_NUM; i++) + { + Oid type_oid = ConflictLogSchema[i].atttypid; + + /* Special handling for the JSON array type for proper TupleDescInitEntry call */ + if (type_oid == JSONARRAYOID) + type_oid = get_array_type(JSONOID); + + TupleDescInitEntry(tupdesc, i + 1, + ConflictLogSchema[i].attname, + type_oid, + -1, 0); + } + + return BlessTupleDesc(tupdesc); +} + +/* + * Create conflict log table. + * + * The subscription owner becomes the owner of this table and has all + * privileges on it. + */ +static void +create_conflict_log_table(Oid namespaceId, char *conflictrel, Oid subid) +{ + TupleDesc tupdesc; + + /* Report an error if the specified conflict log table already exists. */ + if (OidIsValid(get_relname_relid(conflictrel, namespaceId))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("cannot create conflict log table \"%s.%s\" because a table with that name already exists", + get_namespace_name(namespaceId), conflictrel), + errhint("Use a different name for the conflict log table or drop the existing table."))); + + /* Build the tuple descriptor for the new table. */ + tupdesc = create_conflict_log_table_tupdesc(); + + /* Create conflict log table. */ + heap_create_with_catalog(conflictrel, + namespaceId, + 0, + InvalidOid, + InvalidOid, + InvalidOid, + GetUserId(), + HEAP_TABLE_AM_OID, + tupdesc, + NIL, + RELKIND_RELATION, + RELPERSISTENCE_PERMANENT, + false, + false, + ONCOMMIT_NOOP, + (Datum) 0, + false, + false, + false, + InvalidOid, + NULL); + + /* Release tuple descriptor memory. */ + FreeTupleDesc(tupdesc); +} + +/* + * Drop the conflict log table. + */ +static void +drop_conflict_log_table(Oid namespaceId, char *conflictrel) +{ + Oid relid; + ObjectAddress object; + + relid = get_relname_relid(conflictrel, namespaceId); + + /* Return quietly if the table does not exist (e.g., user dropped it) */ + if (!OidIsValid(relid)) + return; + + /* Create the object address for the table. */ + ObjectAddressSet(object, RelationRelationId, relid); + + /* + * Perform the deletion. Using DROP_CASCADE ensures the deletion of + * dependent objects. + */ + performDeletion(&object, DROP_CASCADE, 0); +} + +/* + * Check if the specified relation is used as a conflict log table by any + * subscription. + */ +bool +IsConflictLogTable(Oid relid) +{ + Relation rel; + TableScanDesc scan; + HeapTuple tup; + bool is_clt = false; + + rel = table_open(SubscriptionRelationId, AccessShareLock); + scan = table_beginscan_catalog(rel, 0, NULL); + + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup); + Oid nspid; + char *relname; + + relname = get_subscription_conflict_log_table(subform->oid, &nspid); + if (relname && relid == get_relname_relid(relname, nspid)) + { + is_clt = true; + break; + } + } + + table_endscan(scan); + table_close(rel, AccessShareLock); + + return is_clt; +} diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 166955922650..f5de424bf7e4 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -15,13 +15,31 @@ #include "postgres.h" #include "access/commit_ts.h" +#include "access/heapam.h" +#include "access/htup.h" +#include "access/skey.h" #include "access/tableam.h" +#include "access/table.h" +#include "catalog/pg_attribute.h" +#include "catalog/indexing.h" +#include "catalog/namespace.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_type.h" #include "executor/executor.h" +#include "executor/spi.h" +#include "funcapi.h" #include "pgstat.h" #include "replication/conflict.h" #include "replication/worker_internal.h" #include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/pg_lsn.h" +#include "utils/array.h" +#include "utils/jsonb.h" + +#define MAX_LOCAL_CONFLICT_INFO_ATTRS 5 static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", @@ -50,8 +68,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *localslot, TupleTableSlot *remoteslot, Oid indexoid); +static void build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull); static char *build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid); +static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot); +static Datum tuple_table_slot_to_indextup_json(EState *estate, + Relation localrel, + Oid replica_index, + TupleTableSlot *slot); +static TupleDesc build_conflict_tupledesc(void); +static Datum build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples); +static void prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot); /* * Get the xmin and commit timestamp data (origin and timestamp) associated @@ -106,6 +143,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, TupleTableSlot *remoteslot, List *conflicttuples) { Relation localrel = relinfo->ri_RelationDesc; + Relation conflictlogrel = GetConflictLogTableRel(); StringInfoData err_detail; initStringInfo(&err_detail); @@ -120,6 +158,37 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, conflicttuple->ts, &err_detail); + /* Insert conflict details to conflict log table. */ + if (conflictlogrel) + { + if (ValidateConflictLogTable(conflictlogrel)) + { + /* + * Prepare the conflict log tuple. If the error level is below + * ERROR, insert it immediately. Otherwise, defer the insertion to + * a new transaction after the current one aborts, ensuring the + * insertion of the log tuple is not rolled back. + */ + prepare_conflict_log_tuple(estate, + relinfo->ri_RelationDesc, + conflictlogrel, + type, + searchslot, + conflicttuples, + remoteslot); + if (elevel < ERROR) + InsertConflictLogTuple(conflictlogrel); + } + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Conflict log table \"%s.%s\" structure changed, skipping insertion", + get_namespace_name(RelationGetNamespace(conflictlogrel)), + RelationGetRelationName(conflictlogrel))); + + table_close(conflictlogrel, RowExclusiveLock); + } + pgstat_report_subscription_conflict(MySubscription->oid, type); ereport(elevel, @@ -162,6 +231,141 @@ InitConflictIndexes(ResultRelInfo *relInfo) relInfo->ri_onConflictArbiterIndexes = uniqueIndexes; } +/* + * GetConflictLogTableRel + * + * Get the information of the specific conflict log table defined in + * pg_subscription and opens the relation for insertion. The caller is + * responsible for closing the returned relation handle. + */ +Relation +GetConflictLogTableRel(void) +{ + Oid nspid; + Oid conflictlogrelid; + Relation conflictlogrel = NULL; + char *conflictlogtable; + + /* If conflict log table is not set for the subscription just return. */ + conflictlogtable = get_subscription_conflict_log_table( + MyLogicalRepWorker->subid, &nspid); + if (conflictlogtable == NULL) + return NULL; + + conflictlogrelid = get_relname_relid(conflictlogtable, nspid); + if (OidIsValid(conflictlogrelid)) + conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock); + + /* Conflict log table is dropped or not accessible. */ + if (conflictlogrel == NULL) + ereport(WARNING, + (errcode(ERRCODE_UNDEFINED_TABLE), + errmsg("conflict log table \"%s.%s\" does not exist", + get_namespace_name(nspid), conflictlogtable))); + + pfree(conflictlogtable); + + return conflictlogrel; +} + +/* + * InsertConflictLogTuple + * + * Insert conflict log tuple into the conflict log table. It uses + * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding + * of the tuple inserted into the conflict log table. + */ +void +InsertConflictLogTuple(Relation conflictlogrel) +{ + int options = HEAP_INSERT_NO_LOGICAL; + + /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */ + Assert(MyLogicalRepWorker->conflict_log_tuple != NULL); + + heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple, + GetCurrentCommandId(true), options, NULL); + + /* Free conflict log tuple. */ + heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); + MyLogicalRepWorker->conflict_log_tuple = NULL; +} + +/* + * ValidateConflictLogTable - Validate conflict log table + * + * Validate whether the conflict log table is still suitable for considering as + * conflict log table. + */ +bool +ValidateConflictLogTable(Relation rel) +{ + Relation pg_attribute; + HeapTuple atup; + ScanKeyData scankey; + SysScanDesc scan; + Form_pg_attribute attForm; + int attcnt = 0; + bool tbl_ok = true; + + /* + * Check whether the table definition including its column names, data + * types, and column ordering meets the requirements for conflict log + * table. + */ + pg_attribute = table_open(AttributeRelationId, AccessShareLock); + ScanKeyInit(&scankey, + Anum_pg_attribute_attrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(RelationGetRelid(rel))); + + scan = systable_beginscan(pg_attribute, AttributeRelidNumIndexId, true, + SnapshotSelf, 1, &scankey); + + /* We only need to check up to MAX_CONFLICT_ATTR_NUM attributes */ + while (HeapTupleIsValid(atup = systable_getnext(scan))) + { + const ConflictLogColumnDef *expected; + int schema_idx; + + attForm = (Form_pg_attribute) GETSTRUCT(atup); + + /* Skip system columns and dropped columns */ + if (attForm->attnum < 1 || attForm->attisdropped) + continue; + + attcnt++; + + /* attnum 1 corresponds to index 0 in ConflictLogSchema */ + schema_idx = attForm->attnum - 1; + + /* Check against the central schema definition */ + if (schema_idx >= MAX_CONFLICT_ATTR_NUM) + { + /* Found an extra column beyond the required set */ + tbl_ok = false; + break; + } + + expected = &ConflictLogSchema[schema_idx]; + + if (attForm->atttypid != expected->atttypid || + strcmp(NameStr(attForm->attname), expected->attname) != 0) + { + tbl_ok = false; + break; + } + } + + systable_endscan(scan); + table_close(pg_attribute, AccessShareLock); + + if (attcnt != MAX_CONFLICT_ATTR_NUM || !tbl_ok) + return false; + + return true; +} + /* * Add SQLSTATE error code to the current conflict report. */ @@ -472,6 +676,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo, return tuple_value.data; } +/* + * Helper function to extract the "raw" index key Datums and their null flags + * from a TupleTableSlot, given an already open index descriptor. + * This is the reusable core logic. + */ +static void +build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull) +{ + TupleTableSlot *tableslot = slot; + + /* + * If the slot is a virtual slot, copy it into a heap tuple slot as + * FormIndexDatum only works with heap tuple slots. + */ + if (TTS_IS_VIRTUAL(slot)) + { + /* Slot is created within the EState's tuple table */ + tableslot = table_slot_create(localrel, &estate->es_tupleTable); + tableslot = ExecCopySlot(tableslot, slot); + } + + /* + * Initialize ecxt_scantuple for potential use in FormIndexDatum + */ + GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + + /* Form the index datums */ + FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, + isnull); +} + /* * Helper functions to construct a string describing the contents of an index * entry. See BuildIndexValueDescription for details. @@ -487,41 +725,336 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Relation indexDesc; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; - TupleTableSlot *tableslot = slot; - if (!tableslot) + if (!slot) return NULL; Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); indexDesc = index_open(indexoid, NoLock); - /* - * If the slot is a virtual slot, copy it into a heap tuple slot as - * FormIndexDatum only works with heap tuple slots. - */ - if (TTS_IS_VIRTUAL(slot)) + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + + index_value = BuildIndexValueDescription(indexDesc, values, isnull); + + index_close(indexDesc, NoLock); + + return index_value; +} + +/* + * tuple_table_slot_to_json_datum + * + * Helper function to convert a TupleTableSlot to Jsonb. + */ +static Datum +tuple_table_slot_to_json_datum(TupleTableSlot *slot) +{ + HeapTuple tuple; + Datum datum; + Datum json; + + Assert(slot != NULL); + + tuple = ExecCopySlotHeapTuple(slot); + datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor); + + json = DirectFunctionCall1(row_to_json, datum); + heap_freetuple(tuple); + + return json; +} + +/* + * tuple_table_slot_to_indextup_json + * + * Fetch replica identity key from the tuple table slot and convert into a + * jsonb datum. + */ +static Datum +tuple_table_slot_to_indextup_json(EState *estate, Relation localrel, + Oid indexid, TupleTableSlot *slot) +{ + Relation indexDesc; + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + HeapTuple tuple; + TupleDesc tupdesc; + Datum datum; + + Assert(slot != NULL); + + Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true)); + + indexDesc = index_open(indexid, NoLock); + + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + tupdesc = RelationGetDescr(indexDesc); + + /* Bless the tupdesc so it can be looked up by row_to_json. */ + BlessTupleDesc(tupdesc); + + /* Form the replica identity tuple. */ + tuple = heap_form_tuple(tupdesc, values, isnull); + datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + index_close(indexDesc, NoLock); + + /* Convert to a JSONB datum. */ + return DirectFunctionCall1(row_to_json, datum); +} + +/* + * Initialize the tuple descriptor for local conflict info. + */ +static TupleDesc +build_conflict_tupledesc(void) +{ + TupleDesc tupdesc; + int attno = 1; + + tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS); + + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "xid", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "commit_ts", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "origin", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "key", + JSONOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) attno, "tuple", + JSONOID, -1, 0); + + BlessTupleDesc(tupdesc); + + Assert(attno == MAX_LOCAL_CONFLICT_INFO_ATTRS); + + return tupdesc; +} + +/* + * Builds the local conflicts JSONB array column from the list of + * ConflictTupleInfo objects. + * + * Example output structure: + * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ] + */ +static Datum +build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples) +{ + ListCell *lc; + List *json_datums = NIL; /* List to hold the row_to_json results (type json) */ + Datum *json_datum_array; + bool *json_null_array; + Datum json_array_datum; + int num_conflicts; + int i; + int16 typlen; + bool typbyval; + char typalign; + TupleDesc tupdesc; + + /* Build local conflicts tuple descriptor. */ + tupdesc = build_conflict_tupledesc(); + + /* Process local conflict tuple list and prepare an array of JSON. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) { - tableslot = table_slot_create(localrel, &estate->es_tupleTable); - tableslot = ExecCopySlot(tableslot, slot); + Datum values[MAX_LOCAL_CONFLICT_INFO_ATTRS]; + bool nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS]; + char *origin_name = NULL; + HeapTuple tuple; + Datum json_datum; + int attno; + + memset(values, 0, sizeof(Datum) * MAX_LOCAL_CONFLICT_INFO_ATTRS); + memset(nulls, 0, sizeof(bool) * MAX_LOCAL_CONFLICT_INFO_ATTRS); + + attno = 0; + values[attno++] = TransactionIdGetDatum(conflicttuple->xmin); + + if (conflicttuple->ts) + values[attno++] = TimestampTzGetDatum(conflicttuple->ts); + else + nulls[attno++] = true; + + if (conflicttuple->origin != InvalidRepOriginId) + replorigin_by_oid(conflicttuple->origin, true, &origin_name); + + /* Store empty string if origin name for the tuple is NULL. */ + if (origin_name != NULL) + values[attno++] = CStringGetTextDatum(origin_name); + else + nulls[attno++] = true; + + /* + * Add the conflicting key values in the case of a unique constraint + * violation. + */ + if (conflict_type == CT_INSERT_EXISTS || + conflict_type == CT_UPDATE_EXISTS || + conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS) + { + Oid indexoid = conflicttuple->indexoid; + + Assert(OidIsValid(indexoid) && conflicttuple->slot && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, + true)); + values[attno++] = + tuple_table_slot_to_indextup_json(estate, rel, + indexoid, + conflicttuple->slot); + } + else + nulls[attno++] = true; + + /* Convert conflicting tuple to JSON datum. */ + if (conflicttuple->slot) + values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot); + else + nulls[attno] = true; + + Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + json_datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + /* + * Build the higher level JSON datum in format described in function + * header. + */ + json_datum = DirectFunctionCall1(row_to_json, json_datum); + + /* Done with the temporary tuple. */ + heap_freetuple(tuple); + + /* Add to the array element. */ + json_datums = lappend(json_datums, (void *) json_datum); } - /* - * Initialize ecxt_scantuple for potential use in FormIndexDatum when - * index expressions are present. - */ - GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + num_conflicts = list_length(json_datums); - /* - * The values/nulls arrays passed to BuildIndexValueDescription should be - * the results of FormIndexDatum, which are the "raw" input to the index - * AM. - */ - FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull); + json_datum_array = (Datum *) palloc(num_conflicts * sizeof(Datum)); + json_null_array = (bool *) palloc0(num_conflicts * sizeof(bool)); - index_value = BuildIndexValueDescription(indexDesc, values, isnull); + i = 0; + foreach(lc, json_datums) + { + json_datum_array[i] = (Datum) lfirst(lc); + i++; + } - index_close(indexDesc, NoLock); + /* Construct the json[] array Datum. */ + get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign); + json_array_datum = PointerGetDatum(construct_array(json_datum_array, + num_conflicts, + JSONOID, + typlen, + typbyval, + typalign)); + pfree(json_datum_array); + pfree(json_null_array); + + return json_array_datum; +} - return index_value; +/* + * prepare_conflict_log_tuple + * + * This routine prepares a tuple detailing a conflict encountered during + * logical replication. The prepared tuple will be stored in + * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the + * conflict log table by calling InsertConflictLogTuple. + */ +static void +prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot) +{ + Datum values[MAX_CONFLICT_ATTR_NUM]; + bool nulls[MAX_CONFLICT_ATTR_NUM]; + int attno; + char *remote_origin = NULL; + MemoryContext oldctx; + + Assert(MyLogicalRepWorker->conflict_log_tuple == NULL); + + /* Initialize values and nulls arrays. */ + memset(values, 0, sizeof(Datum) * MAX_CONFLICT_ATTR_NUM); + memset(nulls, 0, sizeof(bool) * MAX_CONFLICT_ATTR_NUM); + + /* Populate the values and nulls arrays. */ + attno = 0; + values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel)); + + values[attno++] = + CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel))); + + values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel)); + + values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]); + + if (TransactionIdIsValid(remote_xid)) + values[attno++] = TransactionIdGetDatum(remote_xid); + else + nulls[attno++] = true; + + values[attno++] = LSNGetDatum(remote_final_lsn); + + if (remote_commit_ts > 0) + values[attno++] = TimestampTzGetDatum(remote_commit_ts); + else + nulls[attno++] = true; + + if (replorigin_session_origin != InvalidRepOriginId) + replorigin_by_oid(replorigin_session_origin, true, &remote_origin); + + if (remote_origin != NULL) + values[attno++] = CStringGetTextDatum(remote_origin); + else + nulls[attno++] = true; + + if (!TupIsNull(searchslot)) + { + Oid replica_index = GetRelationIdentityOrPK(rel); + + /* + * If the table has a valid replica identity index, build the index + * json datum from key value. Otherwise, construct it from the complete + * tuple in REPLICA IDENTITY FULL cases. + */ + if (OidIsValid(replica_index)) + values[attno++] = tuple_table_slot_to_indextup_json(estate, rel, + replica_index, + searchslot); + else + values[attno++] = tuple_table_slot_to_json_datum(searchslot); + } + else + nulls[attno++] = true; + + if (!TupIsNull(remoteslot)) + values[attno++] = tuple_table_slot_to_json_datum(remoteslot); + else + nulls[attno++] = true; + + values[attno] = build_local_conflicts_json_array(estate, rel, + conflict_type, + conflicttuples); + + Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM); + + oldctx = MemoryContextSwitchTo(ApplyContext); + MyLogicalRepWorker->conflict_log_tuple = + heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls); + MemoryContextSwitchTo(oldctx); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 3991e1495d4c..bc7e1d9ebde6 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -477,6 +477,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, worker->oldest_nonremovable_xid = retain_dead_tuples ? MyReplicationSlot->data.xmin : InvalidTransactionId; + worker->conflict_log_tuple = NULL; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fc64476a9ef1..e8f7ab3d5d67 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -482,7 +482,9 @@ static bool MySubscriptionValid = false; static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; -static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +TransactionId remote_xid = InvalidTransactionId; +TimestampTz remote_commit_ts = 0; /* fields valid only when processing streamed transaction */ static bool in_streamed_transaction = false; @@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s) set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; + remote_commit_ts = begin_data.committime; + remote_xid = begin_data.xid; maybe_start_skipping_changes(begin_data.final_lsn); @@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s) /* extract XID of the top-level transaction */ stream_xid = logicalrep_read_stream_start(s, &first_segment); + remote_xid = stream_xid; + remote_final_lsn = InvalidXLogRecPtr; + remote_commit_ts = 0; + if (!TransactionIdIsValid(stream_xid)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -5609,6 +5617,33 @@ start_apply(XLogRecPtr origin_startpos) pgstat_report_subscription_error(MySubscription->oid, MyLogicalRepWorker->type); + /* + * Insert any pending conflict log tuple under a new transaction. + */ + if (MyLogicalRepWorker->conflict_log_tuple != NULL) + { + Relation conflictlogrel; + + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Open conflict log table and insert the tuple. */ + conflictlogrel = GetConflictLogTableRel(); + if (ValidateConflictLogTable(conflictlogrel)) + InsertConflictLogTuple(conflictlogrel); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Conflict log table \"%s.%s\" structure changed, skipping insertion", + get_namespace_name(RelationGetNamespace(conflictlogrel)), + RelationGetRelationName(conflictlogrel))); + MyLogicalRepWorker->conflict_log_tuple = NULL; + table_close(conflictlogrel, RowExclusiveLock); + + PopActiveSnapshot(); + CommitTransactionCommand(); + } + PG_RE_THROW(); } } diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index 5aa7a26d95c3..e1132e47adca 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -3879,3 +3879,41 @@ get_subscription_name(Oid subid, bool missing_ok) return subname; } + +/* + * get_subscription_conflict_log_table + * + * Get conflict log table name and namespace id from subscription. + */ +char * +get_subscription_conflict_log_table(Oid subid, Oid *nspid) +{ + HeapTuple tup; + Datum datum; + bool isnull; + char *relname = NULL; + Form_pg_subscription subform; + + *nspid = InvalidOid; + + tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + return NULL; + + subform = (Form_pg_subscription) GETSTRUCT(tup); + + /* Get conflict log table name. */ + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subconflictlogtable, + &isnull); + if (!isnull) + { + *nspid = subform->subconflictlognspid; + relname = pstrdup(TextDatumGetCString(datum)); + } + + ReleaseSysCache(tup); + return relname; +} diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 36f245028429..906167fe466d 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6806,7 +6806,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false, false, false, false}; + false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6900,15 +6900,25 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subskiplsn AS \"%s\"\n", gettext_noop("Skip LSN")); + + /* Conflict log table is only supported in v19 and higher */ + if (pset.sversion >= 190000) + appendPQExpBuffer(&buf, + ", (CASE\n" + " WHEN subconflictlogtable IS NULL THEN NULL\n" + " ELSE pg_catalog.quote_ident(n.nspname) || '.' ||" + " pg_catalog.quote_ident(subconflictlogtable::text)\n" + "END) AS \"%s\"\n", + gettext_noop("Conflict log table")); } /* Only display subscriptions in current database. */ - appendPQExpBufferStr(&buf, - "FROM pg_catalog.pg_subscription\n" - "WHERE subdbid = (SELECT oid\n" - " FROM pg_catalog.pg_database\n" - " WHERE datname = pg_catalog.current_database())"); - + appendPQExpBuffer(&buf, + "FROM pg_catalog.pg_subscription " + "LEFT JOIN pg_catalog.pg_namespace AS n ON subconflictlognspid = n.oid\n" + "WHERE subdbid = (SELECT oid\n" + " FROM pg_catalog.pg_database\n" + " WHERE datname = pg_catalog.current_database())"); if (!validateSQLNamePattern(&buf, pattern, true, false, NULL, "subname", NULL, NULL, diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 20d7a65c614e..6ab4ab8857de 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2344,8 +2344,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", - "max_retention_duration", "origin", + COMPLETE_WITH("binary", "conflict_log_table", "disable_on_error", + "failover", "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); @@ -3814,8 +3814,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("WITH ("); /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) - COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", + COMPLETE_WITH("binary", "conflict_log_table", "connect", "copy_data", + "create_slot", "disable_on_error", "enabled", "failover", "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 55cb9b1eefae..f4526c15ec3a 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -80,6 +80,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subretaindeadtuples; /* True if dead tuples useful for * conflict detection are retained */ + Oid subconflictlognspid; /* Namespace Oid in which the conflict + * log table is created. */ int32 submaxretention; /* The maximum duration (in milliseconds) * for which information useful for @@ -105,6 +107,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW /* Only publish data originating from the specified origin */ text suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY); + + /* Conflict log table name if specified */ + text subconflictlogtable; #endif } FormData_pg_subscription; diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index fb4e26a51a4d..6c062b0991f6 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -36,4 +36,6 @@ extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, bool retention_active, bool max_retention_set); +extern bool IsConflictLogTable(Oid relid); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index c8fbf9e51b8d..c7e67bd300ed 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -9,9 +9,12 @@ #ifndef CONFLICT_H #define CONFLICT_H +#include "access/htup.h" #include "access/xlogdefs.h" +#include "catalog/pg_type.h" #include "nodes/pg_list.h" #include "utils/timestamp.h" +#include "utils/relcache.h" /* Avoid including execnodes.h here */ typedef struct EState EState; @@ -79,6 +82,32 @@ typedef struct ConflictTupleInfo * conflicting local row occurred */ } ConflictTupleInfo; +/* Structure to hold metadata for one column of the conflict log table */ +typedef struct ConflictLogColumnDef +{ + const char *attname; /* Column name */ + Oid atttypid; /* Data type OID */ +} ConflictLogColumnDef; + +/* The single source of truth for the conflict log table schema */ +static const ConflictLogColumnDef ConflictLogSchema[] = +{ + { .attname = "relid", .atttypid = OIDOID }, + { .attname = "schemaname", .atttypid = TEXTOID }, + { .attname = "relname", .atttypid = TEXTOID }, + { .attname = "conflict_type", .atttypid = TEXTOID }, + { .attname = "remote_xid", .atttypid = XIDOID }, + { .attname = "remote_commit_lsn",.atttypid = LSNOID }, + { .attname = "remote_commit_ts", .atttypid = TIMESTAMPTZOID }, + { .attname = "remote_origin", .atttypid = TEXTOID }, + { .attname = "replica_identity", .atttypid = JSONOID }, + { .attname = "remote_tuple", .atttypid = JSONOID }, + { .attname = "local_conflicts", .atttypid = JSONARRAYOID } +}; + +/* Define the count using the array size */ +#define MAX_CONFLICT_ATTR_NUM (sizeof(ConflictLogSchema) / sizeof(ConflictLogSchema[0])) + extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, RepOriginId *localorigin, @@ -89,4 +118,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, List *conflicttuples); extern void InitConflictIndexes(ResultRelInfo *relInfo); +extern Relation GetConflictLogTableRel(void); +extern void InsertConflictLogTuple(Relation conflictlogrel); +extern bool ValidateConflictLogTable(Relation rel); #endif diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f081619f1513..711c04c7297c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -101,6 +101,9 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; + /* A conflict log tuple that is prepared but not yet inserted. */ + HeapTuple conflict_log_tuple; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; +extern XLogRecPtr remote_final_lsn; +extern TimestampTz remote_commit_ts; +extern TransactionId remote_xid; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 50fb149e9ac9..3bebf04bf51c 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -210,6 +210,7 @@ extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); extern Oid get_subscription_oid(const char *subname, bool missing_ok); extern char *get_subscription_name(Oid subid, bool missing_ok); +extern char *get_subscription_conflict_log_table(Oid subid, Oid *nspid); #define type_is_array(typid) (get_element_type(typid) != InvalidOid) /* type_is_array_domain accepts both plain arrays and domains over arrays */ diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 327d1e7731f9..bab2d0ea9543 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00012345 | (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | 0/00000000 | (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+-------------------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 0/00000000 | (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -433,10 +433,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -450,19 +450,19 @@ NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabl WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) -- ok ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+-------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -517,7 +517,191 @@ COMMIT; -- ok, owning it is enough for this stuff ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- +-- CONFLICT LOG TABLE TESTS +-- +SET SESSION AUTHORIZATION 'regress_subscription_user'; +-- fail - conflict_log_table specified when table already exists +CREATE TABLE public.regress_conflict_log_temp (id int); +CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log_temp'); +ERROR: cannot create conflict log table "public.regress_conflict_log_temp" because a table with that name already exists +HINT: Use a different name for the conflict log table or drop the existing table. +DROP TABLE public.regress_conflict_log_temp; +-- ok - conflict_log_table creation with CREATE SUBSCRIPTION +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1'); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +-- check metadata in pg_subscription +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + subname | subconflictlogtable | is_public_schema +------------------------+-----------------------+------------------ + regress_conflict_test1 | regress_conflict_log1 | t +(1 row) + +-- check if the table exists and has the correct schema (11 columns) +SELECT count(*) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attnum > 0; + count +------- + 11 +(1 row) + +-- check a specific column type (e.g., key_tuple should be JSON) +SELECT format_type(atttypid, atttypmod) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attname = 'key_tuple'; + format_type +------------- + json +(1 row) + +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +------------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+----------------------- + regress_conflict_test1 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | regress_conflict_log1 +(1 row) + +-- ok - adding conflict_log_table with ALTER SUBSCRIPTION +CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log2'); +-- check metadata after ALTER +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogtable | is_public_schema +------------------------+-----------------------+------------------ + regress_conflict_test2 | regress_conflict_log2 | t +(1 row) + +-- ok - change the conflict log table name for an existing subscription that already had one +CREATE SCHEMA clt; +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3'); +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogtable | is_public_schema +------------------------+-----------------------+------------------ + regress_conflict_test2 | regress_conflict_log3 | f +(1 row) + +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Skip LSN | Conflict log table +------------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+----------------------- + regress_conflict_test1 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | regress_conflict_log1 + regress_conflict_test2 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | 0/00000000 | regress_conflict_log3 +(2 rows) + +-- check the new table was created and the old table was dropped +SELECT count(*) FROM pg_class WHERE relname = 'regress_conflict_log2'; + count +------- + 0 +(1 row) + +SELECT count(*) FROM pg_attribute WHERE attrelid = 'clt.regress_conflict_log3'::regclass AND attnum > 0; + count +------- + 11 +(1 row) + +-- ok (NOTICE) - set conflict_log_table to one already used by this subscription +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3'); +NOTICE: "clt.regress_conflict_log3" is already in use as the conflict log table for this subscription +-- fail - try to publish the conflict_log_table +CREATE PUBLICATION pub FOR TABLE clt.regress_conflict_log3; +ERROR: cannot add relation "clt.regress_conflict_log3" to publication +DETAIL: This operation is not supported for conflict log tables. +-- suppress warning that depends on wal_level +SET client_min_messages = 'ERROR'; +-- ok - conflict_log_table should not be published with ALL TABLE +CREATE PUBLICATION pub FOR TABLES IN SCHEMA clt; +SELECT * FROM pg_publication_tables WHERE pubname = 'pub'; + pubname | schemaname | tablename | attnames | rowfilter +---------+------------+-----------+----------+----------- +(0 rows) + +\dt+ clt.regress_conflict_log3 + List of tables + Schema | Name | Type | Owner | Persistence | Size | Description +--------+-----------------------+-------+---------------------------+-------------+---------+------------- + clt | regress_conflict_log3 | table | regress_subscription_user | permanent | 0 bytes | +(1 row) + +DROP PUBLICATION pub; +-- fail - set conflict_log_table to one already used by a different subscription +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1'); +ERROR: cannot create conflict log table "public.regress_conflict_log1" because a table with that name already exists +HINT: Use a different name for the conflict log table or drop the existing table. +-- ok - dropping subscription also drops the log table +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; +-- should return NULL, meaning the table was dropped +SELECT to_regclass('public.regress_conflict_log1'); + to_regclass +------------- + +(1 row) + +-- ok - dropping subscription when the log table was manually dropped first +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1'); +DROP TABLE public.regress_conflict_log1; +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; +-- should return NULL, meaning the subscription was dropped successfully +SELECT subname FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + subname +--------- +(0 rows) + +-- ok - create subscription with conflict_log_table = NONE +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = NONE); +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogtable +------------------------+----------------------- + regress_conflict_test2 | regress_conflict_log3 +(1 row) + +--ok - alter subscription with valid conflict log table name +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1'); +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogtable +------------------------+----------------------- + regress_conflict_test2 | regress_conflict_log1 +(1 row) + +--ok - pg_relation_is_publishable should return false for conflict log table +SELECT pg_relation_is_publishable('public.regress_conflict_log1'); + pg_relation_is_publishable +---------------------------- + f +(1 row) + +--ok - alter subscription with conflict log table = NONE +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = NONE); +-- should return NULL, meaning the table was dropped +SELECT to_regclass('public.regress_conflict_log1'); + to_regclass +------------- + +(1 row) + +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogtable +------------------------+--------------------- + regress_conflict_test2 | +(1 row) + +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; +-- Clean up remaining test subscription +ALTER SUBSCRIPTION regress_conflict_test2 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test2; RESET SESSION AUTHORIZATION; +DROP SCHEMA clt; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; DROP ROLE regress_subscription_user3; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index ef0c298d2df7..416847f081b1 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -365,7 +365,115 @@ COMMIT; ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- +-- CONFLICT LOG TABLE TESTS +-- + +SET SESSION AUTHORIZATION 'regress_subscription_user'; + +-- fail - conflict_log_table specified when table already exists +CREATE TABLE public.regress_conflict_log_temp (id int); +CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log_temp'); +DROP TABLE public.regress_conflict_log_temp; + +-- ok - conflict_log_table creation with CREATE SUBSCRIPTION +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1'); + +-- check metadata in pg_subscription +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + +-- check if the table exists and has the correct schema (11 columns) +SELECT count(*) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attnum > 0; + +-- check a specific column type (e.g., key_tuple should be JSON) +SELECT format_type(atttypid, atttypmod) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attname = 'key_tuple'; + +\dRs+ + +-- ok - adding conflict_log_table with ALTER SUBSCRIPTION +CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log2'); + +-- check metadata after ALTER +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- ok - change the conflict log table name for an existing subscription that already had one +CREATE SCHEMA clt; +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3'); +SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; +\dRs+ + +-- check the new table was created and the old table was dropped +SELECT count(*) FROM pg_class WHERE relname = 'regress_conflict_log2'; +SELECT count(*) FROM pg_attribute WHERE attrelid = 'clt.regress_conflict_log3'::regclass AND attnum > 0; + +-- ok (NOTICE) - set conflict_log_table to one already used by this subscription +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3'); + +-- fail - try to publish the conflict_log_table +CREATE PUBLICATION pub FOR TABLE clt.regress_conflict_log3; + +-- suppress warning that depends on wal_level +SET client_min_messages = 'ERROR'; + +-- ok - conflict_log_table should not be published with ALL TABLE +CREATE PUBLICATION pub FOR TABLES IN SCHEMA clt; +SELECT * FROM pg_publication_tables WHERE pubname = 'pub'; +\dt+ clt.regress_conflict_log3 +DROP PUBLICATION pub; + +-- fail - set conflict_log_table to one already used by a different subscription +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1'); + +-- ok - dropping subscription also drops the log table +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; + +-- should return NULL, meaning the table was dropped +SELECT to_regclass('public.regress_conflict_log1'); + +-- ok - dropping subscription when the log table was manually dropped first +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1'); +DROP TABLE public.regress_conflict_log1; +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; + +-- should return NULL, meaning the subscription was dropped successfully +SELECT subname FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + +-- ok - create subscription with conflict_log_table = NONE +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = NONE); +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; +--ok - alter subscription with valid conflict log table name +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1'); +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +--ok - pg_relation_is_publishable should return false for conflict log table +SELECT pg_relation_is_publishable('public.regress_conflict_log1'); + +--ok - alter subscription with conflict log table = NONE +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = NONE); + +-- should return NULL, meaning the table was dropped +SELECT to_regclass('public.regress_conflict_log1'); +SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test1; + +-- Clean up remaining test subscription +ALTER SUBSCRIPTION regress_conflict_test2 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test2; + RESET SESSION AUTHORIZATION; +DROP SCHEMA clt; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; DROP ROLE regress_subscription_user3;