Skip to content

Commit 19c33a9

Browse files
alterego655Commitfest Bot
authored andcommitted
Extend xlogwait infrastructure with write and flush wait types
Add support for waiting on WAL write and flush LSNs in addition to the existing replay LSN wait type. This provides the foundation for extending the WAIT FOR command with MODE parameter. Key changes: - Add WAIT_LSN_TYPE_WRITE and WAIT_LSN_TYPE_FLUSH_STANDBY to WaitLSNType - Add GetCurrentLSNForWaitType() to retrieve current LSN - Add new wait events WAIT_EVENT_WAIT_FOR_WAL_WRITE and WAIT_EVENT_WAIT_FOR_WAL_FLUSH for pg_stat_activity visibility - Update WaitForLSN() to use GetCurrentLSNForWaitType() internally
1 parent 4f7dacc commit 19c33a9

File tree

6 files changed

+81
-27
lines changed

6 files changed

+81
-27
lines changed

src/backend/access/transam/xlog.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6241,7 +6241,7 @@ StartupXLOG(void)
62416241
* Wake up all waiters for replay LSN. They need to report an error that
62426242
* recovery was ended before reaching the target LSN.
62436243
*/
6244-
WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr);
6244+
WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, InvalidXLogRecPtr);
62456245

62466246
/*
62476247
* Shutdown the recovery environment. This must occur after

src/backend/access/transam/xlogrecovery.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1846,8 +1846,8 @@ PerformWalRecovery(void)
18461846
*/
18471847
if (waitLSNState &&
18481848
(XLogRecoveryCtl->lastReplayedEndRecPtr >=
1849-
pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY])))
1850-
WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
1849+
pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY_STANDBY])))
1850+
WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY_STANDBY, XLogRecoveryCtl->lastReplayedEndRecPtr);
18511851

18521852
/* Else, try to fetch the next WAL record */
18531853
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);

src/backend/access/transam/xlogwait.c

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,30 @@
1212
* This file implements waiting for WAL operations to reach specific LSNs
1313
* on both physical standby and primary servers. The core idea is simple:
1414
* every process that wants to wait publishes the LSN it needs to the
15-
* shared memory, and the appropriate process (startup on standby, or
16-
* WAL writer/backend on primary) wakes it once that LSN has been reached.
15+
* shared memory, and the appropriate process (startup on standby,
16+
* walreceiver on standby, or WAL writer/backend on primary) wakes it
17+
* once that LSN has been reached.
1718
*
1819
* The shared memory used by this module comprises a procInfos
1920
* per-backend array with the information of the awaited LSN for each
2021
* of the backend processes. The elements of that array are organized
21-
* into a pairing heap waitersHeap, which allows for very fast finding
22-
* of the least awaited LSN.
22+
* into pairing heaps (waitersHeap), one for each WaitLSNType, which
23+
* allows for very fast finding of the least awaited LSN for each type.
2324
*
24-
* In addition, the least-awaited LSN is cached as minWaitedLSN. The
25-
* waiter process publishes information about itself to the shared
26-
* memory and waits on the latch until it is woken up by the appropriate
27-
* process, standby is promoted, or the postmaster dies. Then, it cleans
28-
* information about itself in the shared memory.
25+
* In addition, the least-awaited LSN for each type is cached in the
26+
* minWaitedLSN array. The waiter process publishes information about
27+
* itself to the shared memory and waits on the latch until it is woken
28+
* up by the appropriate process, standby is promoted, or the postmaster
29+
* dies. Then, it cleans information about itself in the shared memory.
2930
*
30-
* On standby servers: After replaying a WAL record, the startup process
31-
* first performs a fast path check minWaitedLSN > replayLSN. If this
32-
* check is negative, it checks waitersHeap and wakes up the backend
33-
* whose awaited LSNs are reached.
31+
* On standby servers:
32+
* - After replaying a WAL record, the startup process performs a fast
33+
* path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34+
* negative, it checks waitersHeap[REPLAY] and wakes up the backends
35+
* whose awaited LSNs are reached.
36+
* - After receiving WAL, the walreceiver process performs similar checks
37+
* against the flush and write LSNs, waking up waiters in the FLUSH
38+
* and WRITE heaps respectively.
3439
*
3540
* On primary servers: After flushing WAL, the WAL writer or backend
3641
* process performs a similar check against the flush LSN and wakes up
@@ -49,6 +54,7 @@
4954
#include "access/xlogwait.h"
5055
#include "miscadmin.h"
5156
#include "pgstat.h"
57+
#include "replication/walreceiver.h"
5258
#include "storage/latch.h"
5359
#include "storage/proc.h"
5460
#include "storage/shmem.h"
@@ -62,6 +68,48 @@ static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
6268

6369
struct WaitLSNState *waitLSNState = NULL;
6470

71+
/*
72+
* Wait event for each WaitLSNType, used with WaitLatch() to report
73+
* the wait in pg_stat_activity.
74+
*/
75+
static const uint32 WaitLSNWaitEvents[] = {
76+
[WAIT_LSN_TYPE_REPLAY_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
77+
[WAIT_LSN_TYPE_WRITE_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
78+
[WAIT_LSN_TYPE_FLUSH_STANDBY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
79+
[WAIT_LSN_TYPE_FLUSH_PRIMARY] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
80+
};
81+
82+
StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
83+
"WaitLSNWaitEvents must match WaitLSNType enum");
84+
85+
/*
86+
* Get the current LSN for the specified wait type.
87+
*/
88+
XLogRecPtr
89+
GetCurrentLSNForWaitType(WaitLSNType lsnType)
90+
{
91+
switch (lsnType)
92+
{
93+
case WAIT_LSN_TYPE_REPLAY_STANDBY:
94+
return GetXLogReplayRecPtr(NULL);
95+
96+
case WAIT_LSN_TYPE_WRITE_STANDBY:
97+
return GetWalRcvWriteRecPtr();
98+
99+
case WAIT_LSN_TYPE_FLUSH_STANDBY:
100+
return GetWalRcvFlushRecPtr(NULL, NULL);
101+
102+
case WAIT_LSN_TYPE_FLUSH_PRIMARY:
103+
return GetFlushRecPtr(NULL);
104+
105+
case WAIT_LSN_TYPE_COUNT:
106+
break;
107+
}
108+
109+
elog(ERROR, "invalid LSN wait type: %d", lsnType);
110+
pg_unreachable();
111+
}
112+
65113
/* Report the amount of shared memory space needed for WaitLSNState. */
66114
Size
67115
WaitLSNShmemSize(void)
@@ -341,13 +389,11 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
341389
int rc;
342390
long delay_ms = -1;
343391

344-
if (lsnType == WAIT_LSN_TYPE_REPLAY)
345-
currentLSN = GetXLogReplayRecPtr(NULL);
346-
else
347-
currentLSN = GetFlushRecPtr(NULL);
392+
/* Get current LSN for the wait type */
393+
currentLSN = GetCurrentLSNForWaitType(lsnType);
348394

349395
/* Check that recovery is still in-progress */
350-
if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
396+
if (lsnType != WAIT_LSN_TYPE_FLUSH_PRIMARY && !RecoveryInProgress())
351397
{
352398
/*
353399
* Recovery was ended, but check if target LSN was already
@@ -376,7 +422,7 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
376422
CHECK_FOR_INTERRUPTS();
377423

378424
rc = WaitLatch(MyLatch, wake_events, delay_ms,
379-
(lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
425+
WaitLSNWaitEvents[lsnType]);
380426

381427
/*
382428
* Emergency bailout if postmaster has died. This is to avoid the

src/backend/commands/wait.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
140140
*/
141141
Assert(MyProc->xmin == InvalidTransactionId);
142142

143-
waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
143+
waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY_STANDBY, lsn, timeout);
144144

145145
/*
146146
* Process the result of WaitForLSN(). Throw appropriate error if needed.

src/backend/utils/activity/wait_event_names.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,9 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem
8989
LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server."
9090
SSL_OPEN_SERVER "Waiting for SSL while attempting connection."
9191
WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby."
92-
WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary."
92+
WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary or standby."
9393
WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby."
94+
WAIT_FOR_WAL_WRITE "Waiting for WAL write to reach a target LSN on a standby."
9495
WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process."
9596
WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
9697

src/include/access/xlogwait.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,15 @@ typedef enum
3535
*/
3636
typedef enum WaitLSNType
3737
{
38-
WAIT_LSN_TYPE_REPLAY = 0, /* Waiting for replay on standby */
39-
WAIT_LSN_TYPE_FLUSH = 1, /* Waiting for flush on primary */
40-
WAIT_LSN_TYPE_COUNT = 2
38+
/* Standby wait types (walreceiver/startup wakes) */
39+
WAIT_LSN_TYPE_REPLAY_STANDBY = 0,
40+
WAIT_LSN_TYPE_WRITE_STANDBY = 1,
41+
WAIT_LSN_TYPE_FLUSH_STANDBY = 2,
42+
43+
/* Primary wait types (WAL writer/backends wake) */
44+
WAIT_LSN_TYPE_FLUSH_PRIMARY = 3,
45+
46+
WAIT_LSN_TYPE_COUNT = 4
4147
} WaitLSNType;
4248

4349
/*
@@ -96,6 +102,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState;
96102

97103
extern Size WaitLSNShmemSize(void);
98104
extern void WaitLSNShmemInit(void);
105+
extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType);
99106
extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN);
100107
extern void WaitLSNCleanup(void);
101108
extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN,

0 commit comments

Comments
 (0)