diff options
| -rw-r--r-- | contrib/ltree/crc32.c | 46 | ||||
| -rw-r--r-- | contrib/ltree/lquery_op.c | 39 | ||||
| -rw-r--r-- | doc/src/sgml/ref/pg_createsubscriber.sgml | 8 | ||||
| -rw-r--r-- | src/backend/access/heap/vacuumlazy.c | 3 | ||||
| -rw-r--r-- | src/backend/parser/scansup.c | 36 | ||||
| -rw-r--r-- | src/backend/utils/activity/pgstat_relation.c | 11 | ||||
| -rw-r--r-- | src/backend/utils/adt/pg_locale.c | 20 | ||||
| -rw-r--r-- | src/backend/utils/adt/pg_locale_builtin.c | 2 | ||||
| -rw-r--r-- | src/backend/utils/adt/pg_locale_icu.c | 75 | ||||
| -rw-r--r-- | src/backend/utils/adt/pg_locale_libc.c | 33 | ||||
| -rw-r--r-- | src/backend/utils/misc/injection_point.c | 3 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/pg_createsubscriber.c | 95 | ||||
| -rw-r--r-- | src/bin/pg_basebackup/t/040_pg_createsubscriber.pl | 54 | ||||
| -rw-r--r-- | src/include/pgstat.h | 4 | ||||
| -rw-r--r-- | src/include/utils/pg_locale.h | 6 |
15 files changed, 361 insertions, 74 deletions
diff --git a/contrib/ltree/crc32.c b/contrib/ltree/crc32.c index 134f46a805e..3918d4a0ec2 100644 --- a/contrib/ltree/crc32.c +++ b/contrib/ltree/crc32.c @@ -10,31 +10,61 @@ #include "postgres.h" #include "ltree.h" +#include "crc32.h" +#include "utils/pg_crc.h" #ifdef LOWER_NODE -#include <ctype.h> -#define TOLOWER(x) tolower((unsigned char) (x)) -#else -#define TOLOWER(x) (x) +#include "utils/pg_locale.h" #endif -#include "crc32.h" -#include "utils/pg_crc.h" +#ifdef LOWER_NODE unsigned int ltree_crc32_sz(const char *buf, int size) { pg_crc32 crc; const char *p = buf; + static pg_locale_t locale = NULL; + + if (!locale) + locale = pg_database_locale(); INIT_TRADITIONAL_CRC32(crc); while (size > 0) { - char c = (char) TOLOWER(*p); + char foldstr[UNICODE_CASEMAP_BUFSZ]; + int srclen = pg_mblen(p); + size_t foldlen; + + /* fold one codepoint at a time */ + foldlen = pg_strfold(foldstr, UNICODE_CASEMAP_BUFSZ, p, srclen, + locale); + + COMP_TRADITIONAL_CRC32(crc, foldstr, foldlen); + + size -= srclen; + p += srclen; + } + FIN_TRADITIONAL_CRC32(crc); + return (unsigned int) crc; +} + +#else - COMP_TRADITIONAL_CRC32(crc, &c, 1); +unsigned int +ltree_crc32_sz(const char *buf, int size) +{ + pg_crc32 crc; + const char *p = buf; + + INIT_TRADITIONAL_CRC32(crc); + while (size > 0) + { + COMP_TRADITIONAL_CRC32(crc, p, 1); size--; p++; } FIN_TRADITIONAL_CRC32(crc); return (unsigned int) crc; } + +#endif /* !LOWER_NODE */ diff --git a/contrib/ltree/lquery_op.c b/contrib/ltree/lquery_op.c index 0b39d64a839..a28ddbf40de 100644 --- a/contrib/ltree/lquery_op.c +++ b/contrib/ltree/lquery_op.c @@ -93,11 +93,44 @@ ltree_prefix_eq(const char *a, size_t a_sz, const char *b, size_t b_sz) bool ltree_prefix_eq_ci(const char *a, size_t a_sz, const char *b, size_t b_sz) { - char *al = str_tolower(a, a_sz, DEFAULT_COLLATION_OID); - char *bl = str_tolower(b, b_sz, DEFAULT_COLLATION_OID); + static pg_locale_t locale = NULL; + size_t al_sz = a_sz + 1; + size_t al_len; + char *al = palloc(al_sz); + size_t bl_sz = b_sz + 1; + size_t bl_len; + char *bl = palloc(bl_sz); bool res; - res = (strncmp(al, bl, a_sz) == 0); + if (!locale) + locale = pg_database_locale(); + + /* casefold both a and b */ + + al_len = pg_strfold(al, al_sz, a, a_sz, locale); + if (al_len + 1 > al_sz) + { + /* grow buffer if needed and retry */ + al_sz = al_len + 1; + al = repalloc(al, al_sz); + al_len = pg_strfold(al, al_sz, a, a_sz, locale); + Assert(al_len + 1 <= al_sz); + } + + bl_len = pg_strfold(bl, bl_sz, b, b_sz, locale); + if (bl_len + 1 > bl_sz) + { + /* grow buffer if needed and retry */ + bl_sz = bl_len + 1; + bl = repalloc(bl, bl_sz); + bl_len = pg_strfold(bl, bl_sz, b, b_sz, locale); + Assert(bl_len + 1 <= bl_sz); + } + + if (al_len > bl_len) + res = false; + else + res = (strncmp(al, bl, al_len) == 0); pfree(al); pfree(bl); diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index bb9cc72576c..5a62187b189 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -285,6 +285,14 @@ PostgreSQL documentation a generated name is assigned to the publication name. This option cannot be used together with <option>--all</option>. </para> + <para> + If a specified publication already exists on the publisher, it is reused. + It is useful to partially replicate the database if the specified + publication includes a list of tables. If the publication does not exist, + it is automatically created with <literal>FOR ALL TABLES</literal>. Use + <option>--dry-run</option> option to preview which publications will be + reused and which will be created. + </para> </listitem> </varlistentry> diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 62035b7f9c3..30778a15639 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -961,8 +961,7 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, * soon in cases where the failsafe prevented significant amounts of heap * vacuuming. */ - pgstat_report_vacuum(RelationGetRelid(rel), - rel->rd_rel->relisshared, + pgstat_report_vacuum(rel, Max(vacrel->new_live_tuples, 0), vacrel->recently_dead_tuples + vacrel->missed_dead_tuples, diff --git a/src/backend/parser/scansup.c b/src/backend/parser/scansup.c index 2feb2b6cf5a..d63cb865260 100644 --- a/src/backend/parser/scansup.c +++ b/src/backend/parser/scansup.c @@ -18,6 +18,7 @@ #include "mb/pg_wchar.h" #include "parser/scansup.h" +#include "utils/pg_locale.h" /* @@ -46,35 +47,22 @@ char * downcase_identifier(const char *ident, int len, bool warn, bool truncate) { char *result; - int i; - bool enc_is_single_byte; - - result = palloc(len + 1); - enc_is_single_byte = pg_database_encoding_max_length() == 1; + size_t needed pg_attribute_unused(); /* - * SQL99 specifies Unicode-aware case normalization, which we don't yet - * have the infrastructure for. Instead we use tolower() to provide a - * locale-aware translation. However, there are some locales where this - * is not right either (eg, Turkish may do strange things with 'i' and - * 'I'). Our current compromise is to use tolower() for characters with - * the high bit set, as long as they aren't part of a multi-byte - * character, and use an ASCII-only downcasing for 7-bit characters. + * Preserves string length. + * + * NB: if we decide to support Unicode-aware identifier case folding, then + * we need to account for a change in string length. */ - for (i = 0; i < len; i++) - { - unsigned char ch = (unsigned char) ident[i]; + result = palloc(len + 1); - if (ch >= 'A' && ch <= 'Z') - ch += 'a' - 'A'; - else if (enc_is_single_byte && IS_HIGHBIT_SET(ch) && isupper(ch)) - ch = tolower(ch); - result[i] = (char) ch; - } - result[i] = '\0'; + needed = pg_downcase_ident(result, len + 1, ident, len); + Assert(needed == len); + Assert(result[len] == '\0'); - if (i >= NAMEDATALEN && truncate) - truncate_identifier(result, i, warn); + if (len >= NAMEDATALEN && truncate) + truncate_identifier(result, len, warn); return result; } diff --git a/src/backend/utils/activity/pgstat_relation.c b/src/backend/utils/activity/pgstat_relation.c index b90754f8578..55a10c299db 100644 --- a/src/backend/utils/activity/pgstat_relation.c +++ b/src/backend/utils/activity/pgstat_relation.c @@ -207,14 +207,13 @@ pgstat_drop_relation(Relation rel) * Report that the table was just vacuumed and flush IO statistics. */ void -pgstat_report_vacuum(Oid tableoid, bool shared, - PgStat_Counter livetuples, PgStat_Counter deadtuples, - TimestampTz starttime) +pgstat_report_vacuum(Relation rel, PgStat_Counter livetuples, + PgStat_Counter deadtuples, TimestampTz starttime) { PgStat_EntryRef *entry_ref; PgStatShared_Relation *shtabentry; PgStat_StatTabEntry *tabentry; - Oid dboid = (shared ? InvalidOid : MyDatabaseId); + Oid dboid = (rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId); TimestampTz ts; PgStat_Counter elapsedtime; @@ -226,8 +225,8 @@ pgstat_report_vacuum(Oid tableoid, bool shared, elapsedtime = TimestampDifferenceMilliseconds(starttime, ts); /* block acquiring lock for the same reason as pgstat_report_autovac() */ - entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_RELATION, - dboid, tableoid, false); + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_RELATION, dboid, + RelationGetRelid(rel), false); shtabentry = (PgStatShared_Relation *) entry_ref->shared_stats; tabentry = &shtabentry->stats; diff --git a/src/backend/utils/adt/pg_locale.c b/src/backend/utils/adt/pg_locale.c index 8a3796aa5d0..ee08ac045b7 100644 --- a/src/backend/utils/adt/pg_locale.c +++ b/src/backend/utils/adt/pg_locale.c @@ -1353,6 +1353,26 @@ pg_strfold(char *dst, size_t dstsize, const char *src, ssize_t srclen, } /* + * Lowercase an identifier using the database default locale. + * + * For historical reasons, does not use ordinary locale behavior. Should only + * be used for identifiers. XXX: can we make this equivalent to + * pg_strfold(..., default_locale)? + */ +size_t +pg_downcase_ident(char *dst, size_t dstsize, const char *src, ssize_t srclen) +{ + pg_locale_t locale = default_locale; + + if (locale == NULL || locale->ctype == NULL || + locale->ctype->downcase_ident == NULL) + return strlower_c(dst, dstsize, src, srclen); + else + return locale->ctype->downcase_ident(dst, dstsize, src, srclen, + locale); +} + +/* * pg_strcoll * * Like pg_strncoll for NUL-terminated input strings. diff --git a/src/backend/utils/adt/pg_locale_builtin.c b/src/backend/utils/adt/pg_locale_builtin.c index 0c2920112bb..145b4641b1b 100644 --- a/src/backend/utils/adt/pg_locale_builtin.c +++ b/src/backend/utils/adt/pg_locale_builtin.c @@ -208,6 +208,8 @@ static const struct ctype_methods ctype_methods_builtin = { .strtitle = strtitle_builtin, .strupper = strupper_builtin, .strfold = strfold_builtin, + /* uses plain ASCII semantics for historical reasons */ + .downcase_ident = NULL, .wc_isdigit = wc_isdigit_builtin, .wc_isalpha = wc_isalpha_builtin, .wc_isalnum = wc_isalnum_builtin, diff --git a/src/backend/utils/adt/pg_locale_icu.c b/src/backend/utils/adt/pg_locale_icu.c index 18d026deda8..43d44fe43bd 100644 --- a/src/backend/utils/adt/pg_locale_icu.c +++ b/src/backend/utils/adt/pg_locale_icu.c @@ -61,6 +61,8 @@ static size_t strupper_icu(char *dest, size_t destsize, const char *src, ssize_t srclen, pg_locale_t locale); static size_t strfold_icu(char *dest, size_t destsize, const char *src, ssize_t srclen, pg_locale_t locale); +static size_t downcase_ident_icu(char *dst, size_t dstsize, const char *src, + ssize_t srclen, pg_locale_t locale); static int strncoll_icu(const char *arg1, ssize_t len1, const char *arg2, ssize_t len2, pg_locale_t locale); @@ -123,7 +125,7 @@ static int32_t u_strFoldCase_default(UChar *dest, int32_t destCapacity, /* * XXX: many of the functions below rely on casts directly from pg_wchar to - * UChar32, which is correct for the UTF-8 encoding, but not in general. + * UChar32, which is correct for UTF-8 and LATIN1, but not in general. */ static pg_wchar @@ -227,6 +229,7 @@ static const struct ctype_methods ctype_methods_icu = { .strtitle = strtitle_icu, .strupper = strupper_icu, .strfold = strfold_icu, + .downcase_ident = downcase_ident_icu, .wc_isdigit = wc_isdigit_icu, .wc_isalpha = wc_isalpha_icu, .wc_isalnum = wc_isalnum_icu, @@ -241,6 +244,29 @@ static const struct ctype_methods ctype_methods_icu = { .wc_toupper = toupper_icu, .wc_tolower = tolower_icu, }; + +/* + * ICU still depends on libc for compatibility with certain historical + * behavior for single-byte encodings. See downcase_ident_icu(). + * + * XXX: consider fixing by decoding the single byte into a code point, and + * using u_tolower(). + */ +static locale_t +make_libc_ctype_locale(const char *ctype) +{ + locale_t loc; + +#ifndef WIN32 + loc = newlocale(LC_CTYPE_MASK, ctype, NULL); +#else + loc = _create_locale(LC_ALL, ctype); +#endif + if (!loc) + report_newlocale_failure(ctype); + + return loc; +} #endif pg_locale_t @@ -251,6 +277,7 @@ create_pg_locale_icu(Oid collid, MemoryContext context) const char *iculocstr; const char *icurules = NULL; UCollator *collator; + locale_t loc = (locale_t) 0; pg_locale_t result; if (collid == DEFAULT_COLLATION_OID) @@ -273,6 +300,18 @@ create_pg_locale_icu(Oid collid, MemoryContext context) if (!isnull) icurules = TextDatumGetCString(datum); + /* libc only needed for default locale and single-byte encoding */ + if (pg_database_encoding_max_length() == 1) + { + const char *ctype; + + datum = SysCacheGetAttrNotNull(DATABASEOID, tp, + Anum_pg_database_datctype); + ctype = TextDatumGetCString(datum); + + loc = make_libc_ctype_locale(ctype); + } + ReleaseSysCache(tp); } else @@ -303,6 +342,7 @@ create_pg_locale_icu(Oid collid, MemoryContext context) result = MemoryContextAllocZero(context, sizeof(struct pg_locale_struct)); result->icu.locale = MemoryContextStrdup(context, iculocstr); result->icu.ucol = collator; + result->icu.lt = loc; result->deterministic = deterministic; result->collate_is_c = false; result->ctype_is_c = false; @@ -565,6 +605,39 @@ strfold_icu(char *dest, size_t destsize, const char *src, ssize_t srclen, } /* + * For historical compatibility, behavior is not multibyte-aware. + * + * NB: uses libc tolower() for single-byte encodings (also for historical + * compatibility), and therefore relies on the global LC_CTYPE setting. + */ +static size_t +downcase_ident_icu(char *dst, size_t dstsize, const char *src, + ssize_t srclen, pg_locale_t locale) +{ + int i; + bool libc_lower; + locale_t lt = locale->icu.lt; + + libc_lower = lt && (pg_database_encoding_max_length() == 1); + + for (i = 0; i < srclen && i < dstsize; i++) + { + unsigned char ch = (unsigned char) src[i]; + + if (ch >= 'A' && ch <= 'Z') + ch = pg_ascii_tolower(ch); + else if (libc_lower && IS_HIGHBIT_SET(ch) && isupper_l(ch, lt)) + ch = tolower_l(ch, lt); + dst[i] = (char) ch; + } + + if (i < dstsize) + dst[i] = '\0'; + + return srclen; +} + +/* * strncoll_icu_utf8 * * Call ucol_strcollUTF8() or ucol_strcoll() as appropriate for the given diff --git a/src/backend/utils/adt/pg_locale_libc.c b/src/backend/utils/adt/pg_locale_libc.c index 3baa5816b5f..ab6117aaace 100644 --- a/src/backend/utils/adt/pg_locale_libc.c +++ b/src/backend/utils/adt/pg_locale_libc.c @@ -318,12 +318,41 @@ tolower_libc_mb(pg_wchar wc, pg_locale_t locale) return wc; } +/* + * Characters A..Z always downcase to a..z, even in the Turkish + * locale. Characters beyond 127 use tolower(). + */ +static size_t +downcase_ident_libc_sb(char *dst, size_t dstsize, const char *src, + ssize_t srclen, pg_locale_t locale) +{ + locale_t loc = locale->lt; + int i; + + for (i = 0; i < srclen && i < dstsize; i++) + { + unsigned char ch = (unsigned char) src[i]; + + if (ch >= 'A' && ch <= 'Z') + ch = pg_ascii_tolower(ch); + else if (IS_HIGHBIT_SET(ch) && isupper_l(ch, loc)) + ch = tolower_l(ch, loc); + dst[i] = (char) ch; + } + + if (i < dstsize) + dst[i] = '\0'; + + return srclen; +} + static const struct ctype_methods ctype_methods_libc_sb = { .strlower = strlower_libc_sb, .strtitle = strtitle_libc_sb, .strupper = strupper_libc_sb, /* in libc, casefolding is the same as lowercasing */ .strfold = strlower_libc_sb, + .downcase_ident = downcase_ident_libc_sb, .wc_isdigit = wc_isdigit_libc_sb, .wc_isalpha = wc_isalpha_libc_sb, .wc_isalnum = wc_isalnum_libc_sb, @@ -349,6 +378,8 @@ static const struct ctype_methods ctype_methods_libc_other_mb = { .strupper = strupper_libc_mb, /* in libc, casefolding is the same as lowercasing */ .strfold = strlower_libc_mb, + /* uses plain ASCII semantics for historical reasons */ + .downcase_ident = NULL, .wc_isdigit = wc_isdigit_libc_sb, .wc_isalpha = wc_isalpha_libc_sb, .wc_isalnum = wc_isalnum_libc_sb, @@ -370,6 +401,8 @@ static const struct ctype_methods ctype_methods_libc_utf8 = { .strupper = strupper_libc_mb, /* in libc, casefolding is the same as lowercasing */ .strfold = strlower_libc_mb, + /* uses plain ASCII semantics for historical reasons */ + .downcase_ident = NULL, .wc_isdigit = wc_isdigit_libc_mb, .wc_isalpha = wc_isalpha_libc_mb, .wc_isalnum = wc_isalnum_libc_mb, diff --git a/src/backend/utils/misc/injection_point.c b/src/backend/utils/misc/injection_point.c index 54a9fe8e163..4945da458b1 100644 --- a/src/backend/utils/misc/injection_point.c +++ b/src/backend/utils/misc/injection_point.c @@ -331,11 +331,8 @@ InjectionPointAttach(const char *name, /* Save the entry */ strlcpy(entry->name, name, sizeof(entry->name)); - entry->name[INJ_NAME_MAXLEN - 1] = '\0'; strlcpy(entry->library, library, sizeof(entry->library)); - entry->library[INJ_LIB_MAXLEN - 1] = '\0'; strlcpy(entry->function, function, sizeof(entry->function)); - entry->function[INJ_FUNC_MAXLEN - 1] = '\0'; if (private_data != NULL) memcpy(entry->private_data, private_data, private_data_size); diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index ef6deec14af..41a649297c7 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -116,6 +116,7 @@ static void stop_standby_server(const char *datadir); static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt); static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); +static bool find_publication(PGconn *conn, const char *pubname, const char *dbname); static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication); static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo); @@ -764,6 +765,39 @@ generate_object_name(PGconn *conn) } /* + * Does the publication exist in the specified database? + */ +static bool +find_publication(PGconn *conn, const char *pubname, const char *dbname) +{ + PQExpBuffer str = createPQExpBuffer(); + PGresult *res; + bool found = false; + char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname)); + + appendPQExpBuffer(str, + "SELECT 1 FROM pg_catalog.pg_publication " + "WHERE pubname = %s", + pubname_esc); + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not find publication \"%s\" in database \"%s\": %s", + pubname, dbname, PQerrorMessage(conn)); + disconnect_database(conn, true); + } + + if (PQntuples(res) == 1) + found = true; + + PQclear(res); + PQfreemem(pubname_esc); + destroyPQExpBuffer(str); + + return found; +} + +/* * Create the publications and replication slots in preparation for logical * replication. Returns the LSN from latest replication slot. It will be the * replication start point that is used to adjust the subscriptions (see @@ -799,13 +833,25 @@ setup_publisher(struct LogicalRepInfo *dbinfo) if (num_replslots == 0) dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname); - /* - * Create publication on publisher. This step should be executed - * *before* promoting the subscriber to avoid any transactions between - * consistent LSN and the new publication rows (such transactions - * wouldn't see the new publication rows resulting in an error). - */ - create_publication(conn, &dbinfo[i]); + if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname)) + { + /* Reuse existing publication on publisher. */ + pg_log_info("use existing publication \"%s\" in database \"%s\"", + dbinfo[i].pubname, dbinfo[i].dbname); + /* Don't remove pre-existing publication if an error occurs. */ + dbinfo[i].made_publication = false; + } + else + { + /* + * Create publication on publisher. This step should be executed + * *before* promoting the subscriber to avoid any transactions + * between consistent LSN and the new publication rows (such + * transactions wouldn't see the new publication rows resulting in + * an error). + */ + create_publication(conn, &dbinfo[i]); + } /* Create replication slot on publisher */ if (lsn) @@ -1749,11 +1795,10 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname, /* * Retrieve and drop the publications. * - * Since the publications were created before the consistent LSN, they - * remain on the subscriber even after the physical replica is - * promoted. Remove these publications from the subscriber because - * they have no use. Additionally, if requested, drop all pre-existing - * publications. + * Publications copied during physical replication remain on the subscriber + * after promotion. If --clean=publications is specified, drop all existing + * publications in the subscriber database. Otherwise, only drop publications + * that were created by pg_createsubscriber during this operation. */ static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo) @@ -1785,14 +1830,24 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo) PQclear(res); } - - /* - * In dry-run mode, we don't create publications, but we still try to drop - * those to provide necessary information to the user. - */ - if (!drop_all_pubs || dry_run) - drop_publication(conn, dbinfo->pubname, dbinfo->dbname, - &dbinfo->made_publication); + else + { + /* Drop publication only if it was created by this tool */ + if (dbinfo->made_publication) + { + drop_publication(conn, dbinfo->pubname, dbinfo->dbname, + &dbinfo->made_publication); + } + else + { + if (dry_run) + pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"", + dbinfo->pubname, dbinfo->dbname); + else + pg_log_info("preserve existing publication \"%s\" in database \"%s\"", + dbinfo->pubname, dbinfo->dbname); + } + } } /* diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 3d6086dc489..9e0db6cd099 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -443,10 +443,17 @@ is(scalar(() = $stderr =~ /would create the replication slot/g), is(scalar(() = $stderr =~ /would create subscription/g), 3, "verify subscriptions are created for all databases"); +# Create a user-defined publication, and a table that is not a member of that +# publication. +$node_p->safe_psql($db1, qq( + CREATE PUBLICATION test_pub3 FOR TABLE tbl1; + CREATE TABLE not_replicated (a int); +)); + # Run pg_createsubscriber on node S. --verbose is used twice # to show more information. -# In passing, also test the --enable-two-phase option and -# --clean option +# +# Test two phase and clean options. Use pre-existing publication. command_ok( [ 'pg_createsubscriber', @@ -456,7 +463,7 @@ command_ok( '--publisher-server' => $node_p->connstr($db1), '--socketdir' => $node_s->host, '--subscriber-port' => $node_s->port, - '--publication' => 'pub1', + '--publication' => 'test_pub3', '--publication' => 'pub2', '--replication-slot' => 'replslot1', '--replication-slot' => 'replslot2', @@ -478,13 +485,16 @@ is($result, qq(0), # Insert rows on P $node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('third row')"); $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')"); +$node_p->safe_psql($db1, "INSERT INTO not_replicated VALUES(0)"); # Start subscriber $node_s->start; # Confirm publications are removed from the subscriber node -is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"), - '0', 'all publications on subscriber have been removed'); +is($node_s->safe_psql($db1, 'SELECT COUNT(*) FROM pg_publication'), + '0', 'all publications were removed from db1'); +is($node_s->safe_psql($db2, 'SELECT COUNT(*) FROM pg_publication'), + '0', 'all publications were removed from db2'); # Verify that all subtwophase states are pending or enabled, # e.g. there are no subscriptions where subtwophase is disabled ('d') @@ -525,6 +535,9 @@ is( $result, qq(first row second row third row), "logical replication works in database $db1"); +$result = $node_s->safe_psql($db1, 'SELECT * FROM not_replicated'); +is($result, qq(), + "table is not replicated in database $db1"); # Check result in database $db2 $result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2'); @@ -537,6 +550,37 @@ my $sysid_s = $node_s->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()'); isnt($sysid_p, $sysid_s, 'system identifier was changed'); +# Verify that pub2 was created in $db2 +is($node_p->safe_psql($db2, "SELECT COUNT(*) FROM pg_publication WHERE pubname = 'pub2'"), + '1', "publication pub2 was created in $db2"); + +# Get subscription and publication names +$result = $node_s->safe_psql( + 'postgres', qq( + SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_' + ORDER BY subpublications; +)); +like( + $result, + qr/^pg_createsubscriber_\d+_[0-9a-f]+ \|\{pub2\}\n + pg_createsubscriber_\d+_[0-9a-f]+ \|\{test_pub3\}$/x, + 'subscription and publication names are ok'); + +# Verify that the correct publications are being used +$result = $node_s->safe_psql( + 'postgres', qq( + SELECT d.datname, s.subpublications + FROM pg_subscription s + JOIN pg_database d ON d.oid = s.subdbid + WHERE subname ~ '^pg_createsubscriber_' + ORDER BY s.subdbid + ) +); + +is($result, qq($db1|{test_pub3} +$db2|{pub2}), + "subscriptions use the correct publications"); + # clean up $node_p->teardown_node; $node_s->teardown_node; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index f23dd5870da..6714363144a 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -669,8 +669,8 @@ extern void pgstat_init_relation(Relation rel); extern void pgstat_assoc_relation(Relation rel); extern void pgstat_unlink_relation(Relation rel); -extern void pgstat_report_vacuum(Oid tableoid, bool shared, - PgStat_Counter livetuples, PgStat_Counter deadtuples, +extern void pgstat_report_vacuum(Relation rel, PgStat_Counter livetuples, + PgStat_Counter deadtuples, TimestampTz starttime); extern void pgstat_report_analyze(Relation rel, PgStat_Counter livetuples, PgStat_Counter deadtuples, diff --git a/src/include/utils/pg_locale.h b/src/include/utils/pg_locale.h index 6cf1985963d..86016b9344e 100644 --- a/src/include/utils/pg_locale.h +++ b/src/include/utils/pg_locale.h @@ -110,6 +110,9 @@ struct ctype_methods size_t (*strfold) (char *dest, size_t destsize, const char *src, ssize_t srclen, pg_locale_t locale); + size_t (*downcase_ident) (char *dest, size_t destsize, + const char *src, ssize_t srclen, + pg_locale_t locale); /* required */ bool (*wc_isdigit) (pg_wchar wc, pg_locale_t locale); @@ -164,6 +167,7 @@ struct pg_locale_struct { const char *locale; UCollator *ucol; + locale_t lt; } icu; #endif }; @@ -187,6 +191,8 @@ extern size_t pg_strupper(char *dst, size_t dstsize, extern size_t pg_strfold(char *dst, size_t dstsize, const char *src, ssize_t srclen, pg_locale_t locale); +extern size_t pg_downcase_ident(char *dst, size_t dstsize, + const char *src, ssize_t srclen); extern int pg_strcoll(const char *arg1, const char *arg2, pg_locale_t locale); extern int pg_strncoll(const char *arg1, ssize_t len1, const char *arg2, ssize_t len2, pg_locale_t locale); |
