From 19c33a93ef305d9b55bf8fee0ba4fe4e28e3782f Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Tue, 25 Nov 2025 17:58:28 +0800 Subject: [PATCH 1/4] Extend xlogwait infrastructure with write and flush wait types Add support for waiting on WAL write and flush LSNs in addition to the existing replay LSN wait type. This provides the foundation for extending the WAIT FOR command with MODE parameter. Key changes: - Add WAIT_LSN_TYPE_WRITE and WAIT_LSN_TYPE_FLUSH_STANDBY to WaitLSNType - Add GetCurrentLSNForWaitType() to retrieve current LSN - Add new wait events WAIT_EVENT_WAIT_FOR_WAL_WRITE and WAIT_EVENT_WAIT_FOR_WAL_FLUSH for pg_stat_activity visibility - Update WaitForLSN() to use GetCurrentLSNForWaitType() internally --- src/backend/access/transam/xlog.c | 2 +- src/backend/access/transam/xlogrecovery.c | 4 +- src/backend/access/transam/xlogwait.c | 84 ++++++++++++++----- src/backend/commands/wait.c | 2 +- .../utils/activity/wait_event_names.txt | 3 +- src/include/access/xlogwait.h | 13 ++- 6 files changed, 81 insertions(+), 27 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6a5640df51af..30a64fe4b408 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6241,7 +6241,7 @@ StartupXLOG(void) * Wake up all waiters for replay LSN. They need to report an error that * recovery was ended before reaching the target LSN. */ - WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr); + WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, InvalidXLogRecPtr); /* * Shutdown the recovery environment. This must occur after diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index ae2398d6975b..f97aaa67645b 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1846,8 +1846,8 @@ PerformWalRecovery(void) */ if (waitLSNState && (XLogRecoveryCtl->lastReplayedEndRecPtr >= - pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY]))) - WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr); + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY_STANDBY]))) + WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, XLogRecoveryCtl->lastReplayedEndRecPtr); /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c index 84613fc39c7c..0663a459502e 100644 --- a/src/backend/access/transam/xlogwait.c +++ b/src/backend/access/transam/xlogwait.c @@ -12,25 +12,30 @@ * This file implements waiting for WAL operations to reach specific LSNs * on both physical standby and primary servers. The core idea is simple: * every process that wants to wait publishes the LSN it needs to the - * shared memory, and the appropriate process (startup on standby, or - * WAL writer/backend on primary) wakes it once that LSN has been reached. + * shared memory, and the appropriate process (startup on standby, + * walreceiver on standby, or WAL writer/backend on primary) wakes it + * once that LSN has been reached. * * The shared memory used by this module comprises a procInfos * per-backend array with the information of the awaited LSN for each * of the backend processes. The elements of that array are organized - * into a pairing heap waitersHeap, which allows for very fast finding - * of the least awaited LSN. + * into pairing heaps (waitersHeap), one for each WaitLSNType, which + * allows for very fast finding of the least awaited LSN for each type. * - * In addition, the least-awaited LSN is cached as minWaitedLSN. The - * waiter process publishes information about itself to the shared - * memory and waits on the latch until it is woken up by the appropriate - * process, standby is promoted, or the postmaster dies. Then, it cleans - * information about itself in the shared memory. + * In addition, the least-awaited LSN for each type is cached in the + * minWaitedLSN array. The waiter process publishes information about + * itself to the shared memory and waits on the latch until it is woken + * up by the appropriate process, standby is promoted, or the postmaster + * dies. Then, it cleans information about itself in the shared memory. * - * On standby servers: After replaying a WAL record, the startup process - * first performs a fast path check minWaitedLSN > replayLSN. If this - * check is negative, it checks waitersHeap and wakes up the backend - * whose awaited LSNs are reached. + * On standby servers: + * - After replaying a WAL record, the startup process performs a fast + * path check minWaitedLSN[REPLAY] > replayLSN. If this check is + * negative, it checks waitersHeap[REPLAY] and wakes up the backends + * whose awaited LSNs are reached. + * - After receiving WAL, the walreceiver process performs similar checks + * against the flush and write LSNs, waking up waiters in the FLUSH + * and WRITE heaps respectively. * * On primary servers: After flushing WAL, the WAL writer or backend * process performs a similar check against the flush LSN and wakes up @@ -49,6 +54,7 @@ #include "access/xlogwait.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walreceiver.h" #include "storage/latch.h" #include "storage/proc.h" #include "storage/shmem.h" @@ -62,6 +68,48 @@ static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, struct WaitLSNState *waitLSNState = NULL; +/* + * Wait event for each WaitLSNType, used with WaitLatch() to report + * the wait in pg_stat_activity. + */ +static const uint32 WaitLSNWaitEvents[] = { + [WAIT_LSN_TYPE_REPLAY_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY, + [WAIT_LSN_TYPE_WRITE_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_WRITE, + [WAIT_LSN_TYPE_FLUSH_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH, + [WAIT_LSN_TYPE_FLUSH_PRIMARY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH, +}; + +StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT, + "WaitLSNWaitEvents must match WaitLSNType enum"); + +/* + * Get the current LSN for the specified wait type. + */ +XLogRecPtr +GetCurrentLSNForWaitType(WaitLSNType lsnType) +{ + switch (lsnType) + { + case WAIT_LSN_TYPE_REPLAY_STANDBY: + return GetXLogReplayRecPtr(NULL); + + case WAIT_LSN_TYPE_WRITE_STANDBY: + return GetWalRcvWriteRecPtr(); + + case WAIT_LSN_TYPE_FLUSH_STANDBY: + return GetWalRcvFlushRecPtr(NULL, NULL); + + case WAIT_LSN_TYPE_FLUSH_PRIMARY: + return GetFlushRecPtr(NULL); + + case WAIT_LSN_TYPE_COUNT: + break; + } + + elog(ERROR, "invalid LSN wait type: %d", lsnType); + pg_unreachable(); +} + /* Report the amount of shared memory space needed for WaitLSNState. */ Size WaitLSNShmemSize(void) @@ -341,13 +389,11 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout) int rc; long delay_ms = -1; - if (lsnType == WAIT_LSN_TYPE_REPLAY) - currentLSN = GetXLogReplayRecPtr(NULL); - else - currentLSN = GetFlushRecPtr(NULL); + /* Get current LSN for the wait type */ + currentLSN = GetCurrentLSNForWaitType(lsnType); /* Check that recovery is still in-progress */ - if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress()) + if (lsnType != WAIT_LSN_TYPE_FLUSH_PRIMARY && !RecoveryInProgress()) { /* * Recovery was ended, but check if target LSN was already @@ -376,7 +422,7 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout) CHECK_FOR_INTERRUPTS(); rc = WaitLatch(MyLatch, wake_events, delay_ms, - (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH); + WaitLSNWaitEvents[lsnType]); /* * Emergency bailout if postmaster has died. This is to avoid the diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c index a37bddaefb27..43b37095afbf 100644 --- a/src/backend/commands/wait.c +++ b/src/backend/commands/wait.c @@ -140,7 +140,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) */ Assert(MyProc->xmin == InvalidTransactionId); - waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout); + waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY_STANDBY, lsn, timeout); /* * Process the result of WaitForLSN(). Throw appropriate error if needed. diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index f39830dbb344..58548acb8aa8 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -89,8 +89,9 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." -WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary." +WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary or standby." WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby." +WAIT_FOR_WAL_WRITE "Waiting for WAL write to reach a target LSN on a standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h index e607441d6187..9721a7a71955 100644 --- a/src/include/access/xlogwait.h +++ b/src/include/access/xlogwait.h @@ -35,9 +35,15 @@ typedef enum */ typedef enum WaitLSNType { - WAIT_LSN_TYPE_REPLAY = 0, /* Waiting for replay on standby */ - WAIT_LSN_TYPE_FLUSH = 1, /* Waiting for flush on primary */ - WAIT_LSN_TYPE_COUNT = 2 + /* Standby wait types (walreceiver/startup wakes) */ + WAIT_LSN_TYPE_REPLAY_STANDBY = 0, + WAIT_LSN_TYPE_WRITE_STANDBY = 1, + WAIT_LSN_TYPE_FLUSH_STANDBY = 2, + + /* Primary wait types (WAL writer/backends wake) */ + WAIT_LSN_TYPE_FLUSH_PRIMARY = 3, + + WAIT_LSN_TYPE_COUNT = 4 } WaitLSNType; /* @@ -96,6 +102,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState; extern Size WaitLSNShmemSize(void); extern void WaitLSNShmemInit(void); +extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType); extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN); extern void WaitLSNCleanup(void); extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, From b6ca0f235916d520d0f728dfa8fd8398f8f2a333 Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Tue, 25 Nov 2025 19:11:54 +0800 Subject: [PATCH 2/4] Add MODE parameter to WAIT FOR LSN command Extend the WAIT FOR LSN command with an optional MODE parameter that specifies which LSN type to wait for: WAIT FOR LSN '' [MODE { REPLAY | WRITE | FLUSH }] [WITH (...)] - REPLAY (default): Wait for WAL to be replayed to the specified LSN - WRITE: Wait for WAL to be written (received) to the specified LSN - FLUSH: Wait for WAL to be flushed to disk at the specified LSN The default mode is REPLAY, matching the original behavior when MODE is not specified. This follows the pattern used by LOCK command where the mode parameter is optional with a sensible default. The WRITE and FLUSH modes are useful for scenarios where applications need to ensure WAL has been received or persisted on the standby without necessarily waiting for replay to complete. Also includes: - Documentation updates for the new syntax and refactoring of existing WAIT FOR command documentation - Test coverage for all three modes including mixed concurrent waiters - Wakeup logic in walreceiver for WRITE/FLUSH waiters --- doc/src/sgml/ref/wait_for.sgml | 192 ++++++++++++---- src/backend/access/transam/xlog.c | 6 +- src/backend/commands/wait.c | 64 +++++- src/backend/parser/gram.y | 21 +- src/backend/replication/walreceiver.c | 19 ++ src/include/nodes/parsenodes.h | 11 + src/include/parser/kwlist.h | 2 + src/test/recovery/t/049_wait_for_lsn.pl | 294 ++++++++++++++++++++++-- 8 files changed, 522 insertions(+), 87 deletions(-) diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml index 3b8e842d1de9..28c68678315a 100644 --- a/doc/src/sgml/ref/wait_for.sgml +++ b/doc/src/sgml/ref/wait_for.sgml @@ -16,12 +16,13 @@ PostgreSQL documentation WAIT FOR - wait for target LSN to be replayed, optionally with a timeout + wait for WAL to reach a target LSN on a replica -WAIT FOR LSN 'lsn' [ WITH ( option [, ...] ) ] +WAIT FOR LSN 'lsn' [ MODE { REPLAY | FLUSH | WRITE } ] + [ WITH ( option [, ...] ) ] where option can be: @@ -34,20 +35,22 @@ WAIT FOR LSN 'lsn' [ WITH ( Description - Waits until recovery replays lsn. - If no timeout is specified or it is set to - zero, this command waits indefinitely for the - lsn. - On timeout, or if the server is promoted before - lsn is reached, an error is emitted, - unless NO_THROW is specified in the WITH clause. - If NO_THROW is specified, then the command - doesn't throw errors. + Waits until the specified lsn is reached + according to the specified mode, + which determines whether to wait for WAL to be written, flushed, or replayed. + If no timeout is specified or it is set to + zero, this command waits indefinitely for the + lsn. + On timeout, or if the server is promoted before + lsn is reached, an error is emitted, + unless NO_THROW is specified in the WITH clause. + If NO_THROW is specified, then the command + doesn't throw errors. - The possible return values are success, - timeout, and not in recovery. + The possible return values are success, + timeout, and not in recovery. @@ -64,6 +67,61 @@ WAIT FOR LSN 'lsn' [ WITH ( + + MODE + + + Specifies the type of LSN processing to wait for. If not specified, + the default is REPLAY. The valid modes are: + + + + + REPLAY + + + Wait for the LSN to be replayed (applied to the database). + After successful completion, pg_last_wal_replay_lsn() + will return a value greater than or equal to the target LSN. + + + + + + FLUSH + + + Wait for the WAL containing the LSN to be received from the + primary and flushed to disk. This provides a durability guarantee + without waiting for the WAL to be applied. After successful + completion, pg_last_wal_receive_lsn() + will return a value greater than or equal to the target LSN. + This value is also available as the flushed_lsn + column in + pg_stat_wal_receiver. + + + + + + WRITE + + + Wait for the WAL containing the LSN to be received from the + primary and written to disk, but not yet flushed. This is faster + than FLUSH but provides weaker durability + guarantees since the data may still be in operating system buffers. + After successful completion, the written_lsn + column in + pg_stat_wal_receiver will show + a value greater than or equal to the target LSN. + + + + + + + WITH ( option [, ...] ) @@ -135,9 +193,12 @@ WAIT FOR LSN 'lsn' [ WITH ( This return value denotes that the database server is not in a recovery - state. This might mean either the database server was not in recovery - at the moment of receiving the command, or it was promoted before - reaching the target lsn. + state. This might mean either the database server was not in recovery + at the moment of receiving the command (i.e., executed on a primary), + or it was promoted before reaching the target lsn. + In the promotion case, this status indicates a timeline change occurred, + and the application should re-evaluate whether the target LSN is still + relevant. @@ -148,25 +209,33 @@ WAIT FOR LSN 'lsn' [ WITH ( Notes - WAIT FOR command waits till - lsn to be replayed on standby. - That is, after this command execution, the value returned by - pg_last_wal_replay_lsn should be greater or equal - to the lsn value. This is useful to achieve - read-your-writes-consistency, while using async replica for reads and - primary for writes. In that case, the lsn of the last - modification should be stored on the client application side or the - connection pooler side. + WAIT FOR waits until the specified + lsn is reached according to the specified + mode. The REPLAY mode waits + for the LSN to be replayed (applied to the database), which is useful + to achieve read-your-writes consistency while using an async replica + for reads and the primary for writes. The FLUSH mode + waits for the WAL to be flushed to durable storage on the replica, + providing a durability guarantee without waiting for replay. The + WRITE mode waits for the WAL to be written to the + operating system, which is faster than flush but provides weaker + durability guarantees. In all cases, the LSN of the + last modification should be stored on the client application side or + the connection pooler side. - WAIT FOR command should be called on standby. - If a user runs WAIT FOR on primary, it - will error out unless NO_THROW is specified in the WITH clause. - However, if WAIT FOR is - called on primary promoted from standby and lsn - was already replayed, then the WAIT FOR command just - exits immediately. + WAIT FOR should be called on a standby. + If a user runs WAIT FOR on the primary, it + will error out unless NO_THROW is specified + in the WITH clause. However, if WAIT FOR is + called on a primary promoted from standby and lsn + was already reached, then the WAIT FOR command + just exits immediately. If the replica is promoted while waiting, + the command will return not in recovery (or throw + an error if NO_THROW is not specified). Promotion + creates a new timeline, and the LSN being waited for may refer to + WAL from the old timeline. @@ -175,21 +244,21 @@ WAIT FOR LSN 'lsn' [ WITH ( Examples - You can use WAIT FOR command to wait for - the pg_lsn value. For example, an application could update - the movie table and get the lsn after - changes just made. This example uses pg_current_wal_insert_lsn - on primary server to get the lsn given that - synchronous_commit could be set to - off. + You can use WAIT FOR command to wait for + the pg_lsn value. For example, an application could update + the movie table and get the lsn after + changes just made. This example uses pg_current_wal_insert_lsn + on primary server to get the lsn given that + synchronous_commit could be set to + off. postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama'; UPDATE 100 postgres=# SELECT pg_current_wal_insert_lsn(); -pg_current_wal_insert_lsn --------------------- -0/306EE20 + pg_current_wal_insert_lsn +--------------------------- + 0/306EE20 (1 row) @@ -198,9 +267,9 @@ pg_current_wal_insert_lsn changes made on primary should be guaranteed to be visible on replica. -postgres=# WAIT FOR LSN '0/306EE20'; +postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY; status --------- +--------- success (1 row) postgres=# SELECT * FROM movie WHERE genre = 'Drama'; @@ -211,21 +280,46 @@ postgres=# SELECT * FROM movie WHERE genre = 'Drama'; - If the target LSN is not reached before the timeout, the error is thrown. + Wait for flush (data durable on replica): -postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s'); +postgres=# WAIT FOR LSN '0/306EE20' MODE FLUSH; + status +--------- + success +(1 row) + + + + + Wait for write with timeout: + + +postgres=# WAIT FOR LSN '0/306EE20' MODE WRITE WITH (TIMEOUT '100ms', NO_THROW); + status +--------- + success +(1 row) + + + + + If the target LSN is not reached before the timeout, an error is thrown: + + +postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY WITH (TIMEOUT '0.1s'); ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60 The same example uses WAIT FOR with - NO_THROW option. + NO_THROW option: + -postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW); +postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY WITH (TIMEOUT '100ms', NO_THROW); status --------- +--------- timeout (1 row) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 30a64fe4b408..597e662c8a27 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6238,10 +6238,12 @@ StartupXLOG(void) LWLockRelease(ControlFileLock); /* - * Wake up all waiters for replay LSN. They need to report an error that - * recovery was ended before reaching the target LSN. + * Wake up all waiters. They need to report an error that recovery was + * ended before reaching the target LSN. */ WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, InvalidXLogRecPtr); + WaitLSNWakeup(WAIT_LSN_TYPE_WRITE_STANDBY, InvalidXLogRecPtr); + WaitLSNWakeup(WAIT_LSN_TYPE_FLUSH_STANDBY, InvalidXLogRecPtr); /* * Shutdown the recovery environment. This must occur after diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c index 43b37095afbf..05ad84fdb5bf 100644 --- a/src/backend/commands/wait.c +++ b/src/backend/commands/wait.c @@ -2,7 +2,7 @@ * * wait.c * Implements WAIT FOR, which allows waiting for events such as - * time passing or LSN having been replayed on replica. + * time passing or LSN having been replayed, flushed, or written. * * Portions Copyright (c) 2025, PostgreSQL Global Development Group * @@ -15,6 +15,7 @@ #include +#include "access/xlog.h" #include "access/xlogrecovery.h" #include "access/xlogwait.h" #include "commands/defrem.h" @@ -28,12 +29,28 @@ #include "utils/snapmgr.h" +/* + * Type descriptor for WAIT FOR LSN wait types, used for error messages. + */ +typedef struct WaitLSNTypeDesc +{ + const char *noun; /* "replay", "flush", "write" */ + const char *verb; /* "replayed", "flushed", "written" */ +} WaitLSNTypeDesc; + +static const WaitLSNTypeDesc WaitLSNTypeDescs[] = { + [WAIT_LSN_TYPE_REPLAY_STANDBY] = {"replay", "replayed"}, + [WAIT_LSN_TYPE_WRITE_STANDBY] = {"write", "written"}, + [WAIT_LSN_TYPE_FLUSH_STANDBY] = {"flush", "flushed"}, +}; + void ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) { XLogRecPtr lsn; int64 timeout = 0; WaitLSNResult waitLSNResult; + WaitLSNType lsnType; bool throw = true; TupleDesc tupdesc; TupOutputState *tstate; @@ -45,6 +62,22 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(stmt->lsn_literal))); + /* Convert parse-time WaitLSNMode to runtime WaitLSNType */ + switch (stmt->mode) + { + case WAIT_LSN_MODE_REPLAY: + lsnType = WAIT_LSN_TYPE_REPLAY_STANDBY; + break; + case WAIT_LSN_MODE_WRITE: + lsnType = WAIT_LSN_TYPE_WRITE_STANDBY; + break; + case WAIT_LSN_MODE_FLUSH: + lsnType = WAIT_LSN_TYPE_FLUSH_STANDBY; + break; + default: + elog(ERROR, "unrecognized wait mode: %d", stmt->mode); + } + foreach_node(DefElem, defel, stmt->options) { if (strcmp(defel->defname, "timeout") == 0) @@ -107,8 +140,8 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) } /* - * We are going to wait for the LSN replay. We should first care that we - * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. + * We are going to wait for the LSN. We should first care that we don't + * hold a snapshot and correspondingly our MyProc->xmin is invalid. * Otherwise, our snapshot could prevent the replay of WAL records * implying a kind of self-deadlock. This is the reason why WAIT FOR is a * command, not a procedure or function. @@ -140,7 +173,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) */ Assert(MyProc->xmin == InvalidTransactionId); - waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY_STANDBY, lsn, timeout); + waitLSNResult = WaitForLSN(lsnType, lsn, timeout); /* * Process the result of WaitForLSN(). Throw appropriate error if needed. @@ -154,11 +187,18 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) case WAIT_LSN_RESULT_TIMEOUT: if (throw) + { + const WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType]; + XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType); + ereport(ERROR, errcode(ERRCODE_QUERY_CANCELED), - errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X", + errmsg("timed out while waiting for target LSN %X/%08X to be %s; current %s LSN %X/%08X", LSN_FORMAT_ARGS(lsn), - LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + desc->verb, + desc->noun, + LSN_FORMAT_ARGS(currentLSN))); + } else result = "timeout"; break; @@ -166,20 +206,26 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) case WAIT_LSN_RESULT_NOT_IN_RECOVERY: if (throw) { + const WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType]; + XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType); + if (PromoteIsTriggered()) { ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is not in progress"), - errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.", + errdetail("Recovery ended before target LSN %X/%08X was %s; last %s LSN %X/%08X.", LSN_FORMAT_ARGS(lsn), - LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + desc->verb, + desc->noun, + LSN_FORMAT_ARGS(currentLSN))); } else ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is not in progress"), - errhint("Waiting for the replay LSN can only be executed during recovery.")); + errhint("Waiting for the %s LSN can only be executed during recovery.", + desc->noun)); } else result = "not in recovery"; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 7856ce9d78fc..5ae66fe79f0a 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -640,6 +640,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type window_definition over_clause window_specification opt_frame_clause frame_extent frame_bound %type null_treatment opt_window_exclusion_clause +%type opt_wait_lsn_mode %type opt_existing_window_name %type opt_if_not_exists %type opt_unique_null_treatment @@ -729,7 +730,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); ESCAPE EVENT EXCEPT EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXPRESSION EXTENSION EXTERNAL EXTRACT - FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FOLLOWING FOR + FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FLUSH FOLLOWING FOR FORCE FOREIGN FORMAT FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS @@ -770,7 +771,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); QUOTE QUOTES RANGE READ REAL REASSIGN RECURSIVE REF_P REFERENCES REFERENCING - REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA + REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLAY REPLICA RESET RESPECT_P RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP ROUTINE ROUTINES ROW ROWS RULE @@ -16489,15 +16490,23 @@ xml_passing_mech: *****************************************************************************/ WaitStmt: - WAIT FOR LSN_P Sconst opt_wait_with_clause + WAIT FOR LSN_P Sconst opt_wait_lsn_mode opt_wait_with_clause { WaitStmt *n = makeNode(WaitStmt); n->lsn_literal = $4; - n->options = $5; + n->mode = $5; + n->options = $6; $$ = (Node *) n; } ; +opt_wait_lsn_mode: + MODE REPLAY { $$ = WAIT_LSN_MODE_REPLAY; } + | MODE FLUSH { $$ = WAIT_LSN_MODE_FLUSH; } + | MODE WRITE { $$ = WAIT_LSN_MODE_WRITE; } + | /*EMPTY*/ { $$ = WAIT_LSN_MODE_REPLAY; } + ; + opt_wait_with_clause: WITH '(' utility_option_list ')' { $$ = $3; } | /*EMPTY*/ { $$ = NIL; } @@ -17937,6 +17946,7 @@ unreserved_keyword: | FILTER | FINALIZE | FIRST_P + | FLUSH | FOLLOWING | FORCE | FORMAT @@ -18071,6 +18081,7 @@ unreserved_keyword: | RENAME | REPEATABLE | REPLACE + | REPLAY | REPLICA | RESET | RESPECT_P @@ -18524,6 +18535,7 @@ bare_label_keyword: | FINALIZE | FIRST_P | FLOAT_P + | FLUSH | FOLLOWING | FORCE | FOREIGN @@ -18706,6 +18718,7 @@ bare_label_keyword: | RENAME | REPEATABLE | REPLACE + | REPLAY | REPLICA | RESET | RESTART diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ac802ae85b48..60b9ba5b345d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -57,6 +57,7 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xlogrecovery.h" +#include "access/xlogwait.h" #include "catalog/pg_authid.h" #include "funcapi.h" #include "libpq/pqformat.h" @@ -965,6 +966,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) /* Update shared-memory status */ pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); + /* + * If we wrote an LSN that someone was waiting for then walk over the + * shared memory array and set latches to notify the waiters. + */ + if (waitLSNState && + (LogstreamResult.Write >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_WRITE_STANDBY]))) + WaitLSNWakeup(WAIT_LSN_TYPE_WRITE_STANDBY, LogstreamResult.Write); + /* * Close the current segment if it's fully written up in the last cycle of * the loop, to create its archive notification file soon. Otherwise WAL @@ -1004,6 +1014,15 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) } SpinLockRelease(&walrcv->mutex); + /* + * If we flushed an LSN that someone was waiting for then walk over + * the shared memory array and set latches to notify the waiters. + */ + if (waitLSNState && + (LogstreamResult.Flush >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_FLUSH_STANDBY]))) + WaitLSNWakeup(WAIT_LSN_TYPE_FLUSH_STANDBY, LogstreamResult.Flush); + /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); if (AllowCascadeReplication()) diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index d14294a4eceb..bbaf3242ccbd 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4385,10 +4385,21 @@ typedef struct DropSubscriptionStmt DropBehavior behavior; /* RESTRICT or CASCADE behavior */ } DropSubscriptionStmt; +/* + * WaitLSNMode - MODE parameter for WAIT FOR command + */ +typedef enum WaitLSNMode +{ + WAIT_LSN_MODE_REPLAY, /* Wait for LSN replay on standby */ + WAIT_LSN_MODE_WRITE, /* Wait for LSN write on standby */ + WAIT_LSN_MODE_FLUSH /* Wait for LSN flush on standby */ +} WaitLSNMode; + typedef struct WaitStmt { NodeTag type; char *lsn_literal; /* LSN string from grammar */ + WaitLSNMode mode; /* Wait mode: REPLAY/FLUSH/WRITE */ List *options; /* List of DefElem nodes */ } WaitStmt; diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 5d4fe27ef962..7ad8b11b725e 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -176,6 +176,7 @@ PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD, AS_LABEL) PG_KEYWORD("finalize", FINALIZE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD, BARE_LABEL) +PG_KEYWORD("flush", FLUSH, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("following", FOLLOWING, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("for", FOR, RESERVED_KEYWORD, AS_LABEL) PG_KEYWORD("force", FORCE, UNRESERVED_KEYWORD, BARE_LABEL) @@ -378,6 +379,7 @@ PG_KEYWORD("release", RELEASE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("rename", RENAME, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("repeatable", REPEATABLE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("replace", REPLACE, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("replay", REPLAY, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("replica", REPLICA, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("reset", RESET, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("respect", RESPECT_P, UNRESERVED_KEYWORD, AS_LABEL) diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl index e0ddb06a2f04..df7b563cfbb8 100644 --- a/src/test/recovery/t/049_wait_for_lsn.pl +++ b/src/test/recovery/t/049_wait_for_lsn.pl @@ -1,4 +1,4 @@ -# Checks waiting for the LSN replay on standby using +# Checks waiting for the LSN replay/write/flush on standby using # the WAIT FOR command. use strict; use warnings FATAL => 'all'; @@ -7,6 +7,38 @@ use PostgreSQL::Test::Utils; use Test::More; +# Helper functions to control walreceiver for testing wait conditions. +# These allow us to stop WAL streaming so waiters block, then resume it. +my $saved_primary_conninfo; + +sub stop_walreceiver +{ + my ($node) = @_; + $saved_primary_conninfo = $node->safe_psql('postgres', + "SELECT setting FROM pg_settings WHERE name = 'primary_conninfo'"); + $node->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET primary_conninfo = ''; + SELECT pg_reload_conf(); + ]); + + $node->poll_query_until('postgres', + "SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);"); +} + +sub resume_walreceiver +{ + my ($node) = @_; + $node->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET primary_conninfo = '$saved_primary_conninfo'; + SELECT pg_reload_conf(); + ]); + + $node->poll_query_until('postgres', + "SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);"); +} + # Initialize primary node my $node_primary = PostgreSQL::Test::Cluster->new('primary'); $node_primary->init(allows_streaming => 1); @@ -62,7 +94,34 @@ ok((split("\n", $output))[-1] eq 30, "standby reached the same LSN as primary"); -# 3. Check that waiting for unreachable LSN triggers the timeout. The +# 3. Check that WAIT FOR works with WRITE and FLUSH modes. +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(31, 40))"); +my $lsn_write = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn_write}' MODE WRITE WITH (timeout '1d'); + SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${lsn_write}'::pg_lsn); +]); + +ok((split("\n", $output))[-1] >= 0, + "standby wrote WAL up to target LSN after WAIT FOR MODE WRITE"); + +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(41, 50))"); +my $lsn_flush = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn_flush}' MODE FLUSH WITH (timeout '1d'); + SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '${lsn_flush}'::pg_lsn); +]); + +ok((split("\n", $output))[-1] >= 0, + "standby flushed WAL up to target LSN after WAIT FOR MODE FLUSH"); + +# 4. Check that waiting for unreachable LSN triggers the timeout. The # unreachable LSN must be well in advance. So WAL records issued by # the concurrent autovacuum could not affect that. my $lsn3 = @@ -88,7 +147,7 @@ WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]); ok($output eq "timeout", "WAIT FOR returns correct status after timeout"); -# 4. Check that WAIT FOR triggers an error if called on primary, +# 5. Check that WAIT FOR triggers an error if called on primary, # within another function, or inside a transaction with an isolation level # higher than READ COMMITTED. @@ -125,7 +184,7 @@ /WAIT FOR must be only called without an active or registered snapshot/, "get an error when running within another function"); -# 5. Check parameter validation error cases on standby before promotion +# 6. Check parameter validation error cases on standby before promotion my $test_lsn = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); @@ -208,7 +267,7 @@ ok( $stderr =~ /option "invalid_option" not recognized/, "get error for invalid WITH clause option"); -# 6. Check the scenario of multiple LSN waiters. We make 5 background +# 7a. Check the scenario of multiple REPLAY waiters. We make 5 background # psql sessions each waiting for a corresponding insertion. When waiting is # finished, stored procedures logs if there are visible as many rows as # should be. @@ -226,7 +285,9 @@ \$\$ LANGUAGE plpgsql; ]); + $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();"); + my @psql_sessions; for (my $i = 0; $i < 5; $i++) { @@ -239,10 +300,11 @@ $psql_sessions[$i]->query_until( qr/start/, qq[ \\echo start - WAIT FOR LSN '${lsn}'; + WAIT FOR LSN '${lsn}' MODE REPLAY; SELECT log_count(${i}); ]); } + my $log_offset = -s $node_standby->logfile; $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();"); for (my $i = 0; $i < 5; $i++) @@ -251,23 +313,199 @@ $psql_sessions[$i]->quit; } -ok(1, 'multiple LSN waiters reported consistent data'); +ok(1, 'multiple REPLAY waiters reported consistent data'); + +# 7b. Check the scenario of multiple WRITE waiters. +# Stop walreceiver to ensure waiters actually block. +stop_walreceiver($node_standby); + +# Generate WAL on primary (standby won't receive it yet) +my @write_lsns; +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (100 + ${i});"); + $write_lsns[$i] = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn()"); +} + +# Start WRITE waiters (they will block since walreceiver is stopped) +my @write_sessions; +for (my $i = 0; $i < 5; $i++) +{ + $write_sessions[$i] = $node_standby->background_psql('postgres'); + $write_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '$write_lsns[$i]' MODE WRITE WITH (timeout '1d'); + DO \$\$ BEGIN RAISE LOG 'write_done %', $i; END \$\$; + ]); +} + +# Verify waiters are blocked +$node_standby->poll_query_until('postgres', + "SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalWrite'" +); + +# Restore walreceiver to unblock waiters +my $write_log_offset = -s $node_standby->logfile; +resume_walreceiver($node_standby); + +# Wait for all waiters to complete and close sessions +for (my $i = 0; $i < 5; $i++) +{ + $node_standby->wait_for_log("write_done $i", $write_log_offset); + $write_sessions[$i]->quit; +} + +# Verify on standby that WAL was written up to the target LSN +$output = $node_standby->safe_psql('postgres', + "SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '$write_lsns[4]'::pg_lsn);"); + +ok($output >= 0, + "multiple WRITE waiters: standby wrote WAL up to target LSN"); + +# 7c. Check the scenario of multiple FLUSH waiters. +# Stop walreceiver to ensure waiters actually block. +stop_walreceiver($node_standby); + +# Generate WAL on primary (standby won't receive it yet) +my @flush_lsns; +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (200 + ${i});"); + $flush_lsns[$i] = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn()"); +} + +# Start FLUSH waiters (they will block since walreceiver is stopped) +my @flush_sessions; +for (my $i = 0; $i < 5; $i++) +{ + $flush_sessions[$i] = $node_standby->background_psql('postgres'); + $flush_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '$flush_lsns[$i]' MODE FLUSH WITH (timeout '1d'); + DO \$\$ BEGIN RAISE LOG 'flush_done %', $i; END \$\$; + ]); +} + +# Verify waiters are blocked +$node_standby->poll_query_until('postgres', + "SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalFlush'" +); + +# Restore walreceiver to unblock waiters +my $flush_log_offset = -s $node_standby->logfile; +resume_walreceiver($node_standby); + +# Wait for all waiters to complete and close sessions +for (my $i = 0; $i < 5; $i++) +{ + $node_standby->wait_for_log("flush_done $i", $flush_log_offset); + $flush_sessions[$i]->quit; +} + +# Verify on standby that WAL was flushed up to the target LSN +$output = $node_standby->safe_psql('postgres', + "SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '$flush_lsns[4]'::pg_lsn);" +); + +ok($output >= 0, + "multiple FLUSH waiters: standby flushed WAL up to target LSN"); + +# 7d. Check the scenario of mixed mode waiters (REPLAY, WRITE, FLUSH) +# running concurrently. We start 6 sessions: 2 for each mode, all waiting +# for the same target LSN. We stop the walreceiver and pause replay to +# ensure all waiters block. Then we resume replay and restart the +# walreceiver to verify they unblock and complete correctly. + +# Stop walreceiver first to ensure we can control the flow without hanging +# (stopping it after pausing replay can hang if the startup process is paused). +stop_walreceiver($node_standby); + +# Pause replay +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();"); + +# Generate WAL on primary +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(301, 310));"); +my $mixed_target_lsn = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); + +# Start 6 waiters: 2 for each mode +my @mixed_sessions; +my @mixed_modes = ('REPLAY', 'WRITE', 'FLUSH'); +for (my $i = 0; $i < 6; $i++) +{ + $mixed_sessions[$i] = $node_standby->background_psql('postgres'); + $mixed_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${mixed_target_lsn}' MODE $mixed_modes[$i % 3] WITH (timeout '1d'); + DO \$\$ BEGIN RAISE LOG 'mixed_done %', $i; END \$\$; + ]); +} + +# Verify all waiters are blocked +$node_standby->poll_query_until('postgres', + "SELECT count(*) = 6 FROM pg_stat_activity WHERE wait_event LIKE 'WaitForWal%'" +); -# 7. Check that the standby promotion terminates the wait on LSN. Start -# waiting for an unreachable LSN then promote. Check the log for the relevant -# error message. Also, check that waiting for already replayed LSN doesn't -# cause an error even after promotion. +# Resume replay (waiters should still be blocked as no WAL has arrived) +my $mixed_log_offset = -s $node_standby->logfile; +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();"); +$node_standby->poll_query_until('postgres', + "SELECT NOT pg_is_wal_replay_paused();"); + +# Restore walreceiver to allow WAL to arrive +resume_walreceiver($node_standby); + +# Wait for all sessions to complete and close them +for (my $i = 0; $i < 6; $i++) +{ + $node_standby->wait_for_log("mixed_done $i", $mixed_log_offset); + $mixed_sessions[$i]->quit; +} + +# Verify all modes reached the target LSN +$output = $node_standby->safe_psql( + 'postgres', qq[ + SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${mixed_target_lsn}'::pg_lsn) >= 0 AND + pg_lsn_cmp(pg_last_wal_receive_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0 AND + pg_lsn_cmp(pg_last_wal_replay_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0; +]); + +ok($output eq 't', + "mixed mode waiters: all modes completed and reached target LSN"); + +# 8. Check that the standby promotion terminates all wait modes. Start +# waiting for unreachable LSNs with REPLAY, WRITE, and FLUSH modes, then +# promote. Check the log for the relevant error messages. Also, check that +# waiting for already replayed LSN doesn't cause an error even after promotion. my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn() + 10000000000"); + my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); -my $psql_session = $node_standby->background_psql('postgres'); -$psql_session->query_until( - qr/start/, qq[ - \\echo start - WAIT FOR LSN '${lsn4}'; -]); + +# Start background sessions waiting for unreachable LSN with all modes +my @wait_modes = ('REPLAY', 'WRITE', 'FLUSH'); +my @wait_sessions; +for (my $i = 0; $i < 3; $i++) +{ + $wait_sessions[$i] = $node_standby->background_psql('postgres'); + $wait_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${lsn4}' MODE $wait_modes[$i]; + ]); +} # Make sure standby will be promoted at least at the primary insert LSN we # have just observed. Use pg_switch_wal() to force the insert LSN to be @@ -277,17 +515,24 @@ $log_offset = -s $node_standby->logfile; $node_standby->promote; -$node_standby->wait_for_log('recovery is not in progress', $log_offset); -ok(1, 'got error after standby promote'); +# Wait for all three sessions to get the error (each mode has distinct message) +$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was written/, + $log_offset); +$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was flushed/, + $log_offset); +$node_standby->wait_for_log( + qr/Recovery ended before target LSN.*was replayed/, $log_offset); -$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';"); +ok(1, 'promotion interrupted all wait modes'); + +$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}' MODE REPLAY;"); ok(1, 'wait for already replayed LSN exits immediately even after promotion'); $output = $node_standby->safe_psql( 'postgres', qq[ - WAIT FOR LSN '${lsn4}' WITH (timeout '10ms', no_throw);]); + WAIT FOR LSN '${lsn4}' MODE REPLAY WITH (timeout '10ms', no_throw);]); ok($output eq "not in recovery", "WAIT FOR returns correct status after standby promotion"); @@ -295,8 +540,11 @@ $node_standby->stop; $node_primary->stop; -# If we send \q with $psql_session->quit the command can be sent to the session +# If we send \q with $session->quit the command can be sent to the session # already closed. So \q is in initial script, here we only finish IPC::Run. -$psql_session->{run}->finish; +for (my $i = 0; $i < 3; $i++) +{ + $wait_sessions[$i]->{run}->finish; +} done_testing(); From 488388a3532855f5c97cb7af59cffc515c04a98b Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Tue, 25 Nov 2025 19:18:06 +0800 Subject: [PATCH 3/4] Add tab completion for WAIT FOR LSN MODE parameter Update psql tab completion to support the optional MODE parameter in WAIT FOR LSN command. After specifying an LSN value, completion now offers both MODE and WITH keywords since MODE defaults to REPLAY. --- src/bin/psql/tab-complete.in.c | 39 ++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 20d7a65c614e..fcb9f19faef7 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -5313,10 +5313,11 @@ match_previous_words(int pattern_id, COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_vacuumables); /* - * WAIT FOR LSN '' [ WITH ( option [, ...] ) ] + * WAIT FOR LSN '' [ MODE { REPLAY | FLUSH | WRITE } ] [ WITH ( option [, ...] ) ] * where option can be: * TIMEOUT '' * NO_THROW + * MODE defaults to REPLAY if not specified. */ else if (Matches("WAIT")) COMPLETE_WITH("FOR"); @@ -5325,25 +5326,41 @@ match_previous_words(int pattern_id, else if (Matches("WAIT", "FOR", "LSN")) /* No completion for LSN value - user must provide manually */ ; + + /* + * After LSN value, offer MODE (optional) or WITH, since MODE defaults to + * REPLAY + */ else if (Matches("WAIT", "FOR", "LSN", MatchAny)) + COMPLETE_WITH("MODE", "WITH"); + else if (Matches("WAIT", "FOR", "LSN", MatchAny, "MODE")) + COMPLETE_WITH("REPLAY", "FLUSH", "WRITE"); + else if (Matches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny)) COMPLETE_WITH("WITH"); + /* WITH directly after LSN (using default REPLAY mode) */ else if (Matches("WAIT", "FOR", "LSN", MatchAny, "WITH")) COMPLETE_WITH("("); + else if (Matches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny, "WITH")) + COMPLETE_WITH("("); + + /* + * Handle parenthesized option list (both with and without explicit MODE). + * This fires when we're in an unfinished parenthesized option list. + * get_previous_words treats a completed parenthesized option list as one + * word, so the above test is correct. timeout takes a string value, + * no_throw takes no value. We don't offer completions for these values. + */ else if (HeadMatches("WAIT", "FOR", "LSN", MatchAny, "WITH", "(*") && !HeadMatches("WAIT", "FOR", "LSN", MatchAny, "WITH", "(*)")) { - /* - * This fires if we're in an unfinished parenthesized option list. - * get_previous_words treats a completed parenthesized option list as - * one word, so the above test is correct. - */ if (ends_with(prev_wd, '(') || ends_with(prev_wd, ',')) COMPLETE_WITH("timeout", "no_throw"); - - /* - * timeout takes a string value, no_throw takes no value. We don't - * offer completions for these values. - */ + } + else if (HeadMatches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny, "WITH", "(*") && + !HeadMatches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny, "WITH", "(*)")) + { + if (ends_with(prev_wd, '(') || ends_with(prev_wd, ',')) + COMPLETE_WITH("timeout", "no_throw"); } /* WITH [RECURSIVE] */ From 87ac09a3dcac118a54cf6ff6e4657b0050256e22 Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Tue, 25 Nov 2025 19:20:18 +0800 Subject: [PATCH 4/4] Use WAIT FOR LSN in PostgreSQL::Test::Cluster::wait_for_catchup() Replace polling-based catchup waiting with WAIT FOR LSN command when running on a standby server. This is more efficient than repeatedly querying pg_stat_replication as the WAIT FOR command uses the latch- based wakeup mechanism. The optimization applies when: - The node is in recovery (standby server) - The mode is 'replay', 'write', or 'flush' (not 'sent') For 'sent' mode or when running on a primary, the function falls back to the original polling approach since WAIT FOR LSN is only available during recovery. --- src/test/perl/PostgreSQL/Test/Cluster.pm | 35 ++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index 295988b8b877..eec8233b5158 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -3335,6 +3335,9 @@ sub wait_for_catchup $mode = defined($mode) ? $mode : 'replay'; my %valid_modes = ('sent' => 1, 'write' => 1, 'flush' => 1, 'replay' => 1); + my $isrecovery = + $self->safe_psql('postgres', "SELECT pg_is_in_recovery()"); + chomp($isrecovery); croak "unknown mode $mode for 'wait_for_catchup', valid modes are " . join(', ', keys(%valid_modes)) unless exists($valid_modes{$mode}); @@ -3347,9 +3350,6 @@ sub wait_for_catchup } if (!defined($target_lsn)) { - my $isrecovery = - $self->safe_psql('postgres', "SELECT pg_is_in_recovery()"); - chomp($isrecovery); if ($isrecovery eq 't') { $target_lsn = $self->lsn('replay'); @@ -3367,6 +3367,35 @@ sub wait_for_catchup . $self->name . "\n"; # Before release 12 walreceiver just set the application name to # "walreceiver" + + # Use WAIT FOR LSN when in recovery for supported modes (replay, write, flush) + # This is more efficient than polling pg_stat_replication + if (($mode ne 'sent') && ($isrecovery eq 't')) + { + my $timeout = $PostgreSQL::Test::Utils::timeout_default; + # Map mode names to WAIT FOR LSN MODE values (uppercase) + my $wait_mode = uc($mode); + my $query = + qq[WAIT FOR LSN '${target_lsn}' MODE ${wait_mode} WITH (timeout '${timeout}s', no_throw);]; + my $output = $self->safe_psql('postgres', $query); + chomp($output); + + if ($output ne 'success') + { + # Fetch additional detail for debugging purposes + $query = qq[SELECT * FROM pg_catalog.pg_stat_replication]; + my $details = $self->safe_psql('postgres', $query); + diag qq(WAIT FOR LSN failed with status: +${output}); + diag qq(Last pg_stat_replication contents: +${details}); + croak "failed waiting for catchup"; + } + print "done\n"; + return; + } + + # Polling for 'sent' mode or when not in recovery (WAIT FOR LSN not applicable) my $query = qq[SELECT '$target_lsn' <= ${mode}_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name IN ('$standby_name', 'walreceiver')];