summaryrefslogtreecommitdiff
path: root/src/bin/pg_basebackup/pg_createsubscriber.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_basebackup/pg_createsubscriber.c')
-rw-r--r--src/bin/pg_basebackup/pg_createsubscriber.c95
1 files changed, 75 insertions, 20 deletions
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index ef6deec14af..41a649297c7 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -116,6 +116,7 @@ static void stop_standby_server(const char *datadir);
static void wait_for_end_recovery(const char *conninfo,
const struct CreateSubscriberOptions *opt);
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
static void drop_publication(PGconn *conn, const char *pubname,
const char *dbname, bool *made_publication);
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -764,6 +765,39 @@ generate_object_name(PGconn *conn)
}
/*
+ * Does the publication exist in the specified database?
+ */
+static bool
+find_publication(PGconn *conn, const char *pubname, const char *dbname)
+{
+ PQExpBuffer str = createPQExpBuffer();
+ PGresult *res;
+ bool found = false;
+ char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
+
+ appendPQExpBuffer(str,
+ "SELECT 1 FROM pg_catalog.pg_publication "
+ "WHERE pubname = %s",
+ pubname_esc);
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
+ pubname, dbname, PQerrorMessage(conn));
+ disconnect_database(conn, true);
+ }
+
+ if (PQntuples(res) == 1)
+ found = true;
+
+ PQclear(res);
+ PQfreemem(pubname_esc);
+ destroyPQExpBuffer(str);
+
+ return found;
+}
+
+/*
* Create the publications and replication slots in preparation for logical
* replication. Returns the LSN from latest replication slot. It will be the
* replication start point that is used to adjust the subscriptions (see
@@ -799,13 +833,25 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
if (num_replslots == 0)
dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
- /*
- * Create publication on publisher. This step should be executed
- * *before* promoting the subscriber to avoid any transactions between
- * consistent LSN and the new publication rows (such transactions
- * wouldn't see the new publication rows resulting in an error).
- */
- create_publication(conn, &dbinfo[i]);
+ if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
+ {
+ /* Reuse existing publication on publisher. */
+ pg_log_info("use existing publication \"%s\" in database \"%s\"",
+ dbinfo[i].pubname, dbinfo[i].dbname);
+ /* Don't remove pre-existing publication if an error occurs. */
+ dbinfo[i].made_publication = false;
+ }
+ else
+ {
+ /*
+ * Create publication on publisher. This step should be executed
+ * *before* promoting the subscriber to avoid any transactions
+ * between consistent LSN and the new publication rows (such
+ * transactions wouldn't see the new publication rows resulting in
+ * an error).
+ */
+ create_publication(conn, &dbinfo[i]);
+ }
/* Create replication slot on publisher */
if (lsn)
@@ -1749,11 +1795,10 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname,
/*
* Retrieve and drop the publications.
*
- * Since the publications were created before the consistent LSN, they
- * remain on the subscriber even after the physical replica is
- * promoted. Remove these publications from the subscriber because
- * they have no use. Additionally, if requested, drop all pre-existing
- * publications.
+ * Publications copied during physical replication remain on the subscriber
+ * after promotion. If --clean=publications is specified, drop all existing
+ * publications in the subscriber database. Otherwise, only drop publications
+ * that were created by pg_createsubscriber during this operation.
*/
static void
check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
@@ -1785,14 +1830,24 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
PQclear(res);
}
-
- /*
- * In dry-run mode, we don't create publications, but we still try to drop
- * those to provide necessary information to the user.
- */
- if (!drop_all_pubs || dry_run)
- drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
- &dbinfo->made_publication);
+ else
+ {
+ /* Drop publication only if it was created by this tool */
+ if (dbinfo->made_publication)
+ {
+ drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+ &dbinfo->made_publication);
+ }
+ else
+ {
+ if (dry_run)
+ pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
+ else
+ pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
+ }
+ }
}
/*