diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml index 3b8e842d1de9..28c68678315a 100644 --- a/doc/src/sgml/ref/wait_for.sgml +++ b/doc/src/sgml/ref/wait_for.sgml @@ -16,12 +16,13 @@ PostgreSQL documentation WAIT FOR - wait for target LSN to be replayed, optionally with a timeout + wait for WAL to reach a target LSN on a replica -WAIT FOR LSN 'lsn' [ WITH ( option [, ...] ) ] +WAIT FOR LSN 'lsn' [ MODE { REPLAY | FLUSH | WRITE } ] + [ WITH ( option [, ...] ) ] where option can be: @@ -34,20 +35,22 @@ WAIT FOR LSN 'lsn' [ WITH ( Description - Waits until recovery replays lsn. - If no timeout is specified or it is set to - zero, this command waits indefinitely for the - lsn. - On timeout, or if the server is promoted before - lsn is reached, an error is emitted, - unless NO_THROW is specified in the WITH clause. - If NO_THROW is specified, then the command - doesn't throw errors. + Waits until the specified lsn is reached + according to the specified mode, + which determines whether to wait for WAL to be written, flushed, or replayed. + If no timeout is specified or it is set to + zero, this command waits indefinitely for the + lsn. + On timeout, or if the server is promoted before + lsn is reached, an error is emitted, + unless NO_THROW is specified in the WITH clause. + If NO_THROW is specified, then the command + doesn't throw errors. - The possible return values are success, - timeout, and not in recovery. + The possible return values are success, + timeout, and not in recovery. @@ -64,6 +67,61 @@ WAIT FOR LSN 'lsn' [ WITH ( + + MODE + + + Specifies the type of LSN processing to wait for. If not specified, + the default is REPLAY. The valid modes are: + + + + + REPLAY + + + Wait for the LSN to be replayed (applied to the database). + After successful completion, pg_last_wal_replay_lsn() + will return a value greater than or equal to the target LSN. + + + + + + FLUSH + + + Wait for the WAL containing the LSN to be received from the + primary and flushed to disk. This provides a durability guarantee + without waiting for the WAL to be applied. After successful + completion, pg_last_wal_receive_lsn() + will return a value greater than or equal to the target LSN. + This value is also available as the flushed_lsn + column in + pg_stat_wal_receiver. + + + + + + WRITE + + + Wait for the WAL containing the LSN to be received from the + primary and written to disk, but not yet flushed. This is faster + than FLUSH but provides weaker durability + guarantees since the data may still be in operating system buffers. + After successful completion, the written_lsn + column in + pg_stat_wal_receiver will show + a value greater than or equal to the target LSN. + + + + + + + WITH ( option [, ...] ) @@ -135,9 +193,12 @@ WAIT FOR LSN 'lsn' [ WITH ( This return value denotes that the database server is not in a recovery - state. This might mean either the database server was not in recovery - at the moment of receiving the command, or it was promoted before - reaching the target lsn. + state. This might mean either the database server was not in recovery + at the moment of receiving the command (i.e., executed on a primary), + or it was promoted before reaching the target lsn. + In the promotion case, this status indicates a timeline change occurred, + and the application should re-evaluate whether the target LSN is still + relevant. @@ -148,25 +209,33 @@ WAIT FOR LSN 'lsn' [ WITH ( Notes - WAIT FOR command waits till - lsn to be replayed on standby. - That is, after this command execution, the value returned by - pg_last_wal_replay_lsn should be greater or equal - to the lsn value. This is useful to achieve - read-your-writes-consistency, while using async replica for reads and - primary for writes. In that case, the lsn of the last - modification should be stored on the client application side or the - connection pooler side. + WAIT FOR waits until the specified + lsn is reached according to the specified + mode. The REPLAY mode waits + for the LSN to be replayed (applied to the database), which is useful + to achieve read-your-writes consistency while using an async replica + for reads and the primary for writes. The FLUSH mode + waits for the WAL to be flushed to durable storage on the replica, + providing a durability guarantee without waiting for replay. The + WRITE mode waits for the WAL to be written to the + operating system, which is faster than flush but provides weaker + durability guarantees. In all cases, the LSN of the + last modification should be stored on the client application side or + the connection pooler side. - WAIT FOR command should be called on standby. - If a user runs WAIT FOR on primary, it - will error out unless NO_THROW is specified in the WITH clause. - However, if WAIT FOR is - called on primary promoted from standby and lsn - was already replayed, then the WAIT FOR command just - exits immediately. + WAIT FOR should be called on a standby. + If a user runs WAIT FOR on the primary, it + will error out unless NO_THROW is specified + in the WITH clause. However, if WAIT FOR is + called on a primary promoted from standby and lsn + was already reached, then the WAIT FOR command + just exits immediately. If the replica is promoted while waiting, + the command will return not in recovery (or throw + an error if NO_THROW is not specified). Promotion + creates a new timeline, and the LSN being waited for may refer to + WAL from the old timeline. @@ -175,21 +244,21 @@ WAIT FOR LSN 'lsn' [ WITH ( Examples - You can use WAIT FOR command to wait for - the pg_lsn value. For example, an application could update - the movie table and get the lsn after - changes just made. This example uses pg_current_wal_insert_lsn - on primary server to get the lsn given that - synchronous_commit could be set to - off. + You can use WAIT FOR command to wait for + the pg_lsn value. For example, an application could update + the movie table and get the lsn after + changes just made. This example uses pg_current_wal_insert_lsn + on primary server to get the lsn given that + synchronous_commit could be set to + off. postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama'; UPDATE 100 postgres=# SELECT pg_current_wal_insert_lsn(); -pg_current_wal_insert_lsn --------------------- -0/306EE20 + pg_current_wal_insert_lsn +--------------------------- + 0/306EE20 (1 row) @@ -198,9 +267,9 @@ pg_current_wal_insert_lsn changes made on primary should be guaranteed to be visible on replica. -postgres=# WAIT FOR LSN '0/306EE20'; +postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY; status --------- +--------- success (1 row) postgres=# SELECT * FROM movie WHERE genre = 'Drama'; @@ -211,21 +280,46 @@ postgres=# SELECT * FROM movie WHERE genre = 'Drama'; - If the target LSN is not reached before the timeout, the error is thrown. + Wait for flush (data durable on replica): -postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s'); +postgres=# WAIT FOR LSN '0/306EE20' MODE FLUSH; + status +--------- + success +(1 row) + + + + + Wait for write with timeout: + + +postgres=# WAIT FOR LSN '0/306EE20' MODE WRITE WITH (TIMEOUT '100ms', NO_THROW); + status +--------- + success +(1 row) + + + + + If the target LSN is not reached before the timeout, an error is thrown: + + +postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY WITH (TIMEOUT '0.1s'); ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60 The same example uses WAIT FOR with - NO_THROW option. + NO_THROW option: + -postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW); +postgres=# WAIT FOR LSN '0/306EE20' MODE REPLAY WITH (TIMEOUT '100ms', NO_THROW); status --------- +--------- timeout (1 row) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6a5640df51af..597e662c8a27 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6238,10 +6238,12 @@ StartupXLOG(void) LWLockRelease(ControlFileLock); /* - * Wake up all waiters for replay LSN. They need to report an error that - * recovery was ended before reaching the target LSN. + * Wake up all waiters. They need to report an error that recovery was + * ended before reaching the target LSN. */ - WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr); + WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, InvalidXLogRecPtr); + WaitLSNWakeup(WAIT_LSN_TYPE_WRITE_STANDBY, InvalidXLogRecPtr); + WaitLSNWakeup(WAIT_LSN_TYPE_FLUSH_STANDBY, InvalidXLogRecPtr); /* * Shutdown the recovery environment. This must occur after diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index ae2398d6975b..f97aaa67645b 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1846,8 +1846,8 @@ PerformWalRecovery(void) */ if (waitLSNState && (XLogRecoveryCtl->lastReplayedEndRecPtr >= - pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY]))) - WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr); + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY_STANDBY]))) + WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, XLogRecoveryCtl->lastReplayedEndRecPtr); /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c index 84613fc39c7c..0663a459502e 100644 --- a/src/backend/access/transam/xlogwait.c +++ b/src/backend/access/transam/xlogwait.c @@ -12,25 +12,30 @@ * This file implements waiting for WAL operations to reach specific LSNs * on both physical standby and primary servers. The core idea is simple: * every process that wants to wait publishes the LSN it needs to the - * shared memory, and the appropriate process (startup on standby, or - * WAL writer/backend on primary) wakes it once that LSN has been reached. + * shared memory, and the appropriate process (startup on standby, + * walreceiver on standby, or WAL writer/backend on primary) wakes it + * once that LSN has been reached. * * The shared memory used by this module comprises a procInfos * per-backend array with the information of the awaited LSN for each * of the backend processes. The elements of that array are organized - * into a pairing heap waitersHeap, which allows for very fast finding - * of the least awaited LSN. + * into pairing heaps (waitersHeap), one for each WaitLSNType, which + * allows for very fast finding of the least awaited LSN for each type. * - * In addition, the least-awaited LSN is cached as minWaitedLSN. The - * waiter process publishes information about itself to the shared - * memory and waits on the latch until it is woken up by the appropriate - * process, standby is promoted, or the postmaster dies. Then, it cleans - * information about itself in the shared memory. + * In addition, the least-awaited LSN for each type is cached in the + * minWaitedLSN array. The waiter process publishes information about + * itself to the shared memory and waits on the latch until it is woken + * up by the appropriate process, standby is promoted, or the postmaster + * dies. Then, it cleans information about itself in the shared memory. * - * On standby servers: After replaying a WAL record, the startup process - * first performs a fast path check minWaitedLSN > replayLSN. If this - * check is negative, it checks waitersHeap and wakes up the backend - * whose awaited LSNs are reached. + * On standby servers: + * - After replaying a WAL record, the startup process performs a fast + * path check minWaitedLSN[REPLAY] > replayLSN. If this check is + * negative, it checks waitersHeap[REPLAY] and wakes up the backends + * whose awaited LSNs are reached. + * - After receiving WAL, the walreceiver process performs similar checks + * against the flush and write LSNs, waking up waiters in the FLUSH + * and WRITE heaps respectively. * * On primary servers: After flushing WAL, the WAL writer or backend * process performs a similar check against the flush LSN and wakes up @@ -49,6 +54,7 @@ #include "access/xlogwait.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/walreceiver.h" #include "storage/latch.h" #include "storage/proc.h" #include "storage/shmem.h" @@ -62,6 +68,48 @@ static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, struct WaitLSNState *waitLSNState = NULL; +/* + * Wait event for each WaitLSNType, used with WaitLatch() to report + * the wait in pg_stat_activity. + */ +static const uint32 WaitLSNWaitEvents[] = { + [WAIT_LSN_TYPE_REPLAY_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY, + [WAIT_LSN_TYPE_WRITE_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_WRITE, + [WAIT_LSN_TYPE_FLUSH_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH, + [WAIT_LSN_TYPE_FLUSH_PRIMARY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH, +}; + +StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT, + "WaitLSNWaitEvents must match WaitLSNType enum"); + +/* + * Get the current LSN for the specified wait type. + */ +XLogRecPtr +GetCurrentLSNForWaitType(WaitLSNType lsnType) +{ + switch (lsnType) + { + case WAIT_LSN_TYPE_REPLAY_STANDBY: + return GetXLogReplayRecPtr(NULL); + + case WAIT_LSN_TYPE_WRITE_STANDBY: + return GetWalRcvWriteRecPtr(); + + case WAIT_LSN_TYPE_FLUSH_STANDBY: + return GetWalRcvFlushRecPtr(NULL, NULL); + + case WAIT_LSN_TYPE_FLUSH_PRIMARY: + return GetFlushRecPtr(NULL); + + case WAIT_LSN_TYPE_COUNT: + break; + } + + elog(ERROR, "invalid LSN wait type: %d", lsnType); + pg_unreachable(); +} + /* Report the amount of shared memory space needed for WaitLSNState. */ Size WaitLSNShmemSize(void) @@ -341,13 +389,11 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout) int rc; long delay_ms = -1; - if (lsnType == WAIT_LSN_TYPE_REPLAY) - currentLSN = GetXLogReplayRecPtr(NULL); - else - currentLSN = GetFlushRecPtr(NULL); + /* Get current LSN for the wait type */ + currentLSN = GetCurrentLSNForWaitType(lsnType); /* Check that recovery is still in-progress */ - if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress()) + if (lsnType != WAIT_LSN_TYPE_FLUSH_PRIMARY && !RecoveryInProgress()) { /* * Recovery was ended, but check if target LSN was already @@ -376,7 +422,7 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout) CHECK_FOR_INTERRUPTS(); rc = WaitLatch(MyLatch, wake_events, delay_ms, - (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH); + WaitLSNWaitEvents[lsnType]); /* * Emergency bailout if postmaster has died. This is to avoid the diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c index a37bddaefb27..05ad84fdb5bf 100644 --- a/src/backend/commands/wait.c +++ b/src/backend/commands/wait.c @@ -2,7 +2,7 @@ * * wait.c * Implements WAIT FOR, which allows waiting for events such as - * time passing or LSN having been replayed on replica. + * time passing or LSN having been replayed, flushed, or written. * * Portions Copyright (c) 2025, PostgreSQL Global Development Group * @@ -15,6 +15,7 @@ #include +#include "access/xlog.h" #include "access/xlogrecovery.h" #include "access/xlogwait.h" #include "commands/defrem.h" @@ -28,12 +29,28 @@ #include "utils/snapmgr.h" +/* + * Type descriptor for WAIT FOR LSN wait types, used for error messages. + */ +typedef struct WaitLSNTypeDesc +{ + const char *noun; /* "replay", "flush", "write" */ + const char *verb; /* "replayed", "flushed", "written" */ +} WaitLSNTypeDesc; + +static const WaitLSNTypeDesc WaitLSNTypeDescs[] = { + [WAIT_LSN_TYPE_REPLAY_STANDBY] = {"replay", "replayed"}, + [WAIT_LSN_TYPE_WRITE_STANDBY] = {"write", "written"}, + [WAIT_LSN_TYPE_FLUSH_STANDBY] = {"flush", "flushed"}, +}; + void ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) { XLogRecPtr lsn; int64 timeout = 0; WaitLSNResult waitLSNResult; + WaitLSNType lsnType; bool throw = true; TupleDesc tupdesc; TupOutputState *tstate; @@ -45,6 +62,22 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(stmt->lsn_literal))); + /* Convert parse-time WaitLSNMode to runtime WaitLSNType */ + switch (stmt->mode) + { + case WAIT_LSN_MODE_REPLAY: + lsnType = WAIT_LSN_TYPE_REPLAY_STANDBY; + break; + case WAIT_LSN_MODE_WRITE: + lsnType = WAIT_LSN_TYPE_WRITE_STANDBY; + break; + case WAIT_LSN_MODE_FLUSH: + lsnType = WAIT_LSN_TYPE_FLUSH_STANDBY; + break; + default: + elog(ERROR, "unrecognized wait mode: %d", stmt->mode); + } + foreach_node(DefElem, defel, stmt->options) { if (strcmp(defel->defname, "timeout") == 0) @@ -107,8 +140,8 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) } /* - * We are going to wait for the LSN replay. We should first care that we - * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. + * We are going to wait for the LSN. We should first care that we don't + * hold a snapshot and correspondingly our MyProc->xmin is invalid. * Otherwise, our snapshot could prevent the replay of WAL records * implying a kind of self-deadlock. This is the reason why WAIT FOR is a * command, not a procedure or function. @@ -140,7 +173,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) */ Assert(MyProc->xmin == InvalidTransactionId); - waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout); + waitLSNResult = WaitForLSN(lsnType, lsn, timeout); /* * Process the result of WaitForLSN(). Throw appropriate error if needed. @@ -154,11 +187,18 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) case WAIT_LSN_RESULT_TIMEOUT: if (throw) + { + const WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType]; + XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType); + ereport(ERROR, errcode(ERRCODE_QUERY_CANCELED), - errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X", + errmsg("timed out while waiting for target LSN %X/%08X to be %s; current %s LSN %X/%08X", LSN_FORMAT_ARGS(lsn), - LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + desc->verb, + desc->noun, + LSN_FORMAT_ARGS(currentLSN))); + } else result = "timeout"; break; @@ -166,20 +206,26 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) case WAIT_LSN_RESULT_NOT_IN_RECOVERY: if (throw) { + const WaitLSNTypeDesc *desc = &WaitLSNTypeDescs[lsnType]; + XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType); + if (PromoteIsTriggered()) { ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is not in progress"), - errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.", + errdetail("Recovery ended before target LSN %X/%08X was %s; last %s LSN %X/%08X.", LSN_FORMAT_ARGS(lsn), - LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + desc->verb, + desc->noun, + LSN_FORMAT_ARGS(currentLSN))); } else ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("recovery is not in progress"), - errhint("Waiting for the replay LSN can only be executed during recovery.")); + errhint("Waiting for the %s LSN can only be executed during recovery.", + desc->noun)); } else result = "not in recovery"; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 7856ce9d78fc..5ae66fe79f0a 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -640,6 +640,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type window_definition over_clause window_specification opt_frame_clause frame_extent frame_bound %type null_treatment opt_window_exclusion_clause +%type opt_wait_lsn_mode %type opt_existing_window_name %type opt_if_not_exists %type opt_unique_null_treatment @@ -729,7 +730,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); ESCAPE EVENT EXCEPT EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXPRESSION EXTENSION EXTERNAL EXTRACT - FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FOLLOWING FOR + FALSE_P FAMILY FETCH FILTER FINALIZE FIRST_P FLOAT_P FLUSH FOLLOWING FOR FORCE FOREIGN FORMAT FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS @@ -770,7 +771,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); QUOTE QUOTES RANGE READ REAL REASSIGN RECURSIVE REF_P REFERENCES REFERENCING - REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA + REFRESH REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLAY REPLICA RESET RESPECT_P RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP ROUTINE ROUTINES ROW ROWS RULE @@ -16489,15 +16490,23 @@ xml_passing_mech: *****************************************************************************/ WaitStmt: - WAIT FOR LSN_P Sconst opt_wait_with_clause + WAIT FOR LSN_P Sconst opt_wait_lsn_mode opt_wait_with_clause { WaitStmt *n = makeNode(WaitStmt); n->lsn_literal = $4; - n->options = $5; + n->mode = $5; + n->options = $6; $$ = (Node *) n; } ; +opt_wait_lsn_mode: + MODE REPLAY { $$ = WAIT_LSN_MODE_REPLAY; } + | MODE FLUSH { $$ = WAIT_LSN_MODE_FLUSH; } + | MODE WRITE { $$ = WAIT_LSN_MODE_WRITE; } + | /*EMPTY*/ { $$ = WAIT_LSN_MODE_REPLAY; } + ; + opt_wait_with_clause: WITH '(' utility_option_list ')' { $$ = $3; } | /*EMPTY*/ { $$ = NIL; } @@ -17937,6 +17946,7 @@ unreserved_keyword: | FILTER | FINALIZE | FIRST_P + | FLUSH | FOLLOWING | FORCE | FORMAT @@ -18071,6 +18081,7 @@ unreserved_keyword: | RENAME | REPEATABLE | REPLACE + | REPLAY | REPLICA | RESET | RESPECT_P @@ -18524,6 +18535,7 @@ bare_label_keyword: | FINALIZE | FIRST_P | FLOAT_P + | FLUSH | FOLLOWING | FORCE | FOREIGN @@ -18706,6 +18718,7 @@ bare_label_keyword: | RENAME | REPEATABLE | REPLACE + | REPLAY | REPLICA | RESET | RESTART diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ac802ae85b48..60b9ba5b345d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -57,6 +57,7 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xlogrecovery.h" +#include "access/xlogwait.h" #include "catalog/pg_authid.h" #include "funcapi.h" #include "libpq/pqformat.h" @@ -965,6 +966,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) /* Update shared-memory status */ pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); + /* + * If we wrote an LSN that someone was waiting for then walk over the + * shared memory array and set latches to notify the waiters. + */ + if (waitLSNState && + (LogstreamResult.Write >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_WRITE_STANDBY]))) + WaitLSNWakeup(WAIT_LSN_TYPE_WRITE_STANDBY, LogstreamResult.Write); + /* * Close the current segment if it's fully written up in the last cycle of * the loop, to create its archive notification file soon. Otherwise WAL @@ -1004,6 +1014,15 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) } SpinLockRelease(&walrcv->mutex); + /* + * If we flushed an LSN that someone was waiting for then walk over + * the shared memory array and set latches to notify the waiters. + */ + if (waitLSNState && + (LogstreamResult.Flush >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_FLUSH_STANDBY]))) + WaitLSNWakeup(WAIT_LSN_TYPE_FLUSH_STANDBY, LogstreamResult.Flush); + /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); if (AllowCascadeReplication()) diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index f39830dbb344..58548acb8aa8 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -89,8 +89,9 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." -WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary." +WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary or standby." WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby." +WAIT_FOR_WAL_WRITE "Waiting for WAL write to reach a target LSN on a standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 20d7a65c614e..fcb9f19faef7 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -5313,10 +5313,11 @@ match_previous_words(int pattern_id, COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_vacuumables); /* - * WAIT FOR LSN '' [ WITH ( option [, ...] ) ] + * WAIT FOR LSN '' [ MODE { REPLAY | FLUSH | WRITE } ] [ WITH ( option [, ...] ) ] * where option can be: * TIMEOUT '' * NO_THROW + * MODE defaults to REPLAY if not specified. */ else if (Matches("WAIT")) COMPLETE_WITH("FOR"); @@ -5325,25 +5326,41 @@ match_previous_words(int pattern_id, else if (Matches("WAIT", "FOR", "LSN")) /* No completion for LSN value - user must provide manually */ ; + + /* + * After LSN value, offer MODE (optional) or WITH, since MODE defaults to + * REPLAY + */ else if (Matches("WAIT", "FOR", "LSN", MatchAny)) + COMPLETE_WITH("MODE", "WITH"); + else if (Matches("WAIT", "FOR", "LSN", MatchAny, "MODE")) + COMPLETE_WITH("REPLAY", "FLUSH", "WRITE"); + else if (Matches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny)) COMPLETE_WITH("WITH"); + /* WITH directly after LSN (using default REPLAY mode) */ else if (Matches("WAIT", "FOR", "LSN", MatchAny, "WITH")) COMPLETE_WITH("("); + else if (Matches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny, "WITH")) + COMPLETE_WITH("("); + + /* + * Handle parenthesized option list (both with and without explicit MODE). + * This fires when we're in an unfinished parenthesized option list. + * get_previous_words treats a completed parenthesized option list as one + * word, so the above test is correct. timeout takes a string value, + * no_throw takes no value. We don't offer completions for these values. + */ else if (HeadMatches("WAIT", "FOR", "LSN", MatchAny, "WITH", "(*") && !HeadMatches("WAIT", "FOR", "LSN", MatchAny, "WITH", "(*)")) { - /* - * This fires if we're in an unfinished parenthesized option list. - * get_previous_words treats a completed parenthesized option list as - * one word, so the above test is correct. - */ if (ends_with(prev_wd, '(') || ends_with(prev_wd, ',')) COMPLETE_WITH("timeout", "no_throw"); - - /* - * timeout takes a string value, no_throw takes no value. We don't - * offer completions for these values. - */ + } + else if (HeadMatches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny, "WITH", "(*") && + !HeadMatches("WAIT", "FOR", "LSN", MatchAny, "MODE", MatchAny, "WITH", "(*)")) + { + if (ends_with(prev_wd, '(') || ends_with(prev_wd, ',')) + COMPLETE_WITH("timeout", "no_throw"); } /* WITH [RECURSIVE] */ diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h index e607441d6187..9721a7a71955 100644 --- a/src/include/access/xlogwait.h +++ b/src/include/access/xlogwait.h @@ -35,9 +35,15 @@ typedef enum */ typedef enum WaitLSNType { - WAIT_LSN_TYPE_REPLAY = 0, /* Waiting for replay on standby */ - WAIT_LSN_TYPE_FLUSH = 1, /* Waiting for flush on primary */ - WAIT_LSN_TYPE_COUNT = 2 + /* Standby wait types (walreceiver/startup wakes) */ + WAIT_LSN_TYPE_REPLAY_STANDBY = 0, + WAIT_LSN_TYPE_WRITE_STANDBY = 1, + WAIT_LSN_TYPE_FLUSH_STANDBY = 2, + + /* Primary wait types (WAL writer/backends wake) */ + WAIT_LSN_TYPE_FLUSH_PRIMARY = 3, + + WAIT_LSN_TYPE_COUNT = 4 } WaitLSNType; /* @@ -96,6 +102,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState; extern Size WaitLSNShmemSize(void); extern void WaitLSNShmemInit(void); +extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType); extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN); extern void WaitLSNCleanup(void); extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index d14294a4eceb..bbaf3242ccbd 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4385,10 +4385,21 @@ typedef struct DropSubscriptionStmt DropBehavior behavior; /* RESTRICT or CASCADE behavior */ } DropSubscriptionStmt; +/* + * WaitLSNMode - MODE parameter for WAIT FOR command + */ +typedef enum WaitLSNMode +{ + WAIT_LSN_MODE_REPLAY, /* Wait for LSN replay on standby */ + WAIT_LSN_MODE_WRITE, /* Wait for LSN write on standby */ + WAIT_LSN_MODE_FLUSH /* Wait for LSN flush on standby */ +} WaitLSNMode; + typedef struct WaitStmt { NodeTag type; char *lsn_literal; /* LSN string from grammar */ + WaitLSNMode mode; /* Wait mode: REPLAY/FLUSH/WRITE */ List *options; /* List of DefElem nodes */ } WaitStmt; diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 5d4fe27ef962..7ad8b11b725e 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -176,6 +176,7 @@ PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD, AS_LABEL) PG_KEYWORD("finalize", FINALIZE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD, BARE_LABEL) +PG_KEYWORD("flush", FLUSH, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("following", FOLLOWING, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("for", FOR, RESERVED_KEYWORD, AS_LABEL) PG_KEYWORD("force", FORCE, UNRESERVED_KEYWORD, BARE_LABEL) @@ -378,6 +379,7 @@ PG_KEYWORD("release", RELEASE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("rename", RENAME, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("repeatable", REPEATABLE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("replace", REPLACE, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("replay", REPLAY, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("replica", REPLICA, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("reset", RESET, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("respect", RESPECT_P, UNRESERVED_KEYWORD, AS_LABEL) diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index 295988b8b877..eec8233b5158 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -3335,6 +3335,9 @@ sub wait_for_catchup $mode = defined($mode) ? $mode : 'replay'; my %valid_modes = ('sent' => 1, 'write' => 1, 'flush' => 1, 'replay' => 1); + my $isrecovery = + $self->safe_psql('postgres', "SELECT pg_is_in_recovery()"); + chomp($isrecovery); croak "unknown mode $mode for 'wait_for_catchup', valid modes are " . join(', ', keys(%valid_modes)) unless exists($valid_modes{$mode}); @@ -3347,9 +3350,6 @@ sub wait_for_catchup } if (!defined($target_lsn)) { - my $isrecovery = - $self->safe_psql('postgres', "SELECT pg_is_in_recovery()"); - chomp($isrecovery); if ($isrecovery eq 't') { $target_lsn = $self->lsn('replay'); @@ -3367,6 +3367,35 @@ sub wait_for_catchup . $self->name . "\n"; # Before release 12 walreceiver just set the application name to # "walreceiver" + + # Use WAIT FOR LSN when in recovery for supported modes (replay, write, flush) + # This is more efficient than polling pg_stat_replication + if (($mode ne 'sent') && ($isrecovery eq 't')) + { + my $timeout = $PostgreSQL::Test::Utils::timeout_default; + # Map mode names to WAIT FOR LSN MODE values (uppercase) + my $wait_mode = uc($mode); + my $query = + qq[WAIT FOR LSN '${target_lsn}' MODE ${wait_mode} WITH (timeout '${timeout}s', no_throw);]; + my $output = $self->safe_psql('postgres', $query); + chomp($output); + + if ($output ne 'success') + { + # Fetch additional detail for debugging purposes + $query = qq[SELECT * FROM pg_catalog.pg_stat_replication]; + my $details = $self->safe_psql('postgres', $query); + diag qq(WAIT FOR LSN failed with status: +${output}); + diag qq(Last pg_stat_replication contents: +${details}); + croak "failed waiting for catchup"; + } + print "done\n"; + return; + } + + # Polling for 'sent' mode or when not in recovery (WAIT FOR LSN not applicable) my $query = qq[SELECT '$target_lsn' <= ${mode}_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name IN ('$standby_name', 'walreceiver')]; diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl index e0ddb06a2f04..df7b563cfbb8 100644 --- a/src/test/recovery/t/049_wait_for_lsn.pl +++ b/src/test/recovery/t/049_wait_for_lsn.pl @@ -1,4 +1,4 @@ -# Checks waiting for the LSN replay on standby using +# Checks waiting for the LSN replay/write/flush on standby using # the WAIT FOR command. use strict; use warnings FATAL => 'all'; @@ -7,6 +7,38 @@ use PostgreSQL::Test::Utils; use Test::More; +# Helper functions to control walreceiver for testing wait conditions. +# These allow us to stop WAL streaming so waiters block, then resume it. +my $saved_primary_conninfo; + +sub stop_walreceiver +{ + my ($node) = @_; + $saved_primary_conninfo = $node->safe_psql('postgres', + "SELECT setting FROM pg_settings WHERE name = 'primary_conninfo'"); + $node->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET primary_conninfo = ''; + SELECT pg_reload_conf(); + ]); + + $node->poll_query_until('postgres', + "SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);"); +} + +sub resume_walreceiver +{ + my ($node) = @_; + $node->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET primary_conninfo = '$saved_primary_conninfo'; + SELECT pg_reload_conf(); + ]); + + $node->poll_query_until('postgres', + "SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);"); +} + # Initialize primary node my $node_primary = PostgreSQL::Test::Cluster->new('primary'); $node_primary->init(allows_streaming => 1); @@ -62,7 +94,34 @@ ok((split("\n", $output))[-1] eq 30, "standby reached the same LSN as primary"); -# 3. Check that waiting for unreachable LSN triggers the timeout. The +# 3. Check that WAIT FOR works with WRITE and FLUSH modes. +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(31, 40))"); +my $lsn_write = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn_write}' MODE WRITE WITH (timeout '1d'); + SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${lsn_write}'::pg_lsn); +]); + +ok((split("\n", $output))[-1] >= 0, + "standby wrote WAL up to target LSN after WAIT FOR MODE WRITE"); + +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(41, 50))"); +my $lsn_flush = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn_flush}' MODE FLUSH WITH (timeout '1d'); + SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '${lsn_flush}'::pg_lsn); +]); + +ok((split("\n", $output))[-1] >= 0, + "standby flushed WAL up to target LSN after WAIT FOR MODE FLUSH"); + +# 4. Check that waiting for unreachable LSN triggers the timeout. The # unreachable LSN must be well in advance. So WAL records issued by # the concurrent autovacuum could not affect that. my $lsn3 = @@ -88,7 +147,7 @@ WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]); ok($output eq "timeout", "WAIT FOR returns correct status after timeout"); -# 4. Check that WAIT FOR triggers an error if called on primary, +# 5. Check that WAIT FOR triggers an error if called on primary, # within another function, or inside a transaction with an isolation level # higher than READ COMMITTED. @@ -125,7 +184,7 @@ /WAIT FOR must be only called without an active or registered snapshot/, "get an error when running within another function"); -# 5. Check parameter validation error cases on standby before promotion +# 6. Check parameter validation error cases on standby before promotion my $test_lsn = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); @@ -208,7 +267,7 @@ ok( $stderr =~ /option "invalid_option" not recognized/, "get error for invalid WITH clause option"); -# 6. Check the scenario of multiple LSN waiters. We make 5 background +# 7a. Check the scenario of multiple REPLAY waiters. We make 5 background # psql sessions each waiting for a corresponding insertion. When waiting is # finished, stored procedures logs if there are visible as many rows as # should be. @@ -226,7 +285,9 @@ \$\$ LANGUAGE plpgsql; ]); + $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();"); + my @psql_sessions; for (my $i = 0; $i < 5; $i++) { @@ -239,10 +300,11 @@ $psql_sessions[$i]->query_until( qr/start/, qq[ \\echo start - WAIT FOR LSN '${lsn}'; + WAIT FOR LSN '${lsn}' MODE REPLAY; SELECT log_count(${i}); ]); } + my $log_offset = -s $node_standby->logfile; $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();"); for (my $i = 0; $i < 5; $i++) @@ -251,23 +313,199 @@ $psql_sessions[$i]->quit; } -ok(1, 'multiple LSN waiters reported consistent data'); +ok(1, 'multiple REPLAY waiters reported consistent data'); + +# 7b. Check the scenario of multiple WRITE waiters. +# Stop walreceiver to ensure waiters actually block. +stop_walreceiver($node_standby); + +# Generate WAL on primary (standby won't receive it yet) +my @write_lsns; +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (100 + ${i});"); + $write_lsns[$i] = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn()"); +} + +# Start WRITE waiters (they will block since walreceiver is stopped) +my @write_sessions; +for (my $i = 0; $i < 5; $i++) +{ + $write_sessions[$i] = $node_standby->background_psql('postgres'); + $write_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '$write_lsns[$i]' MODE WRITE WITH (timeout '1d'); + DO \$\$ BEGIN RAISE LOG 'write_done %', $i; END \$\$; + ]); +} + +# Verify waiters are blocked +$node_standby->poll_query_until('postgres', + "SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalWrite'" +); + +# Restore walreceiver to unblock waiters +my $write_log_offset = -s $node_standby->logfile; +resume_walreceiver($node_standby); + +# Wait for all waiters to complete and close sessions +for (my $i = 0; $i < 5; $i++) +{ + $node_standby->wait_for_log("write_done $i", $write_log_offset); + $write_sessions[$i]->quit; +} + +# Verify on standby that WAL was written up to the target LSN +$output = $node_standby->safe_psql('postgres', + "SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '$write_lsns[4]'::pg_lsn);"); + +ok($output >= 0, + "multiple WRITE waiters: standby wrote WAL up to target LSN"); + +# 7c. Check the scenario of multiple FLUSH waiters. +# Stop walreceiver to ensure waiters actually block. +stop_walreceiver($node_standby); + +# Generate WAL on primary (standby won't receive it yet) +my @flush_lsns; +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (200 + ${i});"); + $flush_lsns[$i] = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn()"); +} + +# Start FLUSH waiters (they will block since walreceiver is stopped) +my @flush_sessions; +for (my $i = 0; $i < 5; $i++) +{ + $flush_sessions[$i] = $node_standby->background_psql('postgres'); + $flush_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '$flush_lsns[$i]' MODE FLUSH WITH (timeout '1d'); + DO \$\$ BEGIN RAISE LOG 'flush_done %', $i; END \$\$; + ]); +} + +# Verify waiters are blocked +$node_standby->poll_query_until('postgres', + "SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalFlush'" +); + +# Restore walreceiver to unblock waiters +my $flush_log_offset = -s $node_standby->logfile; +resume_walreceiver($node_standby); + +# Wait for all waiters to complete and close sessions +for (my $i = 0; $i < 5; $i++) +{ + $node_standby->wait_for_log("flush_done $i", $flush_log_offset); + $flush_sessions[$i]->quit; +} + +# Verify on standby that WAL was flushed up to the target LSN +$output = $node_standby->safe_psql('postgres', + "SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '$flush_lsns[4]'::pg_lsn);" +); + +ok($output >= 0, + "multiple FLUSH waiters: standby flushed WAL up to target LSN"); + +# 7d. Check the scenario of mixed mode waiters (REPLAY, WRITE, FLUSH) +# running concurrently. We start 6 sessions: 2 for each mode, all waiting +# for the same target LSN. We stop the walreceiver and pause replay to +# ensure all waiters block. Then we resume replay and restart the +# walreceiver to verify they unblock and complete correctly. + +# Stop walreceiver first to ensure we can control the flow without hanging +# (stopping it after pausing replay can hang if the startup process is paused). +stop_walreceiver($node_standby); + +# Pause replay +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();"); + +# Generate WAL on primary +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(301, 310));"); +my $mixed_target_lsn = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); + +# Start 6 waiters: 2 for each mode +my @mixed_sessions; +my @mixed_modes = ('REPLAY', 'WRITE', 'FLUSH'); +for (my $i = 0; $i < 6; $i++) +{ + $mixed_sessions[$i] = $node_standby->background_psql('postgres'); + $mixed_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${mixed_target_lsn}' MODE $mixed_modes[$i % 3] WITH (timeout '1d'); + DO \$\$ BEGIN RAISE LOG 'mixed_done %', $i; END \$\$; + ]); +} + +# Verify all waiters are blocked +$node_standby->poll_query_until('postgres', + "SELECT count(*) = 6 FROM pg_stat_activity WHERE wait_event LIKE 'WaitForWal%'" +); -# 7. Check that the standby promotion terminates the wait on LSN. Start -# waiting for an unreachable LSN then promote. Check the log for the relevant -# error message. Also, check that waiting for already replayed LSN doesn't -# cause an error even after promotion. +# Resume replay (waiters should still be blocked as no WAL has arrived) +my $mixed_log_offset = -s $node_standby->logfile; +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();"); +$node_standby->poll_query_until('postgres', + "SELECT NOT pg_is_wal_replay_paused();"); + +# Restore walreceiver to allow WAL to arrive +resume_walreceiver($node_standby); + +# Wait for all sessions to complete and close them +for (my $i = 0; $i < 6; $i++) +{ + $node_standby->wait_for_log("mixed_done $i", $mixed_log_offset); + $mixed_sessions[$i]->quit; +} + +# Verify all modes reached the target LSN +$output = $node_standby->safe_psql( + 'postgres', qq[ + SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${mixed_target_lsn}'::pg_lsn) >= 0 AND + pg_lsn_cmp(pg_last_wal_receive_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0 AND + pg_lsn_cmp(pg_last_wal_replay_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0; +]); + +ok($output eq 't', + "mixed mode waiters: all modes completed and reached target LSN"); + +# 8. Check that the standby promotion terminates all wait modes. Start +# waiting for unreachable LSNs with REPLAY, WRITE, and FLUSH modes, then +# promote. Check the log for the relevant error messages. Also, check that +# waiting for already replayed LSN doesn't cause an error even after promotion. my $lsn4 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn() + 10000000000"); + my $lsn5 = $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); -my $psql_session = $node_standby->background_psql('postgres'); -$psql_session->query_until( - qr/start/, qq[ - \\echo start - WAIT FOR LSN '${lsn4}'; -]); + +# Start background sessions waiting for unreachable LSN with all modes +my @wait_modes = ('REPLAY', 'WRITE', 'FLUSH'); +my @wait_sessions; +for (my $i = 0; $i < 3; $i++) +{ + $wait_sessions[$i] = $node_standby->background_psql('postgres'); + $wait_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${lsn4}' MODE $wait_modes[$i]; + ]); +} # Make sure standby will be promoted at least at the primary insert LSN we # have just observed. Use pg_switch_wal() to force the insert LSN to be @@ -277,17 +515,24 @@ $log_offset = -s $node_standby->logfile; $node_standby->promote; -$node_standby->wait_for_log('recovery is not in progress', $log_offset); -ok(1, 'got error after standby promote'); +# Wait for all three sessions to get the error (each mode has distinct message) +$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was written/, + $log_offset); +$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was flushed/, + $log_offset); +$node_standby->wait_for_log( + qr/Recovery ended before target LSN.*was replayed/, $log_offset); -$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';"); +ok(1, 'promotion interrupted all wait modes'); + +$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}' MODE REPLAY;"); ok(1, 'wait for already replayed LSN exits immediately even after promotion'); $output = $node_standby->safe_psql( 'postgres', qq[ - WAIT FOR LSN '${lsn4}' WITH (timeout '10ms', no_throw);]); + WAIT FOR LSN '${lsn4}' MODE REPLAY WITH (timeout '10ms', no_throw);]); ok($output eq "not in recovery", "WAIT FOR returns correct status after standby promotion"); @@ -295,8 +540,11 @@ $node_standby->stop; $node_primary->stop; -# If we send \q with $psql_session->quit the command can be sent to the session +# If we send \q with $session->quit the command can be sent to the session # already closed. So \q is in initial script, here we only finish IPC::Run. -$psql_session->{run}->finish; +for (my $i = 0; $i < 3; $i++) +{ + $wait_sessions[$i]->{run}->finish; +} done_testing();