Skip to content

Commit 978053e

Browse files
melanieplagemanCommitfest Bot
authored andcommitted
Write combining for BAS_BULKWRITE
Implement write combining for users of the bulkwrite buffer access strategy (e.g. COPY FROM). When the buffer access strategy needs to clean a buffer for reuse, it already opportunistically flushes some other buffers. Now, combine any contiguous blocks from the same relation into larger writes and issue them with smgrwritev(). The performance benefit for COPY FROM is mostly noticeable for multiple concurrent COPY FROMs because a single COPY FROM is either CPU bound or bound by WAL writes. The infrastructure for flushing larger batches of IOs will be reused by checkpointer and other processes doing writes of dirty data. XXX: Because this sets in-place checksums for batches, it is not committable until additional infrastructure goes in place. Author: Melanie Plageman <melanieplageman@gmail.com> Reviewed-by: Chao Li <li.evan.chao@gmail.com> Discussion: https://postgr.es/m/flat/CAAKRu_bcWRvRwZUop_d9vzF9nHAiT%2B-uPzkJ%3DS3ShZ1GqeAYOw%40mail.gmail.com
1 parent 12729ee commit 978053e

File tree

7 files changed

+285
-11
lines changed

7 files changed

+285
-11
lines changed

src/backend/storage/buffer/bufmgr.c

Lines changed: 205 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,11 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
537537
static BufferDesc *NextStrategyBufToFlush(BufferAccessStrategy strategy,
538538
Buffer sweep_end,
539539
XLogRecPtr *lsn, int *sweep_cursor);
540-
540+
static void FindFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end,
541+
BufferDesc *batch_start,
542+
uint32 max_batch_size,
543+
BufWriteBatch *batch,
544+
int *sweep_cursor);
541545
static bool BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn);
542546
static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require,
543547
RelFileLocator *rlocator, bool skip_pinned,
@@ -4318,10 +4322,90 @@ BufferNeedsWALFlush(BufferDesc *bufdesc, XLogRecPtr *lsn)
43184322
}
43194323

43204324

4325+
4326+
/*
4327+
* Given a starting buffer descriptor from a strategy ring that supports eager
4328+
* flushing, find additional buffers from the ring that can be combined into a
4329+
* single write batch with the starting buffer.
4330+
*
4331+
* max_batch_size is the maximum number of blocks that can be combined into a
4332+
* single write in general. This function, based on the block number of start,
4333+
* will determine the maximum IO size for this particular write given how much
4334+
* of the file remains. max_batch_size is provided by the caller so it doesn't
4335+
* have to be recalculated for each write.
4336+
*
4337+
* batch is an output parameter that this function will fill with the needed
4338+
* information to issue this IO.
4339+
*
4340+
* This function will pin and content lock all of the buffers that it
4341+
* assembles for the IO batch. The caller is responsible for issuing the IO.
4342+
*/
4343+
static void
4344+
FindFlushAdjacents(BufferAccessStrategy strategy, Buffer sweep_end,
4345+
BufferDesc *batch_start,
4346+
uint32 max_batch_size,
4347+
BufWriteBatch *batch,
4348+
int *sweep_cursor)
4349+
{
4350+
BlockNumber limit;
4351+
4352+
Assert(batch_start);
4353+
batch->bufdescs[0] = batch_start;
4354+
4355+
LockBufHdr(batch_start);
4356+
batch->max_lsn = BufferGetLSN(batch_start);
4357+
UnlockBufHdr(batch_start);
4358+
4359+
batch->start = batch->bufdescs[0]->tag.blockNum;
4360+
Assert(BlockNumberIsValid(batch->start));
4361+
batch->n = 1;
4362+
batch->forkno = BufTagGetForkNum(&batch->bufdescs[0]->tag);
4363+
batch->rlocator = BufTagGetRelFileLocator(&batch->bufdescs[0]->tag);
4364+
batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER);
4365+
4366+
limit = smgrmaxcombine(batch->reln, batch->forkno, batch->start);
4367+
limit = Min(max_batch_size, limit);
4368+
limit = Min(GetAdditionalPinLimit(), limit);
4369+
4370+
/*
4371+
* It's possible we're not allowed any more pins or there aren't more
4372+
* blocks in the target relation. In this case, just return. Our batch
4373+
* will have only one buffer.
4374+
*/
4375+
if (limit <= 0)
4376+
return;
4377+
4378+
/* Now assemble a run of blocks to write out. */
4379+
for (; batch->n < limit; batch->n++)
4380+
{
4381+
Buffer bufnum;
4382+
4383+
if ((bufnum =
4384+
StrategyNextBuffer(strategy, sweep_cursor)) == sweep_end)
4385+
break;
4386+
4387+
/*
4388+
* For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining
4389+
* buffers in the ring will be invalid.
4390+
*/
4391+
if (!BufferIsValid(bufnum))
4392+
break;
4393+
4394+
/* Stop when we encounter a buffer that will break the run */
4395+
if ((batch->bufdescs[batch->n] =
4396+
PrepareOrRejectEagerFlushBuffer(bufnum,
4397+
batch->start + batch->n,
4398+
&batch->rlocator,
4399+
true,
4400+
&batch->max_lsn)) == NULL)
4401+
break;
4402+
}
4403+
}
4404+
43214405
/*
43224406
* Returns the buffer descriptor of the buffer containing the next block we
43234407
* should eagerly flush or NULL when there are no further buffers to consider
4324-
* writing out.
4408+
* writing out. This will be the start of a new batch of buffers to write out.
43254409
*/
43264410
static BufferDesc *
43274411
NextStrategyBufToFlush(BufferAccessStrategy strategy,
@@ -4367,7 +4451,6 @@ CleanVictimBuffer(BufferAccessStrategy strategy,
43674451
bool from_ring, IOContext io_context)
43684452
{
43694453
XLogRecPtr max_lsn = InvalidXLogRecPtr;
4370-
bool first_buffer = true;
43714454

43724455
/* Set up this victim buffer to be flushed */
43734456
if (!PrepareFlushBuffer(bufdesc, &max_lsn))
@@ -4380,18 +4463,23 @@ CleanVictimBuffer(BufferAccessStrategy strategy,
43804463
{
43814464
Buffer sweep_end = BufferDescriptorGetBuffer(bufdesc);
43824465
int cursor = StrategyGetCurrentIndex(strategy);
4466+
uint32 max_batch_size = StrategyMaxWriteBatchSize(strategy);
4467+
4468+
/* Pin our victim again so it stays ours even after batch released */
4469+
ReservePrivateRefCountEntry();
4470+
ResourceOwnerEnlarge(CurrentResourceOwner);
4471+
IncrBufferRefCount(BufferDescriptorGetBuffer(bufdesc));
43834472

43844473
/* Clean victim buffer and find more to flush opportunistically */
43854474
do
43864475
{
4387-
DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
4388-
LWLockRelease(BufferDescriptorGetContentLock(bufdesc));
4389-
ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
4390-
&bufdesc->tag);
4391-
/* We leave the first buffer pinned for the caller */
4392-
if (!first_buffer)
4393-
UnpinBuffer(bufdesc);
4394-
first_buffer = false;
4476+
BufWriteBatch batch;
4477+
4478+
FindFlushAdjacents(strategy, sweep_end, bufdesc, max_batch_size,
4479+
&batch, &cursor);
4480+
FlushBufferBatch(&batch, io_context);
4481+
/* Content locks released inside CompleteWriteBatchIO */
4482+
CompleteWriteBatchIO(&batch, io_context, &BackendWritebackContext);
43954483
} while ((bufdesc = NextStrategyBufToFlush(strategy, sweep_end,
43964484
&max_lsn, &cursor)) != NULL);
43974485
}
@@ -4534,6 +4622,70 @@ PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require,
45344622
return NULL;
45354623
}
45364624

4625+
/*
4626+
* Given a prepared batch of buffers write them out as a vector.
4627+
*/
4628+
void
4629+
FlushBufferBatch(BufWriteBatch *batch,
4630+
IOContext io_context)
4631+
{
4632+
BlockNumber blknums[MAX_IO_COMBINE_LIMIT];
4633+
Block blocks[MAX_IO_COMBINE_LIMIT];
4634+
instr_time io_start;
4635+
ErrorContextCallback errcallback =
4636+
{
4637+
.callback = shared_buffer_write_error_callback,
4638+
.previous = error_context_stack,
4639+
};
4640+
4641+
error_context_stack = &errcallback;
4642+
4643+
if (XLogRecPtrIsValid(batch->max_lsn))
4644+
XLogFlush(batch->max_lsn);
4645+
4646+
if (batch->reln == NULL)
4647+
batch->reln = smgropen(batch->rlocator, INVALID_PROC_NUMBER);
4648+
4649+
#ifdef USE_ASSERT_CHECKING
4650+
for (uint32 i = 0; i < batch->n; i++)
4651+
{
4652+
XLogRecPtr lsn;
4653+
4654+
Assert(!BufferNeedsWALFlush(batch->bufdescs[i], &lsn));
4655+
}
4656+
#endif
4657+
4658+
TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_START(batch->forkno,
4659+
batch->reln->smgr_rlocator.locator.spcOid,
4660+
batch->reln->smgr_rlocator.locator.dbOid,
4661+
batch->reln->smgr_rlocator.locator.relNumber,
4662+
batch->reln->smgr_rlocator.backend,
4663+
batch->n);
4664+
4665+
/*
4666+
* XXX: All blocks should be copied and then checksummed but doing so
4667+
* takes a lot of extra memory and a future patch will eliminate this
4668+
* requirement.
4669+
*/
4670+
for (BlockNumber i = 0; i < batch->n; i++)
4671+
{
4672+
blknums[i] = batch->start + i;
4673+
blocks[i] = BufHdrGetBlock(batch->bufdescs[i]);
4674+
}
4675+
4676+
PageSetBatchChecksumInplace((Page *) blocks, blknums, batch->n);
4677+
4678+
io_start = pgstat_prepare_io_time(track_io_timing);
4679+
4680+
smgrwritev(batch->reln, batch->forkno,
4681+
batch->start, (const void **) blocks, batch->n, false);
4682+
4683+
pgstat_count_io_op_time(IOOBJECT_RELATION, io_context, IOOP_WRITE,
4684+
io_start, batch->n, BLCKSZ);
4685+
4686+
error_context_stack = errcallback.previous;
4687+
}
4688+
45374689
/*
45384690
* Prepare the buffer with bufdesc for writing. Returns true if the buffer
45394691
* acutally needs writing and false otherwise. lsn returns the buffer's LSN if
@@ -4698,6 +4850,48 @@ FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
46984850
LWLockRelease(BufferDescriptorGetContentLock(buf));
46994851
}
47004852

4853+
/*
4854+
* Given a previously initialized batch with buffers that have already been
4855+
* flushed, terminate the IO on each buffer and then unlock and unpin them.
4856+
* This assumes all the buffers were locked and pinned. wb_context will be
4857+
* modified.
4858+
*/
4859+
void
4860+
CompleteWriteBatchIO(BufWriteBatch *batch, IOContext io_context,
4861+
WritebackContext *wb_context)
4862+
{
4863+
ErrorContextCallback errcallback =
4864+
{
4865+
.callback = shared_buffer_write_error_callback,
4866+
.previous = error_context_stack,
4867+
};
4868+
4869+
error_context_stack = &errcallback;
4870+
pgBufferUsage.shared_blks_written += batch->n;
4871+
4872+
for (uint32 i = 0; i < batch->n; i++)
4873+
{
4874+
Buffer buffer = BufferDescriptorGetBuffer(batch->bufdescs[i]);
4875+
4876+
errcallback.arg = batch->bufdescs[i];
4877+
4878+
/* Mark the buffer as clean and end the BM_IO_IN_PROGRESS state. */
4879+
TerminateBufferIO(batch->bufdescs[i], true, 0, true, false);
4880+
LWLockRelease(BufferDescriptorGetContentLock(batch->bufdescs[i]));
4881+
ReleaseBuffer(buffer);
4882+
ScheduleBufferTagForWriteback(wb_context, io_context,
4883+
&batch->bufdescs[i]->tag);
4884+
}
4885+
4886+
TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_DONE(batch->forkno,
4887+
batch->reln->smgr_rlocator.locator.spcOid,
4888+
batch->reln->smgr_rlocator.locator.dbOid,
4889+
batch->reln->smgr_rlocator.locator.relNumber,
4890+
batch->reln->smgr_rlocator.backend,
4891+
batch->n, batch->start);
4892+
error_context_stack = errcallback.previous;
4893+
}
4894+
47014895
/*
47024896
* RelationGetNumberOfBlocksInFork
47034897
* Determines the current number of pages in the specified relation fork.

src/backend/storage/buffer/freelist.c

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,29 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
776776
return NULL;
777777
}
778778

779+
780+
/*
781+
* Determine the largest IO we can assemble from the given strategy ring given
782+
* strategy-specific as well as global constraints on the number of pinned
783+
* buffers and max IO size.
784+
*/
785+
uint32
786+
StrategyMaxWriteBatchSize(BufferAccessStrategy strategy)
787+
{
788+
uint32 max_write_batch_size = Min(io_combine_limit, MAX_IO_COMBINE_LIMIT);
789+
int strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
790+
uint32 max_possible_buffer_limit = GetPinLimit();
791+
792+
/* Identify the minimum of the above */
793+
max_write_batch_size = Min(strategy_pin_limit, max_write_batch_size);
794+
max_write_batch_size = Min(max_possible_buffer_limit, max_write_batch_size);
795+
796+
/* Must allow at least 1 IO for forward progress */
797+
max_write_batch_size = Max(1, max_write_batch_size);
798+
799+
return max_write_batch_size;
800+
}
801+
779802
/*
780803
* AddBufferToRing -- add a buffer to the buffer ring
781804
*

src/backend/storage/page/bufpage.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,3 +1546,23 @@ PageSetChecksumInplace(Page page, BlockNumber blkno)
15461546

15471547
((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno);
15481548
}
1549+
1550+
/*
1551+
* A helper to set multiple block's checksums
1552+
*/
1553+
void
1554+
PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos, uint32 length)
1555+
{
1556+
/* If we don't need a checksum, just return */
1557+
if (!DataChecksumsEnabled())
1558+
return;
1559+
1560+
for (uint32 i = 0; i < length; i++)
1561+
{
1562+
Page page = pages[i];
1563+
1564+
if (PageIsNew(page))
1565+
continue;
1566+
((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]);
1567+
}
1568+
}

src/backend/utils/probes.d

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ provider postgresql {
6161
probe buffer__flush__done(ForkNumber, BlockNumber, Oid, Oid, Oid);
6262
probe buffer__extend__start(ForkNumber, Oid, Oid, Oid, int, unsigned int);
6363
probe buffer__extend__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber);
64+
probe buffer__batch__flush__start(ForkNumber, Oid, Oid, Oid, int, unsigned int);
65+
probe buffer__batch__flush__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber);
6466

6567
probe buffer__checkpoint__start(int);
6668
probe buffer__checkpoint__sync__start();

src/include/storage/buf_internals.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,34 @@ ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
483483
ResourceOwnerForget(owner, Int32GetDatum(buffer), &buffer_io_resowner_desc);
484484
}
485485

486+
/*
487+
* Used to write out multiple blocks at a time in a combined IO. bufdescs
488+
* contains buffer descriptors for buffers containing adjacent blocks of the
489+
* same fork of the same relation.
490+
*/
491+
typedef struct BufWriteBatch
492+
{
493+
RelFileLocator rlocator;
494+
ForkNumber forkno;
495+
SMgrRelation reln;
496+
497+
/*
498+
* The BlockNumber of the first block in the run of contiguous blocks to
499+
* be written out as a single IO.
500+
*/
501+
BlockNumber start;
502+
503+
/*
504+
* While assembling the buffers, we keep track of the maximum LSN so that
505+
* we can flush WAL through this LSN before flushing the buffers.
506+
*/
507+
XLogRecPtr max_lsn;
508+
509+
/* The number of valid buffers in bufdescs */
510+
uint32 n;
511+
BufferDesc *bufdescs[MAX_IO_COMBINE_LIMIT];
512+
} BufWriteBatch;
513+
486514
/*
487515
* Internal buffer management routines
488516
*/
@@ -496,6 +524,7 @@ extern void WritebackContextInit(WritebackContext *context, int *max_pending);
496524
extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context);
497525
extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
498526
IOContext io_context, BufferTag *tag);
527+
extern void FlushBufferBatch(BufWriteBatch *batch, IOContext io_context);
499528

500529
extern void TrackNewBufferPin(Buffer buf);
501530

@@ -507,9 +536,12 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag
507536

508537
/* freelist.c */
509538
extern bool StrategySupportsEagerFlush(BufferAccessStrategy strategy);
539+
extern uint32 StrategyMaxWriteBatchSize(BufferAccessStrategy strategy);
510540
extern Buffer StrategyNextBuffer(BufferAccessStrategy strategy,
511541
int *cursor);
512542
extern int StrategyGetCurrentIndex(BufferAccessStrategy strategy);
543+
extern void CompleteWriteBatchIO(BufWriteBatch *batch, IOContext io_context,
544+
WritebackContext *wb_context);
513545
extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
514546
extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
515547
uint32 *buf_state, bool *from_ring);

src/include/storage/bufpage.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,5 +506,7 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
506506
const void *newtup, Size newsize);
507507
extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
508508
extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
509+
extern void PageSetBatchChecksumInplace(Page *pages, const BlockNumber *blknos,
510+
uint32 length);
509511

510512
#endif /* BUFPAGE_H */

src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ BufferManagerRelation
351351
BufferStrategyControl
352352
BufferTag
353353
BufferUsage
354+
BufWriteBatch
354355
BuildAccumulator
355356
BuiltinScript
356357
BulkInsertState

0 commit comments

Comments
 (0)