diff options
Diffstat (limited to 'src/bin/pg_basebackup/pg_createsubscriber.c')
| -rw-r--r-- | src/bin/pg_basebackup/pg_createsubscriber.c | 95 |
1 files changed, 75 insertions, 20 deletions
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index ef6deec14af..41a649297c7 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -116,6 +116,7 @@ static void stop_standby_server(const char *datadir); static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt); static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); +static bool find_publication(PGconn *conn, const char *pubname, const char *dbname); static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication); static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo); @@ -764,6 +765,39 @@ generate_object_name(PGconn *conn) } /* + * Does the publication exist in the specified database? + */ +static bool +find_publication(PGconn *conn, const char *pubname, const char *dbname) +{ + PQExpBuffer str = createPQExpBuffer(); + PGresult *res; + bool found = false; + char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname)); + + appendPQExpBuffer(str, + "SELECT 1 FROM pg_catalog.pg_publication " + "WHERE pubname = %s", + pubname_esc); + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not find publication \"%s\" in database \"%s\": %s", + pubname, dbname, PQerrorMessage(conn)); + disconnect_database(conn, true); + } + + if (PQntuples(res) == 1) + found = true; + + PQclear(res); + PQfreemem(pubname_esc); + destroyPQExpBuffer(str); + + return found; +} + +/* * Create the publications and replication slots in preparation for logical * replication. Returns the LSN from latest replication slot. It will be the * replication start point that is used to adjust the subscriptions (see @@ -799,13 +833,25 @@ setup_publisher(struct LogicalRepInfo *dbinfo) if (num_replslots == 0) dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname); - /* - * Create publication on publisher. This step should be executed - * *before* promoting the subscriber to avoid any transactions between - * consistent LSN and the new publication rows (such transactions - * wouldn't see the new publication rows resulting in an error). - */ - create_publication(conn, &dbinfo[i]); + if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname)) + { + /* Reuse existing publication on publisher. */ + pg_log_info("use existing publication \"%s\" in database \"%s\"", + dbinfo[i].pubname, dbinfo[i].dbname); + /* Don't remove pre-existing publication if an error occurs. */ + dbinfo[i].made_publication = false; + } + else + { + /* + * Create publication on publisher. This step should be executed + * *before* promoting the subscriber to avoid any transactions + * between consistent LSN and the new publication rows (such + * transactions wouldn't see the new publication rows resulting in + * an error). + */ + create_publication(conn, &dbinfo[i]); + } /* Create replication slot on publisher */ if (lsn) @@ -1749,11 +1795,10 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname, /* * Retrieve and drop the publications. * - * Since the publications were created before the consistent LSN, they - * remain on the subscriber even after the physical replica is - * promoted. Remove these publications from the subscriber because - * they have no use. Additionally, if requested, drop all pre-existing - * publications. + * Publications copied during physical replication remain on the subscriber + * after promotion. If --clean=publications is specified, drop all existing + * publications in the subscriber database. Otherwise, only drop publications + * that were created by pg_createsubscriber during this operation. */ static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo) @@ -1785,14 +1830,24 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo) PQclear(res); } - - /* - * In dry-run mode, we don't create publications, but we still try to drop - * those to provide necessary information to the user. - */ - if (!drop_all_pubs || dry_run) - drop_publication(conn, dbinfo->pubname, dbinfo->dbname, - &dbinfo->made_publication); + else + { + /* Drop publication only if it was created by this tool */ + if (dbinfo->made_publication) + { + drop_publication(conn, dbinfo->pubname, dbinfo->dbname, + &dbinfo->made_publication); + } + else + { + if (dry_run) + pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"", + dbinfo->pubname, dbinfo->dbname); + else + pg_log_info("preserve existing publication \"%s\" in database \"%s\"", + dbinfo->pubname, dbinfo->dbname); + } + } } /* |
