diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml
index 3b8e842d1de9..28c68678315a 100644
--- a/doc/src/sgml/ref/wait_for.sgml
+++ b/doc/src/sgml/ref/wait_for.sgml
@@ -16,12 +16,13 @@ PostgreSQL documentation
WAIT FOR
- wait for target LSN to be replayed, optionally with a timeout
+ wait for WAL to reach a target LSN on a replica
-WAIT FOR LSN 'lsn' [ WITH ( option [, ...] ) ]
+WAIT FOR LSN 'lsn' [ MODE { REPLAY | FLUSH | WRITE } ]
+ [ WITH ( option [, ...] ) ]
where option can be:
@@ -34,20 +35,22 @@ WAIT FOR LSN 'lsn' [ WITH ( Description
- Waits until recovery replays lsn.
- If no timeout is specified or it is set to
- zero, this command waits indefinitely for the
- lsn.
- On timeout, or if the server is promoted before
- lsn is reached, an error is emitted,
- unless NO_THROW is specified in the WITH clause.
- If NO_THROW is specified, then the command
- doesn't throw errors.
+ Waits until the specified lsn is reached
+ according to the specified mode,
+ which determines whether to wait for WAL to be written, flushed, or replayed.
+ If no timeout is specified or it is set to
+ zero, this command waits indefinitely for the
+ lsn.
+ On timeout, or if the server is promoted before
+ lsn is reached, an error is emitted,
+ unless NO_THROW is specified in the WITH clause.
+ If NO_THROW is specified, then the command
+ doesn't throw errors.
- The possible return values are success,
- timeout, and not in recovery.
+ The possible return values are success,
+ timeout, and not in recovery.
@@ -64,6 +67,61 @@ WAIT FOR LSN 'lsn' [ WITH (
+
+ MODE
+
+
+ Specifies the type of LSN processing to wait for. If not specified,
+ the default is REPLAY. The valid modes are:
+
+
+
+
+ REPLAY
+
+
+ Wait for the LSN to be replayed (applied to the database).
+ After successful completion, pg_last_wal_replay_lsn()
+ will return a value greater than or equal to the target LSN.
+
+
+
+
+
+ FLUSH
+
+
+ Wait for the WAL containing the LSN to be received from the
+ primary and flushed to disk. This provides a durability guarantee
+ without waiting for the WAL to be applied. After successful
+ completion, pg_last_wal_receive_lsn()
+ will return a value greater than or equal to the target LSN.
+ This value is also available as the flushed_lsn
+ column in
+ pg_stat_wal_receiver.
+
+
+
+
+
+ WRITE
+
+
+ Wait for the WAL containing the LSN to be received from the
+ primary and written to disk, but not yet flushed. This is faster
+ than FLUSH but provides weaker durability
+ guarantees since the data may still be in operating system buffers.
+ After successful completion, the written_lsn
+ column in
+ pg_stat_wal_receiver will show
+ a value greater than or equal to the target LSN.
+
+
+
+
+
+
+
WITH ( option [, ...] )
@@ -135,9 +193,12 @@ WAIT FOR LSN 'lsn' [ WITH (
This return value denotes that the database server is not in a recovery
- state. This might mean either the database server was not in recovery
- at the moment of receiving the command, or it was promoted before
- reaching the target lsn.
+ state. This might mean either the database server was not in recovery
+ at the moment of receiving the command (i.e., executed on a primary),
+ or it was promoted before reaching the target lsn.
+ In the promotion case, this status indicates a timeline change occurred,
+ and the application should re-evaluate whether the target LSN is still
+ relevant.
@@ -148,25 +209,33 @@ WAIT FOR LSN 'lsn' [ WITH ( Notes
- WAIT FOR command waits till
- lsn to be replayed on standby.
- That is, after this command execution, the value returned by
- pg_last_wal_replay_lsn should be greater or equal
- to the lsn value. This is useful to achieve
- read-your-writes-consistency, while using async replica for reads and
- primary for writes. In that case, the lsn of the last
- modification should be stored on the client application side or the
- connection pooler side.
+ WAIT FOR waits until the specified
+ lsn is reached according to the specified
+ mode. The REPLAY mode waits
+ for the LSN to be replayed (applied to the database), which is useful
+ to achieve read-your-writes consistency while using an async replica
+ for reads and the primary for writes. The FLUSH mode
+ waits for the WAL to be flushed to durable storage on the replica,
+ providing a durability guarantee without waiting for replay. The
+ WRITE mode waits for the WAL to be written to the
+ operating system, which is faster than flush but provides weaker
+ durability guarantees. In all cases, the LSN of the
+ last modification should be stored on the client application side or
+ the connection pooler side.
- WAIT FOR command should be called on standby.
- If a user runs WAIT FOR on primary, it
- will error out unless NO_THROW is specified in the WITH clause.
- However, if WAIT FOR is
- called on primary promoted from standby and lsn
- was already replayed, then the WAIT FOR command just
- exits immediately.
+ WAIT FOR should be called on a standby.
+ If a user runs WAIT FOR on the primary, it
+ will error out unless NO_THROW is specified
+ in the WITH clause. However, if WAIT FOR is
+ called on a primary promoted from standby and lsn
+ was already reached, then the WAIT FOR command
+ just exits immediately. If the replica is promoted while waiting,
+ the command will return not in recovery (or throw
+ an error if NO_THROW is not specified). Promotion
+ creates a new timeline, and the LSN being waited for may refer to
+ WAL from the old timeline.
@@ -175,21 +244,21 @@ WAIT FOR LSN 'lsn' [ WITH ( Examples
- You can use WAIT FOR command to wait for
- the pg_lsn value. For example, an application could update
- the movie table and get the lsn after
- changes just made. This example uses pg_current_wal_insert_lsn
- on primary server to get the lsn given that
- synchronous_commit could be set to
- off.
+ You can use WAIT FOR command to wait for
+ the pg_lsn value. For example, an application could update
+ the movie table and get the lsn after
+ changes just made. This example uses pg_current_wal_insert_lsn
+ on primary server to get the lsn given that
+ synchronous_commit could be set to
+ off.
postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
UPDATE 100
postgres=# SELECT pg_current_wal_insert_lsn();
-pg_current_wal_insert_lsn
---------------------
-0/306EE20
+ pg_current_wal_insert_lsn
+---------------------------
+ 0/306EE20
(1 row)
@@ -198,9 +267,9 @@ pg_current_wal_insert_lsn
changes made on primary should be guaranteed to be visible on replica.
-postgres=# WAIT FOR LSN '0/306EE20';
+postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY;
status
---------
+---------
success
(1 row)
postgres=# SELECT * FROM movie WHERE genre = 'Drama';
@@ -211,21 +280,46 @@ postgres=# SELECT * FROM movie WHERE genre = 'Drama';
- If the target LSN is not reached before the timeout, the error is thrown.
+ Wait for flush (data durable on replica):
-postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s');
+postgres=# WAIT FOR LSN '0/306EE20' MODE FLUSH;
+ status
+---------
+ success
+(1 row)
+
+
+
+
+ Wait for write with timeout:
+
+
+postgres=# WAIT FOR LSN '0/306EE20' MODE WRITE WITH (TIMEOUT '100ms', NO_THROW);
+ status
+---------
+ success
+(1 row)
+
+
+
+
+ If the target LSN is not reached before the timeout, an error is thrown:
+
+
+postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY WITH (TIMEOUT '0.1s');
ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60
The same example uses WAIT FOR with
- NO_THROW option.
+ NO_THROW option:
+
-postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW);
+postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY WITH (TIMEOUT '100ms', NO_THROW);
status
---------
+---------
timeout
(1 row)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6a5640df51af..597e662c8a27 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6238,10 +6238,12 @@ StartupXLOG(void)
LWLockRelease(ControlFileLock);
/*
- * Wake up all waiters for replay LSN. They need to report an error that
- * recovery was ended before reaching the target LSN.
+ * Wake up all waiters. They need to report an error that recovery was
+ * ended before reaching the target LSN.
*/
- WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr);
+ WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, InvalidXLogRecPtr);
+ WaitLSNWakeup(WAIT_LSN_TYPE_WRITE_STANDBY, InvalidXLogRecPtr);
+ WaitLSNWakeup(WAIT_LSN_TYPE_FLUSH_STANDBY, InvalidXLogRecPtr);
/*
* Shutdown the recovery environment. This must occur after
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index ae2398d6975b..f97aaa67645b 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1846,8 +1846,8 @@ PerformWalRecovery(void)
*/
if (waitLSNState &&
(XLogRecoveryCtl->lastReplayedEndRecPtr >=
- pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY])))
- WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
+ pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY_STANDBY])))
+ WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, XLogRecoveryCtl->lastReplayedEndRecPtr);
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
index 84613fc39c7c..0663a459502e 100644
--- a/src/backend/access/transam/xlogwait.c
+++ b/src/backend/access/transam/xlogwait.c
@@ -12,25 +12,30 @@
* This file implements waiting for WAL operations to reach specific LSNs
* on both physical standby and primary servers. The core idea is simple:
* every process that wants to wait publishes the LSN it needs to the
- * shared memory, and the appropriate process (startup on standby, or
- * WAL writer/backend on primary) wakes it once that LSN has been reached.
+ * shared memory, and the appropriate process (startup on standby,
+ * walreceiver on standby, or WAL writer/backend on primary) wakes it
+ * once that LSN has been reached.
*
* The shared memory used by this module comprises a procInfos
* per-backend array with the information of the awaited LSN for each
* of the backend processes. The elements of that array are organized
- * into a pairing heap waitersHeap, which allows for very fast finding
- * of the least awaited LSN.
+ * into pairing heaps (waitersHeap), one for each WaitLSNType, which
+ * allows for very fast finding of the least awaited LSN for each type.
*
- * In addition, the least-awaited LSN is cached as minWaitedLSN. The
- * waiter process publishes information about itself to the shared
- * memory and waits on the latch until it is woken up by the appropriate
- * process, standby is promoted, or the postmaster dies. Then, it cleans
- * information about itself in the shared memory.
+ * In addition, the least-awaited LSN for each type is cached in the
+ * minWaitedLSN array. The waiter process publishes information about
+ * itself to the shared memory and waits on the latch until it is woken
+ * up by the appropriate process, standby is promoted, or the postmaster
+ * dies. Then, it cleans information about itself in the shared memory.
*
- * On standby servers: After replaying a WAL record, the startup process
- * first performs a fast path check minWaitedLSN > replayLSN. If this
- * check is negative, it checks waitersHeap and wakes up the backend
- * whose awaited LSNs are reached.
+ * On standby servers:
+ * - After replaying a WAL record, the startup process performs a fast
+ * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
+ * negative, it checks waitersHeap[REPLAY] and wakes up the backends
+ * whose awaited LSNs are reached.
+ * - After receiving WAL, the walreceiver process performs similar checks
+ * against the flush and write LSNs, waking up waiters in the FLUSH
+ * and WRITE heaps respectively.
*
* On primary servers: After flushing WAL, the WAL writer or backend
* process performs a similar check against the flush LSN and wakes up
@@ -49,6 +54,7 @@
#include "access/xlogwait.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/walreceiver.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
@@ -62,6 +68,48 @@ static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
struct WaitLSNState *waitLSNState = NULL;
+/*
+ * Wait event for each WaitLSNType, used with WaitLatch() to report
+ * the wait in pg_stat_activity.
+ */
+static const uint32 WaitLSNWaitEvents[] = {
+ [WAIT_LSN_TYPE_REPLAY_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
+ [WAIT_LSN_TYPE_WRITE_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
+ [WAIT_LSN_TYPE_FLUSH_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+ [WAIT_LSN_TYPE_FLUSH_PRIMARY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+};
+
+StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
+ "WaitLSNWaitEvents must match WaitLSNType enum");
+
+/*
+ * Get the current LSN for the specified wait type.
+ */
+XLogRecPtr
+GetCurrentLSNForWaitType(WaitLSNType lsnType)
+{
+ switch (lsnType)
+ {
+ case WAIT_LSN_TYPE_REPLAY_STANDBY:
+ return GetXLogReplayRecPtr(NULL);
+
+ case WAIT_LSN_TYPE_WRITE_STANDBY:
+ return GetWalRcvWriteRecPtr();
+
+ case WAIT_LSN_TYPE_FLUSH_STANDBY:
+ return GetWalRcvFlushRecPtr(NULL, NULL);
+
+ case WAIT_LSN_TYPE_FLUSH_PRIMARY:
+ return GetFlushRecPtr(NULL);
+
+ case WAIT_LSN_TYPE_COUNT:
+ break;
+ }
+
+ elog(ERROR, "invalid LSN wait type: %d", lsnType);
+ pg_unreachable();
+}
+
/* Report the amount of shared memory space needed for WaitLSNState. */
Size
WaitLSNShmemSize(void)
@@ -341,13 +389,11 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
int rc;
long delay_ms = -1;
- if (lsnType == WAIT_LSN_TYPE_REPLAY)
- currentLSN = GetXLogReplayRecPtr(NULL);
- else
- currentLSN = GetFlushRecPtr(NULL);
+ /* Get current LSN for the wait type */
+ currentLSN = GetCurrentLSNForWaitType(lsnType);
/* Check that recovery is still in-progress */
- if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
+ if (lsnType != WAIT_LSN_TYPE_FLUSH_PRIMARY && !RecoveryInProgress())
{
/*
* Recovery was ended, but check if target LSN was already
@@ -376,7 +422,7 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
CHECK_FOR_INTERRUPTS();
rc = WaitLatch(MyLatch, wake_events, delay_ms,
- (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+ WaitLSNWaitEvents[lsnType]);
/*
* Emergency bailout if postmaster has died. This is to avoid the
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
index a37bddaefb27..05ad84fdb5bf 100644
--- a/src/backend/commands/wait.c
+++ b/src/backend/commands/wait.c
@@ -2,7 +2,7 @@
*
* wait.c
* Implements WAIT FOR, which allows waiting for events such as
- * time passing or LSN having been replayed on replica.
+ * time passing or LSN having been replayed, flushed, or written.
*
* Portions Copyright (c) 2025, PostgreSQL Global Development Group
*
@@ -15,6 +15,7 @@
#include
+#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "commands/defrem.h"
@@ -28,12 +29,28 @@
#include "utils/snapmgr.h"
+/*
+ * Type descriptor for WAIT FOR LSN wait types, used for error messages.
+ */
+typedef struct WaitLSNTypeDesc
+{
+ const char *noun; /* "replay", "flush", "write" */
+ const char *verb; /* "replayed", "flushed", "written" */
+} WaitLSNTypeDesc;
+
+static const WaitLSNTypeDesc WaitLSNTypeDescs[] = {
+ [WAIT_LSN_TYPE_REPLAY_STANDBY] = {"replay", "replayed"},
+ [WAIT_LSN_TYPE_WRITE_STANDBY] = {"write", "written"},
+ [WAIT_LSN_TYPE_FLUSH_STANDBY] = {"flush", "flushed"},
+};
+
void
ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
{
XLogRecPtr lsn;
int64 timeout = 0;
WaitLSNResult waitLSNResult;
+ WaitLSNType lsnType;
bool throw = true;
TupleDesc tupdesc;
TupOutputState *tstate;
@@ -45,6 +62,22 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
CStringGetDatum(stmt->lsn_literal)));
+ /* Convert parse-time WaitLSNMode to runtime WaitLSNType */
+ switch (stmt->mode)
+ {
+ case WAIT_LSN_MODE_REPLAY:
+ lsnType = WAIT_LSN_TYPE_REPLAY_STANDBY;
+ break;
+ case WAIT_LSN_MODE_WRITE:
+ lsnType = WAIT_LSN_TYPE_WRITE_STANDBY;
+ break;
+ case WAIT_LSN_MODE_FLUSH:
+ lsnType = WAIT_LSN_TYPE_FLUSH_STANDBY;
+ break;
+ default:
+ elog(ERROR, "unrecognized wait mode: %d", stmt->mode);
+ }
+
foreach_node(DefElem, defel, stmt->options)
{
if (strcmp(defel->defname, "timeout") == 0)
@@ -107,8 +140,8 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
}
/*
- * We are going to wait for the LSN replay. We should first care that we
- * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+ * We are going to wait for the LSN. We should first care that we don't
+ * hold a snapshot and correspondingly our MyProc->xmin is invalid.
* Otherwise, our snapshot could prevent the replay of WAL records
* implying a kind of self-deadlock. This is the reason why WAIT FOR is a
* command, not a procedure or function.
@@ -140,7 +173,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
*/
Assert(MyProc->xmin == InvalidTransactionId);
- waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
+ waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
/*
* Process the result of WaitForLSN(). Throw appropriate error if needed.
@@ -154,11 +187,18 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
case WAIT_LSN_RESULT_TIMEOUT:
if (throw)
+ {
+ const WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType];
+ XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
+
ereport(ERROR,
errcode(ERRCODE_QUERY_CANCELED),
- errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
+ errmsg("timed out while waiting for target LSN %X/%08X to be %s; current %s LSN %X/%08X",
LSN_FORMAT_ARGS(lsn),
- LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+ desc->verb,
+ desc->noun,
+ LSN_FORMAT_ARGS(currentLSN)));
+ }
else
result = "timeout";
break;
@@ -166,20 +206,26 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
if (throw)
{
+ const WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType];
+ XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
+
if (PromoteIsTriggered())
{
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is not in progress"),
- errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
+ errdetail("Recovery ended before target LSN %X/%08X was %s; last %s LSN %X/%08X.",
LSN_FORMAT_ARGS(lsn),
- LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+ desc->verb,
+ desc->noun,
+ LSN_FORMAT_ARGS(currentLSN)));
}
else
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is not in progress"),
- errhint("Waiting for the replay LSN can only be executed during recovery."));
+ errhint("Waiting for the %s LSN can only be executed during recovery.",
+ desc->noun));
}
else
result = "not in recovery";
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7856ce9d78fc..5ae66fe79f0a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -640,6 +640,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type window_definition over_clause window_specification
opt_frame_clause frame_extent frame_bound
%type null_treatment opt_window_exclusion_clause
+%type opt_wait_lsn_mode
%type opt_existing_window_name
%type opt_if_not_exists
%type opt_unique_null_treatment
@@ -729,7 +730,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
ESCAPE EVENT EXCEPT EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
EXPRESSION EXTENSION EXTERNAL EXTRACT
- FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FOLLOWING FOR
+ FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FLUSH FOLLOWING FOR
FORCE FOREIGN FORMAT FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -770,7 +771,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
QUOTE QUOTES
RANGE READ REAL REASSIGN RECURSIVE REF_P REFERENCES REFERENCING
- REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA
+ REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLAY REPLICA
RESET RESPECT_P RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP
ROUTINE ROUTINES ROW ROWS RULE
@@ -16489,15 +16490,23 @@ xml_passing_mech:
*****************************************************************************/
WaitStmt:
- WAIT FOR LSN_P Sconst opt_wait_with_clause
+ WAIT FOR LSN_P Sconst opt_wait_lsn_mode opt_wait_with_clause
{
WaitStmt *n = makeNode(WaitStmt);
n->lsn_literal = $4;
- n->options = $5;
+ n->mode = $5;
+ n->options = $6;
$$ = (Node *) n;
}
;
+opt_wait_lsn_mode:
+ MODE REPLAY { $$ = WAIT_LSN_MODE_REPLAY; }
+ | MODE FLUSH { $$ = WAIT_LSN_MODE_FLUSH; }
+ | MODE WRITE { $$ = WAIT_LSN_MODE_WRITE; }
+ | /*EMPTY*/ { $$ = WAIT_LSN_MODE_REPLAY; }
+ ;
+
opt_wait_with_clause:
WITH '(' utility_option_list ')' { $$ = $3; }
| /*EMPTY*/ { $$ = NIL; }
@@ -17937,6 +17946,7 @@ unreserved_keyword:
| FILTER
| FINALIZE
| FIRST_P
+ | FLUSH
| FOLLOWING
| FORCE
| FORMAT
@@ -18071,6 +18081,7 @@ unreserved_keyword:
| RENAME
| REPEATABLE
| REPLACE
+ | REPLAY
| REPLICA
| RESET
| RESPECT_P
@@ -18524,6 +18535,7 @@ bare_label_keyword:
| FINALIZE
| FIRST_P
| FLOAT_P
+ | FLUSH
| FOLLOWING
| FORCE
| FOREIGN
@@ -18706,6 +18718,7 @@ bare_label_keyword:
| RENAME
| REPEATABLE
| REPLACE
+ | REPLAY
| REPLICA
| RESET
| RESTART
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ac802ae85b48..60b9ba5b345d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -57,6 +57,7 @@
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
#include "catalog/pg_authid.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
@@ -965,6 +966,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
/* Update shared-memory status */
pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
+ /*
+ * If we wrote an LSN that someone was waiting for then walk over the
+ * shared memory array and set latches to notify the waiters.
+ */
+ if (waitLSNState &&
+ (LogstreamResult.Write >=
+ pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_WRITE_STANDBY])))
+ WaitLSNWakeup(WAIT_LSN_TYPE_WRITE_STANDBY, LogstreamResult.Write);
+
/*
* Close the current segment if it's fully written up in the last cycle of
* the loop, to create its archive notification file soon. Otherwise WAL
@@ -1004,6 +1014,15 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
}
SpinLockRelease(&walrcv->mutex);
+ /*
+ * If we flushed an LSN that someone was waiting for then walk over
+ * the shared memory array and set latches to notify the waiters.
+ */
+ if (waitLSNState &&
+ (LogstreamResult.Flush >=
+ pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_FLUSH_STANDBY])))
+ WaitLSNWakeup(WAIT_LSN_TYPE_FLUSH_STANDBY, LogstreamResult.Flush);
+
/* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery();
if (AllowCascadeReplication())
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index f39830dbb344..58548acb8aa8 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -89,8 +89,9 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem
LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server."
SSL_OPEN_SERVER "Waiting for SSL while attempting connection."
WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby."
-WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary."
+WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary or standby."
WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby."
+WAIT_FOR_WAL_WRITE "Waiting for WAL write to reach a target LSN on a standby."
WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process."
WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 20d7a65c614e..fcb9f19faef7 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -5313,10 +5313,11 @@ match_previous_words(int pattern_id,
COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_vacuumables);
/*
- * WAIT FOR LSN '' [ WITH ( option [, ...] ) ]
+ * WAIT FOR LSN '' [ MODE { REPLAY | FLUSH | WRITE } ] [ WITH ( option [, ...] ) ]
* where option can be:
* TIMEOUT ''
* NO_THROW
+ * MODE defaults to REPLAY if not specified.
*/
else if (Matches("WAIT"))
COMPLETE_WITH("FOR");
@@ -5325,25 +5326,41 @@ match_previous_words(int pattern_id,
else if (Matches("WAIT", "FOR", "LSN"))
/* No completion for LSN value - user must provide manually */
;
+
+ /*
+ * After LSN value, offer MODE (optional) or WITH, since MODE defaults to
+ * REPLAY
+ */
else if (Matches("WAIT", "FOR", "LSN", MatchAny))
+ COMPLETE_WITH("MODE", "WITH");
+ else if (Matches("WAIT", "FOR", "LSN", MatchAny, "MODE"))
+ COMPLETE_WITH("REPLAY", "FLUSH", "WRITE");
+ else if (Matches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny))
COMPLETE_WITH("WITH");
+ /* WITH directly after LSN (using default REPLAY mode) */
else if (Matches("WAIT", "FOR", "LSN", MatchAny, "WITH"))
COMPLETE_WITH("(");
+ else if (Matches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny, "WITH"))
+ COMPLETE_WITH("(");
+
+ /*
+ * Handle parenthesized option list (both with and without explicit MODE).
+ * This fires when we're in an unfinished parenthesized option list.
+ * get_previous_words treats a completed parenthesized option list as one
+ * word, so the above test is correct. timeout takes a string value,
+ * no_throw takes no value. We don't offer completions for these values.
+ */
else if (HeadMatches("WAIT", "FOR", "LSN", MatchAny, "WITH", "(*") &&
!HeadMatches("WAIT", "FOR", "LSN", MatchAny, "WITH", "(*)"))
{
- /*
- * This fires if we're in an unfinished parenthesized option list.
- * get_previous_words treats a completed parenthesized option list as
- * one word, so the above test is correct.
- */
if (ends_with(prev_wd, '(') || ends_with(prev_wd, ','))
COMPLETE_WITH("timeout", "no_throw");
-
- /*
- * timeout takes a string value, no_throw takes no value. We don't
- * offer completions for these values.
- */
+ }
+ else if (HeadMatches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny, "WITH", "(*") &&
+ !HeadMatches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny, "WITH", "(*)"))
+ {
+ if (ends_with(prev_wd, '(') || ends_with(prev_wd, ','))
+ COMPLETE_WITH("timeout", "no_throw");
}
/* WITH [RECURSIVE] */
diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h
index e607441d6187..9721a7a71955 100644
--- a/src/include/access/xlogwait.h
+++ b/src/include/access/xlogwait.h
@@ -35,9 +35,15 @@ typedef enum
*/
typedef enum WaitLSNType
{
- WAIT_LSN_TYPE_REPLAY = 0, /* Waiting for replay on standby */
- WAIT_LSN_TYPE_FLUSH = 1, /* Waiting for flush on primary */
- WAIT_LSN_TYPE_COUNT = 2
+ /* Standby wait types (walreceiver/startup wakes) */
+ WAIT_LSN_TYPE_REPLAY_STANDBY = 0,
+ WAIT_LSN_TYPE_WRITE_STANDBY = 1,
+ WAIT_LSN_TYPE_FLUSH_STANDBY = 2,
+
+ /* Primary wait types (WAL writer/backends wake) */
+ WAIT_LSN_TYPE_FLUSH_PRIMARY = 3,
+
+ WAIT_LSN_TYPE_COUNT = 4
} WaitLSNType;
/*
@@ -96,6 +102,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState;
extern Size WaitLSNShmemSize(void);
extern void WaitLSNShmemInit(void);
+extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType);
extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN);
extern void WaitLSNCleanup(void);
extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index d14294a4eceb..bbaf3242ccbd 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4385,10 +4385,21 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+/*
+ * WaitLSNMode - MODE parameter for WAIT FOR command
+ */
+typedef enum WaitLSNMode
+{
+ WAIT_LSN_MODE_REPLAY, /* Wait for LSN replay on standby */
+ WAIT_LSN_MODE_WRITE, /* Wait for LSN write on standby */
+ WAIT_LSN_MODE_FLUSH /* Wait for LSN flush on standby */
+} WaitLSNMode;
+
typedef struct WaitStmt
{
NodeTag type;
char *lsn_literal; /* LSN string from grammar */
+ WaitLSNMode mode; /* Wait mode: REPLAY/FLUSH/WRITE */
List *options; /* List of DefElem nodes */
} WaitStmt;
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 5d4fe27ef962..7ad8b11b725e 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -176,6 +176,7 @@ PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD, AS_LABEL)
PG_KEYWORD("finalize", FINALIZE, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD, BARE_LABEL)
+PG_KEYWORD("flush", FLUSH, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("following", FOLLOWING, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("for", FOR, RESERVED_KEYWORD, AS_LABEL)
PG_KEYWORD("force", FORCE, UNRESERVED_KEYWORD, BARE_LABEL)
@@ -378,6 +379,7 @@ PG_KEYWORD("release", RELEASE, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("rename", RENAME, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("repeatable", REPEATABLE, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("replace", REPLACE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("replay", REPLAY, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("replica", REPLICA, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("reset", RESET, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("respect", RESPECT_P, UNRESERVED_KEYWORD, AS_LABEL)
diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 295988b8b877..eec8233b5158 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3335,6 +3335,9 @@ sub wait_for_catchup
$mode = defined($mode) ? $mode : 'replay';
my %valid_modes =
('sent' => 1, 'write' => 1, 'flush' => 1, 'replay' => 1);
+ my $isrecovery =
+ $self->safe_psql('postgres', "SELECT pg_is_in_recovery()");
+ chomp($isrecovery);
croak "unknown mode $mode for 'wait_for_catchup', valid modes are "
. join(', ', keys(%valid_modes))
unless exists($valid_modes{$mode});
@@ -3347,9 +3350,6 @@ sub wait_for_catchup
}
if (!defined($target_lsn))
{
- my $isrecovery =
- $self->safe_psql('postgres', "SELECT pg_is_in_recovery()");
- chomp($isrecovery);
if ($isrecovery eq 't')
{
$target_lsn = $self->lsn('replay');
@@ -3367,6 +3367,35 @@ sub wait_for_catchup
. $self->name . "\n";
# Before release 12 walreceiver just set the application name to
# "walreceiver"
+
+ # Use WAIT FOR LSN when in recovery for supported modes (replay, write, flush)
+ # This is more efficient than polling pg_stat_replication
+ if (($mode ne 'sent') && ($isrecovery eq 't'))
+ {
+ my $timeout = $PostgreSQL::Test::Utils::timeout_default;
+ # Map mode names to WAIT FOR LSN MODE values (uppercase)
+ my $wait_mode = uc($mode);
+ my $query =
+ qq[WAIT FOR LSN '${target_lsn}' MODE ${wait_mode} WITH (timeout '${timeout}s', no_throw);];
+ my $output = $self->safe_psql('postgres', $query);
+ chomp($output);
+
+ if ($output ne 'success')
+ {
+ # Fetch additional detail for debugging purposes
+ $query = qq[SELECT * FROM pg_catalog.pg_stat_replication];
+ my $details = $self->safe_psql('postgres', $query);
+ diag qq(WAIT FOR LSN failed with status:
+${output});
+ diag qq(Last pg_stat_replication contents:
+${details});
+ croak "failed waiting for catchup";
+ }
+ print "done\n";
+ return;
+ }
+
+ # Polling for 'sent' mode or when not in recovery (WAIT FOR LSN not applicable)
my $query = qq[SELECT '$target_lsn' <= ${mode}_lsn AND state = 'streaming'
FROM pg_catalog.pg_stat_replication
WHERE application_name IN ('$standby_name', 'walreceiver')];
diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl
index e0ddb06a2f04..df7b563cfbb8 100644
--- a/src/test/recovery/t/049_wait_for_lsn.pl
+++ b/src/test/recovery/t/049_wait_for_lsn.pl
@@ -1,4 +1,4 @@
-# Checks waiting for the LSN replay on standby using
+# Checks waiting for the LSN replay/write/flush on standby using
# the WAIT FOR command.
use strict;
use warnings FATAL => 'all';
@@ -7,6 +7,38 @@
use PostgreSQL::Test::Utils;
use Test::More;
+# Helper functions to control walreceiver for testing wait conditions.
+# These allow us to stop WAL streaming so waiters block, then resume it.
+my $saved_primary_conninfo;
+
+sub stop_walreceiver
+{
+ my ($node) = @_;
+ $saved_primary_conninfo = $node->safe_psql('postgres',
+ "SELECT setting FROM pg_settings WHERE name = 'primary_conninfo'");
+ $node->safe_psql(
+ 'postgres', qq[
+ ALTER SYSTEM SET primary_conninfo = '';
+ SELECT pg_reload_conf();
+ ]);
+
+ $node->poll_query_until('postgres',
+ "SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+}
+
+sub resume_walreceiver
+{
+ my ($node) = @_;
+ $node->safe_psql(
+ 'postgres', qq[
+ ALTER SYSTEM SET primary_conninfo = '$saved_primary_conninfo';
+ SELECT pg_reload_conf();
+ ]);
+
+ $node->poll_query_until('postgres',
+ "SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+}
+
# Initialize primary node
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
$node_primary->init(allows_streaming => 1);
@@ -62,7 +94,34 @@
ok((split("\n", $output))[-1] eq 30,
"standby reached the same LSN as primary");
-# 3. Check that waiting for unreachable LSN triggers the timeout. The
+# 3. Check that WAIT FOR works with WRITE and FLUSH modes.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn_write =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn_write}' MODE WRITE WITH (timeout '1d');
+ SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${lsn_write}'::pg_lsn);
+]);
+
+ok((split("\n", $output))[-1] >= 0,
+ "standby wrote WAL up to target LSN after WAIT FOR MODE WRITE");
+
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn_flush =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn_flush}' MODE FLUSH WITH (timeout '1d');
+ SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '${lsn_flush}'::pg_lsn);
+]);
+
+ok((split("\n", $output))[-1] >= 0,
+ "standby flushed WAL up to target LSN after WAIT FOR MODE FLUSH");
+
+# 4. Check that waiting for unreachable LSN triggers the timeout. The
# unreachable LSN must be well in advance. So WAL records issued by
# the concurrent autovacuum could not affect that.
my $lsn3 =
@@ -88,7 +147,7 @@
WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]);
ok($output eq "timeout", "WAIT FOR returns correct status after timeout");
-# 4. Check that WAIT FOR triggers an error if called on primary,
+# 5. Check that WAIT FOR triggers an error if called on primary,
# within another function, or inside a transaction with an isolation level
# higher than READ COMMITTED.
@@ -125,7 +184,7 @@
/WAIT FOR must be only called without an active or registered snapshot/,
"get an error when running within another function");
-# 5. Check parameter validation error cases on standby before promotion
+# 6. Check parameter validation error cases on standby before promotion
my $test_lsn =
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
@@ -208,7 +267,7 @@
ok( $stderr =~ /option "invalid_option" not recognized/,
"get error for invalid WITH clause option");
-# 6. Check the scenario of multiple LSN waiters. We make 5 background
+# 7a. Check the scenario of multiple REPLAY waiters. We make 5 background
# psql sessions each waiting for a corresponding insertion. When waiting is
# finished, stored procedures logs if there are visible as many rows as
# should be.
@@ -226,7 +285,9 @@
\$\$
LANGUAGE plpgsql;
]);
+
$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+
my @psql_sessions;
for (my $i = 0; $i < 5; $i++)
{
@@ -239,10 +300,11 @@
$psql_sessions[$i]->query_until(
qr/start/, qq[
\\echo start
- WAIT FOR LSN '${lsn}';
+ WAIT FOR LSN '${lsn}' MODE REPLAY;
SELECT log_count(${i});
]);
}
+
my $log_offset = -s $node_standby->logfile;
$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
for (my $i = 0; $i < 5; $i++)
@@ -251,23 +313,199 @@
$psql_sessions[$i]->quit;
}
-ok(1, 'multiple LSN waiters reported consistent data');
+ok(1, 'multiple REPLAY waiters reported consistent data');
+
+# 7b. Check the scenario of multiple WRITE waiters.
+# Stop walreceiver to ensure waiters actually block.
+stop_walreceiver($node_standby);
+
+# Generate WAL on primary (standby won't receive it yet)
+my @write_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (100 + ${i});");
+ $write_lsns[$i] =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn()");
+}
+
+# Start WRITE waiters (they will block since walreceiver is stopped)
+my @write_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+ $write_sessions[$i] = $node_standby->background_psql('postgres');
+ $write_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '$write_lsns[$i]' MODE WRITE WITH (timeout '1d');
+ DO \$\$ BEGIN RAISE LOG 'write_done %', $i; END \$\$;
+ ]);
+}
+
+# Verify waiters are blocked
+$node_standby->poll_query_until('postgres',
+ "SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalWrite'"
+);
+
+# Restore walreceiver to unblock waiters
+my $write_log_offset = -s $node_standby->logfile;
+resume_walreceiver($node_standby);
+
+# Wait for all waiters to complete and close sessions
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_standby->wait_for_log("write_done $i", $write_log_offset);
+ $write_sessions[$i]->quit;
+}
+
+# Verify on standby that WAL was written up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '$write_lsns[4]'::pg_lsn);");
+
+ok($output >= 0,
+ "multiple WRITE waiters: standby wrote WAL up to target LSN");
+
+# 7c. Check the scenario of multiple FLUSH waiters.
+# Stop walreceiver to ensure waiters actually block.
+stop_walreceiver($node_standby);
+
+# Generate WAL on primary (standby won't receive it yet)
+my @flush_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (200 + ${i});");
+ $flush_lsns[$i] =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn()");
+}
+
+# Start FLUSH waiters (they will block since walreceiver is stopped)
+my @flush_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+ $flush_sessions[$i] = $node_standby->background_psql('postgres');
+ $flush_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '$flush_lsns[$i]' MODE FLUSH WITH (timeout '1d');
+ DO \$\$ BEGIN RAISE LOG 'flush_done %', $i; END \$\$;
+ ]);
+}
+
+# Verify waiters are blocked
+$node_standby->poll_query_until('postgres',
+ "SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalFlush'"
+);
+
+# Restore walreceiver to unblock waiters
+my $flush_log_offset = -s $node_standby->logfile;
+resume_walreceiver($node_standby);
+
+# Wait for all waiters to complete and close sessions
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_standby->wait_for_log("flush_done $i", $flush_log_offset);
+ $flush_sessions[$i]->quit;
+}
+
+# Verify on standby that WAL was flushed up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+ "SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '$flush_lsns[4]'::pg_lsn);"
+);
+
+ok($output >= 0,
+ "multiple FLUSH waiters: standby flushed WAL up to target LSN");
+
+# 7d. Check the scenario of mixed mode waiters (REPLAY, WRITE, FLUSH)
+# running concurrently. We start 6 sessions: 2 for each mode, all waiting
+# for the same target LSN. We stop the walreceiver and pause replay to
+# ensure all waiters block. Then we resume replay and restart the
+# walreceiver to verify they unblock and complete correctly.
+
+# Stop walreceiver first to ensure we can control the flow without hanging
+# (stopping it after pausing replay can hang if the startup process is paused).
+stop_walreceiver($node_standby);
+
+# Pause replay
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+
+# Generate WAL on primary
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(301, 310));");
+my $mixed_target_lsn =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+
+# Start 6 waiters: 2 for each mode
+my @mixed_sessions;
+my @mixed_modes = ('REPLAY', 'WRITE', 'FLUSH');
+for (my $i = 0; $i < 6; $i++)
+{
+ $mixed_sessions[$i] = $node_standby->background_psql('postgres');
+ $mixed_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '${mixed_target_lsn}' MODE $mixed_modes[$i % 3] WITH (timeout '1d');
+ DO \$\$ BEGIN RAISE LOG 'mixed_done %', $i; END \$\$;
+ ]);
+}
+
+# Verify all waiters are blocked
+$node_standby->poll_query_until('postgres',
+ "SELECT count(*) = 6 FROM pg_stat_activity WHERE wait_event LIKE 'WaitForWal%'"
+);
-# 7. Check that the standby promotion terminates the wait on LSN. Start
-# waiting for an unreachable LSN then promote. Check the log for the relevant
-# error message. Also, check that waiting for already replayed LSN doesn't
-# cause an error even after promotion.
+# Resume replay (waiters should still be blocked as no WAL has arrived)
+my $mixed_log_offset = -s $node_standby->logfile;
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
+$node_standby->poll_query_until('postgres',
+ "SELECT NOT pg_is_wal_replay_paused();");
+
+# Restore walreceiver to allow WAL to arrive
+resume_walreceiver($node_standby);
+
+# Wait for all sessions to complete and close them
+for (my $i = 0; $i < 6; $i++)
+{
+ $node_standby->wait_for_log("mixed_done $i", $mixed_log_offset);
+ $mixed_sessions[$i]->quit;
+}
+
+# Verify all modes reached the target LSN
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${mixed_target_lsn}'::pg_lsn) >= 0 AND
+ pg_lsn_cmp(pg_last_wal_receive_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0 AND
+ pg_lsn_cmp(pg_last_wal_replay_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0;
+]);
+
+ok($output eq 't',
+ "mixed mode waiters: all modes completed and reached target LSN");
+
+# 8. Check that the standby promotion terminates all wait modes. Start
+# waiting for unreachable LSNs with REPLAY, WRITE, and FLUSH modes, then
+# promote. Check the log for the relevant error messages. Also, check that
+# waiting for already replayed LSN doesn't cause an error even after promotion.
my $lsn4 =
$node_primary->safe_psql('postgres',
"SELECT pg_current_wal_insert_lsn() + 10000000000");
+
my $lsn5 =
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
-my $psql_session = $node_standby->background_psql('postgres');
-$psql_session->query_until(
- qr/start/, qq[
- \\echo start
- WAIT FOR LSN '${lsn4}';
-]);
+
+# Start background sessions waiting for unreachable LSN with all modes
+my @wait_modes = ('REPLAY', 'WRITE', 'FLUSH');
+my @wait_sessions;
+for (my $i = 0; $i < 3; $i++)
+{
+ $wait_sessions[$i] = $node_standby->background_psql('postgres');
+ $wait_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '${lsn4}' MODE $wait_modes[$i];
+ ]);
+}
# Make sure standby will be promoted at least at the primary insert LSN we
# have just observed. Use pg_switch_wal() to force the insert LSN to be
@@ -277,17 +515,24 @@
$log_offset = -s $node_standby->logfile;
$node_standby->promote;
-$node_standby->wait_for_log('recovery is not in progress', $log_offset);
-ok(1, 'got error after standby promote');
+# Wait for all three sessions to get the error (each mode has distinct message)
+$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was written/,
+ $log_offset);
+$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was flushed/,
+ $log_offset);
+$node_standby->wait_for_log(
+ qr/Recovery ended before target LSN.*was replayed/, $log_offset);
-$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';");
+ok(1, 'promotion interrupted all wait modes');
+
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}' MODE REPLAY;");
ok(1, 'wait for already replayed LSN exits immediately even after promotion');
$output = $node_standby->safe_psql(
'postgres', qq[
- WAIT FOR LSN '${lsn4}' WITH (timeout '10ms', no_throw);]);
+ WAIT FOR LSN '${lsn4}' MODE REPLAY WITH (timeout '10ms', no_throw);]);
ok($output eq "not in recovery",
"WAIT FOR returns correct status after standby promotion");
@@ -295,8 +540,11 @@
$node_standby->stop;
$node_primary->stop;
-# If we send \q with $psql_session->quit the command can be sent to the session
+# If we send \q with $session->quit the command can be sent to the session
# already closed. So \q is in initial script, here we only finish IPC::Run.
-$psql_session->{run}->finish;
+for (my $i = 0; $i < 3; $i++)
+{
+ $wait_sessions[$i]->{run}->finish;
+}
done_testing();