diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index e78912cd141a..c89cc5c7e642 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -68,10 +68,6 @@ #include "utils/timestamp.h" -/* Note: these two macros only work on shared buffers, not local ones! */ -#define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ)) -#define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr))) - /* Note: this macro only works on local buffers, not shared ones! */ #define LocalBufHdrGetBlock(bufHdr) \ LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)] @@ -517,9 +513,9 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy, static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf); static void UnpinBufferNoOwner(BufferDesc *buf); +static uint32 CheckpointerMaxBatchSize(void); static void BufferSync(int flags); -static int SyncOneBuffer(int buf_id, bool skip_recently_used, - WritebackContext *wb_context); +static int SyncOneBuffer(int buf_id, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); static void AbortBufferIO(Buffer buffer); static void shared_buffer_write_error_callback(void *arg); @@ -535,8 +531,29 @@ static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_c static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); + static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); +static BufferDesc *NextStrategyBufToFlush(BufferAccessStrategy strategy, + Buffer sweep_end, + XLogRecPtr *lsn, int *sweep_cursor); +static void FindFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end, + BufferDesc *batch_start, + uint32 max_batch_size, + BufWriteBatch *batch, + int *sweep_cursor); +static bool BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn); +static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require, + RelFileLocator *rlocator, bool skip_pinned, + XLogRecPtr *max_lsn); +static bool PrepareFlushBuffer(BufferDesc *bufdesc, + XLogRecPtr *lsn); +static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, + IOObject io_object, IOContext io_context, + XLogRecPtr buffer_lsn); +static void CleanVictimBuffer(BufferAccessStrategy strategy, + BufferDesc *bufdesc, + bool from_ring, IOContext io_context); static void FindAndDropRelationBuffers(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber nForkBlock, @@ -2331,125 +2348,112 @@ GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context) ReservePrivateRefCountEntry(); ResourceOwnerEnlarge(CurrentResourceOwner); - /* we return here if a prospective victim buffer gets used concurrently */ -again: - - /* - * Select a victim buffer. The buffer is returned pinned and owned by - * this backend. - */ - buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring); - buf = BufferDescriptorGetBuffer(buf_hdr); - - /* - * We shouldn't have any other pins for this buffer. - */ - CheckBufferIsPinnedOnce(buf); - - /* - * If the buffer was dirty, try to write it out. There is a race - * condition here, in that someone might dirty it after we released the - * buffer header lock above, or even while we are writing it out (since - * our share-lock won't prevent hint-bit updates). We will recheck the - * dirty bit after re-locking the buffer header. - */ - if (buf_state & BM_DIRTY) + /* Select a victim buffer using an optimistic locking scheme. */ + for (;;) { - LWLock *content_lock; - Assert(buf_state & BM_TAG_VALID); - Assert(buf_state & BM_VALID); + /* Attempt to claim a victim buffer. Buffer is returned pinned. */ + buf_hdr = StrategyGetBuffer(strategy, &buf_state, &from_ring); + buf = BufferDescriptorGetBuffer(buf_hdr); /* - * We need a share-lock on the buffer contents to write it out (else - * we might write invalid data, eg because someone else is compacting - * the page contents while we write). We must use a conditional lock - * acquisition here to avoid deadlock. Even though the buffer was not - * pinned (and therefore surely not locked) when StrategyGetBuffer - * returned it, someone else could have pinned and exclusive-locked it - * by the time we get here. If we try to get the lock unconditionally, - * we'd block waiting for them; if they later block waiting for us, - * deadlock ensues. (This has been observed to happen when two - * backends are both trying to split btree index pages, and the second - * one just happens to be trying to split the page the first one got - * from StrategyGetBuffer.) + * We shouldn't have any other pins for this buffer. */ - content_lock = BufferDescriptorGetContentLock(buf_hdr); - if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) - { - /* - * Someone else has locked the buffer, so give it up and loop back - * to get another one. - */ - UnpinBuffer(buf_hdr); - goto again; - } + CheckBufferIsPinnedOnce(buf); /* - * If using a nondefault strategy, and writing the buffer would - * require a WAL flush, let the strategy decide whether to go ahead - * and write/reuse the buffer or to choose another victim. We need a - * lock to inspect the page LSN, so this can't be done inside - * StrategyGetBuffer. + * If the buffer was dirty, try to write it out. There is a race + * condition here, in that someone might dirty it after we released + * the buffer header lock above, or even while we are writing it out + * (since our share-lock won't prevent hint-bit updates). We will + * recheck the dirty bit after re-locking the buffer header. */ - if (strategy != NULL) + if (buf_state & BM_DIRTY) { - XLogRecPtr lsn; + LWLock *content_lock; - /* Read the LSN while holding buffer header lock */ - buf_state = LockBufHdr(buf_hdr); - lsn = BufferGetLSN(buf_hdr); - UnlockBufHdr(buf_hdr); + Assert(buf_state & BM_TAG_VALID); + Assert(buf_state & BM_VALID); - if (XLogNeedsFlush(lsn) - && StrategyRejectBuffer(strategy, buf_hdr, from_ring)) + /* + * We need a share-lock on the buffer contents to write it out + * (else we might write invalid data, eg because someone else is + * compacting the page contents while we write). We must use a + * conditional lock acquisition here to avoid deadlock. Even + * though the buffer was not pinned (and therefore surely not + * locked) when StrategyGetBuffer returned it, someone else could + * have pinned and exclusive-locked it by the time we get here. If + * we try to get the lock unconditionally, we'd block waiting for + * them; if they later block waiting for us, deadlock ensues. + * (This has been observed to happen when two backends are both + * trying to split btree index pages, and the second one just + * happens to be trying to split the page the first one got from + * StrategyGetBuffer.) + */ + content_lock = BufferDescriptorGetContentLock(buf_hdr); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + /* + * Someone else has locked the buffer, so give it up and loop + * back to get another one. + */ + UnpinBuffer(buf_hdr); + continue; + } + + /* + * If using a nondefault strategy, and writing the buffer would + * require a WAL flush, let the strategy decide whether to go + * ahead and write/reuse the buffer or to choose another victim. + * We need the content lock to inspect the page LSN, so this can't + * be done inside StrategyGetBuffer. + */ + if (StrategyRejectBuffer(strategy, buf_hdr, from_ring)) { LWLockRelease(content_lock); UnpinBuffer(buf_hdr); - goto again; + continue; } - } - /* OK, do the I/O */ - FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context); - LWLockRelease(content_lock); + /* Content lock is released inside CleanVictimBuffer */ + CleanVictimBuffer(strategy, buf_hdr, from_ring, io_context); + } - ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, - &buf_hdr->tag); - } + if (buf_state & BM_VALID) + { + /* + * When a BufferAccessStrategy is in use, blocks evicted from + * shared buffers are counted as IOOP_EVICT in the corresponding + * context (e.g. IOCONTEXT_BULKWRITE). Shared buffers are evicted + * by a strategy in two cases: 1) while initially claiming buffers + * for the strategy ring 2) to replace an existing strategy ring + * buffer because it is pinned or in use and cannot be reused. + * + * Blocks evicted from buffers already in the strategy ring are + * counted as IOOP_REUSE in the corresponding strategy context. + * + * At this point, we can accurately count evictions and reuses, + * because we have successfully claimed the valid buffer. + * Previously, we may have been forced to release the buffer due + * to concurrent pinners or erroring out. + */ + pgstat_count_io_op(IOOBJECT_RELATION, io_context, + from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0); + } - if (buf_state & BM_VALID) - { /* - * When a BufferAccessStrategy is in use, blocks evicted from shared - * buffers are counted as IOOP_EVICT in the corresponding context - * (e.g. IOCONTEXT_BULKWRITE). Shared buffers are evicted by a - * strategy in two cases: 1) while initially claiming buffers for the - * strategy ring 2) to replace an existing strategy ring buffer - * because it is pinned or in use and cannot be reused. - * - * Blocks evicted from buffers already in the strategy ring are - * counted as IOOP_REUSE in the corresponding strategy context. - * - * At this point, we can accurately count evictions and reuses, - * because we have successfully claimed the valid buffer. Previously, - * we may have been forced to release the buffer due to concurrent - * pinners or erroring out. + * If the buffer has an entry in the buffer mapping table, delete it. + * This can fail because another backend could have pinned or dirtied + * the buffer. */ - pgstat_count_io_op(IOOBJECT_RELATION, io_context, - from_ring ? IOOP_REUSE : IOOP_EVICT, 1, 0); - } + if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr)) + { + UnpinBuffer(buf_hdr); + continue; + } - /* - * If the buffer has an entry in the buffer mapping table, delete it. This - * can fail because another backend could have pinned or dirtied the - * buffer. - */ - if ((buf_state & BM_TAG_VALID) && !InvalidateVictimBuffer(buf_hdr)) - { - UnpinBuffer(buf_hdr); - goto again; + break; } /* a final set of sanity checks */ @@ -3342,7 +3346,6 @@ TrackNewBufferPin(Buffer buf) static void BufferSync(int flags) { - uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -3354,6 +3357,8 @@ BufferSync(int flags) int i; uint32 mask = BM_DIRTY; WritebackContext wb_context; + uint32 max_batch_size; + BufWriteBatch batch; /* * Unless this is a shutdown checkpoint or we have been explicitly told, @@ -3385,6 +3390,7 @@ BufferSync(int flags) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); uint32 set_bits = 0; + uint32 buf_state; /* * Header spinlock is enough to examine BM_DIRTY, see comment in @@ -3401,6 +3407,7 @@ BufferSync(int flags) item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; item->tsId = bufHdr->tag.spcOid; + item->dbId = bufHdr->tag.dbOid; item->relNumber = BufTagGetRelNumber(&bufHdr->tag); item->forkNum = BufTagGetForkNum(&bufHdr->tag); item->blockNum = bufHdr->tag.blockNum; @@ -3526,48 +3533,199 @@ BufferSync(int flags) */ num_processed = 0; num_written = 0; + max_batch_size = CheckpointerMaxBatchSize(); while (!binaryheap_empty(ts_heap)) { + BlockNumber limit = max_batch_size; BufferDesc *bufHdr = NULL; CkptTsStatus *ts_stat = (CkptTsStatus *) DatumGetPointer(binaryheap_first(ts_heap)); + int ts_end = ts_stat->index - ts_stat->num_scanned + ts_stat->num_to_scan; + int processed = 0; - buf_id = CkptBufferIds[ts_stat->index].buf_id; - Assert(buf_id != -1); + batch.start = InvalidBlockNumber; + batch.max_lsn = InvalidXLogRecPtr; + batch.n = 0; - bufHdr = GetBufferDescriptor(buf_id); + while (batch.n < limit) + { + uint32 buf_state; + XLogRecPtr lsn = InvalidXLogRecPtr; + LWLock *content_lock; + CkptSortItem item; - num_processed++; + if (ProcSignalBarrierPending) + ProcessProcSignalBarrier(); - /* - * We don't need to acquire the lock here, because we're only looking - * at a single bit. It's possible that someone else writes the buffer - * and clears the flag right after we check, but that doesn't matter - * since SyncOneBuffer will then do nothing. However, there is a - * further race condition: it's conceivable that between the time we - * examine the bit here and the time SyncOneBuffer acquires the lock, - * someone else not only wrote the buffer but replaced it with another - * page and dirtied it. In that improbable case, SyncOneBuffer will - * write the buffer though we didn't need to. It doesn't seem worth - * guarding against this, though. - */ - if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) - { - if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) + /* Check if we are done with this tablespace */ + if (ts_stat->index + processed >= ts_end) + break; + + item = CkptBufferIds[ts_stat->index + processed]; + + buf_id = item.buf_id; + Assert(buf_id != -1); + + bufHdr = GetBufferDescriptor(buf_id); + + /* + * If this is the first block of the batch, then check if we need + * to open a new relation. Open the relation now because we have + * to determine the maximum IO size based on how many blocks + * remain in the file. + */ + if (!BlockNumberIsValid(batch.start)) + { + Assert(batch.max_lsn == InvalidXLogRecPtr && batch.n == 0); + batch.rlocator.spcOid = item.tsId; + batch.rlocator.dbOid = item.dbId; + batch.rlocator.relNumber = item.relNumber; + batch.forkno = item.forkNum; + batch.start = item.blockNum; + batch.reln = smgropen(batch.rlocator, INVALID_PROC_NUMBER); + limit = smgrmaxcombine(batch.reln, batch.forkno, batch.start); + limit = Min(max_batch_size, limit); + limit = Min(GetAdditionalPinLimit(), limit); + /* Guarantee progress */ + limit = Max(limit, 1); + } + + /* + * Once we hit blocks from the next relation or fork of the + * relation, break out of the loop and issue the IO we've built up + * so far. It is important that we don't increment processed + * because we want to start the next IO with this item. + */ + if (item.dbId != batch.rlocator.dbOid) + break; + + if (item.relNumber != batch.rlocator.relNumber) + break; + + if (item.forkNum != batch.forkno) + break; + + Assert(item.tsId == batch.rlocator.spcOid); + + /* + * If the next block is not contiguous, we can't include it in the + * IO we will issue. Break out of the loop and issue what we have + * so far. Do not count this item as processed -- otherwise we + * will end up skipping it. + */ + if (item.blockNum != batch.start + batch.n) + break; + + /* + * We don't need to acquire the lock here, because we're only + * looking at a few bits. It's possible that someone else writes + * the buffer and clears the flag right after we check, but that + * doesn't matter since StartBufferIO will then return false. + * + * If the buffer doesn't need checkpointing, don't include it in + * the batch we are building. And if the buffer doesn't need + * flushing, we're done with the item, so count it as processed + * and break out of the loop to issue the IO so far. + */ + buf_state = pg_atomic_read_u32(&bufHdr->state); + if ((buf_state & (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY)) != + (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY)) + { + processed++; + break; + } + + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + PinBuffer(bufHdr, NULL, false); + + /* + * There is a race condition here: it's conceivable that between + * the time we examine the buffer header for BM_CHECKPOINT_NEEDED + * above and when we are now acquiring the lock that, someone else + * not only wrote the buffer but replaced it with another page and + * dirtied it. In that improbable case, we will write the buffer + * though we didn't need to. It doesn't seem worth guarding + * against this, though. + */ + content_lock = BufferDescriptorGetContentLock(bufHdr); + + /* + * We are willing to wait for the content lock on the first IO in + * the batch. However, for subsequent IOs, waiting could lead to + * deadlock. We have to eventually flush all eligible buffers, + * though. So, if we fail to acquire the lock on a subsequent + * buffer, we break out and issue the IO we've built up so far. + * Then we come back and start a new IO with that buffer as the + * starting buffer. As such, we must not count the item as + * processed if we end up failing to acquire the content lock. + */ + if (batch.n == 0) + LWLockAcquire(content_lock, LW_SHARED); + else if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + { + UnpinBuffer(bufHdr); + break; + } + + /* + * If the buffer doesn't need IO, count the item as processed, + * release the buffer, and break out of the loop to issue the IO + * we have built up so far. + */ + if (!StartBufferIO(bufHdr, false, true)) { - TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); - PendingCheckpointerStats.buffers_written++; - num_written++; + processed++; + LWLockRelease(content_lock); + UnpinBuffer(bufHdr); + break; } + + /* + * Lock buffer header lock before examining LSN because we only + * have a shared lock on the buffer. + */ + buf_state = LockBufHdr(bufHdr); + lsn = BufferGetLSN(bufHdr); + UnlockBufHdrExt(bufHdr, buf_state, 0, BM_JUST_DIRTIED, 0); + + /* + * Keep track of the max LSN so that we can be sure to flush + * enough WAL before flushing data from the buffers. See comment + * in DoFlushBuffer() for more on why we don't consider the LSNs + * of unlogged relations. + */ + if (buf_state & BM_PERMANENT && lsn > batch.max_lsn) + batch.max_lsn = lsn; + + batch.bufdescs[batch.n++] = bufHdr; + processed++; } /* * Measure progress independent of actually having to flush the buffer - * - otherwise writing become unbalanced. + * - otherwise writing becomes unbalanced. */ - ts_stat->progress += ts_stat->progress_slice; - ts_stat->num_scanned++; - ts_stat->index++; + num_processed += processed; + ts_stat->progress += ts_stat->progress_slice * processed; + ts_stat->num_scanned += processed; + ts_stat->index += processed; + + /* + * If we built up an IO, issue it. There's a chance we didn't find any + * items referencing buffers that needed flushing this time, but we + * still want to check if we should update the heap if we examined and + * processed the items. + */ + if (batch.n > 0) + { + FlushBufferBatch(&batch, IOCONTEXT_NORMAL); + CompleteWriteBatchIO(&batch, IOCONTEXT_NORMAL, &wb_context); + + TRACE_POSTGRESQL_BUFFER_BATCH_SYNC_WRITTEN(batch.n); + PendingCheckpointerStats.buffers_written += batch.n; + num_written += batch.n; + } /* Have all the buffers from the tablespace been processed? */ if (ts_stat->num_scanned == ts_stat->num_to_scan) @@ -3841,8 +3999,7 @@ BgBufferSync(WritebackContext *wb_context) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int sync_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state = SyncOneBuffer(next_to_clean, wb_context); if (++next_to_clean >= NBuffers) { @@ -3905,8 +4062,8 @@ BgBufferSync(WritebackContext *wb_context) /* * SyncOneBuffer -- process a single buffer during syncing. * - * If skip_recently_used is true, we don't write currently-pinned buffers, nor - * buffers marked recently used, as these are not replacement candidates. + * We don't write currently-pinned buffers, nor buffers marked recently used, + * as these are not replacement candidates. * * Returns a bitmask containing the following flag bits: * BUF_WRITTEN: we wrote the buffer. @@ -3917,53 +4074,71 @@ BgBufferSync(WritebackContext *wb_context) * after locking it, but we don't care all that much.) */ static int -SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) +SyncOneBuffer(int buf_id, WritebackContext *wb_context) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + uint32 old_buf_state; uint32 buf_state; BufferTag tag; - /* Make sure we can handle the pin */ - ReservePrivateRefCountEntry(); - ResourceOwnerEnlarge(CurrentResourceOwner); - /* - * Check whether buffer needs writing. - * - * We can make this check without taking the buffer content lock so long - * as we mark pages dirty in access methods *before* logging changes with - * XLogInsert(): if someone marks the buffer dirty just after our check we - * don't worry because our checkpoint.redo points before log record for - * upcoming changes and so we are not required to write such dirty buffer. + * Check whether the buffer can be used and pin it if so. Do this using a + * CAS loop, to avoid having to lock the buffer header. */ - buf_state = LockBufHdr(bufHdr); - - if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && - BUF_STATE_GET_USAGECOUNT(buf_state) == 0) + old_buf_state = pg_atomic_read_u32(&bufHdr->state); + for (;;) { + buf_state = old_buf_state; + + /* + * We can make these checks without taking the buffer content lock so + * long as we mark pages dirty in access methods *before* logging + * changes with XLogInsert(): if someone marks the buffer dirty just + * after our check we don't worry because our checkpoint.redo points + * before log record for upcoming changes and so we are not required + * to write such dirty buffer. + */ + if (BUF_STATE_GET_REFCOUNT(buf_state) != 0 || + BUF_STATE_GET_USAGECOUNT(buf_state) != 0) + { + /* Don't write recently-used buffers */ + return result; + } + result |= BUF_REUSABLE; - } - else if (skip_recently_used) - { - /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr); - return result; - } - if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) - { - /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr); - return result; + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) + { + /* It's clean, so nothing to do */ + return result; + } + + if (unlikely(buf_state & BM_LOCKED)) + { + old_buf_state = WaitBufHdrUnlocked(bufHdr); + continue; + } + + /* Make sure we can handle the pin */ + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + + /* pin the buffer if the CAS succeeds */ + buf_state += BUF_REFCOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state, + buf_state)) + { + TrackNewBufferPin(BufferDescriptorGetBuffer(bufHdr)); + break; + } } /* - * Pin it, share-lock it, write it. (FlushBuffer will do nothing if the - * buffer is clean by the time we've locked it.) + * Share lock and write it out (FlushBuffer will do nothing if the buffer + * is clean by the time we've locked it.) */ - PinBuffer_Locked(bufHdr); - FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL); tag = bufHdr->tag; @@ -3971,8 +4146,8 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) UnpinBuffer(bufHdr); /* - * SyncOneBuffer() is only called by checkpointer and bgwriter, so - * IOContext will always be IOCONTEXT_NORMAL. + * SyncOneBuffer() is only called by bgwriter, so IOContext will always be + * IOCONTEXT_NORMAL. */ ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag); @@ -4283,54 +4458,430 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context) { - XLogRecPtr recptr; - ErrorContextCallback errcallback; - instr_time io_start; - Block bufBlock; - char *bufToWrite; + XLogRecPtr lsn; + + if (PrepareFlushBuffer(buf, &lsn)) + DoFlushBuffer(buf, reln, io_object, io_context, lsn); +} + +/* + * Returns true if the buffer needs WAL flushed before it can be written out. + * Caller must not already hold the buffer header spinlock. If the buffer is + * unlogged, *lsn shouldn't be used by the caller and is set to + * InvalidXLogRecPtr. + */ +static bool +BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn) +{ uint32 buf_state; + buf_state = LockBufHdr(bufdesc); + *lsn = BufferGetLSN(bufdesc); + UnlockBufHdr(bufdesc); + /* - * Try to start an I/O operation. If StartBufferIO returns false, then - * someone else flushed the buffer before we could, so we need not do - * anything. + * See buffer flushing code for more details on why we condition this on + * the relation being logged. */ - if (!StartBufferIO(buf, false, false)) + if (!(buf_state & BM_PERMANENT)) + { + *lsn = InvalidXLogRecPtr; + return false; + } + + return XLogNeedsFlush(*lsn); +} + + + +/* + * Given a starting buffer descriptor from a strategy ring that supports eager + * flushing, find additional buffers from the ring that can be combined into a + * single write batch with the starting buffer. + * + * max_batch_size is the maximum number of blocks that can be combined into a + * single write in general. This function, based on the block number of start, + * will determine the maximum IO size for this particular write given how much + * of the file remains. max_batch_size is provided by the caller so it doesn't + * have to be recalculated for each write. + * + * batch is an output parameter that this function will fill with the needed + * information to issue this IO. + * + * This function will pin and content lock all of the buffers that it + * assembles for the IO batch. The caller is responsible for issuing the IO. + */ +static void +FindFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end, + BufferDesc *batch_start, + uint32 max_batch_size, + BufWriteBatch *batch, + int *sweep_cursor) +{ + BlockNumber limit; + + Assert(batch_start); + batch->bufdescs[0] = batch_start; + + LockBufHdr(batch_start); + batch->max_lsn = BufferGetLSN(batch_start); + UnlockBufHdr(batch_start); + + batch->start = batch->bufdescs[0]->tag.blockNum; + Assert(BlockNumberIsValid(batch->start)); + batch->n = 1; + batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag); + batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag); + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); + + limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start); + limit = Min(max_batch_size, limit); + limit = Min(GetAdditionalPinLimit(), limit); + + /* + * It's possible we're not allowed any more pins or there aren't more + * blocks in the target relation. In this case, just return. Our batch + * will have only one buffer. + */ + if (limit <= 0) return; - /* Setup error traceback support for ereport() */ - errcallback.callback = shared_buffer_write_error_callback; - errcallback.arg = buf; - errcallback.previous = error_context_stack; + /* Now assemble a run of blocks to write out. */ + for (; batch->n < limit; batch->n++) + { + Buffer bufnum; + + if ((bufnum = + StrategyNextBuffer(strategy, sweep_cursor)) == sweep_end) + break; + + /* + * For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining + * buffers in the ring will be invalid. + */ + if (!BufferIsValid(bufnum)) + break; + + /* Stop when we encounter a buffer that will break the run */ + if ((batch->bufdescs[batch->n] = + PrepareOrRejectEagerFlushBuffer(bufnum, + batch->start + batch->n, + &batch->rlocator, + true, + &batch->max_lsn)) == NULL) + break; + } +} + +/* + * Returns the buffer descriptor of the buffer containing the next block we + * should eagerly flush or NULL when there are no further buffers to consider + * writing out. This will be the start of a new batch of buffers to write out. + */ +static BufferDesc * +NextStrategyBufToFlush(BufferAccessStrategy strategy, + Buffer sweep_end, + XLogRecPtr *lsn, int *sweep_cursor) +{ + Buffer bufnum; + BufferDesc *bufdesc; + + while ((bufnum = + StrategyNextBuffer(strategy, sweep_cursor)) != sweep_end) + { + /* + * For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining + * buffers in the ring will be invalid. + */ + if (!BufferIsValid(bufnum)) + break; + + if ((bufdesc = PrepareOrRejectEagerFlushBuffer(bufnum, + InvalidBlockNumber, + NULL, + true, + lsn)) != NULL) + return bufdesc; + } + + return NULL; +} + +/* + * Prepare and write out a dirty victim buffer. + * + * Buffer must be pinned, the content lock must be held, and the buffer header + * spinlock must not be held. The content lock is released and the buffer is + * returned pinned but not locked. + * + * bufdesc may be modified. + */ +static void +CleanVictimBuffer(BufferAccessStrategy strategy, + BufferDesc *bufdesc, + bool from_ring, IOContext io_context) +{ + XLogRecPtr max_lsn = InvalidXLogRecPtr; + + /* Set up this victim buffer to be flushed */ + if (!PrepareFlushBuffer(bufdesc, &max_lsn)) + { + LWLockRelease(BufferDescriptorGetContentLock(bufdesc)); + return; + } + + if (from_ring && StrategySupportsEagerFlush(strategy)) + { + Buffer sweep_end = BufferDescriptorGetBuffer(bufdesc); + int cursor = StrategyGetCurrentIndex(strategy); + uint32 max_batch_size = StrategyMaxWriteBatchSize(strategy); + + /* Pin our victim again so it stays ours even after batch released */ + ReservePrivateRefCountEntry(); + ResourceOwnerEnlarge(CurrentResourceOwner); + IncrBufferRefCount(BufferDescriptorGetBuffer(bufdesc)); + + /* Clean victim buffer and find more to flush opportunistically */ + do + { + BufWriteBatch batch; + + FindFlushAdjacents(strategy, sweep_end, bufdesc, max_batch_size, + &batch, &cursor); + FlushBufferBatch(&batch, io_context); + /* Content locks released inside CompleteWriteBatchIO */ + CompleteWriteBatchIO(&batch, io_context, &BackendWritebackContext); + } while ((bufdesc = NextStrategyBufToFlush(strategy, sweep_end, + &max_lsn, &cursor)) != NULL); + } + else + { + DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn); + LWLockRelease(BufferDescriptorGetContentLock(bufdesc)); + ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, + &bufdesc->tag); + } +} + +/* + * Prepare bufdesc for eager flushing. + * + * Given bufnum, return the buffer descriptor of the buffer to eagerly flush, + * pinned and locked, or NULL if this buffer does not contain a block that + * should be flushed. + * + * require is the BlockNumber required by the caller. Some callers may require + * a specific BlockNumber to be in bufnum because they are assembling a + * contiguous run of blocks. + * + * If the caller needs the block to be from a specific relation, rlocator will + * be provided. + */ +static BufferDesc * +PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require, + RelFileLocator *rlocator, bool skip_pinned, + XLogRecPtr *max_lsn) +{ + BufferDesc *bufdesc; + uint32 old_buf_state; + uint32 buf_state; + XLogRecPtr lsn; + BlockNumber blknum; + LWLock *content_lock; + + if (!BufferIsValid(bufnum)) + return NULL; + + Assert(!BufferIsLocal(bufnum)); + + bufdesc = GetBufferDescriptor(bufnum - 1); + + /* Block may need to be in a specific relation */ + if (rlocator && + !RelFileLocatorEquals(BufTagGetRelFileLocator(&bufdesc->tag), + *rlocator)) + return NULL; + + /* + * Ensure that theres a free refcount entry and resource owner slot for + * the pin before pinning the buffer. While this may leake a refcount and + * slot if we return without a buffer, we should use that slot the next + * time we try and reserve a spot. + */ + ResourceOwnerEnlarge(CurrentResourceOwner); + ReservePrivateRefCountEntry(); + + /* + * Check whether the buffer can be used and pin it if so. Do this using a + * CAS loop, to avoid having to lock the buffer header. We have to lock + * the buffer header later if we succeed in pinning the buffer here, but + * avoiding locking the buffer header if the buffer is in use is worth it. + */ + old_buf_state = pg_atomic_read_u32(&bufdesc->state); + + for (;;) + { + buf_state = old_buf_state; + + if (!(buf_state & BM_DIRTY) || !(buf_state & BM_VALID)) + return NULL; + + /* We don't eagerly flush buffers used by others */ + if (skip_pinned && + (BUF_STATE_GET_REFCOUNT(buf_state) > 0 || + BUF_STATE_GET_USAGECOUNT(buf_state) > 1)) + return NULL; + + if (unlikely(buf_state & BM_LOCKED)) + { + old_buf_state = WaitBufHdrUnlocked(bufdesc); + continue; + } + + /* pin the buffer if the CAS succeeds */ + buf_state += BUF_REFCOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&bufdesc->state, &old_buf_state, + buf_state)) + { + TrackNewBufferPin(BufferDescriptorGetBuffer(bufdesc)); + break; + } + } + + CheckBufferIsPinnedOnce(bufnum); + + blknum = BufferGetBlockNumber(bufnum); + Assert(BlockNumberIsValid(blknum)); + + /* We only include contiguous blocks in the run */ + if (BlockNumberIsValid(require) && blknum != require) + goto except_unpin_buffer; + + /* Don't eagerly flush buffers requiring WAL flush */ + if (BufferNeedsWALFlush(bufdesc, &lsn)) + goto except_unpin_buffer; + + content_lock = BufferDescriptorGetContentLock(bufdesc); + if (!LWLockConditionalAcquire(content_lock, LW_SHARED)) + goto except_unpin_buffer; + + /* + * Now that we have the content lock, we need to recheck if we need to + * flush WAL. + */ + if (BufferNeedsWALFlush(bufdesc, &lsn)) + goto except_unpin_buffer; + + /* Try to start an I/O operation */ + if (!StartBufferIO(bufdesc, false, true)) + goto except_unlock_content; + + if (lsn > *max_lsn) + *max_lsn = lsn; + + buf_state = LockBufHdr(bufdesc); + UnlockBufHdrExt(bufdesc, buf_state, 0, BM_JUST_DIRTIED, 0); + + return bufdesc; + +except_unlock_content: + LWLockRelease(content_lock); + +except_unpin_buffer: + UnpinBuffer(bufdesc); + return NULL; +} + +/* + * Given a prepared batch of buffers write them out as a vector. + */ +void +FlushBufferBatch(BufWriteBatch *batch, + IOContext io_context) +{ + BlockNumber blknums[MAX_IO_COMBINE_LIMIT]; + Block blocks[MAX_IO_COMBINE_LIMIT]; + instr_time io_start; + ErrorContextCallback errcallback = + { + .callback = shared_buffer_write_error_callback, + .previous = error_context_stack, + }; + error_context_stack = &errcallback; - /* Find smgr relation for buffer */ - if (reln == NULL) - reln = smgropen(BufTagGetRelFileLocator(&buf->tag), INVALID_PROC_NUMBER); + if (XLogRecPtrIsValid(batch->max_lsn)) + XLogFlush(batch->max_lsn); - TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&buf->tag), - buf->tag.blockNum, - reln->smgr_rlocator.locator.spcOid, - reln->smgr_rlocator.locator.dbOid, - reln->smgr_rlocator.locator.relNumber); + if (batch->reln == NULL) + batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER); - buf_state = LockBufHdr(buf); +#ifdef USE_ASSERT_CHECKING + for (uint32 i = 0; i < batch->n; i++) + { + XLogRecPtr lsn; + + Assert(!BufferNeedsWALFlush(batch->bufdescs[i], &lsn)); + } +#endif + + TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_START(batch->forkno, + batch->reln->smgr_rlocator.locator.spcOid, + batch->reln->smgr_rlocator.locator.dbOid, + batch->reln->smgr_rlocator.locator.relNumber, + batch->reln->smgr_rlocator.backend, + batch->n); /* - * Run PageGetLSN while holding header lock, since we don't have the - * buffer locked exclusively in all cases. + * XXX: All blocks should be copied and then checksummed but doing so + * takes a lot of extra memory and a future patch will eliminate this + * requirement. */ - recptr = BufferGetLSN(buf); + for (BlockNumber i = 0; i < batch->n; i++) + { + blknums[i] = batch->start + i; + blocks[i] = BufHdrGetBlock(batch->bufdescs[i]); + } - /* To check if block content changes while flushing. - vadim 01/17/97 */ - UnlockBufHdrExt(buf, buf_state, - 0, BM_JUST_DIRTIED, - 0); + PageSetBatchChecksumInplace((Page *) blocks, blknums, batch->n); + + io_start = pgstat_prepare_io_time(track_io_timing); + + smgrwritev(batch->reln, batch->forkno, + batch->start, (const void **) blocks, batch->n, false); + + pgstat_count_io_op_time(IOOBJECT_RELATION, io_context, IOOP_WRITE, + io_start, batch->n, BLCKSZ); + + error_context_stack = errcallback.previous; +} + +/* + * Prepare the buffer with bufdesc for writing. Returns true if the buffer + * acutally needs writing and false otherwise. lsn returns the buffer's LSN if + * the table is logged. + */ +static bool +PrepareFlushBuffer(BufferDesc *bufdesc, XLogRecPtr *lsn) +{ + uint32 buf_state; + + /* + * Try to start an I/O operation. If StartBufferIO returns false, then + * someone else flushed the buffer before we could, so we need not do + * anything. + */ + if (!StartBufferIO(bufdesc, false, false)) + return false; + + *lsn = InvalidXLogRecPtr; + buf_state = LockBufHdr(bufdesc); /* - * Force XLOG flush up to buffer's LSN. This implements the basic WAL - * rule that log updates must hit disk before any of the data-file changes - * they describe do. + * Record the buffer's LSN. We will force XLOG flush up to buffer's LSN. + * This implements the basic WAL rule that log updates must hit disk + * before any of the data-file changes they describe do. * * However, this rule does not apply to unlogged relations, which will be * lost after a crash anyway. Most unlogged relation pages do not bear @@ -4343,9 +4894,54 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * happen, attempting to flush WAL through that location would fail, with * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. + * + * We must hold the buffer header lock when examining the page LSN since + * don't have buffer exclusively locked in all cases. */ if (buf_state & BM_PERMANENT) - XLogFlush(recptr); + *lsn = BufferGetLSN(bufdesc); + + /* To check if block content changes while flushing. - vadim 01/17/97 */ + UnlockBufHdrExt(bufdesc, buf_state, + 0, BM_JUST_DIRTIED, + 0); + return true; +} + +/* + * Actually do the write I/O to clean a buffer. buf and reln may be modified. + */ +static void +DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, + IOContext io_context, XLogRecPtr buffer_lsn) +{ + ErrorContextCallback errcallback; + instr_time io_start; + Block bufBlock; + char *bufToWrite; + + /* Setup error traceback support for ereport() */ + errcallback.callback = shared_buffer_write_error_callback; + errcallback.arg = buf; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Find smgr relation for buffer */ + if (reln == NULL) + reln = smgropen(BufTagGetRelFileLocator(&buf->tag), INVALID_PROC_NUMBER); + + TRACE_POSTGRESQL_BUFFER_FLUSH_START(BufTagGetForkNum(&buf->tag), + buf->tag.blockNum, + reln->smgr_rlocator.locator.spcOid, + reln->smgr_rlocator.locator.dbOid, + reln->smgr_rlocator.locator.relNumber); + + /* Force XLOG flush up to buffer's LSN */ + if (XLogRecPtrIsValid(buffer_lsn)) + { + Assert(pg_atomic_read_u32(&buf->state) & BM_PERMANENT); + XLogFlush(buffer_lsn); + } /* * Now it's safe to write the buffer to disk. Note that no one else should @@ -4425,6 +5021,48 @@ FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, LWLockRelease(BufferDescriptorGetContentLock(buf)); } +/* + * Given a previously initialized batch with buffers that have already been + * flushed, terminate the IO on each buffer and then unlock and unpin them. + * This assumes all the buffers were locked and pinned. wb_context will be + * modified. + */ +void +CompleteWriteBatchIO(BufWriteBatch *batch, IOContext io_context, + WritebackContext *wb_context) +{ + ErrorContextCallback errcallback = + { + .callback = shared_buffer_write_error_callback, + .previous = error_context_stack, + }; + + error_context_stack = &errcallback; + pgBufferUsage.shared_blks_written += batch->n; + + for (uint32 i = 0; i < batch->n; i++) + { + Buffer buffer = BufferDescriptorGetBuffer(batch->bufdescs[i]); + + errcallback.arg = batch->bufdescs[i]; + + /* Mark the buffer as clean and end the BM_IO_IN_PROGRESS state. */ + TerminateBufferIO(batch->bufdescs[i], true, 0, true, false); + LWLockRelease(BufferDescriptorGetContentLock(batch->bufdescs[i])); + ReleaseBuffer(buffer); + ScheduleBufferTagForWriteback(wb_context, io_context, + &batch->bufdescs[i]->tag); + } + + TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_DONE(batch->forkno, + batch->reln->smgr_rlocator.locator.spcOid, + batch->reln->smgr_rlocator.locator.dbOid, + batch->reln->smgr_rlocator.locator.relNumber, + batch->reln->smgr_rlocator.backend, + batch->n, batch->start); + error_context_stack = errcallback.previous; +} + /* * RelationGetNumberOfBlocksInFork * Determines the current number of pages in the specified relation fork. @@ -5953,6 +6591,22 @@ IsBufferCleanupOK(Buffer buffer) return false; } +/* + * The maximum number of blocks that can be written out in a single batch by + * the checkpointer. + */ +static uint32 +CheckpointerMaxBatchSize(void) +{ + uint32 result; + uint32 pin_limit = GetPinLimit(); + + result = Min(pin_limit, io_combine_limit); + result = Min(result, MAX_IO_COMBINE_LIMIT); + result = Max(result, 1); + return result; +} + /* * Functions for buffer I/O handling @@ -6356,6 +7010,13 @@ ckpt_buforder_comparator(const CkptSortItem *a, const CkptSortItem *b) return -1; else if (a->tsId > b->tsId) return 1; + + /* compare database */ + if (a->dbId < b->dbId) + return -1; + else if (a->dbId > b->dbId) + return 1; + /* compare relation */ if (a->relNumber < b->relNumber) return -1; diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 28d952b35344..189274fc0c0f 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -15,6 +15,7 @@ */ #include "postgres.h" +#include "access/xlog.h" #include "pgstat.h" #include "port/atomics.h" #include "storage/buf_internals.h" @@ -155,6 +156,31 @@ ClockSweepTick(void) return victim; } +/* + * Some BufferAccessStrategies support eager flushing -- which is flushing + * buffers in the ring before they are needed. This can lead to better I/O + * patterns than lazily flushing buffers immediately before reusing them. + */ +bool +StrategySupportsEagerFlush(BufferAccessStrategy strategy) +{ + Assert(strategy); + + switch (strategy->btype) + { + case BAS_BULKWRITE: + return true; + case BAS_VACUUM: + case BAS_NORMAL: + case BAS_BULKREAD: + return false; + default: + elog(ERROR, "unrecognized buffer access strategy: %d", + (int) strategy->btype); + return false; + } +} + /* * StrategyGetBuffer * @@ -306,6 +332,29 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r } } +/* + * Given a position in the ring, cursor, increment the position, and return + * the buffer at this position. + */ +Buffer +StrategyNextBuffer(BufferAccessStrategy strategy, int *cursor) +{ + if (++(*cursor) >= strategy->nbuffers) + *cursor = 0; + + return strategy->buffers[*cursor]; +} + +/* + * Return the current slot in the strategy ring. + */ +int +StrategyGetCurrentIndex(BufferAccessStrategy strategy) +{ + return strategy->current; +} + + /* * StrategySyncStart -- tell BgBufferSync where to start syncing * @@ -727,6 +776,29 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) return NULL; } + +/* + * Determine the largest IO we can assemble from the given strategy ring given + * strategy-specific as well as global constraints on the number of pinned + * buffers and max IO size. + */ +uint32 +StrategyMaxWriteBatchSize(BufferAccessStrategy strategy) +{ + uint32 max_write_batch_size = Min(io_combine_limit, MAX_IO_COMBINE_LIMIT); + int strategy_pin_limit = GetAccessStrategyPinLimit(strategy); + uint32 max_possible_buffer_limit = GetPinLimit(); + + /* Identify the minimum of the above */ + max_write_batch_size = Min(strategy_pin_limit, max_write_batch_size); + max_write_batch_size = Min(max_possible_buffer_limit, max_write_batch_size); + + /* Must allow at least 1 IO for forward progress */ + max_write_batch_size = Max(1, max_write_batch_size); + + return max_write_batch_size; +} + /* * AddBufferToRing -- add a buffer to the buffer ring * @@ -780,12 +852,20 @@ IOContextForStrategy(BufferAccessStrategy strategy) * be written out and doing so would require flushing WAL too. This gives us * a chance to choose a different victim. * + * The buffer must be pinned and content locked and the buffer header spinlock + * must not be held. We must hold the content lock to examine the LSN. + * * Returns true if buffer manager should ask for a new victim, and false * if this buffer should be written and re-used. */ bool StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_ring) { + XLogRecPtr lsn; + + if (!strategy) + return false; + /* We only do this in bulkread mode */ if (strategy->btype != BAS_BULKREAD) return false; @@ -795,11 +875,17 @@ StrategyRejectBuffer(BufferAccessStrategy strategy, BufferDesc *buf, bool from_r strategy->buffers[strategy->current] != BufferDescriptorGetBuffer(buf)) return false; + LockBufHdr(buf); + lsn = BufferGetLSN(buf); + UnlockBufHdr(buf); + + if (!XLogNeedsFlush(lsn)) + return false; + /* - * Remove the dirty buffer from the ring; necessary to prevent infinite + * Remove the dirty buffer from the ring; necessary to prevent an infinite * loop if all ring members are dirty. */ strategy->buffers[strategy->current] = InvalidBuffer; - return true; } diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index 05376431ef29..629c2c54960e 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -1546,3 +1546,23 @@ PageSetChecksumInplace(Page page, BlockNumber blkno) ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno); } + +/* + * A helper to set multiple block's checksums + */ +void +PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos, uint32 length) +{ + /* If we don't need a checksum, just return */ + if (!DataChecksumsEnabled()) + return; + + for (uint32 i = 0; i < length; i++) + { + Page page = pages[i]; + + if (PageIsNew(page)) + continue; + ((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]); + } +} diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index e9e413477ba4..d6970731ba91 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -61,12 +61,14 @@ provider postgresql { probe buffer__flush__done(ForkNumber, BlockNumber, Oid, Oid, Oid); probe buffer__extend__start(ForkNumber, Oid, Oid, Oid, int, unsigned int); probe buffer__extend__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber); + probe buffer__batch__flush__start(ForkNumber, Oid, Oid, Oid, int, unsigned int); + probe buffer__batch__flush__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber); probe buffer__checkpoint__start(int); probe buffer__checkpoint__sync__start(); probe buffer__checkpoint__done(); probe buffer__sync__start(int, int); - probe buffer__sync__written(int); + probe buffer__batch__sync__written(BlockNumber); probe buffer__sync__done(int, int, int); probe deadlock__found(); diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 5400c56a965f..feb370175f04 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -449,6 +449,7 @@ extern uint32 WaitBufHdrUnlocked(BufferDesc *buf); typedef struct CkptSortItem { Oid tsId; + Oid dbId; RelFileNumber relNumber; ForkNumber forkNum; BlockNumber blockNum; @@ -483,14 +484,48 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer) ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc); } +/* + * Used to write out multiple blocks at a time in a combined IO. bufdescs + * contains buffer descriptors for buffers containing adjacent blocks of the + * same fork of the same relation. + */ +typedef struct BufWriteBatch +{ + RelFileLocator rlocator; + ForkNumber forkno; + SMgrRelation reln; + + /* + * The BlockNumber of the first block in the run of contiguous blocks to + * be written out as a single IO. + */ + BlockNumber start; + + /* + * While assembling the buffers, we keep track of the maximum LSN so that + * we can flush WAL through this LSN before flushing the buffers. + */ + XLogRecPtr max_lsn; + + /* The number of valid buffers in bufdescs */ + uint32 n; + BufferDesc *bufdescs[MAX_IO_COMBINE_LIMIT]; +} BufWriteBatch; + /* * Internal buffer management routines */ + +/* Note: these two macros only work on shared buffers, not local ones! */ +#define BufHdrGetBlock(bufHdr) ((Block) (BufferBlocks + ((Size) (bufHdr)->buf_id) * BLCKSZ)) +#define BufferGetLSN(bufHdr) (PageGetLSN(BufHdrGetBlock(bufHdr))) + /* bufmgr.c */ extern void WritebackContextInit(WritebackContext *context, int *max_pending); extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context); extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context, BufferTag *tag); +extern void FlushBufferBatch(BufWriteBatch *batch, IOContext io_context); extern void TrackNewBufferPin(Buffer buf); @@ -501,6 +536,13 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag /* freelist.c */ +extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy); +extern uint32 StrategyMaxWriteBatchSize(BufferAccessStrategy strategy); +extern Buffer StrategyNextBuffer(BufferAccessStrategy strategy, + int *cursor); +extern int StrategyGetCurrentIndex(BufferAccessStrategy strategy); +extern void CompleteWriteBatchIO(BufWriteBatch *batch, IOContext io_context, + WritebackContext *wb_context); extern IOContext IOContextForStrategy(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_ring); diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index abc2cf2a020b..29a400a71eb9 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -506,5 +506,7 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum, const void *newtup, Size newsize); extern char *PageSetChecksumCopy(Page page, BlockNumber blkno); extern void PageSetChecksumInplace(Page page, BlockNumber blkno); +extern void PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos, + uint32 length); #endif /* BUFPAGE_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9dd65b102544..a13a103e29f2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -351,6 +351,7 @@ BufferManagerRelation BufferStrategyControl BufferTag BufferUsage +BufWriteBatch BuildAccumulator BuiltinScript BulkInsertState