@@ -116,6 +116,7 @@ struct ReadStream
116116 int16 pinned_buffers ;
117117 int16 distance ;
118118 bool advice_enabled ;
119+ bool temporary ;
119120
120121 /*
121122 * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -225,7 +226,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
225226 stream -> buffered_blocknum = blocknum ;
226227}
227228
228- static void
229+ /*
230+ * Start as much of the current pending read as we can. If we have to split it
231+ * because of the per-backend buffer limit, or the buffer manager decides to
232+ * split it, then the pending read is adjusted to hold the remaining portion.
233+ *
234+ * We can always start a read of at least size one if we have no progress yet.
235+ * Otherwise it's possible that we can't start a read at all because of a lack
236+ * of buffers, and then false is returned. Buffer shortages also reduce the
237+ * distance to a level that prevents look-ahead until buffers are released.
238+ */
239+ static bool
229240read_stream_start_pending_read (ReadStream * stream , bool suppress_advice )
230241{
231242 bool need_wait ;
@@ -234,12 +245,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
234245 int16 io_index ;
235246 int16 overflow ;
236247 int16 buffer_index ;
248+ int16 buffer_limit ;
237249
238250 /* This should only be called with a pending read. */
239251 Assert (stream -> pending_read_nblocks > 0 );
240252 Assert (stream -> pending_read_nblocks <= stream -> io_combine_limit );
241253
242- /* We had better not exceed the pin limit by starting this read. */
254+ /* We had better not exceed the per-stream buffer limit with this read. */
243255 Assert (stream -> pinned_buffers + stream -> pending_read_nblocks <=
244256 stream -> max_pinned_buffers );
245257
@@ -260,10 +272,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
260272 else
261273 flags = 0 ;
262274
263- /* We say how many blocks we want to read, but may be smaller on return. */
275+ /* Compute the remaining portion of the per-backend buffer limit. */
276+ if (stream -> temporary )
277+ buffer_limit = Min (GetAdditionalLocalPinLimit (), PG_INT16_MAX );
278+ else
279+ buffer_limit = Min (GetAdditionalPinLimit (), PG_INT16_MAX );
280+ if (buffer_limit == 0 && stream -> pinned_buffers == 0 )
281+ buffer_limit = 1 ; /* guarantee progress */
282+
283+ /* Does the per-backend buffer limit affect this read? */
284+ nblocks = stream -> pending_read_nblocks ;
285+ if (buffer_limit < nblocks )
286+ {
287+ int16 new_distance ;
288+
289+ /* Shrink distance: no more look-ahead until buffers are released. */
290+ new_distance = stream -> pinned_buffers + buffer_limit ;
291+ if (stream -> distance > new_distance )
292+ stream -> distance = new_distance ;
293+
294+ /* If we've already made progress, just give up and wait for buffers. */
295+ if (stream -> pinned_buffers > 0 )
296+ return false;
297+
298+ /* A short read is required to make progress. */
299+ nblocks = buffer_limit ;
300+ }
301+
302+ /*
303+ * We say how many blocks we want to read, but it may be smaller on return
304+ * if the buffer manager decides it needs a short read at its level.
305+ */
264306 buffer_index = stream -> next_buffer_index ;
265307 io_index = stream -> next_io_index ;
266- nblocks = stream -> pending_read_nblocks ;
267308 need_wait = StartReadBuffers (& stream -> ios [io_index ].op ,
268309 & stream -> buffers [buffer_index ],
269310 stream -> pending_read_blocknum ,
@@ -313,6 +354,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
313354 /* Adjust the pending read to cover the remaining portion, if any. */
314355 stream -> pending_read_blocknum += nblocks ;
315356 stream -> pending_read_nblocks -= nblocks ;
357+
358+ return true;
316359}
317360
318361static void
@@ -361,14 +404,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
361404 /* We have to start the pending read before we can build another. */
362405 while (stream -> pending_read_nblocks > 0 )
363406 {
364- read_stream_start_pending_read (stream , suppress_advice );
365- suppress_advice = false;
366- if (stream -> ios_in_progress == stream -> max_ios )
407+ if (!read_stream_start_pending_read (stream , suppress_advice ) ||
408+ stream -> ios_in_progress == stream -> max_ios )
367409 {
368- /* And we've hit the limit. Rewind, and stop here . */
410+ /* And we've hit a buffer or I/O limit. Rewind and wait . */
369411 read_stream_unget_block (stream , blocknum );
370412 return ;
371413 }
414+
415+ suppress_advice = false;
372416 }
373417
374418 /* This is the start of a new pending read. */
@@ -382,15 +426,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
382426 * io_combine_limit size once more buffers have been consumed. However,
383427 * if we've already reached io_combine_limit, or we've reached the
384428 * distance limit and there isn't anything pinned yet, or the callback has
385- * signaled end-of-stream, we start the read immediately.
429+ * signaled end-of-stream, we start the read immediately. Note that the
430+ * pending read could even exceed the distance goal, if the latter was
431+ * reduced on buffer limit exhaustion.
386432 */
387433 if (stream -> pending_read_nblocks > 0 &&
388434 (stream -> pending_read_nblocks == stream -> io_combine_limit ||
389- (stream -> pending_read_nblocks = = stream -> distance &&
435+ (stream -> pending_read_nblocks > = stream -> distance &&
390436 stream -> pinned_buffers == 0 ) ||
391437 stream -> distance == 0 ) &&
392438 stream -> ios_in_progress < stream -> max_ios )
393439 read_stream_start_pending_read (stream , suppress_advice );
440+
441+ /*
442+ * There should always be something pinned when we leave this function,
443+ * whether started by this call or not, unless we've hit the end of the
444+ * stream. In the worst case we can always make progress one buffer at a
445+ * time.
446+ */
447+ Assert (stream -> pinned_buffers > 0 || stream -> distance == 0 );
394448}
395449
396450/*
@@ -420,6 +474,7 @@ read_stream_begin_impl(int flags,
420474 int max_ios ;
421475 int strategy_pin_limit ;
422476 uint32 max_pinned_buffers ;
477+ uint32 max_possible_buffer_limit ;
423478 Oid tablespace_id ;
424479
425480 /*
@@ -475,12 +530,23 @@ read_stream_begin_impl(int flags,
475530 strategy_pin_limit = GetAccessStrategyPinLimit (strategy );
476531 max_pinned_buffers = Min (strategy_pin_limit , max_pinned_buffers );
477532
478- /* Don't allow this backend to pin more than its share of buffers. */
533+ /*
534+ * Also limit our queue to the maximum number of pins we could possibly
535+ * ever be allowed to acquire according to the buffer manager. We may not
536+ * really be able to use them all due to other pins held by this backend,
537+ * but we'll check that later in read_stream_start_pending_read().
538+ */
479539 if (SmgrIsTemp (smgr ))
480- LimitAdditionalLocalPins ( & max_pinned_buffers );
540+ max_possible_buffer_limit = GetSoftLocalPinLimit ( );
481541 else
482- LimitAdditionalPins (& max_pinned_buffers );
483- Assert (max_pinned_buffers > 0 );
542+ max_possible_buffer_limit = GetSoftPinLimit ();
543+ max_pinned_buffers = Min (max_pinned_buffers , max_possible_buffer_limit );
544+
545+ /*
546+ * The soft limit might be zero on a system configured with more
547+ * connections than buffers. We need at least one to make progress.
548+ */
549+ max_pinned_buffers = Max (1 , max_pinned_buffers );
484550
485551 /*
486552 * We need one extra entry for buffers and per-buffer data, because users
@@ -546,6 +612,7 @@ read_stream_begin_impl(int flags,
546612 stream -> callback = callback ;
547613 stream -> callback_private_data = callback_private_data ;
548614 stream -> buffered_blocknum = InvalidBlockNumber ;
615+ stream -> temporary = SmgrIsTemp (smgr );
549616
550617 /*
551618 * Skip the initial ramp-up phase if the caller says we're going to be
@@ -674,6 +741,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
674741 * arbitrary I/O entry (they're all free). We don't have to
675742 * adjust pinned_buffers because we're transferring one to caller
676743 * but pinning one more.
744+ *
745+ * In the fast path we don't need to check the pin limit. We're
746+ * always allowed at least one pin so that progress can be made,
747+ * and that's all we need here. Although two pins are momentarily
748+ * held at the same time, the model used here is that the stream
749+ * holds only one, and the other now belongs to the caller.
677750 */
678751 if (likely (!StartReadBuffer (& stream -> ios [0 ].op ,
679752 & stream -> buffers [oldest_buffer_index ],
@@ -874,6 +947,9 @@ read_stream_reset(ReadStream *stream)
874947 stream -> buffered_blocknum = InvalidBlockNumber ;
875948 stream -> fast_path = false;
876949
950+ /* There is no point in reading whatever was pending. */
951+ stream -> pending_read_nblocks = 0 ;
952+
877953 /* Unpin anything that wasn't consumed. */
878954 while ((buffer = read_stream_next_buffer (stream , NULL )) != InvalidBuffer )
879955 ReleaseBuffer (buffer );
0 commit comments