@@ -95,8 +95,10 @@ struct ReadStream
9595 int16 ios_in_progress ;
9696 int16 queue_size ;
9797 int16 max_pinned_buffers ;
98+ int16 forwarded_buffers ;
9899 int16 pinned_buffers ;
99100 int16 distance ;
101+ int16 initialized_buffers ;
100102 bool advice_enabled ;
101103 bool temporary ;
102104
@@ -223,8 +225,10 @@ static bool
223225read_stream_start_pending_read (ReadStream * stream )
224226{
225227 bool need_wait ;
228+ int requested_nblocks ;
226229 int nblocks ;
227230 int flags = 0 ;
231+ int forwarded ;
228232 int16 io_index ;
229233 int16 overflow ;
230234 int16 buffer_index ;
@@ -275,11 +279,19 @@ read_stream_start_pending_read(ReadStream *stream)
275279 }
276280 }
277281
278- /* Compute the remaining portion of the per-backend buffer limit. */
282+ /*
283+ * Compute the remaining portion of the per-backend buffer limit. If we
284+ * already have some forwarded buffers, we can certainly use those. They
285+ * are already pinned, and are mapped to the starting blocks of the pending
286+ * read, they just don't have any I/O started yet and are not counted in
287+ * stream->pinned_buffers.
288+ */
279289 if (stream -> temporary )
280290 buffer_limit = Min (GetAdditionalLocalPinLimit (), PG_INT16_MAX );
281291 else
282292 buffer_limit = Min (GetAdditionalPinLimit (), PG_INT16_MAX );
293+ Assert (stream -> forwarded_buffers <= stream -> pending_read_nblocks );
294+ buffer_limit += stream -> forwarded_buffers ;
283295 if (buffer_limit == 0 && stream -> pinned_buffers == 0 )
284296 buffer_limit = 1 ; /* guarantee progress */
285297
@@ -306,8 +318,31 @@ read_stream_start_pending_read(ReadStream *stream)
306318 * We say how many blocks we want to read, but it may be smaller on return
307319 * if the buffer manager decides it needs a short read at its level.
308320 */
321+ requested_nblocks = Min (buffer_limit , stream -> pending_read_nblocks );
322+ nblocks = requested_nblocks ;
309323 buffer_index = stream -> next_buffer_index ;
310324 io_index = stream -> next_io_index ;
325+
326+ /*
327+ * The first time around the queue we initialize it as we go, including
328+ * the overflow zone, because otherwise the entries would appear as
329+ * forwarded buffers. This avoids initializing the whole queue up front
330+ * in cases where it is large but we don't ever use it due to the
331+ * all-cached fast path or small scans.
332+ */
333+ while (stream -> initialized_buffers < buffer_index + nblocks )
334+ stream -> buffers [stream -> initialized_buffers ++ ] = InvalidBuffer ;
335+
336+ /*
337+ * Start the I/O. Any buffers that are not InvalidBuffer will be
338+ * interpreted as already pinned, forwarded by an earlier call to
339+ * StartReadBuffers(), and must map to the expected blocks. The nblocks
340+ * value may be smaller on return indicating the size of the I/O that
341+ * could be started. Buffers beyond the output nblocks number may also
342+ * have been pinned without starting I/O due to various edge cases. In
343+ * that case we'll just leave them in the queue ahead of us, "forwarded"
344+ * to the next call, avoiding the need to unpin/repin.
345+ */
311346 need_wait = StartReadBuffers (& stream -> ios [io_index ].op ,
312347 & stream -> buffers [buffer_index ],
313348 stream -> pending_read_blocknum ,
@@ -336,16 +371,35 @@ read_stream_start_pending_read(ReadStream *stream)
336371 stream -> seq_blocknum = stream -> pending_read_blocknum + nblocks ;
337372 }
338373
374+ /*
375+ * How many pins were acquired but forwarded to the next call? These need
376+ * to be passed to the next StartReadBuffers() call, or released if the
377+ * stream ends early. We need the number for accounting purposes, since
378+ * they are not counted in stream->pinned_buffers but we already hold
379+ * them.
380+ */
381+ forwarded = 0 ;
382+ while (nblocks + forwarded < requested_nblocks &&
383+ stream -> buffers [buffer_index + nblocks + forwarded ] != InvalidBuffer )
384+ forwarded ++ ;
385+ stream -> forwarded_buffers = forwarded ;
386+
339387 /*
340388 * We gave a contiguous range of buffer space to StartReadBuffers(), but
341- * we want it to wrap around at queue_size. Slide overflowing buffers to
342- * the front of the array.
389+ * we want it to wrap around at queue_size. Copy overflowing buffers to
390+ * the front of the array where they'll be consumed, but also leave a copy
391+ * in the overflow zone which the I/O operation has a pointer to (it needs
392+ * a contiguous array). Both copies will be cleared when the buffers are
393+ * handed to the consumer.
343394 */
344- overflow = (buffer_index + nblocks ) - stream -> queue_size ;
395+ overflow = (buffer_index + nblocks + forwarded ) - stream -> queue_size ;
345396 if (overflow > 0 )
346- memmove (& stream -> buffers [0 ],
347- & stream -> buffers [stream -> queue_size ],
348- sizeof (stream -> buffers [0 ]) * overflow );
397+ {
398+ Assert (overflow < stream -> queue_size ); /* can't overlap */
399+ memcpy (& stream -> buffers [0 ],
400+ & stream -> buffers [stream -> queue_size ],
401+ sizeof (stream -> buffers [0 ]) * overflow );
402+ }
349403
350404 /* Compute location of start of next read, without using % operator. */
351405 buffer_index += nblocks ;
@@ -730,10 +784,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
730784
731785 /* Fast path assumptions. */
732786 Assert (stream -> ios_in_progress == 0 );
787+ Assert (stream -> forwarded_buffers == 0 );
733788 Assert (stream -> pinned_buffers == 1 );
734789 Assert (stream -> distance == 1 );
735790 Assert (stream -> pending_read_nblocks == 0 );
736791 Assert (stream -> per_buffer_data_size == 0 );
792+ Assert (stream -> initialized_buffers > stream -> oldest_buffer_index );
737793
738794 /* We're going to return the buffer we pinned last time. */
739795 oldest_buffer_index = stream -> oldest_buffer_index ;
@@ -782,6 +838,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
782838 stream -> distance = 0 ;
783839 stream -> oldest_buffer_index = stream -> next_buffer_index ;
784840 stream -> pinned_buffers = 0 ;
841+ stream -> buffers [oldest_buffer_index ] = InvalidBuffer ;
785842 }
786843
787844 stream -> fast_path = false;
@@ -858,10 +915,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
858915 stream -> seq_until_processed = InvalidBlockNumber ;
859916 }
860917
861- #ifdef CLOBBER_FREED_MEMORY
862- /* Clobber old buffer for debugging purposes. */
918+ /*
919+ * We must zap this queue entry, or else it would appear as a forwarded
920+ * buffer. If it's potentially in the overflow zone (ie it wrapped around
921+ * the queue), also zap that copy.
922+ */
863923 stream -> buffers [oldest_buffer_index ] = InvalidBuffer ;
864- #endif
924+ if (oldest_buffer_index < io_combine_limit - 1 )
925+ stream -> buffers [stream -> queue_size + oldest_buffer_index ] =
926+ InvalidBuffer ;
865927
866928#if defined(CLOBBER_FREED_MEMORY ) || defined(USE_VALGRIND )
867929
@@ -906,6 +968,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
906968#ifndef READ_STREAM_DISABLE_FAST_PATH
907969 /* See if we can take the fast path for all-cached scans next time. */
908970 if (stream -> ios_in_progress == 0 &&
971+ stream -> forwarded_buffers == 0 &&
909972 stream -> pinned_buffers == 1 &&
910973 stream -> distance == 1 &&
911974 stream -> pending_read_nblocks == 0 &&
@@ -941,6 +1004,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
9411004void
9421005read_stream_reset (ReadStream * stream )
9431006{
1007+ int16 index ;
9441008 Buffer buffer ;
9451009
9461010 /* Stop looking ahead. */
@@ -957,6 +1021,24 @@ read_stream_reset(ReadStream *stream)
9571021 while ((buffer = read_stream_next_buffer (stream , NULL )) != InvalidBuffer )
9581022 ReleaseBuffer (buffer );
9591023
1024+ /* Unpin any unused forwarded buffers. */
1025+ index = stream -> next_buffer_index ;
1026+ while (index < stream -> initialized_buffers &&
1027+ (buffer = stream -> buffers [index ]) != InvalidBuffer )
1028+ {
1029+ Assert (stream -> forwarded_buffers > 0 );
1030+ stream -> forwarded_buffers -- ;
1031+ ReleaseBuffer (buffer );
1032+
1033+ stream -> buffers [index ] = InvalidBuffer ;
1034+ if (index < io_combine_limit - 1 )
1035+ stream -> buffers [stream -> queue_size + index ] = InvalidBuffer ;
1036+
1037+ if (++ index == stream -> queue_size )
1038+ index = 0 ;
1039+ }
1040+
1041+ Assert (stream -> forwarded_buffers == 0 );
9601042 Assert (stream -> pinned_buffers == 0 );
9611043 Assert (stream -> ios_in_progress == 0 );
9621044
0 commit comments