124#define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200
125#define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000
130#define SLOTSYNC_RESTART_INTERVAL_SEC 10
207 bool *found_consistent_snapshot,
208 bool *remote_slot_precedes)
211 bool updated_xmin_or_lsn =
false;
212 bool updated_config =
false;
217 if (found_consistent_snapshot)
218 *found_consistent_snapshot =
false;
220 if (remote_slot_precedes)
221 *remote_slot_precedes =
false;
257 errmsg(
"could not synchronize replication slot \"%s\"",
259 errdetail(
"Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
265 if (remote_slot_precedes)
266 *remote_slot_precedes =
true;
308 if (found_consistent_snapshot)
309 *found_consistent_snapshot =
true;
314 found_consistent_snapshot);
319 errmsg_internal(
"synchronized confirmed_flush for slot \"%s\" differs from remote slot",
329 if (found_consistent_snapshot && !(*found_consistent_snapshot))
333 updated_xmin_or_lsn =
true;
358 updated_config =
true;
372 if (updated_config || updated_xmin_or_lsn)
384 if (updated_xmin_or_lsn)
394 return updated_config || updated_xmin_or_lsn;
416 local_slots =
lappend(local_slots, s);
435 bool remote_exists =
false;
436 bool locally_invalidated =
false;
442 remote_exists =
true;
449 locally_invalidated =
458 return (remote_exists && !locally_invalidated);
515 synced_slot = local_slot->in_use && local_slot->data.synced;
528 errmsg(
"dropped replication slot \"%s\" of database with OID %u",
529 NameStr(local_slot->data.name),
530 local_slot->data.database));
579 if (oldest_segno == 1)
588 segno, oldest_segno);
597 if (segno >= oldest_segno)
618 bool *slot_persistence_pending)
621 bool found_consistent_snapshot =
false;
622 bool remote_slot_precedes =
false;
626 &found_consistent_snapshot,
627 &remote_slot_precedes);
633 if (remote_slot_precedes)
646 if (slot_persistence_pending)
647 *slot_persistence_pending =
true;
656 if (!found_consistent_snapshot)
659 errmsg(
"could not synchronize replication slot \"%s\"", remote_slot->
name),
660 errdetail(
"Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
664 if (slot_persistence_pending)
665 *slot_persistence_pending =
true;
673 errmsg(
"newly created replication slot \"%s\" is sync-ready now",
697 bool *slot_persistence_pending)
701 bool slot_updated =
false;
715 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
716 errmsg(
"exiting from slot synchronization because same"
717 " name slot \"%s\" already exists on the standby",
783 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
784 errmsg(
"skipping slot synchronization because the received slot sync"
785 " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
800 slot_persistence_pending);
817 errdetail_internal(
"Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
885 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
886 errmsg(
"skipping slot synchronization because the received slot sync"
887 " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
898 slot_persistence_pending);
920#define SLOTSYNC_COLUMN_COUNT 10
922 LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
931 "SELECT slot_name, plugin, confirmed_flush_lsn,"
932 " restart_lsn, catalog_xmin, two_phase,"
933 " two_phase_at, failover,"
934 " database, invalidation_reason"
935 " FROM pg_catalog.pg_replication_slots"
936 " WHERE failover and NOT temporary");
938 if (slot_names !=
NIL)
940 bool first_slot =
true;
963 errmsg(
"could not fetch failover logical slots info from the primary server: %s",
1037 remote_slot_list =
lappend(remote_slot_list, remote_slot);
1044 return remote_slot_list;
1061 bool *slot_persistence_pending)
1063 bool some_slot_updated =
false;
1081 slot_persistence_pending);
1086 return some_slot_updated;
1098#define PRIMARY_INFO_OUTPUT_COL_COUNT 2
1104 bool remote_in_recovery;
1105 bool primary_slot_valid;
1106 bool started_tx =
false;
1110 "SELECT pg_is_in_recovery(), count(*) = 1"
1111 " FROM pg_catalog.pg_replication_slots"
1112 " WHERE slot_type='physical' AND slot_name=%s",
1127 errmsg(
"could not fetch primary slot name \"%s\" info from the primary server: %s",
1129 errhint(
"Check if \"primary_slot_name\" is configured correctly."));
1134 "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
1146 if (remote_in_recovery)
1148 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1149 errmsg(
"cannot synchronize replication slots from a standby server"));
1154 if (!primary_slot_valid)
1156 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1158 errmsg(
"replication slot \"%s\" specified by \"%s\" does not exist on primary server",
1185 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1191 errmsg(
"replication slot synchronization requires \"%s\" to be specified in \"%s\"",
1192 "dbname",
"primary_conninfo"));
1209 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1210 errmsg(
"replication slot synchronization requires \"wal_level\" >= \"logical\""));
1223 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1225 errmsg(
"replication slot synchronization requires \"%s\" to be set",
"primary_slot_name"));
1237 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1239 errmsg(
"replication slot synchronization requires \"%s\" to be enabled",
1240 "hot_standby_feedback"));
1251 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1253 errmsg(
"replication slot synchronization requires \"%s\" to be set",
1254 "primary_conninfo"));
1274 bool conninfo_changed;
1275 bool primary_slotname_changed;
1277 bool parameter_changed =
false;
1279 if (is_slotsync_worker)
1285 conninfo_changed = strcmp(old_primary_conninfo,
PrimaryConnInfo) != 0;
1286 primary_slotname_changed = strcmp(old_primary_slotname,
PrimarySlotName) != 0;
1287 pfree(old_primary_conninfo);
1288 pfree(old_primary_slotname);
1292 if (is_slotsync_worker)
1296 errmsg(
"replication slot synchronization worker will stop because \"%s\" is disabled",
1297 "sync_replication_slots"));
1302 parameter_changed =
true;
1306 if (conninfo_changed ||
1307 primary_slotname_changed ||
1311 if (is_slotsync_worker)
1314 errmsg(
"replication slot synchronization worker will restart because of a parameter change"));
1326 parameter_changed =
true;
1334 if (parameter_changed)
1336 Assert(!is_slotsync_worker);
1338 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1339 errmsg(
"replication slot synchronization will stop because of a parameter change"));
1357 errmsg(
"replication slot synchronization worker will stop because promotion is triggered"));
1368 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1369 errmsg(
"replication slot synchronization will stop because promotion is triggered"));
1446 if (!some_slot_updated)
1466 WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
1485 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1486 errmsg(
"cannot synchronize replication slots concurrently"));
1534 sigjmp_buf local_sigjmp_buf;
1537 Assert(startup_data_len == 0);
1567 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
1662 errcode(ERRCODE_CONNECTION_FAILURE),
1663 errmsg(
"synchronization worker \"%s\" could not connect to the primary server: %s",
1687 bool some_slot_updated =
false;
1688 bool started_tx =
false;
1782 pid_t sync_process_pid;
1818 10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
1855 time_t curtime = time(NULL);
1863 curtime < SlotSyncCtx->last_start_time ||
1960 slot_name =
pstrdup(remote_slot->name);
1961 slot_names =
lappend(slot_names, slot_name);
1995 bool slot_persistence_pending =
false;
1996 bool some_slot_updated =
false;
2018 &slot_persistence_pending);
2024 if (slot_names ==
NIL && slot_persistence_pending)
2031 if (!slot_persistence_pending)
TimestampTz GetCurrentTimestamp(void)
Datum now(PG_FUNCTION_ARGS)
#define TextDatumGetCString(d)
Oid get_database_oid(const char *dbname, bool missing_ok)
void load_file(const char *filename, bool restricted)
int errmsg_internal(const char *fmt,...)
void EmitErrorReport(void)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
sigjmp_buf * PG_exception_stack
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
const TupleTableSlotOps TTSOpsMinimalTuple
#define palloc0_object(type)
void ProcessConfigFile(GucContext context)
void SetConfigOption(const char *name, const char *value, GucContext context, GucSource source)
Assert(PointerIsAligned(start, uint64))
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
#define PG_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
#define PG_END_ENSURE_ERROR_CLEANUP(cleanup_function, arg)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
char * pstrdup(const char *in)
void pfree(void *pointer)
#define GetProcessingMode()
#define CHECK_FOR_INTERRUPTS()
#define AmLogicalSlotSyncWorkerProcess()
#define HOLD_INTERRUPTS()
#define SetProcessingMode(mode)
BackendType MyBackendType
void namestrcpy(Name name, const char *str)
#define foreach_ptr(type, var, lst)
static XLogRecPtr DatumGetLSN(Datum X)
void pgstat_report_replslotsync(ReplicationSlot *slot)
void FloatExceptionHandler(SIGNAL_ARGS)
void StatementCancelHandler(SIGNAL_ARGS)
static bool DatumGetBool(Datum X)
static Datum PointerGetDatum(const void *X)
static Pointer DatumGetPointer(Datum X)
static TransactionId DatumGetTransactionId(Datum X)
void InitPostgres(const char *in_dbname, Oid dboid, const char *username, Oid useroid, bits32 flags, char *out_dbname)
TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
void init_ps_display(const char *fixed_part)
char * quote_literal_cstr(const char *rawstr)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
void ReplicationSlotDropAcquired(void)
void ReplicationSlotMarkDirty(void)
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
void ReplicationSlotPersist(void)
ReplicationSlot * MyReplicationSlot
void ReplicationSlotSave(void)
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
void ReplicationSlotRelease(void)
int max_replication_slots
ReplicationSlotCtlData * ReplicationSlotCtl
void ReplicationSlotsComputeRequiredLSN(void)
void ReplicationSlotCleanup(bool synced_only)
ReplicationSlotInvalidationCause
#define SlotIsLogical(slot)
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
@ SS_SKIP_WAL_NOT_FLUSHED
@ SS_SKIP_NO_CONSISTENT_SNAPSHOT
@ SS_SKIP_WAL_OR_ROWS_REMOVED
static List * get_local_synced_slots(void)
#define MIN_SLOTSYNC_WORKER_NAPTIME_MS
#define PRIMARY_INFO_OUTPUT_COL_COUNT
static void slotsync_worker_disconnect(int code, Datum arg)
void SyncReplicationSlots(WalReceiverConn *wrconn)
static bool local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
static void drop_local_obsolete_slots(List *remote_slot_list)
static void reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
static void update_slotsync_skip_stats(SlotSyncSkipReason skip_reason)
void ShutDownSlotSync(void)
bool sync_replication_slots
static bool synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *slot_persistence_pending)
static SlotSyncCtxStruct * SlotSyncCtx
static void slotsync_failure_callback(int code, Datum arg)
#define SLOTSYNC_COLUMN_COUNT
static List * extract_slot_names(List *remote_slots)
#define SLOTSYNC_RESTART_INTERVAL_SEC
char * CheckAndGetDbnameFromConninfo(void)
static bool syncing_slots
struct RemoteSlot RemoteSlot
static void ProcessSlotSyncInterrupts(void)
struct SlotSyncCtxStruct SlotSyncCtxStruct
#define MAX_SLOTSYNC_WORKER_NAPTIME_MS
static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *slot_persistence_pending)
bool SlotSyncWorkerCanRestart(void)
static void wait_for_slot_activity(bool some_slot_updated)
static void slotsync_reread_config(void)
static void reset_syncing_flag(void)
void SlotSyncShmemInit(void)
static bool update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, bool *found_consistent_snapshot, bool *remote_slot_precedes)
static void slotsync_worker_onexit(int code, Datum arg)
static void update_synced_slots_inactive_since(void)
bool ValidateSlotSyncParams(int elevel)
static void validate_remote_info(WalReceiverConn *wrconn)
static void check_and_set_sync_info(pid_t sync_process_pid)
bool IsSyncingReplicationSlots(void)
void ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
static List * fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names)
Size SlotSyncShmemSize(void)
static bool synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list, bool *slot_persistence_pending)
bool SnapBuildSnapshotExists(XLogRecPtr lsn)
#define SpinLockInit(lock)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
ReplicationSlotInvalidationCause invalidated
TransactionId catalog_xmin
ReplicationSlot replication_slots[1]
TransactionId catalog_xmin
XLogRecPtr confirmed_flush
ReplicationSlotPersistency persistency
ReplicationSlotInvalidationCause invalidated
TransactionId effective_catalog_xmin
SlotSyncSkipReason slotsync_skip_reason
ReplicationSlotPersistentData data
Tuplestorestate * tuplestore
void InitializeTimeouts(void)
static bool TransactionIdFollows(TransactionId id1, TransactionId id2)
#define InvalidTransactionId
#define TransactionIdIsValid(xid)
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
#define WL_EXIT_ON_PM_DEATH
static WalReceiverConn * wrconn
bool hot_standby_feedback
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_get_dbname_from_conninfo(conninfo)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn)
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
bool IsTransactionState(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
XLogSegNo XLogGetLastRemovedSegno(void)
XLogSegNo XLogGetOldestSegno(TimeLineID tli)
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr