diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 2fc634429802..a4d32de58ec8 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6581,6 +6581,15 @@ SCRAM-SHA-256$<iteration count>:&l if there is no publication qualifying condition. + + + prexcept bool + + + True if the relation must be excluded + + + prattrs int2vector diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index aa013f348d4d..c420469feaae 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2550,10 +2550,12 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER - To add tables to a publication, the user must have ownership rights on the - table. To add all tables in schema to a publication, the user must be a - superuser. To create a publication that publishes all tables, all tables in - schema, or all sequences automatically, the user must be a superuser. + To create a publication using FOR ALL TABLES, + FOR ALL SEQUENCES or + FOR TABLES IN SCHEMA, the user must be a superuser. To add + ALL TABLES or TABLES IN SCHEMA to a + publication, the user must be a superuser. To add tables to a publication, + the user must have ownership rights on the table. diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 75a508bebfa4..528660e011ad 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -32,12 +32,16 @@ CREATE PUBLICATION name and publication_all_object is one of: - ALL TABLES + ALL TABLES [ EXCEPT [ TABLE ] ( table_exception_object [, ... ] ) ] ALL SEQUENCES and table_and_columns is: [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] + +where table_exception_object is: + + [ ONLY ] table_name [ * ] @@ -164,7 +168,9 @@ CREATE PUBLICATION name Marks the publication as one that replicates changes for all tables in - the database, including tables created in the future. + the database, including tables created in the future. If + EXCEPT TABLE is specified, then exclude replicating + the changes for the specified tables. @@ -184,6 +190,35 @@ CREATE PUBLICATION name + + EXCEPT TABLE + + + This clause specifies a list of tables to be excluded from the + publication. It can only be used with FOR ALL TABLES. + If ONLY is specified before the table name, only + that table is excluded from the publication. If ONLY is + not specified, the table and all its descendant tables (if any) are + excluded. Optionally, * can be specified after the + table name to explicitly indicate that descendant tables are excluded. + This does not apply to a partitioned table, however. + + + The partitioned table or its partitions are excluded from the publication + based on the parameter publish_via_partition_root. + When publish_via_partition_root is set to + true, specifying a root partitioned table in + EXCEPT TABLE excludes it and all its partitions from + replication. Specifying a leaf partition has no effect, as its changes are + still replicated via the root partitioned table. When + publish_via_partition_root is set to + false, specifying a root partitioned table has no + effect, as changes are replicated via the leaf partitions. Specifying a + leaf partition excludes only that partition from replication. + + + + WITH ( publication_parameter [= value] [, ... ] ) @@ -487,6 +522,23 @@ CREATE PUBLICATION all_sequences FOR ALL SEQUENCES; all sequences for synchronization: CREATE PUBLICATION all_tables_sequences FOR ALL TABLES, ALL SEQUENCES; + + + + + Create a publication that publishes all changes in all the tables except + users and departments: + +CREATE PUBLICATION all_tables_except FOR ALL TABLES EXCEPT (users, departments); + + + + + Create a publication that publishes all changes in all sequences and all the + tables except tables users and + departments: + +CREATE PUBLICATION all_sequences_tables_except FOR ALL SEQUENCES, ALL TABLES EXCEPT (users, departments); diff --git a/doc/src/sgml/ref/psql-ref.sgml b/doc/src/sgml/ref/psql-ref.sgml index f56c70263e07..f1b3ce380b6c 100644 --- a/doc/src/sgml/ref/psql-ref.sgml +++ b/doc/src/sgml/ref/psql-ref.sgml @@ -2103,8 +2103,9 @@ SELECT $1 \parse stmt1 listed. If x is appended to the command name, the results are displayed in expanded mode. - If + is appended to the command name, the tables and - schemas associated with each publication are shown as well. + If + is appended to the command name, the tables, + excluded tables, and schemas associated with each publication are shown + as well. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 7aa3f1799240..473880a3b46b 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -354,7 +354,8 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, * ancestor is at the end of the list. */ Oid -GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level) +GetTopMostAncestorInPublication(Oid puboid, List *ancestors, + int *ancestor_level, bool puballtables) { ListCell *lc; Oid topmost_relid = InvalidOid; @@ -366,32 +367,43 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level foreach(lc, ancestors) { Oid ancestor = lfirst_oid(lc); - List *apubids = GetRelationPublications(ancestor); - List *aschemaPubids = NIL; + List *apubids = NIL; + List *aexceptpubids = NIL; + List *aschemapubids = NIL; + bool set_top = false; + + GetRelationPublications(ancestor, &apubids, &aexceptpubids); level++; - if (list_member_oid(apubids, puboid)) + /* check if member of table publications */ + set_top = list_member_oid(apubids, puboid); + if (!set_top) { - topmost_relid = ancestor; + aschemapubids = GetSchemaPublications(get_rel_namespace(ancestor)); - if (ancestor_level) - *ancestor_level = level; + /* check if member of schema publications */ + set_top = list_member_oid(aschemapubids, puboid); + + /* + * If the publication is all tables publication and the table is + * not part of exception tables. + */ + if (!set_top && puballtables) + set_top = !list_member_oid(aexceptpubids, puboid); } - else + + if (set_top) { - aschemaPubids = GetSchemaPublications(get_rel_namespace(ancestor)); - if (list_member_oid(aschemaPubids, puboid)) - { - topmost_relid = ancestor; + topmost_relid = ancestor; - if (ancestor_level) - *ancestor_level = level; - } + if (ancestor_level) + *ancestor_level = level; } list_free(apubids); - list_free(aschemaPubids); + list_free(aschemapubids); + list_free(aexceptpubids); } return topmost_relid; @@ -466,6 +478,26 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, RelationGetRelationName(targetrel), pub->name))); } + /* + * Check when a partition is excluded via EXCEPT TABLE while the + * publication has publish_via_partition_root = true. + */ + if (pub->alltables && pub->pubviaroot && pri->except && + targetrel->rd_rel->relispartition) + ereport(WARNING, + (errmsg("partition \"%s\" might be replicated as publish_via_partition_root is \"%s\"", + RelationGetRelationName(targetrel), "true"))); + + /* + * Check when a partitioned table is excluded via EXCEPT TABLE while the + * publication has publish_via_partition_root = false. + */ + if (pub->alltables && !pub->pubviaroot && pri->except && + targetrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + ereport(WARNING, + (errmsg("partitioned table \"%s\" might be replicated as publish_via_partition_root is \"%s\"", + RelationGetRelationName(targetrel), "false"))); + check_publication_add_relation(targetrel); /* Validate and translate column names into a Bitmapset of attnums. */ @@ -482,6 +514,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_publication_rel_prexcept - 1] = + BoolGetDatum(pri->except); /* Add qualifications, if available */ if (pri->whereClause != NULL) @@ -749,35 +783,59 @@ publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists) return myself; } -/* Gets list of publication oids for a relation */ -List * -GetRelationPublications(Oid relid) +/* + * Get the list of publication oids associated with a specified relation. + * pubids is filled with the list of publication oids the relation is part of. + * except_pubids is filled with the list of publication oids the relation is + * excluded from. + * + * This function returns true if the relation is part of any publication. + */ +bool +GetRelationPublications(Oid relid, List **pubids, List **except_pubids) { - List *result = NIL; CatCList *pubrellist; - int i; + bool found = false; /* Find all publications associated with the relation. */ pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, ObjectIdGetDatum(relid)); - for (i = 0; i < pubrellist->n_members; i++) + for (int i = 0; i < pubrellist->n_members; i++) { HeapTuple tup = &pubrellist->members[i]->tuple; - Oid pubid = ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid; + Form_pg_publication_rel pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + Oid pubid = pubrel->prpubid; - result = lappend_oid(result, pubid); + if (pubrel->prexcept) + { + if (except_pubids) + *except_pubids = lappend_oid(*except_pubids, pubid); + } + else + { + if (pubids) + *pubids = lappend_oid(*pubids, pubid); + found = true; + } } ReleaseSysCacheList(pubrellist); - return result; + return found; } /* - * Gets list of relation oids for a publication. + * Return the list of relation OIDs for a publication. + * + * For a FOR ALL TABLES publication, this returns the list of tables that were + * explicitly excluded via an EXCEPT TABLE clause. + * + * For a FOR TABLE publication, this returns the list of tables explicitly + * included in the publication. * - * This should only be used FOR TABLE publications, the FOR ALL TABLES/SEQUENCES - * should use GetAllPublicationRelations(). + * Publications declared with FOR ALL TABLES or FOR ALL SEQUENCES should use + * GetAllPublicationRelations() to obtain the complete set of tables covered by + * the publication. */ List * GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) @@ -864,15 +922,23 @@ GetAllTablesPublications(void) * partitioned tables, we must exclude partitions in favor of including the * root partitioned tables. This is not applicable to FOR ALL SEQUENCES * publication. + * + * The list does not include relations that are explicitly excluded via the + * EXCEPT TABLE clause of the publication specified by pubid. */ List * -GetAllPublicationRelations(char relkind, bool pubviaroot) +GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot) { Relation classRel; ScanKeyData key[1]; TableScanDesc scan; HeapTuple tuple; List *result = NIL; + List *exceptlist; + + exceptlist = GetPublicationRelations(pubid, pubviaroot ? + PUBLICATION_PART_ALL : + PUBLICATION_PART_ROOT); Assert(!(relkind == RELKIND_SEQUENCE && pubviaroot)); @@ -891,7 +957,8 @@ GetAllPublicationRelations(char relkind, bool pubviaroot) Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && - !(relForm->relispartition && pubviaroot)) + !(relForm->relispartition && pubviaroot) && + !list_member_oid(exceptlist, relid)) result = lappend_oid(result, relid); } @@ -912,7 +979,8 @@ GetAllPublicationRelations(char relkind, bool pubviaroot) Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && - !relForm->relispartition) + !relForm->relispartition && + !list_member_oid(exceptlist, relid)) result = lappend_oid(result, relid); } @@ -1168,7 +1236,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * those. Otherwise, get the partitioned table itself. */ if (pub_elem->alltables) - pub_elem_tables = GetAllPublicationRelations(RELKIND_RELATION, + pub_elem_tables = GetAllPublicationRelations(pub_elem->oid, + RELKIND_RELATION, pub_elem->pubviaroot); else { @@ -1367,7 +1436,7 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS) publication = GetPublicationByName(pubname, false); if (publication->allsequences) - sequences = GetAllPublicationRelations(RELKIND_SEQUENCE, false); + sequences = GetAllPublicationRelations(publication->oid, RELKIND_SEQUENCE, false); funcctx->user_fctx = sequences; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index a19835089505..790f9e6948e0 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -170,6 +170,38 @@ parse_publication_options(ParseState *pstate, } } +/* + * Convert the PublicationObjSpec list which is part of + * PublicationAllObjSpecType list into PublicationTable list. + */ +static void +ObjectsInAllPublicationToOids(List *puballobjspec_list, + ParseState *pstate, List **rels) +{ + if (!puballobjspec_list) + return; + + foreach_ptr(PublicationAllObjSpec, puballobj, puballobjspec_list) + { + switch (puballobj->pubobjtype) + { + case PUBLICATION_ALL_SEQUENCES: + break; + case PUBLICATION_ALL_TABLES: + foreach_ptr(PublicationObjSpec, pubobj, puballobj->except_objects) + { + pubobj->pubtable->except = true; + *rels = lappend(*rels, pubobj->pubtable); + } + break; + default: + elog(ERROR, "invalid publication object type %d", + puballobj->pubobjtype); + break; + } + } +} + /* * Convert the PublicationObjSpecType list into schema oid list and * PublicationTable list. @@ -194,6 +226,8 @@ ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate, switch (pubobj->pubobjtype) { case PUBLICATIONOBJ_TABLE: + case PUBLICATIONOBJ_EXCEPT_TABLE: + pubobj->pubtable->except = (pubobj->pubobjtype == PUBLICATIONOBJ_EXCEPT_TABLE); *rels = lappend(*rels, pubobj->pubtable); break; case PUBLICATIONOBJ_TABLES_IN_SCHEMA: @@ -268,7 +302,7 @@ contain_invalid_rfcolumn_walker(Node *node, rf_context *context) */ bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, - bool pubviaroot) + bool pubviaroot, bool puballtables) { HeapTuple rftuple; Oid relid = RelationGetRelid(relation); @@ -295,7 +329,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, if (pubviaroot && relation->rd_rel->relispartition) { publish_as_relid - = GetTopMostAncestorInPublication(pubid, ancestors, NULL); + = GetTopMostAncestorInPublication(pubid, ancestors, NULL, + puballtables); if (!OidIsValid(publish_as_relid)) publish_as_relid = relid; @@ -354,8 +389,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, */ bool pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, - bool pubviaroot, char pubgencols_type, - bool *invalid_column_list, + bool puballtables, bool pubviaroot, + char pubgencols_type, bool *invalid_column_list, bool *invalid_gen_col) { Oid relid = RelationGetRelid(relation); @@ -379,7 +414,8 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, */ if (pubviaroot && relation->rd_rel->relispartition) { - publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL); + publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, + NULL, puballtables); if (!OidIsValid(publish_as_relid)) publish_as_relid = relid; @@ -923,16 +959,9 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); - /* Associate objects with the publication. */ if (stmt->for_all_tables) - { - /* - * Invalidate relcache so that publication info is rebuilt. Sequences - * publication doesn't require invalidation, as replica identity - * checks don't apply to them. - */ - CacheInvalidateRelcacheAll(); - } + ObjectsInAllPublicationToOids(stmt->pubobjects, pstate, &relations); + else if (!stmt->for_all_sequences) { ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations, @@ -944,22 +973,6 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR TABLES IN SCHEMA publication")); - if (relations != NIL) - { - List *rels; - - rels = OpenTableList(relations); - TransformPubWhereClauses(rels, pstate->p_sourcetext, - publish_via_partition_root); - - CheckPubRelationColumnList(stmt->pubname, rels, - schemaidlist != NIL, - publish_via_partition_root); - - PublicationAddTables(puboid, rels, true, NULL); - CloseTableList(rels); - } - if (schemaidlist != NIL) { /* @@ -971,8 +984,37 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) } } + /* + * If publication is for ALL TABLES and relations is not empty, it means + * that there are some relations to be excluded from the publication. + * Else, relations is the list of relations to be added to the + * publication. + */ + if (relations != NIL) + { + List *rels; + + rels = OpenTableList(relations); + TransformPubWhereClauses(rels, pstate->p_sourcetext, + publish_via_partition_root); + + CheckPubRelationColumnList(stmt->pubname, rels, + schemaidlist != NIL, + publish_via_partition_root); + + PublicationAddTables(puboid, rels, true, NULL); + CloseTableList(rels); + } + table_close(rel, RowExclusiveLock); + /* Associate objects with the publication. */ + if (stmt->for_all_tables) + { + /* Invalidate relcache so that publication info is rebuilt. */ + CacheInvalidateRelcacheAll(); + } + InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); if (wal_level != WAL_LEVEL_LOGICAL) @@ -1348,6 +1390,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, oldrel = palloc_object(PublicationRelInfo); oldrel->whereClause = NULL; oldrel->columns = NIL; + oldrel->except = false; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1761,6 +1804,7 @@ OpenTableList(List *tables) pub_rel->relation = rel; pub_rel->whereClause = t->whereClause; pub_rel->columns = t->columns; + pub_rel->except = t->except; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); @@ -1833,6 +1877,7 @@ OpenTableList(List *tables) /* child inherits column list from parent */ pub_rel->columns = t->columns; + pub_rel->except = t->except; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 1c9ef53be205..85080d5aced2 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8651,7 +8651,7 @@ ATExecSetExpression(AlteredTableInfo *tab, Relation rel, const char *colName, * expressions. */ if (attgenerated == ATTRIBUTE_GENERATED_VIRTUAL && - GetRelationPublications(RelationGetRelid(rel)) != NIL) + GetRelationPublications(RelationGetRelid(rel), NULL, NULL)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("ALTER TABLE / SET EXPRESSION is not supported for virtual generated columns in tables that are part of a publication"), @@ -18846,7 +18846,7 @@ ATPrepChangePersistence(AlteredTableInfo *tab, Relation rel, bool toLogged) * UNLOGGED, as UNLOGGED tables can't be published. */ if (!toLogged && - GetRelationPublications(RelationGetRelid(rel)) != NIL) + GetRelationPublications(RelationGetRelid(rel), NULL, NULL)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot change table \"%s\" to unlogged because it is part of a publication", diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 7856ce9d78fc..c523512a0972 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -454,6 +454,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list drop_option_list pub_obj_list pub_all_obj_type_list + pub_except_obj_list opt_except_clause %type returning_clause %type returning_option @@ -591,6 +592,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type var_value zone_value %type auth_ident RoleSpec opt_granted_by %type PublicationObjSpec +%type PublicationExceptObjSpec %type PublicationAllObjSpec %type unreserved_keyword type_func_name_keyword @@ -10761,6 +10763,7 @@ CreatePublicationStmt: CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; + n->pubobjects = $5; preprocess_pub_all_objtype_list($5, &n->for_all_tables, &n->for_all_sequences, yyscanner); @@ -10801,6 +10804,7 @@ PublicationObjSpec: $$->pubtable->relation = $2; $$->pubtable->columns = $3; $$->pubtable->whereClause = $4; + $$->location = @1; } | TABLES IN_P SCHEMA ColId { @@ -10877,10 +10881,13 @@ pub_obj_list: PublicationObjSpec ; PublicationAllObjSpec: - ALL TABLES + ALL TABLES opt_except_clause { $$ = makeNode(PublicationAllObjSpec); $$->pubobjtype = PUBLICATION_ALL_TABLES; + $$->except_objects = $3; + if($$->except_objects != NULL) + preprocess_pubobj_list($$->except_objects, yyscanner); $$->location = @1; } | ALL SEQUENCES @@ -10897,6 +10904,31 @@ pub_all_obj_type_list: PublicationAllObjSpec { $$ = lappend($1, $3); } ; +/* + * ALL TABLES EXCEPT ( table_name [, ...] ) specification + */ +PublicationExceptObjSpec: + relation_expr + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_EXCEPT_TABLE; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->except = true; + $$->pubtable->relation = $1; + $$->location = @1; + } + ; + +pub_except_obj_list: PublicationExceptObjSpec + { $$ = list_make1($1); } + | pub_except_obj_list ',' PublicationExceptObjSpec + { $$ = lappend($1, $3); } + ; + +opt_except_clause: + EXCEPT opt_table '(' pub_except_obj_list ')' { $$ = $4; } + | /*EMPTY*/ { $$ = NIL; } + ; /***************************************************************************** * diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 787998abb8a2..d042da7b3476 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -2084,7 +2084,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) if (!entry->replicate_valid) { Oid schemaId = get_rel_namespace(relid); - List *pubids = GetRelationPublications(relid); + List *pubids = NIL; + List *exceptTablePubids = NIL; /* * We don't acquire a lock on the namespace system table as we build @@ -2099,6 +2100,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) char relkind = get_rel_relkind(relid); List *rel_publications = NIL; + GetRelationPublications(relid, &pubids, &exceptTablePubids); + /* Reload publications if needed before use. */ if (!publications_valid) { @@ -2195,22 +2198,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) Oid pub_relid = relid; int ancestor_level = 0; - /* - * If this is a FOR ALL TABLES publication, pick the partition - * root and set the ancestor level accordingly. - */ - if (pub->alltables) - { - publish = true; - if (pub->pubviaroot && am_partition) - { - List *ancestors = get_partition_ancestors(relid); - - pub_relid = llast_oid(ancestors); - ancestor_level = list_length(ancestors); - } - } - if (!publish) { bool ancestor_published = false; @@ -2229,7 +2216,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) ancestor = GetTopMostAncestorInPublication(pub->oid, ancestors, - &level); + &level, + pub->alltables); if (ancestor != InvalidOid) { @@ -2244,6 +2232,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) if (list_member_oid(pubids, pub->oid) || list_member_oid(schemaPubids, pub->oid) || + (pub->alltables && + !list_member_oid(exceptTablePubids, pub->oid)) || ancestor_published) publish = true; } @@ -2322,6 +2312,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) list_free(pubids); list_free(schemaPubids); + list_free(exceptTablePubids); list_free(rel_publications); entry->replicate_valid = true; diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 2d0cb7bcfd4a..b3461151df6c 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5793,7 +5793,9 @@ RelationGetExclusionInfo(Relation indexRelation, void RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) { - List *puboids; + List *puboids = NIL; + List *exceptpuboids = NIL; + List *alltablespuboids; ListCell *lc; MemoryContext oldcxt; Oid schemaid; @@ -5831,7 +5833,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) pubdesc->gencols_valid_for_delete = true; /* Fetch the publication membership info. */ - puboids = GetRelationPublications(relid); + GetRelationPublications(relid, &puboids, &exceptpuboids); schemaid = RelationGetNamespace(relation); puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); @@ -5843,16 +5845,25 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) foreach(lc, ancestors) { Oid ancestor = lfirst_oid(lc); + List *ancestor_puboids = NIL; + List *ancestor_exceptpuboids = NIL; - puboids = list_concat_unique_oid(puboids, - GetRelationPublications(ancestor)); + GetRelationPublications(ancestor, &ancestor_puboids, + &ancestor_exceptpuboids); + + puboids = list_concat_unique_oid(puboids, ancestor_puboids); schemaid = get_rel_namespace(ancestor); puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + exceptpuboids = list_concat_unique_oid(exceptpuboids, + ancestor_exceptpuboids); } } - puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); + alltablespuboids = GetAllTablesPublications(); + puboids = list_concat_unique_oid(puboids, + list_difference_oid(alltablespuboids, + exceptpuboids)); foreach(lc, puboids) { Oid pubid = lfirst_oid(lc); @@ -5883,7 +5894,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) if (!pubform->puballtables && (pubform->pubupdate || pubform->pubdelete) && pub_rf_contains_invalid_column(pubid, relation, ancestors, - pubform->pubviaroot)) + pubform->pubviaroot, pubform->puballtables)) { if (pubform->pubupdate) pubdesc->rf_valid_for_update = false; @@ -5899,6 +5910,7 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) */ if ((pubform->pubupdate || pubform->pubdelete) && pub_contains_invalid_column(pubid, relation, ancestors, + pubform->puballtables, pubform->pubviaroot, pubform->pubgencols, &invalid_column_list, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 24ad201af2f9..b04f4d72494a 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -186,6 +186,8 @@ static SimpleOidList extension_include_oids = {NULL, NULL}; static SimpleStringList extension_exclude_patterns = {NULL, NULL}; static SimpleOidList extension_exclude_oids = {NULL, NULL}; +static SimplePtrList exceptinfo = {NULL, NULL}; + static const CatalogId nilCatalogId = {0, 0}; /* override for standard extra_float_digits setting */ @@ -4676,7 +4678,33 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) if (pubinfo->puballtables && pubinfo->puballsequences) appendPQExpBufferStr(query, " FOR ALL TABLES, ALL SEQUENCES"); else if (pubinfo->puballtables) + { + bool first_tbl = true; appendPQExpBufferStr(query, " FOR ALL TABLES"); + + /* Include exception tables if the publication has EXCEPT TABLEs */ + for (SimplePtrListCell *cell = exceptinfo.head; cell; cell = cell->next) + { + PublicationRelInfo *pubrinfo = (PublicationRelInfo *) cell->ptr; + TableInfo *tbinfo; + + if (pubinfo == pubrinfo->publication) + { + tbinfo = pubrinfo->pubtable; + + if (first_tbl) + { + appendPQExpBufferStr(query, " EXCEPT TABLE ("); + first_tbl = false; + } + else + appendPQExpBufferStr(query, ", "); + appendPQExpBuffer(query, "ONLY %s", fmtQualifiedDumpable(tbinfo)); + } + } + if (!first_tbl) + appendPQExpBufferStr(query, ")"); + } else if (pubinfo->puballsequences) appendPQExpBufferStr(query, " FOR ALL SEQUENCES"); @@ -4845,6 +4873,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_prrelid; int i_prrelqual; int i_prattrs; + int i_prexcept; int i, j, ntups; @@ -4856,8 +4885,16 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* Collect all publication membership info. */ if (fout->remoteVersion >= 150000) + { + appendPQExpBufferStr(query, + "SELECT tableoid, oid, prpubid, prrelid,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, " prexcept,\n"); + else + appendPQExpBufferStr(query, " false AS prexcept,\n"); + appendPQExpBufferStr(query, - "SELECT tableoid, oid, prpubid, prrelid, " "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual, " "(CASE\n" " WHEN pr.prattrs IS NOT NULL THEN\n" @@ -4868,6 +4905,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" " ELSE NULL END) prattrs " "FROM pg_catalog.pg_publication_rel pr"); + } else appendPQExpBufferStr(query, "SELECT tableoid, oid, prpubid, prrelid, " @@ -4883,6 +4921,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_prrelid = PQfnumber(res, "prrelid"); i_prrelqual = PQfnumber(res, "prrelqual"); i_prattrs = PQfnumber(res, "prattrs"); + i_prexcept = PQfnumber(res, "prexcept"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4894,6 +4933,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid)); PublicationInfo *pubinfo; TableInfo *tbinfo; + bool prexcept = strcmp(PQgetvalue(res, i, i_prexcept), "t") == 0; /* * Ignore any entries for which we aren't interested in either the @@ -4907,7 +4947,11 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) continue; /* OK, make a DumpableObject for this relationship */ - pubrinfo[j].dobj.objType = DO_PUBLICATION_REL; + if (prexcept) + pubrinfo[j].dobj.objType = DO_PUBLICATION_EXCEPT_REL; + else + pubrinfo[j].dobj.objType = DO_PUBLICATION_REL; + pubrinfo[j].dobj.catId.tableoid = atooid(PQgetvalue(res, i, i_tableoid)); pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid)); @@ -4948,6 +4992,9 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* Decide whether we want to dump it */ selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout); + if (prexcept) + simple_ptr_list_append(&exceptinfo, &pubrinfo[j]); + j++; } @@ -11826,6 +11873,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) case DO_PUBLICATION: dumpPublication(fout, (const PublicationInfo *) dobj); break; + case DO_PUBLICATION_EXCEPT_REL: + /* will be dumped in dumpPublication */ + break; case DO_PUBLICATION_REL: dumpPublicationTable(fout, (const PublicationRelInfo *) dobj); break; @@ -20196,6 +20246,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_DEFAULT_ACL: case DO_POLICY: case DO_PUBLICATION: + case DO_PUBLICATION_EXCEPT_REL: case DO_PUBLICATION_REL: case DO_PUBLICATION_TABLE_IN_SCHEMA: case DO_SUBSCRIPTION: diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 72a00e1bc202..723b5575c536 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -81,6 +81,7 @@ typedef enum DO_REFRESH_MATVIEW, DO_POLICY, DO_PUBLICATION, + DO_PUBLICATION_EXCEPT_REL, DO_PUBLICATION_REL, DO_PUBLICATION_TABLE_IN_SCHEMA, DO_REL_STATS, diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 164c76e08640..6ebeb9c96a1c 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -92,6 +92,7 @@ enum dbObjectTypePriorities PRIO_FK_CONSTRAINT, PRIO_POLICY, PRIO_PUBLICATION, + PRIO_PUBLICATION_EXCEPT_REL, PRIO_PUBLICATION_REL, PRIO_PUBLICATION_TABLE_IN_SCHEMA, PRIO_SUBSCRIPTION, @@ -147,6 +148,7 @@ static const int dbObjectTypePriority[] = [DO_REFRESH_MATVIEW] = PRIO_REFRESH_MATVIEW, [DO_POLICY] = PRIO_POLICY, [DO_PUBLICATION] = PRIO_PUBLICATION, + [DO_PUBLICATION_EXCEPT_REL] = PRIO_PUBLICATION_EXCEPT_REL, [DO_PUBLICATION_REL] = PRIO_PUBLICATION_REL, [DO_PUBLICATION_TABLE_IN_SCHEMA] = PRIO_PUBLICATION_TABLE_IN_SCHEMA, [DO_REL_STATS] = PRIO_STATISTICS_DATA_DATA, @@ -432,7 +434,8 @@ DOTypeNameCompare(const void *p1, const void *p2) if (cmpval != 0) return cmpval; } - else if (obj1->objType == DO_PUBLICATION_REL) + else if (obj1->objType == DO_PUBLICATION_REL || + obj1->objType == DO_PUBLICATION_EXCEPT_REL) { PublicationRelInfo *probj1 = *(PublicationRelInfo *const *) p1; PublicationRelInfo *probj2 = *(PublicationRelInfo *const *) p2; @@ -1715,6 +1718,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "PUBLICATION (ID %d OID %u)", obj->dumpId, obj->catId.oid); return; + case DO_PUBLICATION_EXCEPT_REL: + snprintf(buf, bufsize, + "PUBLICATION EXCEPT TABLE (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; case DO_PUBLICATION_REL: snprintf(buf, bufsize, "PUBLICATION TABLE (ID %d OID %u)", diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index e33aa95f6ffc..db18a40744c1 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -3170,6 +3170,36 @@ like => { %full_runs, section_post_data => 1, }, }, + 'CREATE PUBLICATION pub8' => { + create_order => 50, + create_sql => + 'CREATE PUBLICATION pub8 FOR ALL TABLES EXCEPT (dump_test.test_table);', + regexp => qr/^ + \QCREATE PUBLICATION pub8 FOR ALL TABLES EXCEPT TABLE (ONLY dump_test.test_table) WITH (publish = 'insert, update, delete, truncate');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + + 'CREATE PUBLICATION pub9' => { + create_order => 50, + create_sql => + 'CREATE PUBLICATION pub9 FOR ALL TABLES EXCEPT TABLE (dump_test.test_table, dump_test.test_second_table);', + regexp => qr/^ + \QCREATE PUBLICATION pub9 FOR ALL TABLES EXCEPT TABLE (ONLY dump_test.test_table, ONLY dump_test.test_second_table) WITH (publish = 'insert, update, delete, truncate');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + + 'CREATE PUBLICATION pub10' => { + create_order => 50, + create_sql => + 'CREATE PUBLICATION pub10 FOR ALL TABLES EXCEPT TABLE (dump_test.test_table_generated);', + regexp => qr/^ + \QCREATE PUBLICATION pub10 FOR ALL TABLES EXCEPT TABLE (ONLY dump_test.test_table_generated, ONLY dump_test.test_table_generated_child2, ONLY dump_test.test_table_generated_child1) WITH (publish = 'insert, update, delete, truncate');\E + /xm, + like => { %full_runs, section_post_data => 1, }, + }, + 'CREATE SUBSCRIPTION sub1' => { create_order => 50, create_sql => 'CREATE SUBSCRIPTION sub1 @@ -5163,7 +5193,7 @@ # # Either "all_runs" should be set or there should be a "like" list, # even if it is empty. (This makes the test more self-documenting.) - if (!defined($tests{$test}->{all_runs}) + if ( !defined($tests{$test}->{all_runs}) && !defined($tests{$test}->{like})) { die "missing \"like\" in test \"$test\""; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 36f245028429..50b1d4353599 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -3073,17 +3073,34 @@ describeOneTableDetails(const char *schemaname, " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" " ELSE NULL END) " "FROM pg_catalog.pg_publication p\n" - " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" - " JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n" - "WHERE pr.prrelid = '%s'\n" + " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + " JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n" + "WHERE pr.prrelid = '%s'\n", + oid, oid, oid); + + if (pset.sversion >= 190000) + appendPQExpBufferStr(&buf, " AND NOT pr.prexcept\n"); + + appendPQExpBuffer(&buf, "UNION\n" "SELECT pubname\n" - " , NULL\n" - " , NULL\n" + " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" - "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" - "ORDER BY 1;", - oid, oid, oid, oid); + "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n", + oid); + + if (pset.sversion >= 190000) + appendPQExpBuffer(&buf, + " AND NOT EXISTS (\n" + " SELECT 1\n" + " FROM pg_catalog.pg_publication_rel pr\n" + " JOIN pg_catalog.pg_class pc\n" + " ON pr.prrelid = pc.oid\n" + " WHERE pr.prrelid = '%s' AND pr.prpubid = p.oid)\n", + oid); + + appendPQExpBufferStr(&buf, "ORDER BY 1;"); } else { @@ -6753,8 +6770,12 @@ describePublications(const char *pattern) " pg_catalog.pg_publication_rel pr\n" "WHERE c.relnamespace = n.oid\n" " AND c.oid = pr.prrelid\n" - " AND pr.prpubid = '%s'\n" - "ORDER BY 1,2", pubid); + " AND pr.prpubid = '%s'\n", pubid); + + if (pset.sversion >= 190000) + appendPQExpBuffer(&buf, " AND NOT pr.prexcept\n"); + + appendPQExpBuffer(&buf, "ORDER BY 1,2"); if (!addFooterToPublicationDesc(&buf, _("Tables:"), false, &cont)) goto error_return; @@ -6772,6 +6793,23 @@ describePublications(const char *pattern) goto error_return; } } + else + { + if (pset.sversion >= 190000) + { + /* Get the excluded tables for the specified publication */ + printfPQExpBuffer(&buf, + "SELECT concat(c.relnamespace::regnamespace, '.', c.relname)\n" + "FROM pg_catalog.pg_class c\n" + " JOIN pg_catalog.pg_publication_rel pr ON c.oid = pr.prrelid\n" + "WHERE pr.prpubid = '%s'\n" + " AND pr.prexcept\n" + "ORDER BY 1", pubid); + if (!addFooterToPublicationDesc(&buf, _("Except tables:"), + true, &cont)) + goto error_return; + } + } printTable(&cont, pset.queryFout, false, pset.logfile); printTableCleanup(&cont); diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 20d7a65c614e..45e7a9cbfd39 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -3623,7 +3623,17 @@ match_previous_words(int pattern_id, else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL")) COMPLETE_WITH("TABLES", "SEQUENCES"); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES")) - COMPLETE_WITH("WITH ("); + COMPLETE_WITH("EXCEPT TABLE (", "WITH ("); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT")) + COMPLETE_WITH("TABLE ("); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT", "TABLE")) + COMPLETE_WITH("("); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT", "TABLE", "(")) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT", "TABLE", "(", MatchAnyN) && ends_with(prev_wd, ',')) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "EXCEPT", "TABLE", "(", MatchAnyN) && !ends_with(prev_wd, ',')) + COMPLETE_WITH(")"); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLES")) COMPLETE_WITH("IN SCHEMA"); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE", MatchAny) && !ends_with(prev_wd, ',')) diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 22f48bb89758..c3a5e278a03a 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -146,11 +146,12 @@ typedef struct PublicationRelInfo Relation relation; Node *whereClause; List *columns; + bool except; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); -extern List *GetRelationPublications(Oid relid); +extern bool GetRelationPublications(Oid relid, List **pubids, List **except_pubids); /*--------- * Expected values for pub_partopt parameter of GetPublicationRelations(), @@ -170,7 +171,7 @@ typedef enum PublicationPartOpt extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); -extern List *GetAllPublicationRelations(char relkind, bool pubviaroot); +extern List *GetAllPublicationRelations(Oid pubid, char relkind, bool pubviaroot); extern List *GetPublicationSchemas(Oid pubid); extern List *GetSchemaPublications(Oid schemaid); extern List *GetSchemaPublicationRelations(Oid schemaid, @@ -181,7 +182,7 @@ extern List *GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, Oid relid); extern Oid GetTopMostAncestorInPublication(Oid puboid, List *ancestors, - int *ancestor_level); + int *ancestor_level, bool puballtables); extern bool is_publishable_relation(Relation rel); extern bool is_schema_publication(Oid pubid); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index 92cc36dfdf69..e7d7f3ba85c9 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ + bool prexcept BKI_DEFAULT(f); /* exclude the relation */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ pg_node_tree prqual; /* qualifications */ diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index f90cf1ef896e..4a170994f762 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -32,10 +32,11 @@ extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); extern void InvalidatePublicationRels(List *relids); extern bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, - List *ancestors, bool pubviaroot); + List *ancestors, bool pubviaroot, + bool puballtables); extern bool pub_contains_invalid_column(Oid pubid, Relation relation, - List *ancestors, bool pubviaroot, - char pubgencols_type, + List *ancestors, bool puballtables, + bool pubviaroot, char pubgencols_type, bool *invalid_column_list, bool *invalid_gen_col); extern void InvalidatePubRelSyncCache(Oid pubid, bool puballtables); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index d14294a4eceb..a14ecedb27f3 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4271,6 +4271,7 @@ typedef struct PublicationTable RangeVar *relation; /* relation to be published */ Node *whereClause; /* qualifications */ List *columns; /* List of columns in a publication table */ + bool except; /* exclude the relation */ } PublicationTable; /* @@ -4279,6 +4280,7 @@ typedef struct PublicationTable typedef enum PublicationObjSpecType { PUBLICATIONOBJ_TABLE, /* A table */ + PUBLICATIONOBJ_EXCEPT_TABLE, /* A table to be excluded */ PUBLICATIONOBJ_TABLES_IN_SCHEMA, /* All tables in schema */ PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA, /* All tables in first element of * search_path */ @@ -4307,6 +4309,7 @@ typedef struct PublicationAllObjSpec { NodeTag type; PublicationAllObjType pubobjtype; /* type of this publication object */ + List *except_objects; /* List of publication object to be excluded */ ParseLoc location; /* token location, or -1 if unknown */ } PublicationAllObjSpec; @@ -4326,6 +4329,7 @@ typedef enum AlterPublicationAction AP_AddObjects, /* add objects to publication */ AP_DropObjects, /* remove objects from publication */ AP_SetObjects, /* set list of objects */ + AP_Reset, /* reset the publication */ } AlterPublicationAction; typedef struct AlterPublicationStmt @@ -4341,6 +4345,7 @@ typedef struct AlterPublicationStmt * objects. */ List *pubobjects; /* Optional list of publication objects */ + bool for_all_tables; /* Special publication for all tables in db */ AlterPublicationAction action; /* What action to perform with the given * objects */ } AlterPublicationStmt; diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index e72d1308967e..ef469c761d0c 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -213,13 +213,40 @@ Not-null constraints: regress_publication_user | t | f | t | t | f | f | none | f (1 row) +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_foralltables_excepttable FOR ALL TABLES EXCEPT TABLE (testpub_tbl1, testpub_tbl2); +-- specify EXCEPT without TABLE +CREATE PUBLICATION testpub_foralltables_excepttable1 FOR ALL TABLES EXCEPT (testpub_tbl1); +RESET client_min_messages; +\dRp+ testpub_foralltables_excepttable + Publication testpub_foralltables_excepttable + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl1" + "public.testpub_tbl2" + +\dRp+ testpub_foralltables_excepttable1 + Publication testpub_foralltables_excepttable1 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl1" + DROP TABLE testpub_tbl2; -DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema; +DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema, testpub_foralltables_excepttable, testpub_foralltables_excepttable1; CREATE TABLE testpub_tbl3 (a int); CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; +CREATE PUBLICATION testpub5 FOR ALL TABLES EXCEPT TABLE (testpub_tbl3); +-- EXCEPT with wildcard: exclude table and all descendants +CREATE PUBLICATION testpub6 FOR ALL TABLES EXCEPT TABLE (testpub_tbl3*); +-- EXCEPT with ONLY: exclude table but not descendants +CREATE PUBLICATION testpub7 FOR ALL TABLES EXCEPT TABLE (ONLY testpub_tbl3); RESET client_min_messages; \dRp+ testpub3 Publication testpub3 @@ -238,8 +265,34 @@ Tables: Tables: "public.testpub_tbl3" +\dRp+ testpub5 + Publication testpub5 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl3" + "public.testpub_tbl3a" + +\dRp+ testpub6 + Publication testpub6 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl3" + "public.testpub_tbl3a" + +\dRp+ testpub7 + Publication testpub7 + Owner | All tables | All sequences | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------------------------+------------+---------------+---------+---------+---------+-----------+-------------------+---------- + regress_publication_user | t | f | t | t | t | t | none | f +Except tables: + "public.testpub_tbl3" + DROP TABLE testpub_tbl3, testpub_tbl3a; -DROP PUBLICATION testpub3, testpub4; +DROP PUBLICATION testpub3, testpub4, testpub5, testpub6, testpub7; --- Tests for publications with SEQUENCES CREATE SEQUENCE regress_pub_seq0; CREATE SEQUENCE pub_test.regress_pub_seq1; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 00390aecd476..651b3b12030c 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -105,20 +105,37 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall \d+ testpub_tbl2 \dRp+ testpub_foralltables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_foralltables_excepttable FOR ALL TABLES EXCEPT TABLE (testpub_tbl1, testpub_tbl2); +-- specify EXCEPT without TABLE +CREATE PUBLICATION testpub_foralltables_excepttable1 FOR ALL TABLES EXCEPT (testpub_tbl1); +RESET client_min_messages; + +\dRp+ testpub_foralltables_excepttable +\dRp+ testpub_foralltables_excepttable1 + DROP TABLE testpub_tbl2; -DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema; +DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_for_tbl_schema, testpub_foralltables_excepttable, testpub_foralltables_excepttable1; CREATE TABLE testpub_tbl3 (a int); CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); SET client_min_messages = 'ERROR'; CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; +CREATE PUBLICATION testpub5 FOR ALL TABLES EXCEPT TABLE (testpub_tbl3); +-- EXCEPT with wildcard: exclude table and all descendants +CREATE PUBLICATION testpub6 FOR ALL TABLES EXCEPT TABLE (testpub_tbl3*); +-- EXCEPT with ONLY: exclude table but not descendants +CREATE PUBLICATION testpub7 FOR ALL TABLES EXCEPT TABLE (ONLY testpub_tbl3); RESET client_min_messages; \dRp+ testpub3 \dRp+ testpub4 +\dRp+ testpub5 +\dRp+ testpub6 +\dRp+ testpub7 DROP TABLE testpub_tbl3, testpub_tbl3a; -DROP PUBLICATION testpub3, testpub4; +DROP PUBLICATION testpub3, testpub4, testpub5, testpub6, testpub7; --- Tests for publications with SEQUENCES CREATE SEQUENCE regress_pub_seq0; diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index 85d10a89994e..b8e5c54c314d 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -46,6 +46,7 @@ tests += { 't/034_temporal.pl', 't/035_conflicts.pl', 't/036_sequences.pl', + 't/037_rep_changes_except_table.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/037_rep_changes_except_table.pl b/src/test/subscription/t/037_rep_changes_except_table.pl new file mode 100644 index 000000000000..09174b7d5d73 --- /dev/null +++ b/src/test/subscription/t/037_rep_changes_except_table.pl @@ -0,0 +1,215 @@ + +# Copyright (c) 2021-2025, PostgreSQL Global Development Group + +# Logical replication tests for EXCEPT TABLE publications +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Initialize subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# ============================================ +# EXCEPT TABLE test cases for normal tables +# ============================================ +# Create schemas and tables on publisher +$node_publisher->safe_psql( + 'postgres', qq( + CREATE SCHEMA sch1; + CREATE TABLE sch1.tab1 AS SELECT generate_series(1,10) AS a; +)); + +# Create schemas and tables on subscriber +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE SCHEMA sch1; + CREATE TABLE sch1.tab1 (a int); +)); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_schema FOR ALL TABLES EXCEPT TABLE (sch1.tab1)" +); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, + 'tap_sub_schema'); + +# Check the table data does not sync for excluded table +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch1.tab1"); +is($result, qq(0||), + 'check there is no initial data copied for the excluded table'); + +# Verify that data inserted to the excluded table is not replicated. +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.tab1 VALUES(generate_series(11,20))"); + +$node_publisher->wait_for_catchup('tap_sub_schema'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch1.tab1"); +is($result, qq(0||), 'check replicated inserts on subscriber'); + +# cleanup +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_schema"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_schema"); + +# ============================================ +# EXCEPT TABLE test cases for partition tables +# ============================================ +# Check behavior of EXCEPT TABLE together with publish_via_partition_root +# when applied to a partitioned table and its partitions. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE sch1.t1(a int) PARTITION BY RANGE(a); + CREATE TABLE sch1.part1 PARTITION OF sch1.t1 FOR VALUES FROM (0) TO (5); + CREATE TABLE sch1.part2 PARTITION OF sch1.t1 FOR VALUES FROM (6) TO (10); + INSERT INTO sch1.t1 VALUES (1), (6); +)); + +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE sch1.t1(a int); + CREATE TABLE sch1.part1(a int); + CREATE TABLE sch1.part2(a int); +)); + +# EXCEPT TABLE (sch1.part1) with publish_via_partition_root = false +# Excluding a partition while publish_via_partition_root = false prevents +# replication of rows inserted into the partitioned table. +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.part1)" +); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" +); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.t1 VALUES (2), (7);"); +$node_publisher->wait_for_catchup('tap_sub_part'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is($result, qq(), 'check rows on partitioned table'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); +is($result, qq(), 'check rows on excluded partition'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2"); +is( $result, qq(6 +7), 'check rows on other partition'); + +$node_publisher->safe_psql('postgres', "TRUNCATE sch1.t1"); +$node_publisher->wait_for_catchup('tap_sub_part'); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_part"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_part;"); + +# EXCEPT TABLE (sch1.t1) with publish_via_partition_root = false +# Excluding the partitioned table while publish_via_partition_root = false +# still allows rows inserted into its partitions to be replicated. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.t1); + INSERT INTO sch1.t1 VALUES (1), (6); +)); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" +); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.t1 VALUES (2), (7);"); +$node_publisher->wait_for_catchup('tap_sub_part'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is($result, qq(), 'check rows on partitioned table'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); +is( $result, qq(1 +2), 'check rows on first partition'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2"); +is( $result, qq(6 +7), 'check rows on second partition'); + +$node_publisher->safe_psql('postgres', "TRUNCATE sch1.t1"); +$node_publisher->wait_for_catchup('tap_sub_part'); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_part"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_part;"); + +# EXCEPT TABLE (sch1.t1) with publish_via_partition_root = true +# When the partitioned table is excluded and publish_via_partition_root is true, +# no rows from the table or its partitions are replicated. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.t1) WITH (publish_via_partition_root); + INSERT INTO sch1.t1 VALUES (1), (6); +)); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" +); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.t1 VALUES (2), (7);"); +$node_publisher->wait_for_catchup('tap_sub_part'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1"); +is($result, qq(), 'check rows on partitioned table'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); +is($result, qq(), 'check rows on first partition'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2"); +is($result, qq(), 'check rows on second partition'); + +$node_publisher->safe_psql('postgres', "TRUNCATE sch1.t1"); +$node_publisher->wait_for_catchup('tap_sub_part'); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_part"); +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_part;"); + +# EXCEPT TABLE (sch1.part1) with publish_via_partition_root = true +# When a partition is excluded but publish_via_partition_root is true, +# rows published through the partitioned table can still be replicated. +$node_publisher->safe_psql( + 'postgres', qq( + CREATE PUBLICATION tap_pub_part FOR ALL TABLES EXCEPT TABLE (sch1.part1) WITH (publish_via_partition_root); + INSERT INTO sch1.t1 VALUES (1), (6) +)); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_part CONNECTION '$publisher_connstr' PUBLICATION tap_pub_part" +); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_part'); +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.t1 VALUES (2), (7);"); +$node_publisher->wait_for_catchup('tap_sub_part'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1 ORDER BY a"); +is( $result, qq(1 +2 +6 +7), 'check rows on partitioned table'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part1"); +is($result, qq(), 'check rows on excluded partition'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.part2"); +is($result, qq(), 'check rows on other partition'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); + +done_testing();