summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bin/pg_basebackup/pg_createsubscriber.c95
-rw-r--r--src/bin/pg_basebackup/t/040_pg_createsubscriber.pl54
-rw-r--r--src/test/postmaster/t/002_connection_limits.pl5
-rw-r--r--src/test/postmaster/t/003_start_stop.pl7
4 files changed, 135 insertions, 26 deletions
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index ef6deec14af..41a649297c7 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -116,6 +116,7 @@ static void stop_standby_server(const char *datadir);
static void wait_for_end_recovery(const char *conninfo,
const struct CreateSubscriberOptions *opt);
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
static void drop_publication(PGconn *conn, const char *pubname,
const char *dbname, bool *made_publication);
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -764,6 +765,39 @@ generate_object_name(PGconn *conn)
}
/*
+ * Does the publication exist in the specified database?
+ */
+static bool
+find_publication(PGconn *conn, const char *pubname, const char *dbname)
+{
+ PQExpBuffer str = createPQExpBuffer();
+ PGresult *res;
+ bool found = false;
+ char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
+
+ appendPQExpBuffer(str,
+ "SELECT 1 FROM pg_catalog.pg_publication "
+ "WHERE pubname = %s",
+ pubname_esc);
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
+ pubname, dbname, PQerrorMessage(conn));
+ disconnect_database(conn, true);
+ }
+
+ if (PQntuples(res) == 1)
+ found = true;
+
+ PQclear(res);
+ PQfreemem(pubname_esc);
+ destroyPQExpBuffer(str);
+
+ return found;
+}
+
+/*
* Create the publications and replication slots in preparation for logical
* replication. Returns the LSN from latest replication slot. It will be the
* replication start point that is used to adjust the subscriptions (see
@@ -799,13 +833,25 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
if (num_replslots == 0)
dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
- /*
- * Create publication on publisher. This step should be executed
- * *before* promoting the subscriber to avoid any transactions between
- * consistent LSN and the new publication rows (such transactions
- * wouldn't see the new publication rows resulting in an error).
- */
- create_publication(conn, &dbinfo[i]);
+ if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
+ {
+ /* Reuse existing publication on publisher. */
+ pg_log_info("use existing publication \"%s\" in database \"%s\"",
+ dbinfo[i].pubname, dbinfo[i].dbname);
+ /* Don't remove pre-existing publication if an error occurs. */
+ dbinfo[i].made_publication = false;
+ }
+ else
+ {
+ /*
+ * Create publication on publisher. This step should be executed
+ * *before* promoting the subscriber to avoid any transactions
+ * between consistent LSN and the new publication rows (such
+ * transactions wouldn't see the new publication rows resulting in
+ * an error).
+ */
+ create_publication(conn, &dbinfo[i]);
+ }
/* Create replication slot on publisher */
if (lsn)
@@ -1749,11 +1795,10 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname,
/*
* Retrieve and drop the publications.
*
- * Since the publications were created before the consistent LSN, they
- * remain on the subscriber even after the physical replica is
- * promoted. Remove these publications from the subscriber because
- * they have no use. Additionally, if requested, drop all pre-existing
- * publications.
+ * Publications copied during physical replication remain on the subscriber
+ * after promotion. If --clean=publications is specified, drop all existing
+ * publications in the subscriber database. Otherwise, only drop publications
+ * that were created by pg_createsubscriber during this operation.
*/
static void
check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
@@ -1785,14 +1830,24 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
PQclear(res);
}
-
- /*
- * In dry-run mode, we don't create publications, but we still try to drop
- * those to provide necessary information to the user.
- */
- if (!drop_all_pubs || dry_run)
- drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
- &dbinfo->made_publication);
+ else
+ {
+ /* Drop publication only if it was created by this tool */
+ if (dbinfo->made_publication)
+ {
+ drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+ &dbinfo->made_publication);
+ }
+ else
+ {
+ if (dry_run)
+ pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
+ else
+ pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
+ }
+ }
}
/*
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 3d6086dc489..9e0db6cd099 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -443,10 +443,17 @@ is(scalar(() = $stderr =~ /would create the replication slot/g),
is(scalar(() = $stderr =~ /would create subscription/g),
3, "verify subscriptions are created for all databases");
+# Create a user-defined publication, and a table that is not a member of that
+# publication.
+$node_p->safe_psql($db1, qq(
+ CREATE PUBLICATION test_pub3 FOR TABLE tbl1;
+ CREATE TABLE not_replicated (a int);
+));
+
# Run pg_createsubscriber on node S. --verbose is used twice
# to show more information.
-# In passing, also test the --enable-two-phase option and
-# --clean option
+#
+# Test two phase and clean options. Use pre-existing publication.
command_ok(
[
'pg_createsubscriber',
@@ -456,7 +463,7 @@ command_ok(
'--publisher-server' => $node_p->connstr($db1),
'--socketdir' => $node_s->host,
'--subscriber-port' => $node_s->port,
- '--publication' => 'pub1',
+ '--publication' => 'test_pub3',
'--publication' => 'pub2',
'--replication-slot' => 'replslot1',
'--replication-slot' => 'replslot2',
@@ -478,13 +485,16 @@ is($result, qq(0),
# Insert rows on P
$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('third row')");
$node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
+$node_p->safe_psql($db1, "INSERT INTO not_replicated VALUES(0)");
# Start subscriber
$node_s->start;
# Confirm publications are removed from the subscriber node
-is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
- '0', 'all publications on subscriber have been removed');
+is($node_s->safe_psql($db1, 'SELECT COUNT(*) FROM pg_publication'),
+ '0', 'all publications were removed from db1');
+is($node_s->safe_psql($db2, 'SELECT COUNT(*) FROM pg_publication'),
+ '0', 'all publications were removed from db2');
# Verify that all subtwophase states are pending or enabled,
# e.g. there are no subscriptions where subtwophase is disabled ('d')
@@ -525,6 +535,9 @@ is( $result, qq(first row
second row
third row),
"logical replication works in database $db1");
+$result = $node_s->safe_psql($db1, 'SELECT * FROM not_replicated');
+is($result, qq(),
+ "table is not replicated in database $db1");
# Check result in database $db2
$result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2');
@@ -537,6 +550,37 @@ my $sysid_s = $node_s->safe_psql('postgres',
'SELECT system_identifier FROM pg_control_system()');
isnt($sysid_p, $sysid_s, 'system identifier was changed');
+# Verify that pub2 was created in $db2
+is($node_p->safe_psql($db2, "SELECT COUNT(*) FROM pg_publication WHERE pubname = 'pub2'"),
+ '1', "publication pub2 was created in $db2");
+
+# Get subscription and publication names
+$result = $node_s->safe_psql(
+ 'postgres', qq(
+ SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+ ORDER BY subpublications;
+));
+like(
+ $result,
+ qr/^pg_createsubscriber_\d+_[0-9a-f]+ \|\{pub2\}\n
+ pg_createsubscriber_\d+_[0-9a-f]+ \|\{test_pub3\}$/x,
+ 'subscription and publication names are ok');
+
+# Verify that the correct publications are being used
+$result = $node_s->safe_psql(
+ 'postgres', qq(
+ SELECT d.datname, s.subpublications
+ FROM pg_subscription s
+ JOIN pg_database d ON d.oid = s.subdbid
+ WHERE subname ~ '^pg_createsubscriber_'
+ ORDER BY s.subdbid
+ )
+);
+
+is($result, qq($db1|{test_pub3}
+$db2|{pub2}),
+ "subscriptions use the correct publications");
+
# clean up
$node_p->teardown_node;
$node_s->teardown_node;
diff --git a/src/test/postmaster/t/002_connection_limits.pl b/src/test/postmaster/t/002_connection_limits.pl
index 4a7fb16261f..2fc821ad0b4 100644
--- a/src/test/postmaster/t/002_connection_limits.pl
+++ b/src/test/postmaster/t/002_connection_limits.pl
@@ -74,6 +74,11 @@ sub connect_fails_wait
ok(1, "$test_name: client backend process exited");
}
+# Restart the server to ensure that any backends launched for the
+# initialization steps are gone. Otherwise they could still be using
+# up connection slots and mess with our expectations.
+$node->restart;
+
my @sessions = ();
my @raw_connections = ();
diff --git a/src/test/postmaster/t/003_start_stop.pl b/src/test/postmaster/t/003_start_stop.pl
index 58e7ba6cc42..25d6f667217 100644
--- a/src/test/postmaster/t/003_start_stop.pl
+++ b/src/test/postmaster/t/003_start_stop.pl
@@ -46,6 +46,11 @@ if (!$node->raw_connect_works())
plan skip_all => "this test requires working raw_connect()";
}
+# Restart the server to ensure that the backend launched for
+# raw_connect_works() is gone. Otherwise, it might free up the
+# connection slot later, when we expect all the slots to be in use.
+$node->restart;
+
my @raw_connections = ();
# Open a lot of TCP (or Unix domain socket) connections to use up all
@@ -81,7 +86,7 @@ for (my $i = 0; $i <= 20; $i++)
# clients already" instead of "role does not exist" error. Test that
# to ensure that we have used up all the slots.
$node->connect_fails("dbname=postgres user=invalid_user",
- "connect ",
+ "connection is rejected when all slots are in use",
expected_stderr => qr/FATAL: sorry, too many clients already/);
# Open one more connection, to really ensure that we have at least one