summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/ltree/crc32.c46
-rw-r--r--contrib/ltree/lquery_op.c39
-rw-r--r--doc/src/sgml/ref/pg_createsubscriber.sgml8
-rw-r--r--src/backend/access/heap/vacuumlazy.c3
-rw-r--r--src/backend/parser/scansup.c36
-rw-r--r--src/backend/utils/activity/pgstat_relation.c11
-rw-r--r--src/backend/utils/adt/pg_locale.c20
-rw-r--r--src/backend/utils/adt/pg_locale_builtin.c2
-rw-r--r--src/backend/utils/adt/pg_locale_icu.c75
-rw-r--r--src/backend/utils/adt/pg_locale_libc.c33
-rw-r--r--src/backend/utils/misc/injection_point.c3
-rw-r--r--src/bin/pg_basebackup/pg_createsubscriber.c95
-rw-r--r--src/bin/pg_basebackup/t/040_pg_createsubscriber.pl54
-rw-r--r--src/include/pgstat.h4
-rw-r--r--src/include/utils/pg_locale.h6
15 files changed, 361 insertions, 74 deletions
diff --git a/contrib/ltree/crc32.c b/contrib/ltree/crc32.c
index 134f46a805e..3918d4a0ec2 100644
--- a/contrib/ltree/crc32.c
+++ b/contrib/ltree/crc32.c
@@ -10,31 +10,61 @@
#include "postgres.h"
#include "ltree.h"
+#include "crc32.h"
+#include "utils/pg_crc.h"
#ifdef LOWER_NODE
-#include <ctype.h>
-#define TOLOWER(x) tolower((unsigned char) (x))
-#else
-#define TOLOWER(x) (x)
+#include "utils/pg_locale.h"
#endif
-#include "crc32.h"
-#include "utils/pg_crc.h"
+#ifdef LOWER_NODE
unsigned int
ltree_crc32_sz(const char *buf, int size)
{
pg_crc32 crc;
const char *p = buf;
+ static pg_locale_t locale = NULL;
+
+ if (!locale)
+ locale = pg_database_locale();
INIT_TRADITIONAL_CRC32(crc);
while (size > 0)
{
- char c = (char) TOLOWER(*p);
+ char foldstr[UNICODE_CASEMAP_BUFSZ];
+ int srclen = pg_mblen(p);
+ size_t foldlen;
+
+ /* fold one codepoint at a time */
+ foldlen = pg_strfold(foldstr, UNICODE_CASEMAP_BUFSZ, p, srclen,
+ locale);
+
+ COMP_TRADITIONAL_CRC32(crc, foldstr, foldlen);
+
+ size -= srclen;
+ p += srclen;
+ }
+ FIN_TRADITIONAL_CRC32(crc);
+ return (unsigned int) crc;
+}
+
+#else
- COMP_TRADITIONAL_CRC32(crc, &c, 1);
+unsigned int
+ltree_crc32_sz(const char *buf, int size)
+{
+ pg_crc32 crc;
+ const char *p = buf;
+
+ INIT_TRADITIONAL_CRC32(crc);
+ while (size > 0)
+ {
+ COMP_TRADITIONAL_CRC32(crc, p, 1);
size--;
p++;
}
FIN_TRADITIONAL_CRC32(crc);
return (unsigned int) crc;
}
+
+#endif /* !LOWER_NODE */
diff --git a/contrib/ltree/lquery_op.c b/contrib/ltree/lquery_op.c
index 0b39d64a839..a28ddbf40de 100644
--- a/contrib/ltree/lquery_op.c
+++ b/contrib/ltree/lquery_op.c
@@ -93,11 +93,44 @@ ltree_prefix_eq(const char *a, size_t a_sz, const char *b, size_t b_sz)
bool
ltree_prefix_eq_ci(const char *a, size_t a_sz, const char *b, size_t b_sz)
{
- char *al = str_tolower(a, a_sz, DEFAULT_COLLATION_OID);
- char *bl = str_tolower(b, b_sz, DEFAULT_COLLATION_OID);
+ static pg_locale_t locale = NULL;
+ size_t al_sz = a_sz + 1;
+ size_t al_len;
+ char *al = palloc(al_sz);
+ size_t bl_sz = b_sz + 1;
+ size_t bl_len;
+ char *bl = palloc(bl_sz);
bool res;
- res = (strncmp(al, bl, a_sz) == 0);
+ if (!locale)
+ locale = pg_database_locale();
+
+ /* casefold both a and b */
+
+ al_len = pg_strfold(al, al_sz, a, a_sz, locale);
+ if (al_len + 1 > al_sz)
+ {
+ /* grow buffer if needed and retry */
+ al_sz = al_len + 1;
+ al = repalloc(al, al_sz);
+ al_len = pg_strfold(al, al_sz, a, a_sz, locale);
+ Assert(al_len + 1 <= al_sz);
+ }
+
+ bl_len = pg_strfold(bl, bl_sz, b, b_sz, locale);
+ if (bl_len + 1 > bl_sz)
+ {
+ /* grow buffer if needed and retry */
+ bl_sz = bl_len + 1;
+ bl = repalloc(bl, bl_sz);
+ bl_len = pg_strfold(bl, bl_sz, b, b_sz, locale);
+ Assert(bl_len + 1 <= bl_sz);
+ }
+
+ if (al_len > bl_len)
+ res = false;
+ else
+ res = (strncmp(al, bl, al_len) == 0);
pfree(al);
pfree(bl);
diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index bb9cc72576c..5a62187b189 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -285,6 +285,14 @@ PostgreSQL documentation
a generated name is assigned to the publication name. This option cannot
be used together with <option>--all</option>.
</para>
+ <para>
+ If a specified publication already exists on the publisher, it is reused.
+ It is useful to partially replicate the database if the specified
+ publication includes a list of tables. If the publication does not exist,
+ it is automatically created with <literal>FOR ALL TABLES</literal>. Use
+ <option>--dry-run</option> option to preview which publications will be
+ reused and which will be created.
+ </para>
</listitem>
</varlistentry>
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 62035b7f9c3..30778a15639 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -961,8 +961,7 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
* soon in cases where the failsafe prevented significant amounts of heap
* vacuuming.
*/
- pgstat_report_vacuum(RelationGetRelid(rel),
- rel->rd_rel->relisshared,
+ pgstat_report_vacuum(rel,
Max(vacrel->new_live_tuples, 0),
vacrel->recently_dead_tuples +
vacrel->missed_dead_tuples,
diff --git a/src/backend/parser/scansup.c b/src/backend/parser/scansup.c
index 2feb2b6cf5a..d63cb865260 100644
--- a/src/backend/parser/scansup.c
+++ b/src/backend/parser/scansup.c
@@ -18,6 +18,7 @@
#include "mb/pg_wchar.h"
#include "parser/scansup.h"
+#include "utils/pg_locale.h"
/*
@@ -46,35 +47,22 @@ char *
downcase_identifier(const char *ident, int len, bool warn, bool truncate)
{
char *result;
- int i;
- bool enc_is_single_byte;
-
- result = palloc(len + 1);
- enc_is_single_byte = pg_database_encoding_max_length() == 1;
+ size_t needed pg_attribute_unused();
/*
- * SQL99 specifies Unicode-aware case normalization, which we don't yet
- * have the infrastructure for. Instead we use tolower() to provide a
- * locale-aware translation. However, there are some locales where this
- * is not right either (eg, Turkish may do strange things with 'i' and
- * 'I'). Our current compromise is to use tolower() for characters with
- * the high bit set, as long as they aren't part of a multi-byte
- * character, and use an ASCII-only downcasing for 7-bit characters.
+ * Preserves string length.
+ *
+ * NB: if we decide to support Unicode-aware identifier case folding, then
+ * we need to account for a change in string length.
*/
- for (i = 0; i < len; i++)
- {
- unsigned char ch = (unsigned char) ident[i];
+ result = palloc(len + 1);
- if (ch >= 'A' && ch <= 'Z')
- ch += 'a' - 'A';
- else if (enc_is_single_byte && IS_HIGHBIT_SET(ch) && isupper(ch))
- ch = tolower(ch);
- result[i] = (char) ch;
- }
- result[i] = '\0';
+ needed = pg_downcase_ident(result, len + 1, ident, len);
+ Assert(needed == len);
+ Assert(result[len] == '\0');
- if (i >= NAMEDATALEN && truncate)
- truncate_identifier(result, i, warn);
+ if (len >= NAMEDATALEN && truncate)
+ truncate_identifier(result, len, warn);
return result;
}
diff --git a/src/backend/utils/activity/pgstat_relation.c b/src/backend/utils/activity/pgstat_relation.c
index b90754f8578..55a10c299db 100644
--- a/src/backend/utils/activity/pgstat_relation.c
+++ b/src/backend/utils/activity/pgstat_relation.c
@@ -207,14 +207,13 @@ pgstat_drop_relation(Relation rel)
* Report that the table was just vacuumed and flush IO statistics.
*/
void
-pgstat_report_vacuum(Oid tableoid, bool shared,
- PgStat_Counter livetuples, PgStat_Counter deadtuples,
- TimestampTz starttime)
+pgstat_report_vacuum(Relation rel, PgStat_Counter livetuples,
+ PgStat_Counter deadtuples, TimestampTz starttime)
{
PgStat_EntryRef *entry_ref;
PgStatShared_Relation *shtabentry;
PgStat_StatTabEntry *tabentry;
- Oid dboid = (shared ? InvalidOid : MyDatabaseId);
+ Oid dboid = (rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId);
TimestampTz ts;
PgStat_Counter elapsedtime;
@@ -226,8 +225,8 @@ pgstat_report_vacuum(Oid tableoid, bool shared,
elapsedtime = TimestampDifferenceMilliseconds(starttime, ts);
/* block acquiring lock for the same reason as pgstat_report_autovac() */
- entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_RELATION,
- dboid, tableoid, false);
+ entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_RELATION, dboid,
+ RelationGetRelid(rel), false);
shtabentry = (PgStatShared_Relation *) entry_ref->shared_stats;
tabentry = &shtabentry->stats;
diff --git a/src/backend/utils/adt/pg_locale.c b/src/backend/utils/adt/pg_locale.c
index 8a3796aa5d0..ee08ac045b7 100644
--- a/src/backend/utils/adt/pg_locale.c
+++ b/src/backend/utils/adt/pg_locale.c
@@ -1353,6 +1353,26 @@ pg_strfold(char *dst, size_t dstsize, const char *src, ssize_t srclen,
}
/*
+ * Lowercase an identifier using the database default locale.
+ *
+ * For historical reasons, does not use ordinary locale behavior. Should only
+ * be used for identifiers. XXX: can we make this equivalent to
+ * pg_strfold(..., default_locale)?
+ */
+size_t
+pg_downcase_ident(char *dst, size_t dstsize, const char *src, ssize_t srclen)
+{
+ pg_locale_t locale = default_locale;
+
+ if (locale == NULL || locale->ctype == NULL ||
+ locale->ctype->downcase_ident == NULL)
+ return strlower_c(dst, dstsize, src, srclen);
+ else
+ return locale->ctype->downcase_ident(dst, dstsize, src, srclen,
+ locale);
+}
+
+/*
* pg_strcoll
*
* Like pg_strncoll for NUL-terminated input strings.
diff --git a/src/backend/utils/adt/pg_locale_builtin.c b/src/backend/utils/adt/pg_locale_builtin.c
index 0c2920112bb..145b4641b1b 100644
--- a/src/backend/utils/adt/pg_locale_builtin.c
+++ b/src/backend/utils/adt/pg_locale_builtin.c
@@ -208,6 +208,8 @@ static const struct ctype_methods ctype_methods_builtin = {
.strtitle = strtitle_builtin,
.strupper = strupper_builtin,
.strfold = strfold_builtin,
+ /* uses plain ASCII semantics for historical reasons */
+ .downcase_ident = NULL,
.wc_isdigit = wc_isdigit_builtin,
.wc_isalpha = wc_isalpha_builtin,
.wc_isalnum = wc_isalnum_builtin,
diff --git a/src/backend/utils/adt/pg_locale_icu.c b/src/backend/utils/adt/pg_locale_icu.c
index 18d026deda8..43d44fe43bd 100644
--- a/src/backend/utils/adt/pg_locale_icu.c
+++ b/src/backend/utils/adt/pg_locale_icu.c
@@ -61,6 +61,8 @@ static size_t strupper_icu(char *dest, size_t destsize, const char *src,
ssize_t srclen, pg_locale_t locale);
static size_t strfold_icu(char *dest, size_t destsize, const char *src,
ssize_t srclen, pg_locale_t locale);
+static size_t downcase_ident_icu(char *dst, size_t dstsize, const char *src,
+ ssize_t srclen, pg_locale_t locale);
static int strncoll_icu(const char *arg1, ssize_t len1,
const char *arg2, ssize_t len2,
pg_locale_t locale);
@@ -123,7 +125,7 @@ static int32_t u_strFoldCase_default(UChar *dest, int32_t destCapacity,
/*
* XXX: many of the functions below rely on casts directly from pg_wchar to
- * UChar32, which is correct for the UTF-8 encoding, but not in general.
+ * UChar32, which is correct for UTF-8 and LATIN1, but not in general.
*/
static pg_wchar
@@ -227,6 +229,7 @@ static const struct ctype_methods ctype_methods_icu = {
.strtitle = strtitle_icu,
.strupper = strupper_icu,
.strfold = strfold_icu,
+ .downcase_ident = downcase_ident_icu,
.wc_isdigit = wc_isdigit_icu,
.wc_isalpha = wc_isalpha_icu,
.wc_isalnum = wc_isalnum_icu,
@@ -241,6 +244,29 @@ static const struct ctype_methods ctype_methods_icu = {
.wc_toupper = toupper_icu,
.wc_tolower = tolower_icu,
};
+
+/*
+ * ICU still depends on libc for compatibility with certain historical
+ * behavior for single-byte encodings. See downcase_ident_icu().
+ *
+ * XXX: consider fixing by decoding the single byte into a code point, and
+ * using u_tolower().
+ */
+static locale_t
+make_libc_ctype_locale(const char *ctype)
+{
+ locale_t loc;
+
+#ifndef WIN32
+ loc = newlocale(LC_CTYPE_MASK, ctype, NULL);
+#else
+ loc = _create_locale(LC_ALL, ctype);
+#endif
+ if (!loc)
+ report_newlocale_failure(ctype);
+
+ return loc;
+}
#endif
pg_locale_t
@@ -251,6 +277,7 @@ create_pg_locale_icu(Oid collid, MemoryContext context)
const char *iculocstr;
const char *icurules = NULL;
UCollator *collator;
+ locale_t loc = (locale_t) 0;
pg_locale_t result;
if (collid == DEFAULT_COLLATION_OID)
@@ -273,6 +300,18 @@ create_pg_locale_icu(Oid collid, MemoryContext context)
if (!isnull)
icurules = TextDatumGetCString(datum);
+ /* libc only needed for default locale and single-byte encoding */
+ if (pg_database_encoding_max_length() == 1)
+ {
+ const char *ctype;
+
+ datum = SysCacheGetAttrNotNull(DATABASEOID, tp,
+ Anum_pg_database_datctype);
+ ctype = TextDatumGetCString(datum);
+
+ loc = make_libc_ctype_locale(ctype);
+ }
+
ReleaseSysCache(tp);
}
else
@@ -303,6 +342,7 @@ create_pg_locale_icu(Oid collid, MemoryContext context)
result = MemoryContextAllocZero(context, sizeof(struct pg_locale_struct));
result->icu.locale = MemoryContextStrdup(context, iculocstr);
result->icu.ucol = collator;
+ result->icu.lt = loc;
result->deterministic = deterministic;
result->collate_is_c = false;
result->ctype_is_c = false;
@@ -565,6 +605,39 @@ strfold_icu(char *dest, size_t destsize, const char *src, ssize_t srclen,
}
/*
+ * For historical compatibility, behavior is not multibyte-aware.
+ *
+ * NB: uses libc tolower() for single-byte encodings (also for historical
+ * compatibility), and therefore relies on the global LC_CTYPE setting.
+ */
+static size_t
+downcase_ident_icu(char *dst, size_t dstsize, const char *src,
+ ssize_t srclen, pg_locale_t locale)
+{
+ int i;
+ bool libc_lower;
+ locale_t lt = locale->icu.lt;
+
+ libc_lower = lt && (pg_database_encoding_max_length() == 1);
+
+ for (i = 0; i < srclen && i < dstsize; i++)
+ {
+ unsigned char ch = (unsigned char) src[i];
+
+ if (ch >= 'A' && ch <= 'Z')
+ ch = pg_ascii_tolower(ch);
+ else if (libc_lower && IS_HIGHBIT_SET(ch) && isupper_l(ch, lt))
+ ch = tolower_l(ch, lt);
+ dst[i] = (char) ch;
+ }
+
+ if (i < dstsize)
+ dst[i] = '\0';
+
+ return srclen;
+}
+
+/*
* strncoll_icu_utf8
*
* Call ucol_strcollUTF8() or ucol_strcoll() as appropriate for the given
diff --git a/src/backend/utils/adt/pg_locale_libc.c b/src/backend/utils/adt/pg_locale_libc.c
index 3baa5816b5f..ab6117aaace 100644
--- a/src/backend/utils/adt/pg_locale_libc.c
+++ b/src/backend/utils/adt/pg_locale_libc.c
@@ -318,12 +318,41 @@ tolower_libc_mb(pg_wchar wc, pg_locale_t locale)
return wc;
}
+/*
+ * Characters A..Z always downcase to a..z, even in the Turkish
+ * locale. Characters beyond 127 use tolower().
+ */
+static size_t
+downcase_ident_libc_sb(char *dst, size_t dstsize, const char *src,
+ ssize_t srclen, pg_locale_t locale)
+{
+ locale_t loc = locale->lt;
+ int i;
+
+ for (i = 0; i < srclen && i < dstsize; i++)
+ {
+ unsigned char ch = (unsigned char) src[i];
+
+ if (ch >= 'A' && ch <= 'Z')
+ ch = pg_ascii_tolower(ch);
+ else if (IS_HIGHBIT_SET(ch) && isupper_l(ch, loc))
+ ch = tolower_l(ch, loc);
+ dst[i] = (char) ch;
+ }
+
+ if (i < dstsize)
+ dst[i] = '\0';
+
+ return srclen;
+}
+
static const struct ctype_methods ctype_methods_libc_sb = {
.strlower = strlower_libc_sb,
.strtitle = strtitle_libc_sb,
.strupper = strupper_libc_sb,
/* in libc, casefolding is the same as lowercasing */
.strfold = strlower_libc_sb,
+ .downcase_ident = downcase_ident_libc_sb,
.wc_isdigit = wc_isdigit_libc_sb,
.wc_isalpha = wc_isalpha_libc_sb,
.wc_isalnum = wc_isalnum_libc_sb,
@@ -349,6 +378,8 @@ static const struct ctype_methods ctype_methods_libc_other_mb = {
.strupper = strupper_libc_mb,
/* in libc, casefolding is the same as lowercasing */
.strfold = strlower_libc_mb,
+ /* uses plain ASCII semantics for historical reasons */
+ .downcase_ident = NULL,
.wc_isdigit = wc_isdigit_libc_sb,
.wc_isalpha = wc_isalpha_libc_sb,
.wc_isalnum = wc_isalnum_libc_sb,
@@ -370,6 +401,8 @@ static const struct ctype_methods ctype_methods_libc_utf8 = {
.strupper = strupper_libc_mb,
/* in libc, casefolding is the same as lowercasing */
.strfold = strlower_libc_mb,
+ /* uses plain ASCII semantics for historical reasons */
+ .downcase_ident = NULL,
.wc_isdigit = wc_isdigit_libc_mb,
.wc_isalpha = wc_isalpha_libc_mb,
.wc_isalnum = wc_isalnum_libc_mb,
diff --git a/src/backend/utils/misc/injection_point.c b/src/backend/utils/misc/injection_point.c
index 54a9fe8e163..4945da458b1 100644
--- a/src/backend/utils/misc/injection_point.c
+++ b/src/backend/utils/misc/injection_point.c
@@ -331,11 +331,8 @@ InjectionPointAttach(const char *name,
/* Save the entry */
strlcpy(entry->name, name, sizeof(entry->name));
- entry->name[INJ_NAME_MAXLEN - 1] = '\0';
strlcpy(entry->library, library, sizeof(entry->library));
- entry->library[INJ_LIB_MAXLEN - 1] = '\0';
strlcpy(entry->function, function, sizeof(entry->function));
- entry->function[INJ_FUNC_MAXLEN - 1] = '\0';
if (private_data != NULL)
memcpy(entry->private_data, private_data, private_data_size);
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index ef6deec14af..41a649297c7 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -116,6 +116,7 @@ static void stop_standby_server(const char *datadir);
static void wait_for_end_recovery(const char *conninfo,
const struct CreateSubscriberOptions *opt);
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
static void drop_publication(PGconn *conn, const char *pubname,
const char *dbname, bool *made_publication);
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -764,6 +765,39 @@ generate_object_name(PGconn *conn)
}
/*
+ * Does the publication exist in the specified database?
+ */
+static bool
+find_publication(PGconn *conn, const char *pubname, const char *dbname)
+{
+ PQExpBuffer str = createPQExpBuffer();
+ PGresult *res;
+ bool found = false;
+ char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
+
+ appendPQExpBuffer(str,
+ "SELECT 1 FROM pg_catalog.pg_publication "
+ "WHERE pubname = %s",
+ pubname_esc);
+ res = PQexec(conn, str->data);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
+ pubname, dbname, PQerrorMessage(conn));
+ disconnect_database(conn, true);
+ }
+
+ if (PQntuples(res) == 1)
+ found = true;
+
+ PQclear(res);
+ PQfreemem(pubname_esc);
+ destroyPQExpBuffer(str);
+
+ return found;
+}
+
+/*
* Create the publications and replication slots in preparation for logical
* replication. Returns the LSN from latest replication slot. It will be the
* replication start point that is used to adjust the subscriptions (see
@@ -799,13 +833,25 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
if (num_replslots == 0)
dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
- /*
- * Create publication on publisher. This step should be executed
- * *before* promoting the subscriber to avoid any transactions between
- * consistent LSN and the new publication rows (such transactions
- * wouldn't see the new publication rows resulting in an error).
- */
- create_publication(conn, &dbinfo[i]);
+ if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
+ {
+ /* Reuse existing publication on publisher. */
+ pg_log_info("use existing publication \"%s\" in database \"%s\"",
+ dbinfo[i].pubname, dbinfo[i].dbname);
+ /* Don't remove pre-existing publication if an error occurs. */
+ dbinfo[i].made_publication = false;
+ }
+ else
+ {
+ /*
+ * Create publication on publisher. This step should be executed
+ * *before* promoting the subscriber to avoid any transactions
+ * between consistent LSN and the new publication rows (such
+ * transactions wouldn't see the new publication rows resulting in
+ * an error).
+ */
+ create_publication(conn, &dbinfo[i]);
+ }
/* Create replication slot on publisher */
if (lsn)
@@ -1749,11 +1795,10 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname,
/*
* Retrieve and drop the publications.
*
- * Since the publications were created before the consistent LSN, they
- * remain on the subscriber even after the physical replica is
- * promoted. Remove these publications from the subscriber because
- * they have no use. Additionally, if requested, drop all pre-existing
- * publications.
+ * Publications copied during physical replication remain on the subscriber
+ * after promotion. If --clean=publications is specified, drop all existing
+ * publications in the subscriber database. Otherwise, only drop publications
+ * that were created by pg_createsubscriber during this operation.
*/
static void
check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
@@ -1785,14 +1830,24 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
PQclear(res);
}
-
- /*
- * In dry-run mode, we don't create publications, but we still try to drop
- * those to provide necessary information to the user.
- */
- if (!drop_all_pubs || dry_run)
- drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
- &dbinfo->made_publication);
+ else
+ {
+ /* Drop publication only if it was created by this tool */
+ if (dbinfo->made_publication)
+ {
+ drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+ &dbinfo->made_publication);
+ }
+ else
+ {
+ if (dry_run)
+ pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
+ else
+ pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
+ }
+ }
}
/*
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 3d6086dc489..9e0db6cd099 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -443,10 +443,17 @@ is(scalar(() = $stderr =~ /would create the replication slot/g),
is(scalar(() = $stderr =~ /would create subscription/g),
3, "verify subscriptions are created for all databases");
+# Create a user-defined publication, and a table that is not a member of that
+# publication.
+$node_p->safe_psql($db1, qq(
+ CREATE PUBLICATION test_pub3 FOR TABLE tbl1;
+ CREATE TABLE not_replicated (a int);
+));
+
# Run pg_createsubscriber on node S. --verbose is used twice
# to show more information.
-# In passing, also test the --enable-two-phase option and
-# --clean option
+#
+# Test two phase and clean options. Use pre-existing publication.
command_ok(
[
'pg_createsubscriber',
@@ -456,7 +463,7 @@ command_ok(
'--publisher-server' => $node_p->connstr($db1),
'--socketdir' => $node_s->host,
'--subscriber-port' => $node_s->port,
- '--publication' => 'pub1',
+ '--publication' => 'test_pub3',
'--publication' => 'pub2',
'--replication-slot' => 'replslot1',
'--replication-slot' => 'replslot2',
@@ -478,13 +485,16 @@ is($result, qq(0),
# Insert rows on P
$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('third row')");
$node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
+$node_p->safe_psql($db1, "INSERT INTO not_replicated VALUES(0)");
# Start subscriber
$node_s->start;
# Confirm publications are removed from the subscriber node
-is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
- '0', 'all publications on subscriber have been removed');
+is($node_s->safe_psql($db1, 'SELECT COUNT(*) FROM pg_publication'),
+ '0', 'all publications were removed from db1');
+is($node_s->safe_psql($db2, 'SELECT COUNT(*) FROM pg_publication'),
+ '0', 'all publications were removed from db2');
# Verify that all subtwophase states are pending or enabled,
# e.g. there are no subscriptions where subtwophase is disabled ('d')
@@ -525,6 +535,9 @@ is( $result, qq(first row
second row
third row),
"logical replication works in database $db1");
+$result = $node_s->safe_psql($db1, 'SELECT * FROM not_replicated');
+is($result, qq(),
+ "table is not replicated in database $db1");
# Check result in database $db2
$result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2');
@@ -537,6 +550,37 @@ my $sysid_s = $node_s->safe_psql('postgres',
'SELECT system_identifier FROM pg_control_system()');
isnt($sysid_p, $sysid_s, 'system identifier was changed');
+# Verify that pub2 was created in $db2
+is($node_p->safe_psql($db2, "SELECT COUNT(*) FROM pg_publication WHERE pubname = 'pub2'"),
+ '1', "publication pub2 was created in $db2");
+
+# Get subscription and publication names
+$result = $node_s->safe_psql(
+ 'postgres', qq(
+ SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+ ORDER BY subpublications;
+));
+like(
+ $result,
+ qr/^pg_createsubscriber_\d+_[0-9a-f]+ \|\{pub2\}\n
+ pg_createsubscriber_\d+_[0-9a-f]+ \|\{test_pub3\}$/x,
+ 'subscription and publication names are ok');
+
+# Verify that the correct publications are being used
+$result = $node_s->safe_psql(
+ 'postgres', qq(
+ SELECT d.datname, s.subpublications
+ FROM pg_subscription s
+ JOIN pg_database d ON d.oid = s.subdbid
+ WHERE subname ~ '^pg_createsubscriber_'
+ ORDER BY s.subdbid
+ )
+);
+
+is($result, qq($db1|{test_pub3}
+$db2|{pub2}),
+ "subscriptions use the correct publications");
+
# clean up
$node_p->teardown_node;
$node_s->teardown_node;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f23dd5870da..6714363144a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -669,8 +669,8 @@ extern void pgstat_init_relation(Relation rel);
extern void pgstat_assoc_relation(Relation rel);
extern void pgstat_unlink_relation(Relation rel);
-extern void pgstat_report_vacuum(Oid tableoid, bool shared,
- PgStat_Counter livetuples, PgStat_Counter deadtuples,
+extern void pgstat_report_vacuum(Relation rel, PgStat_Counter livetuples,
+ PgStat_Counter deadtuples,
TimestampTz starttime);
extern void pgstat_report_analyze(Relation rel,
PgStat_Counter livetuples, PgStat_Counter deadtuples,
diff --git a/src/include/utils/pg_locale.h b/src/include/utils/pg_locale.h
index 6cf1985963d..86016b9344e 100644
--- a/src/include/utils/pg_locale.h
+++ b/src/include/utils/pg_locale.h
@@ -110,6 +110,9 @@ struct ctype_methods
size_t (*strfold) (char *dest, size_t destsize,
const char *src, ssize_t srclen,
pg_locale_t locale);
+ size_t (*downcase_ident) (char *dest, size_t destsize,
+ const char *src, ssize_t srclen,
+ pg_locale_t locale);
/* required */
bool (*wc_isdigit) (pg_wchar wc, pg_locale_t locale);
@@ -164,6 +167,7 @@ struct pg_locale_struct
{
const char *locale;
UCollator *ucol;
+ locale_t lt;
} icu;
#endif
};
@@ -187,6 +191,8 @@ extern size_t pg_strupper(char *dst, size_t dstsize,
extern size_t pg_strfold(char *dst, size_t dstsize,
const char *src, ssize_t srclen,
pg_locale_t locale);
+extern size_t pg_downcase_ident(char *dst, size_t dstsize,
+ const char *src, ssize_t srclen);
extern int pg_strcoll(const char *arg1, const char *arg2, pg_locale_t locale);
extern int pg_strncoll(const char *arg1, ssize_t len1,
const char *arg2, ssize_t len2, pg_locale_t locale);