diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml
index bd5dcaf86a5c..294494877d9d 100644
--- a/doc/src/sgml/ref/vacuum.sgml
+++ b/doc/src/sgml/ref/vacuum.sgml
@@ -280,25 +280,41 @@ VACUUM [ ( option [, ...] ) ] [ PARALLEL
- Perform index vacuum and index cleanup phases of VACUUM
- in parallel using integer
- background workers (for the details of each vacuum phase, please
- refer to ). The number of workers used
- to perform the operation is equal to the number of indexes on the
- relation that support parallel vacuum which is limited by the number of
- workers specified with PARALLEL option if any which is
- further limited by .
- An index can participate in parallel vacuum if and only if the size of the
- index is more than .
- Please note that it is not guaranteed that the number of parallel workers
- specified in integer will be
- used during execution. It is possible for a vacuum to run with fewer
- workers than specified, or even with no workers at all. Only one worker
- can be used per index. So parallel workers are launched only when there
- are at least 2 indexes in the table. Workers for
- vacuum are launched before the start of each phase and exit at the end of
- the phase. These behaviors might change in a future release. This
- option can't be used with the FULL option.
+ Perform scanning heap, index vacuum, and index cleanup phases of
+ VACUUM in parallel using
+ integer background workers
+ (for the details of each vacuum phase, please refer to
+ ).
+
+
+ For heap tables, the number of workers used to perform the scanning
+ heap is determined based on the size of table. A table can participate in
+ parallel scanning heap if and only if the size of the table is more than
+ . During scanning heap,
+ the heap table's blocks will be divided into ranges and shared among the
+ cooperating processes. Each worker process will complete the scanning of
+ its given range of blocks before requesting an additional range of blocks.
+
+
+ The number of workers used to perform parallel index vacuum and index
+ cleanup is equal to the number of indexes on the relation that support
+ parallel vacuum. An index can participate in parallel vacuum if and only
+ if the size of the index is more than .
+ Only one worker can be used per index. So parallel workers for index vacuum
+ and index cleanup are launched only when there are at least 2
+ indexes in the table.
+
+
+ Workers for vacuum are launched before the start of each phase and exit
+ at the end of the phase. The number of workers for each phase is limited by
+ the number of workers specified with PARALLEL option if
+ any which is futher limited by .
+ Please note that in any parallel vacuum phase, it is not guaanteed that the
+ number of parallel workers specified in integer
+ will be used during execution. It is possible for a vacuum to run with fewer
+ workers than specified, or even with no workers at all. These behaviors might
+ change in a future release. This option can't be used with the FULL
+ option.
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index bcbac844bb66..c2fac6cbe65b 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2668,7 +2668,13 @@ static const TableAmRoutine heapam_methods = {
.scan_bitmap_next_tuple = heapam_scan_bitmap_next_tuple,
.scan_sample_next_block = heapam_scan_sample_next_block,
- .scan_sample_next_tuple = heapam_scan_sample_next_tuple
+ .scan_sample_next_tuple = heapam_scan_sample_next_tuple,
+
+ .parallel_vacuum_compute_workers = heap_parallel_vacuum_compute_workers,
+ .parallel_vacuum_estimate = heap_parallel_vacuum_estimate,
+ .parallel_vacuum_initialize = heap_parallel_vacuum_initialize,
+ .parallel_vacuum_initialize_worker = heap_parallel_vacuum_initialize_worker,
+ .parallel_vacuum_collect_dead_items = heap_parallel_vacuum_collect_dead_items
};
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 981d9380a925..6b7b22816b94 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -99,6 +99,44 @@
* After pruning and freezing, pages that are newly all-visible and all-frozen
* are marked as such in the visibility map.
*
+ * Parallel Vacuum:
+ *
+ * Lazy vacuum on heap tables supports parallel processing for phase I and
+ * phase II. Before starting phase I, we initialize parallel vacuum state,
+ * ParallelVacuumState, and allocate the TID store in a DSA area if we can
+ * use parallel mode for any of these two phases.
+ *
+ * We could require different number of parallel vacuum workers for each phase
+ * for various factors such as table size and number of indexes. Parallel
+ * workers are launched at the beginning of each phase and exit at the end of
+ * each phase.
+ *
+ * While vacuum cutoffs are shared between leader and worker processes, each
+ * individual process uses its own GlobalVisState, potentially causing some
+ * workers to remove fewer tuples than optimal. During parallel lazy heap scans,
+ * each worker tracks the oldest existing XID and MXID. The leader computes the
+ * globally oldest existing XID and MXID after the parallel scan, while
+ * gathering table data too.
+ *
+ * The parallel lazy heap scan (i.e. parallel phase I) is controlled by
+ * ParallelLVScanDesc in conjunction with the read stream. The table is split
+ * into multiple chunks, which are then distributed among parallel workers.
+ * Due to the potential presence of pinned buffers loaded by the read stream's
+ * look-ahead mechanism, we cannot abruptly stop phase I even hen the space
+ * of dead_items TIDs exceeds the limit. Instead, once this threshold is
+ * surpassed, we begin processing pages without attempting to retrieve additional
+ * blocks until the read stream is exhausted. While this approach may increase
+ * the memory usage, it typically doesn't pose a significant problem, as
+ * processing a few 10s-100s buffers doesn't substantially increase the size
+ * of dead_items TIDs. The workers' parallel scan descriptions,
+ * ParallelLVScanWorkerData, are stored in the DSM space, enabling different
+ * parallel workers to resume phase I from their previous state.
+ *
+ * If the leader launches fewer workers than the previous time to resume the
+ * parallel lazy heap scan, some block within chunks may remain un-scanned.
+ * To address this, the leader completes workers' unfinished scans at the end
+ * of the parallel lazy heap scan (see complete_unfinished_lazy_scan_heap()).
+ *
* Dead TID Storage:
*
* The major space usage for vacuuming is storage for the dead tuple IDs that
@@ -146,6 +184,7 @@
#include "common/pg_prng.h"
#include "executor/instrument.h"
#include "miscadmin.h"
+#include "optimizer/paths.h" /* for min_parallel_table_scan_size */
#include "pgstat.h"
#include "portability/instr_time.h"
#include "postmaster/autovacuum.h"
@@ -153,6 +192,7 @@
#include "storage/freespace.h"
#include "storage/lmgr.h"
#include "storage/read_stream.h"
+#include "utils/injection_point.h"
#include "utils/lsyscache.h"
#include "utils/pg_rusage.h"
#include "utils/timestamp.h"
@@ -213,11 +253,22 @@
*/
#define PREFETCH_SIZE ((BlockNumber) 32)
+/*
+ * DSM keys for parallel lazy vacuum. Unlike other parallel execution code, we
+ * we don't need to worry about DSM keys conflicting with plan_node_id, but need to
+ * avoid conflicting with DSM keys used in vacuumparallel.c.
+ */
+#define PARALLEL_LV_KEY_SHARED 0xFFFF0001
+#define PARALLEL_LV_KEY_SCANDESC 0xFFFF0002
+#define PARALLEL_LV_KEY_SCANWORKER 0xFFFF0003
+#define PARALLEL_LV_KEY_SCANDATA 0xFFFF0004
+
/*
* Macro to check if we are in a parallel vacuum. If true, we are in the
* parallel mode and the DSM segment is initialized.
*/
#define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL)
+#define ParallelHeapVacuumIsActive(vacrel) ((vacrel)->plvstate != NULL)
/* Phases of vacuum during which we report error context. */
typedef enum
@@ -248,6 +299,12 @@ typedef enum
*/
#define EAGER_SCAN_REGION_SIZE 4096
+/*
+ * During parallel lazy scans, each worker (including the leader) retrieves
+ * a chunk consisting of PARALLEL_LV_CHUNK_SIZE blocks.
+ */
+#define PARALLEL_LV_CHUNK_SIZE 1024
+
/*
* heap_vac_scan_next_block() sets these flags to communicate information
* about the block it read to the caller.
@@ -255,6 +312,177 @@ typedef enum
#define VAC_BLK_WAS_EAGER_SCANNED (1 << 0)
#define VAC_BLK_ALL_VISIBLE_ACCORDING_TO_VM (1 << 1)
+/*
+ * Data and counters updated during lazy heap scan.
+ */
+typedef struct LVScanData
+{
+ BlockNumber scanned_pages; /* # pages examined (not skipped via VM) */
+
+ /*
+ * Count of all-visible blocks eagerly scanned (for logging only). This
+ * does not include skippable blocks scanned due to SKIP_PAGES_THRESHOLD.
+ */
+ BlockNumber eager_scanned_pages;
+
+ BlockNumber removed_pages; /* # pages removed by relation truncation */
+ BlockNumber new_frozen_tuple_pages; /* # pages with newly frozen tuples */
+
+ /* # pages newly set all-visible in the VM */
+ BlockNumber vm_new_visible_pages;
+
+ /*
+ * # pages newly set all-visible and all-frozen in the VM. This is a
+ * subset of vm_new_visible_pages. That is, vm_new_visible_pages includes
+ * all pages set all-visible, but vm_new_visible_frozen_pages includes
+ * only those which were also set all-frozen.
+ */
+ BlockNumber vm_new_visible_frozen_pages;
+
+ /* # all-visible pages newly set all-frozen in the VM */
+ BlockNumber vm_new_frozen_pages;
+
+ BlockNumber lpdead_item_pages; /* # pages with LP_DEAD items */
+ BlockNumber missed_dead_pages; /* # pages with missed dead tuples */
+ BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
+
+ /* Counters that follow are only for scanned_pages */
+ int64 tuples_deleted; /* # deleted from table */
+ int64 tuples_frozen; /* # newly frozen */
+ int64 lpdead_items; /* # deleted from indexes */
+ int64 live_tuples; /* # live tuples remaining */
+ int64 recently_dead_tuples; /* # dead, but not yet removable */
+ int64 missed_dead_tuples; /* # removable, but not removed */
+
+ /* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid. */
+ TransactionId NewRelfrozenXid;
+ MultiXactId NewRelminMxid;
+ bool skippedallvis;
+} LVScanData;
+
+/*
+ * Struct for information that needs to be shared among parallel workers
+ * for parallel lazy vacuum. All fields are static, set by the leader
+ * process.
+ */
+typedef struct ParallelLVShared
+{
+ bool aggressive;
+ bool skipwithvm;
+
+ /* The current oldest extant XID/MXID shared by the leader process */
+ TransactionId NewRelfrozenXid;
+ MultiXactId NewRelminMxid;
+
+ /* VACUUM operation's cutoffs for freezing and pruning */
+ struct VacuumCutoffs cutoffs;
+
+ /*
+ * The first chunk size varies depending on the first eager scan region
+ * size. If eager scan is disabled, we use the default chunk size
+ * PARALLEL_LV_CHUNK_SIZE for the first chunk.
+ */
+ BlockNumber initial_chunk_size;
+
+ /*
+ * Similar to LVRelState.eager_scan_max_fails_per_region but this is a
+ * per-chunk failure counter.
+ */
+ BlockNumber eager_scan_max_fails_per_chunk;
+
+ /*
+ * Similar to LVRelState.eager_scan_remaining_successes but this is a
+ * success counter per parallel worker.
+ */
+ BlockNumber eager_scan_remaining_successes_per_worker;
+} ParallelLVShared;
+
+/*
+ * Shared scan description for parallel lazy scan.
+ */
+typedef struct ParallelLVScanDesc
+{
+ /* Number of blocks of the table at start of scan */
+ BlockNumber nblocks;
+
+ /* Number of blocks in to allocate in each I/O chunk */
+ BlockNumber chunk_size;
+
+ /* Number of blocks allocated to workers so far */
+ pg_atomic_uint64 nallocated;
+} ParallelLVScanDesc;
+
+/*
+ * Per-worker data for scan description, statistics counters, and
+ * miscellaneous data need to be shared with the leader.
+ */
+typedef struct ParallelLVScanWorkerData
+{
+ bool inited;
+
+ /* Current number of blocks into the scan */
+ BlockNumber nallocated;
+
+ /* Number of blocks per chunk */
+ BlockNumber chunk_size;
+
+ /* Number of blocks left in this chunk */
+ uint32 chunk_remaining;
+
+ /* The last processed block number */
+ pg_atomic_uint32 last_blkno;
+
+ /* Eager scan state for resuming the scan */
+ BlockNumber remaining_fails_save;
+ BlockNumber remaining_successes_save;
+ BlockNumber next_region_start_save;
+} ParallelLVScanWorkerData;
+
+/*
+ * Struct to store parallel lazy vacuum working state.
+ */
+typedef struct ParallelLVState
+{
+ /* Shared static information */
+ ParallelLVShared *shared;
+
+ /* Parallel scan description shared among parallel workers */
+ ParallelLVScanDesc *scandesc;
+
+ /* Per-worker scan data */
+ ParallelLVScanWorkerData *scanwork;
+} ParallelLVState;
+
+/*
+ * Struct for the leader process in parallel lazy vacuum.
+ */
+typedef struct ParallelLVLeader
+{
+ /* Shared memory size for each shared object */
+ Size shared_len;
+ Size scandesc_len;
+ Size scanwork_len;
+ Size scandata_len;
+
+ /* The number of workers launched for parallel lazy heap scan */
+ int nworkers_launched;
+
+ /*
+ * Will the leader participate to parallel lazy heap scan?
+ *
+ * This is a parameter for testing and always true unless it is disabled
+ * explicitly by the injection point.
+ */
+ bool leaderparticipate;
+
+ /*
+ * These fields point to the arrays of all per-worker scan states stored
+ * in DSM.
+ */
+ ParallelLVScanWorkerData *scanwork_array;
+ LVScanData *scandata_array;
+} ParallelLVLeader;
+
typedef struct LVRelState
{
/* Target heap relation and its indexes */
@@ -281,10 +509,6 @@ typedef struct LVRelState
/* VACUUM operation's cutoffs for freezing and pruning */
struct VacuumCutoffs cutoffs;
GlobalVisState *vistest;
- /* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid */
- TransactionId NewRelfrozenXid;
- MultiXactId NewRelminMxid;
- bool skippedallvis;
/* Error reporting state */
char *dbname;
@@ -310,34 +534,9 @@ typedef struct LVRelState
VacDeadItemsInfo *dead_items_info;
BlockNumber rel_pages; /* total number of pages */
- BlockNumber scanned_pages; /* # pages examined (not skipped via VM) */
- /*
- * Count of all-visible blocks eagerly scanned (for logging only). This
- * does not include skippable blocks scanned due to SKIP_PAGES_THRESHOLD.
- */
- BlockNumber eager_scanned_pages;
-
- BlockNumber removed_pages; /* # pages removed by relation truncation */
- BlockNumber new_frozen_tuple_pages; /* # pages with newly frozen tuples */
-
- /* # pages newly set all-visible in the VM */
- BlockNumber vm_new_visible_pages;
-
- /*
- * # pages newly set all-visible and all-frozen in the VM. This is a
- * subset of vm_new_visible_pages. That is, vm_new_visible_pages includes
- * all pages set all-visible, but vm_new_visible_frozen_pages includes
- * only those which were also set all-frozen.
- */
- BlockNumber vm_new_visible_frozen_pages;
-
- /* # all-visible pages newly set all-frozen in the VM */
- BlockNumber vm_new_frozen_pages;
-
- BlockNumber lpdead_item_pages; /* # pages with LP_DEAD items */
- BlockNumber missed_dead_pages; /* # pages with missed dead tuples */
- BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
+ /* Data and counters updated during lazy heap scan */
+ LVScanData *scan_data;
/* Statistics output by us, for table */
double new_rel_tuples; /* new estimated total # of tuples */
@@ -347,13 +546,12 @@ typedef struct LVRelState
/* Instrumentation counters */
int num_index_scans;
- /* Counters that follow are only for scanned_pages */
- int64 tuples_deleted; /* # deleted from table */
- int64 tuples_frozen; /* # newly frozen */
- int64 lpdead_items; /* # deleted from indexes */
- int64 live_tuples; /* # live tuples remaining */
- int64 recently_dead_tuples; /* # dead, but not yet removable */
- int64 missed_dead_tuples; /* # removable, but not removed */
+
+ /* Last processed block number */
+ BlockNumber last_blkno;
+
+ /* Next block to check for FSM vacuum */
+ BlockNumber next_fsm_block_to_vacuum;
/* State maintained by heap_vac_scan_next_block() */
BlockNumber current_block; /* last block returned */
@@ -362,6 +560,16 @@ typedef struct LVRelState
bool next_unskippable_eager_scanned; /* if it was eagerly scanned */
Buffer next_unskippable_vmbuffer; /* buffer containing its VM bit */
+ /* Fields used for parallel lazy vacuum */
+
+ /* Parallel lazy vacuum working state */
+ ParallelLVState *plvstate;
+
+ /*
+ * The leader state for parallel lazy vacuum. NULL for parallel workers.
+ */
+ ParallelLVLeader *leader;
+
/* State related to managing eager scanning of all-visible pages */
/*
@@ -421,12 +629,19 @@ typedef struct LVSavedErrInfo
/* non-export function prototypes */
static void lazy_scan_heap(LVRelState *vacrel);
+static void do_lazy_scan_heap(LVRelState *vacrel, bool check_mem_usage);
static void heap_vacuum_eager_scan_setup(LVRelState *vacrel,
const VacuumParams params);
static BlockNumber heap_vac_scan_next_block(ReadStream *stream,
void *callback_private_data,
void *per_buffer_data);
-static void find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis);
+static void parallel_lazy_scan_init_scan_worker(ParallelLVScanWorkerData *scanwork,
+ BlockNumber initial_chunk_size);
+static BlockNumber parallel_lazy_scan_get_nextpage(LVRelState *vacrel, Relation rel,
+ ParallelLVScanDesc *scandesc,
+ ParallelLVScanWorkerData *scanwork);
+static bool find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis,
+ BlockNumber start_blk, BlockNumber end_blk);
static bool lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf,
BlockNumber blkno, Page page,
bool sharelock, Buffer vmbuffer);
@@ -437,6 +652,12 @@ static int lazy_scan_prune(LVRelState *vacrel, Buffer buf,
static bool lazy_scan_noprune(LVRelState *vacrel, Buffer buf,
BlockNumber blkno, Page page,
bool *has_lpdead_items);
+static void do_parallel_lazy_scan_heap(LVRelState *vacrel);
+static BlockNumber parallel_lazy_scan_compute_min_scan_block(LVRelState *vacrel);
+static void complete_unfinished_lazy_scan_heap(LVRelState *vacrel);
+static void parallel_lazy_scan_heap_begin(LVRelState *vacrel);
+static void parallel_lazy_scan_heap_end(LVRelState *vacrel);
+static void parallel_lazy_scan_gather_results(LVRelState *vacrel);
static void lazy_vacuum(LVRelState *vacrel);
static bool lazy_vacuum_all_indexes(LVRelState *vacrel);
static void lazy_vacuum_heap_rel(LVRelState *vacrel);
@@ -461,6 +682,7 @@ static BlockNumber count_nondeletable_pages(LVRelState *vacrel,
static void dead_items_alloc(LVRelState *vacrel, int nworkers);
static void dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets,
int num_offsets);
+static bool dead_items_check_memory_limit(LVRelState *vacrel);
static void dead_items_reset(LVRelState *vacrel);
static void dead_items_cleanup(LVRelState *vacrel);
static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
@@ -615,6 +837,7 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
BufferAccessStrategy bstrategy)
{
LVRelState *vacrel;
+ LVScanData *scan_data;
bool verbose,
instrument,
skipwithvm,
@@ -729,14 +952,24 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
}
/* Initialize page counters explicitly (be tidy) */
- vacrel->scanned_pages = 0;
- vacrel->eager_scanned_pages = 0;
- vacrel->removed_pages = 0;
- vacrel->new_frozen_tuple_pages = 0;
- vacrel->lpdead_item_pages = 0;
- vacrel->missed_dead_pages = 0;
- vacrel->nonempty_pages = 0;
- /* dead_items_alloc allocates vacrel->dead_items later on */
+ scan_data = palloc(sizeof(LVScanData));
+ scan_data->scanned_pages = 0;
+ scan_data->eager_scanned_pages = 0;
+ scan_data->removed_pages = 0;
+ scan_data->new_frozen_tuple_pages = 0;
+ scan_data->lpdead_item_pages = 0;
+ scan_data->missed_dead_pages = 0;
+ scan_data->nonempty_pages = 0;
+ scan_data->tuples_deleted = 0;
+ scan_data->tuples_frozen = 0;
+ scan_data->lpdead_items = 0;
+ scan_data->live_tuples = 0;
+ scan_data->recently_dead_tuples = 0;
+ scan_data->missed_dead_tuples = 0;
+ scan_data->vm_new_visible_pages = 0;
+ scan_data->vm_new_visible_frozen_pages = 0;
+ scan_data->vm_new_frozen_pages = 0;
+ vacrel->scan_data = scan_data;
/* Allocate/initialize output statistics state */
vacrel->new_rel_tuples = 0;
@@ -746,16 +979,9 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
/* Initialize remaining counters (be tidy) */
vacrel->num_index_scans = 0;
- vacrel->tuples_deleted = 0;
- vacrel->tuples_frozen = 0;
- vacrel->lpdead_items = 0;
- vacrel->live_tuples = 0;
- vacrel->recently_dead_tuples = 0;
- vacrel->missed_dead_tuples = 0;
+ vacrel->next_fsm_block_to_vacuum = 0;
- vacrel->vm_new_visible_pages = 0;
- vacrel->vm_new_visible_frozen_pages = 0;
- vacrel->vm_new_frozen_pages = 0;
+ /* dead_items_alloc allocates vacrel->dead_items later on */
/*
* Get cutoffs that determine which deleted tuples are considered DEAD,
@@ -778,15 +1004,15 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
vacrel->vistest = GlobalVisTestFor(rel);
/* Initialize state used to track oldest extant XID/MXID */
- vacrel->NewRelfrozenXid = vacrel->cutoffs.OldestXmin;
- vacrel->NewRelminMxid = vacrel->cutoffs.OldestMxact;
+ vacrel->scan_data->NewRelfrozenXid = vacrel->cutoffs.OldestXmin;
+ vacrel->scan_data->NewRelminMxid = vacrel->cutoffs.OldestMxact;
/*
* Initialize state related to tracking all-visible page skipping. This is
* very important to determine whether or not it is safe to advance the
* relfrozenxid/relminmxid.
*/
- vacrel->skippedallvis = false;
+ vacrel->scan_data->skippedallvis = false;
skipwithvm = true;
if (params.options & VACOPT_DISABLE_PAGE_SKIPPING)
{
@@ -874,15 +1100,15 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
* value >= FreezeLimit, and relminmxid to a value >= MultiXactCutoff.
* Non-aggressive VACUUMs may advance them by any amount, or not at all.
*/
- Assert(vacrel->NewRelfrozenXid == vacrel->cutoffs.OldestXmin ||
+ Assert(vacrel->scan_data->NewRelfrozenXid == vacrel->cutoffs.OldestXmin ||
TransactionIdPrecedesOrEquals(vacrel->aggressive ? vacrel->cutoffs.FreezeLimit :
vacrel->cutoffs.relfrozenxid,
- vacrel->NewRelfrozenXid));
- Assert(vacrel->NewRelminMxid == vacrel->cutoffs.OldestMxact ||
+ vacrel->scan_data->NewRelfrozenXid));
+ Assert(vacrel->scan_data->NewRelminMxid == vacrel->cutoffs.OldestMxact ||
MultiXactIdPrecedesOrEquals(vacrel->aggressive ? vacrel->cutoffs.MultiXactCutoff :
vacrel->cutoffs.relminmxid,
- vacrel->NewRelminMxid));
- if (vacrel->skippedallvis)
+ vacrel->scan_data->NewRelminMxid));
+ if (vacrel->scan_data->skippedallvis)
{
/*
* Must keep original relfrozenxid in a non-aggressive VACUUM that
@@ -890,8 +1116,8 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
* values will have missed unfrozen XIDs from the pages we skipped.
*/
Assert(!vacrel->aggressive);
- vacrel->NewRelfrozenXid = InvalidTransactionId;
- vacrel->NewRelminMxid = InvalidMultiXactId;
+ vacrel->scan_data->NewRelfrozenXid = InvalidTransactionId;
+ vacrel->scan_data->NewRelminMxid = InvalidMultiXactId;
}
/*
@@ -921,7 +1147,8 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
vac_update_relstats(rel, new_rel_pages, vacrel->new_live_tuples,
new_rel_allvisible, new_rel_allfrozen,
vacrel->nindexes > 0,
- vacrel->NewRelfrozenXid, vacrel->NewRelminMxid,
+ vacrel->scan_data->NewRelfrozenXid,
+ vacrel->scan_data->NewRelminMxid,
&frozenxid_updated, &minmulti_updated, false);
/*
@@ -937,8 +1164,8 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
pgstat_report_vacuum(RelationGetRelid(rel),
rel->rd_rel->relisshared,
Max(vacrel->new_live_tuples, 0),
- vacrel->recently_dead_tuples +
- vacrel->missed_dead_tuples,
+ vacrel->scan_data->recently_dead_tuples +
+ vacrel->scan_data->missed_dead_tuples,
starttime);
pgstat_progress_end_command();
@@ -1012,23 +1239,23 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
vacrel->relname,
vacrel->num_index_scans);
appendStringInfo(&buf, _("pages: %u removed, %u remain, %u scanned (%.2f%% of total), %u eagerly scanned\n"),
- vacrel->removed_pages,
+ vacrel->scan_data->removed_pages,
new_rel_pages,
- vacrel->scanned_pages,
+ vacrel->scan_data->scanned_pages,
orig_rel_pages == 0 ? 100.0 :
- 100.0 * vacrel->scanned_pages /
+ 100.0 * vacrel->scan_data->scanned_pages /
orig_rel_pages,
- vacrel->eager_scanned_pages);
+ vacrel->scan_data->eager_scanned_pages);
appendStringInfo(&buf,
_("tuples: %" PRId64 " removed, %" PRId64 " remain, %" PRId64 " are dead but not yet removable\n"),
- vacrel->tuples_deleted,
+ vacrel->scan_data->tuples_deleted,
(int64) vacrel->new_rel_tuples,
- vacrel->recently_dead_tuples);
- if (vacrel->missed_dead_tuples > 0)
+ vacrel->scan_data->recently_dead_tuples);
+ if (vacrel->scan_data->missed_dead_tuples > 0)
appendStringInfo(&buf,
_("tuples missed: %" PRId64 " dead from %u pages not removed due to cleanup lock contention\n"),
- vacrel->missed_dead_tuples,
- vacrel->missed_dead_pages);
+ vacrel->scan_data->missed_dead_tuples,
+ vacrel->scan_data->missed_dead_pages);
diff = (int32) (ReadNextTransactionId() -
vacrel->cutoffs.OldestXmin);
appendStringInfo(&buf,
@@ -1036,33 +1263,33 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
vacrel->cutoffs.OldestXmin, diff);
if (frozenxid_updated)
{
- diff = (int32) (vacrel->NewRelfrozenXid -
+ diff = (int32) (vacrel->scan_data->NewRelfrozenXid -
vacrel->cutoffs.relfrozenxid);
appendStringInfo(&buf,
_("new relfrozenxid: %u, which is %d XIDs ahead of previous value\n"),
- vacrel->NewRelfrozenXid, diff);
+ vacrel->scan_data->NewRelfrozenXid, diff);
}
if (minmulti_updated)
{
- diff = (int32) (vacrel->NewRelminMxid -
+ diff = (int32) (vacrel->scan_data->NewRelminMxid -
vacrel->cutoffs.relminmxid);
appendStringInfo(&buf,
_("new relminmxid: %u, which is %d MXIDs ahead of previous value\n"),
- vacrel->NewRelminMxid, diff);
+ vacrel->scan_data->NewRelminMxid, diff);
}
appendStringInfo(&buf, _("frozen: %u pages from table (%.2f%% of total) had %" PRId64 " tuples frozen\n"),
- vacrel->new_frozen_tuple_pages,
+ vacrel->scan_data->new_frozen_tuple_pages,
orig_rel_pages == 0 ? 100.0 :
- 100.0 * vacrel->new_frozen_tuple_pages /
+ 100.0 * vacrel->scan_data->new_frozen_tuple_pages /
orig_rel_pages,
- vacrel->tuples_frozen);
+ vacrel->scan_data->tuples_frozen);
appendStringInfo(&buf,
_("visibility map: %u pages set all-visible, %u pages set all-frozen (%u were all-visible)\n"),
- vacrel->vm_new_visible_pages,
- vacrel->vm_new_visible_frozen_pages +
- vacrel->vm_new_frozen_pages,
- vacrel->vm_new_frozen_pages);
+ vacrel->scan_data->vm_new_visible_pages,
+ vacrel->scan_data->vm_new_visible_frozen_pages +
+ vacrel->scan_data->vm_new_frozen_pages,
+ vacrel->scan_data->vm_new_frozen_pages);
if (vacrel->do_index_vacuuming)
{
if (vacrel->nindexes == 0 || vacrel->num_index_scans == 0)
@@ -1082,10 +1309,10 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
msgfmt = _("%u pages from table (%.2f%% of total) have %" PRId64 " dead item identifiers\n");
}
appendStringInfo(&buf, msgfmt,
- vacrel->lpdead_item_pages,
+ vacrel->scan_data->lpdead_item_pages,
orig_rel_pages == 0 ? 100.0 :
- 100.0 * vacrel->lpdead_item_pages / orig_rel_pages,
- vacrel->lpdead_items);
+ 100.0 * vacrel->scan_data->lpdead_item_pages / orig_rel_pages,
+ vacrel->scan_data->lpdead_items);
for (int i = 0; i < vacrel->nindexes; i++)
{
IndexBulkDeleteResult *istat = vacrel->indstats[i];
@@ -1198,13 +1425,7 @@ heap_vacuum_rel(Relation rel, const VacuumParams params,
static void
lazy_scan_heap(LVRelState *vacrel)
{
- ReadStream *stream;
- BlockNumber rel_pages = vacrel->rel_pages,
- blkno = 0,
- next_fsm_block_to_vacuum = 0;
- BlockNumber orig_eager_scan_success_limit =
- vacrel->eager_scan_remaining_successes; /* for logging */
- Buffer vmbuffer = InvalidBuffer;
+ BlockNumber rel_pages = vacrel->rel_pages;
const int initprog_index[] = {
PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_TOTAL_HEAP_BLKS,
@@ -1225,6 +1446,80 @@ lazy_scan_heap(LVRelState *vacrel)
vacrel->next_unskippable_eager_scanned = false;
vacrel->next_unskippable_vmbuffer = InvalidBuffer;
+ /* Do the actual work */
+ if (ParallelHeapVacuumIsActive(vacrel))
+ do_parallel_lazy_scan_heap(vacrel);
+ else
+ do_lazy_scan_heap(vacrel, true);
+
+ /*
+ * Report that everything is now scanned. We never skip scanning the last
+ * block in the relation, so we can pass rel_pages here.
+ */
+ pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED,
+ rel_pages);
+
+ /* now we can compute the new value for pg_class.reltuples */
+ vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages,
+ vacrel->scan_data->scanned_pages,
+ vacrel->scan_data->live_tuples);
+
+ /*
+ * Also compute the total number of surviving heap entries. In the
+ * (unlikely) scenario that new_live_tuples is -1, take it as zero.
+ */
+ vacrel->new_rel_tuples =
+ Max(vacrel->new_live_tuples, 0) + vacrel->scan_data->recently_dead_tuples +
+ vacrel->scan_data->missed_dead_tuples;
+
+ /*
+ * Do index vacuuming (call each index's ambulkdelete routine), then do
+ * related heap vacuuming
+ */
+ if (vacrel->dead_items_info->num_items > 0)
+ lazy_vacuum(vacrel);
+
+ /*
+ * Vacuum the remainder of the Free Space Map. We must do this whether or
+ * not there were indexes, and whether or not we bypassed index vacuuming.
+ * We can pass rel_pages here because we never skip scanning the last
+ * block of the relation.
+ */
+ if (rel_pages > vacrel->next_fsm_block_to_vacuum)
+ FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, rel_pages);
+
+ /* report all blocks vacuumed */
+ pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages);
+
+ /* Do final index cleanup (call each index's amvacuumcleanup routine) */
+ if (vacrel->nindexes > 0 && vacrel->do_index_cleanup)
+ lazy_cleanup_all_indexes(vacrel);
+}
+
+/*
+ * Workhorse for lazy_scan_heap().
+ *
+ * If check_mem_usage is true, we check the memory usage during the heap scan.
+ * If the space of dead_items TIDs exceeds the limit, we stop the lazy heap scan
+ * and invoke a cycle of index vacuuming and heap vacuuming, and then resume the
+ * scan. If it's false, we continue doing lazy heap scan until the read stream
+ * is exhausted.
+ */
+static void
+do_lazy_scan_heap(LVRelState *vacrel, bool check_mem_usage)
+{
+ ReadStream *stream;
+ BlockNumber blkno = InvalidBlockNumber;
+ BlockNumber orig_eager_scan_success_limit =
+ vacrel->eager_scan_remaining_successes; /* for logging */
+ Buffer vmbuffer = InvalidBuffer;
+
+ /*
+ * We should not set check_mem_usage to false unless during parallel heap
+ * vacuum.
+ */
+ Assert(check_mem_usage || ParallelHeapVacuumIsActive(vacrel));
+
/*
* Set up the read stream for vacuum's first pass through the heap.
*
@@ -1260,21 +1555,21 @@ lazy_scan_heap(LVRelState *vacrel)
* that point. This check also provides failsafe coverage for the
* one-pass strategy, and the two-pass strategy with the index_cleanup
* param set to 'off'.
+ *
+ * The failsafe check is done only by the leader process.
*/
- if (vacrel->scanned_pages > 0 &&
- vacrel->scanned_pages % FAILSAFE_EVERY_PAGES == 0)
+ if (!IsParallelWorker() &&
+ vacrel->scan_data->scanned_pages > 0 &&
+ vacrel->scan_data->scanned_pages % FAILSAFE_EVERY_PAGES == 0)
lazy_check_wraparound_failsafe(vacrel);
/*
* Consider if we definitely have enough space to process TIDs on page
* already. If we are close to overrunning the available space for
* dead_items TIDs, pause and do a cycle of vacuuming before we tackle
- * this page. However, let's force at least one page-worth of tuples
- * to be stored as to ensure we do at least some work when the memory
- * configured is so low that we run out before storing anything.
+ * this page.
*/
- if (vacrel->dead_items_info->num_items > 0 &&
- TidStoreMemoryUsage(vacrel->dead_items) > vacrel->dead_items_info->max_bytes)
+ if (check_mem_usage && dead_items_check_memory_limit(vacrel))
{
/*
* Before beginning index vacuuming, we release any pin we may
@@ -1297,15 +1592,16 @@ lazy_scan_heap(LVRelState *vacrel)
* upper-level FSM pages. Note that blkno is the previously
* processed block.
*/
- FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum,
+ FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum,
blkno + 1);
- next_fsm_block_to_vacuum = blkno;
+ vacrel->next_fsm_block_to_vacuum = blkno;
/* Report that we are once again scanning the heap */
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_PHASE_SCAN_HEAP);
}
+ /* Read the next block to process */
buf = read_stream_next_buffer(stream, &per_buffer_data);
/* The relation is exhausted. */
@@ -1315,11 +1611,11 @@ lazy_scan_heap(LVRelState *vacrel)
blk_info = *((uint8 *) per_buffer_data);
CheckBufferIsPinnedOnce(buf);
page = BufferGetPage(buf);
- blkno = BufferGetBlockNumber(buf);
+ blkno = vacrel->last_blkno = BufferGetBlockNumber(buf);
- vacrel->scanned_pages++;
+ vacrel->scan_data->scanned_pages++;
if (blk_info & VAC_BLK_WAS_EAGER_SCANNED)
- vacrel->eager_scanned_pages++;
+ vacrel->scan_data->eager_scanned_pages++;
/* Report as block scanned, update error traceback information */
pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno);
@@ -1480,13 +1776,34 @@ lazy_scan_heap(LVRelState *vacrel)
* visible on upper FSM pages. This is done after vacuuming if the
* table has indexes. There will only be newly-freed space if we
* held the cleanup lock and lazy_scan_prune() was called.
+ *
+ * During parallel lazy heap scanning, only the leader process
+ * vacuums the FSM. However, we cannot vacuum the FSM for blocks
+ * up to 'blk' because there may be un-scanned blocks or blocks
+ * being processed by workers before this point. Instead, parallel
+ * workers advertise the block numbers they have just processed,
+ * and the leader vacuums the FSM up to the smallest block number
+ * among them. This approach ensures we vacuum the FSM for
+ * consecutive processed blocks.
*/
if (got_cleanup_lock && vacrel->nindexes == 0 && ndeleted > 0 &&
- blkno - next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES)
+ blkno - vacrel->next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES)
{
- FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum,
+ if (IsParallelWorker())
+ pg_atomic_write_u32(&(vacrel->plvstate->scanwork->last_blkno),
blkno);
- next_fsm_block_to_vacuum = blkno;
+ else
+ {
+ BlockNumber fsmvac_upto = blkno;
+
+ if (ParallelHeapVacuumIsActive(vacrel))
+ fsmvac_upto = parallel_lazy_scan_compute_min_scan_block(vacrel);
+
+ FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum,
+ fsmvac_upto);
+ }
+
+ vacrel->next_fsm_block_to_vacuum = blkno;
}
}
else
@@ -1497,50 +1814,7 @@ lazy_scan_heap(LVRelState *vacrel)
if (BufferIsValid(vmbuffer))
ReleaseBuffer(vmbuffer);
- /*
- * Report that everything is now scanned. We never skip scanning the last
- * block in the relation, so we can pass rel_pages here.
- */
- pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED,
- rel_pages);
-
- /* now we can compute the new value for pg_class.reltuples */
- vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages,
- vacrel->scanned_pages,
- vacrel->live_tuples);
-
- /*
- * Also compute the total number of surviving heap entries. In the
- * (unlikely) scenario that new_live_tuples is -1, take it as zero.
- */
- vacrel->new_rel_tuples =
- Max(vacrel->new_live_tuples, 0) + vacrel->recently_dead_tuples +
- vacrel->missed_dead_tuples;
-
read_stream_end(stream);
-
- /*
- * Do index vacuuming (call each index's ambulkdelete routine), then do
- * related heap vacuuming
- */
- if (vacrel->dead_items_info->num_items > 0)
- lazy_vacuum(vacrel);
-
- /*
- * Vacuum the remainder of the Free Space Map. We must do this whether or
- * not there were indexes, and whether or not we bypassed index vacuuming.
- * We can pass rel_pages here because we never skip scanning the last
- * block of the relation.
- */
- if (rel_pages > next_fsm_block_to_vacuum)
- FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, rel_pages);
-
- /* report all blocks vacuumed */
- pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages);
-
- /* Do final index cleanup (call each index's amvacuumcleanup routine) */
- if (vacrel->nindexes > 0 && vacrel->do_index_cleanup)
- lazy_cleanup_all_indexes(vacrel);
}
/*
@@ -1554,7 +1828,8 @@ lazy_scan_heap(LVRelState *vacrel)
* heap_vac_scan_next_block() uses the visibility map, vacuum options, and
* various thresholds to skip blocks which do not need to be processed and
* returns the next block to process or InvalidBlockNumber if there are no
- * remaining blocks.
+ * remaining blocks or the space of dead_items TIDs reaches the limit (only
+ * in parallel lazy vacuum cases).
*
* The visibility status of the next block to process and whether or not it
* was eager scanned is set in the per_buffer_data.
@@ -1562,7 +1837,7 @@ lazy_scan_heap(LVRelState *vacrel)
* callback_private_data contains a reference to the LVRelState, passed to the
* read stream API during stream setup. The LVRelState is an in/out parameter
* here (locally named `vacrel`). Vacuum options and information about the
- * relation are read from it. vacrel->skippedallvis is set if we skip a block
+ * relation are read from it. vacrel->scan_data->skippedallvis is set if we skip a block
* that's all-visible but not all-frozen (to ensure that we don't update
* relfrozenxid in that case). vacrel also holds information about the next
* unskippable block -- as bookkeeping for this function.
@@ -1576,8 +1851,42 @@ heap_vac_scan_next_block(ReadStream *stream,
LVRelState *vacrel = callback_private_data;
uint8 blk_info = 0;
- /* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */
- next_block = vacrel->current_block + 1;
+retry:
+ next_block = InvalidBlockNumber;
+
+ /* Get the next block to process */
+ if (ParallelHeapVacuumIsActive(vacrel))
+ {
+ /*
+ * Stop returning the next block to the read stream if we are close to
+ * overrunning the available space for dead_items TIDs so that the
+ * read stream returns pinned buffers in its buffers queue until the
+ * stream is exhausted. See the comments atop this file for details.
+ */
+ if (dead_items_check_memory_limit(vacrel))
+ {
+ if (BufferIsValid(vacrel->next_unskippable_vmbuffer))
+ {
+ ReleaseBuffer(vacrel->next_unskippable_vmbuffer);
+ vacrel->next_unskippable_vmbuffer = InvalidBuffer;
+ }
+
+ return InvalidBlockNumber;
+
+ }
+
+ next_block = parallel_lazy_scan_get_nextpage(vacrel,
+ vacrel->rel,
+ vacrel->plvstate->scandesc,
+ vacrel->plvstate->scanwork);
+ }
+ else
+ {
+ /* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */
+ next_block = vacrel->current_block + 1;
+ }
+
+ Assert(BlockNumberIsValid(next_block));
/* Have we reached the end of the relation? */
if (next_block >= vacrel->rel_pages)
@@ -1602,8 +1911,41 @@ heap_vac_scan_next_block(ReadStream *stream,
* visibility map.
*/
bool skipsallvis;
+ bool found;
+ BlockNumber end_block;
+ BlockNumber nblocks_skip;
+
+ if (ParallelHeapVacuumIsActive(vacrel))
+ {
+ /* We look for the next unskippable block within the chunk */
+ end_block = next_block + vacrel->plvstate->scanwork->chunk_remaining + 1;
+ }
+ else
+ end_block = vacrel->rel_pages;
+
+ found = find_next_unskippable_block(vacrel, &skipsallvis, next_block, end_block);
+
+ /*
+ * We must have found the next unskippable block within the specified
+ * range in non-parallel cases as the end_block is always the last
+ * block + 1 and we must scan the last block.
+ */
+ Assert(found || ParallelHeapVacuumIsActive(vacrel));
+
+ if (!found)
+ {
+ if (skipsallvis)
+ vacrel->scan_data->skippedallvis = true;
+
+ /*
+ * Skip all remaining blocks in the current chunk, and retry with
+ * the next chunk.
+ */
+ vacrel->plvstate->scanwork->chunk_remaining = 0;
+ goto retry;
+ }
- find_next_unskippable_block(vacrel, &skipsallvis);
+ Assert(vacrel->next_unskippable_block < end_block);
/*
* We now know the next block that we must process. It can be the
@@ -1620,11 +1962,21 @@ heap_vac_scan_next_block(ReadStream *stream,
* pages then skipping makes updating relfrozenxid unsafe, which is a
* real downside.
*/
- if (vacrel->next_unskippable_block - next_block >= SKIP_PAGES_THRESHOLD)
+ nblocks_skip = vacrel->next_unskippable_block - next_block;
+ if (nblocks_skip >= SKIP_PAGES_THRESHOLD)
{
- next_block = vacrel->next_unskippable_block;
if (skipsallvis)
- vacrel->skippedallvis = true;
+ vacrel->scan_data->skippedallvis = true;
+
+ /* Tell the parallel scans to skip blocks */
+ if (ParallelHeapVacuumIsActive(vacrel))
+ {
+ vacrel->plvstate->scanwork->chunk_remaining -= nblocks_skip;
+ vacrel->plvstate->scanwork->nallocated += nblocks_skip;
+ Assert(vacrel->plvstate->scanwork->chunk_remaining > 0);
+ }
+
+ next_block = vacrel->next_unskippable_block;
}
}
@@ -1644,25 +1996,101 @@ heap_vac_scan_next_block(ReadStream *stream,
else
{
/*
- * 3. We reached the next unskippable block. Process it. On next
- * iteration, we will be back in state 1.
+ * 3. We reached the next unskippable block. Process it. On next
+ * iteration, we will be back in state 1.
+ */
+ Assert(next_block == vacrel->next_unskippable_block);
+
+ vacrel->current_block = next_block;
+ if (vacrel->next_unskippable_allvis)
+ blk_info |= VAC_BLK_ALL_VISIBLE_ACCORDING_TO_VM;
+ if (vacrel->next_unskippable_eager_scanned)
+ blk_info |= VAC_BLK_WAS_EAGER_SCANNED;
+ *((uint8 *) per_buffer_data) = blk_info;
+ return vacrel->current_block;
+ }
+}
+
+
+/*
+ * Initialize scan state of the given ParallelLVScanWorkerData.
+ */
+static void
+parallel_lazy_scan_init_scan_worker(ParallelLVScanWorkerData *scanwork,
+ BlockNumber initial_chunk_size)
+{
+ Assert(BlockNumberIsValid(initial_chunk_size));
+
+ scanwork->inited = true;
+ scanwork->nallocated = 0;
+ scanwork->chunk_size = initial_chunk_size;
+ scanwork->chunk_remaining = 0;
+ pg_atomic_init_u32(&(scanwork->last_blkno), InvalidBlockNumber);
+}
+
+/*
+ * Return the next page to process for parallel lazy scan.
+ *
+ * If there is no block to scan for the worker, return the number of blocks in
+ * the relation.
+ */
+static BlockNumber
+parallel_lazy_scan_get_nextpage(LVRelState *vacrel, Relation rel,
+ ParallelLVScanDesc *scandesc,
+ ParallelLVScanWorkerData *scanwork)
+{
+ uint64 nallocated;
+
+ if (scanwork->chunk_remaining > 0)
+ {
+ /*
+ * Give them the next block in the range and update the remaining
+ * number of blocks.
+ */
+ nallocated = ++scanwork->nallocated;
+ scanwork->chunk_remaining--;
+ }
+ else
+ {
+ /* Get the new chunk */
+ nallocated = scanwork->nallocated =
+ pg_atomic_fetch_add_u64(&scandesc->nallocated, scanwork->chunk_size);
+
+ /*
+ * Set the remaining number of blocks in this chunk so that subsequent
+ * calls from this worker continue on with this chunk until it's done.
*/
- Assert(next_block == vacrel->next_unskippable_block);
+ scanwork->chunk_remaining = scanwork->chunk_size - 1;
+
+ /* We use the fixed size chunk for subsequent scans */
+ scanwork->chunk_size = PARALLEL_LV_CHUNK_SIZE;
+
+ /*
+ * Getting the new chunk also means to start the new eager scan
+ * region.
+ *
+ * Update next_eager_scan_region_start to the first block in the chunk
+ * so that we can reset the remaining_fails counter when checking the
+ * visibility of the first block in this chunk in
+ * find_next_unskippable_block().
+ */
+ vacrel->next_eager_scan_region_start = nallocated;
- vacrel->current_block = next_block;
- if (vacrel->next_unskippable_allvis)
- blk_info |= VAC_BLK_ALL_VISIBLE_ACCORDING_TO_VM;
- if (vacrel->next_unskippable_eager_scanned)
- blk_info |= VAC_BLK_WAS_EAGER_SCANNED;
- *((uint8 *) per_buffer_data) = blk_info;
- return vacrel->current_block;
}
+
+ /* Clear the chunk_remaining if there is no more blocks to process */
+ if (nallocated >= scandesc->nblocks)
+ scanwork->chunk_remaining = 0;
+
+ return Min(nallocated, scandesc->nblocks);
}
/*
- * Find the next unskippable block in a vacuum scan using the visibility map.
- * The next unskippable block and its visibility information is updated in
- * vacrel.
+ * Find the next unskippable block in a vacuum scan using the visibility map,
+ * in a range of 'start' (inclusive) and 'end' (exclusive).
+ *
+ * If found, the next unskippable block and its visibility information is updated
+ * in vacrel. Otherwise, return false and reset the information in vacrel.
*
* Note: our opinion of which blocks can be skipped can go stale immediately.
* It's okay if caller "misses" a page whose all-visible or all-frozen marking
@@ -1672,22 +2100,32 @@ heap_vac_scan_next_block(ReadStream *stream,
* older XIDs/MXIDs. The *skippedallvis flag will be set here when the choice
* to skip such a range is actually made, making everything safe.)
*/
-static void
-find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis)
+static bool
+find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis,
+ BlockNumber start, BlockNumber end)
{
BlockNumber rel_pages = vacrel->rel_pages;
- BlockNumber next_unskippable_block = vacrel->next_unskippable_block + 1;
+ BlockNumber next_unskippable_block = start;
Buffer next_unskippable_vmbuffer = vacrel->next_unskippable_vmbuffer;
bool next_unskippable_eager_scanned = false;
bool next_unskippable_allvis;
+ bool found = true;
*skipsallvis = false;
for (;; next_unskippable_block++)
{
- uint8 mapbits = visibilitymap_get_status(vacrel->rel,
- next_unskippable_block,
- &next_unskippable_vmbuffer);
+ uint8 mapbits;
+
+ /* Reach the end of range? */
+ if (next_unskippable_block >= end)
+ {
+ found = false;
+ break;
+ }
+
+ mapbits = visibilitymap_get_status(vacrel->rel, next_unskippable_block,
+ &next_unskippable_vmbuffer);
next_unskippable_allvis = (mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0;
@@ -1763,11 +2201,286 @@ find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis)
*skipsallvis = true;
}
- /* write the local variables back to vacrel */
- vacrel->next_unskippable_block = next_unskippable_block;
- vacrel->next_unskippable_allvis = next_unskippable_allvis;
- vacrel->next_unskippable_eager_scanned = next_unskippable_eager_scanned;
- vacrel->next_unskippable_vmbuffer = next_unskippable_vmbuffer;
+ if (found)
+ {
+ /* write the local variables back to vacrel */
+ vacrel->next_unskippable_block = next_unskippable_block;
+ vacrel->next_unskippable_allvis = next_unskippable_allvis;
+ vacrel->next_unskippable_eager_scanned = next_unskippable_eager_scanned;
+ vacrel->next_unskippable_vmbuffer = next_unskippable_vmbuffer;
+ }
+ else
+ {
+ if (BufferIsValid(next_unskippable_vmbuffer))
+ ReleaseBuffer(next_unskippable_vmbuffer);
+
+ /*
+ * There is not unskippable block in the specified range. Reset the
+ * related fields in vacrel.
+ */
+ vacrel->next_unskippable_block = InvalidBlockNumber;
+ vacrel->next_unskippable_allvis = InvalidBlockNumber;
+ vacrel->next_unskippable_eager_scanned = false;
+ vacrel->next_unskippable_vmbuffer = InvalidBuffer;
+ }
+
+ return found;
+}
+
+/*
+ * A parallel variant of do_lazy_scan_heap(). The leader process launches
+ * parallel workers to scan the heap in parallel.
+*/
+static void
+do_parallel_lazy_scan_heap(LVRelState *vacrel)
+{
+ ParallelLVScanWorkerData scanwork;
+
+ Assert(ParallelHeapVacuumIsActive(vacrel));
+ Assert(!IsParallelWorker());
+
+ /* Setup the parallel scan description for the leader to join as a worker */
+ parallel_lazy_scan_init_scan_worker(&scanwork,
+ vacrel->plvstate->shared->initial_chunk_size);
+ vacrel->plvstate->scanwork = &scanwork;
+
+ /* Adjust the eager scan's success counter as a worker */
+ vacrel->eager_scan_remaining_successes =
+ vacrel->plvstate->shared->eager_scan_remaining_successes_per_worker;
+
+ for (;;)
+ {
+ BlockNumber fsmvac_upto;
+
+ /* Launch parallel workers */
+ parallel_lazy_scan_heap_begin(vacrel);
+
+ /*
+ * Do lazy heap scan until the read stream is exhausted. We will stop
+ * retrieving new blocks for the read stream once the space of
+ * dead_items TIDs exceeds the limit.
+ */
+ if (vacrel->leader->leaderparticipate)
+ do_lazy_scan_heap(vacrel, false);
+
+ /* Wait for parallel workers to finish and gather scan results */
+ parallel_lazy_scan_heap_end(vacrel);
+
+ if (!dead_items_check_memory_limit(vacrel))
+ break;
+
+ /* Perform a round of index and heap vacuuming */
+ vacrel->consider_bypass_optimization = false;
+ lazy_vacuum(vacrel);
+
+ /* Compute the smallest processed block number */
+ fsmvac_upto = parallel_lazy_scan_compute_min_scan_block(vacrel);
+
+ /*
+ * Vacuum the Free Space Map to make newly-freed space visible on
+ * upper-level FSM pages.
+ */
+ if (fsmvac_upto > vacrel->next_fsm_block_to_vacuum)
+ {
+ FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum,
+ fsmvac_upto);
+ vacrel->next_fsm_block_to_vacuum = fsmvac_upto;
+ }
+
+ /* Report that we are once again scanning the heap */
+ pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
+ PROGRESS_VACUUM_PHASE_SCAN_HEAP);
+ }
+
+ /*
+ * The parallel heap scan finished, but it's possible that some workers
+ * have allocated blocks but not processed them yet. This can happen for
+ * example when workers exit because they are full of dead_items TIDs and
+ * the leader process launched fewer workers in the next cycle.
+ */
+ complete_unfinished_lazy_scan_heap(vacrel);
+}
+
+/*
+ * Return the smallest block number that the leader and workers have scanned.
+ */
+static BlockNumber
+parallel_lazy_scan_compute_min_scan_block(LVRelState *vacrel)
+{
+ BlockNumber min_blk;
+
+ Assert(ParallelHeapVacuumIsActive(vacrel));
+
+ /* Initialized with the leader's value */
+ min_blk = vacrel->last_blkno;
+
+ for (int i = 0; i < vacrel->leader->nworkers_launched; i++)
+ {
+ ParallelLVScanWorkerData *scanwork = &(vacrel->leader->scanwork_array[i]);
+ BlockNumber blkno;
+
+ /* Skip if no worker has been initialized the scan state */
+ if (!scanwork->inited)
+ continue;
+
+ blkno = pg_atomic_read_u32(&(scanwork->last_blkno));
+
+ if (!BlockNumberIsValid(min_blk) || min_blk > blkno)
+ min_blk = blkno;
+ }
+
+ Assert(BlockNumberIsValid(min_blk));
+
+ return min_blk;
+}
+
+/*
+ * Complete parallel heaps scans that have remaining blocks in their
+ * chunks.
+ */
+static void
+complete_unfinished_lazy_scan_heap(LVRelState *vacrel)
+{
+ int nworkers;
+
+ Assert(!IsParallelWorker());
+
+ nworkers = parallel_vacuum_get_nworkers_table(vacrel->pvs);
+
+ for (int i = 0; i < nworkers; i++)
+ {
+ ParallelLVScanWorkerData *scanwork = &(vacrel->leader->scanwork_array[i]);
+
+ if (!scanwork->inited)
+ continue;
+
+ if (scanwork->chunk_remaining == 0)
+ continue;
+
+ /* Attach the worker's scan state */
+ vacrel->plvstate->scanwork = scanwork;
+
+ vacrel->next_fsm_block_to_vacuum = pg_atomic_read_u32(&(scanwork->last_blkno));
+ vacrel->next_eager_scan_region_start = scanwork->next_region_start_save;
+ vacrel->eager_scan_remaining_fails = scanwork->remaining_fails_save;
+
+ /*
+ * Complete the unfinished scan. Note that we might perform multiple
+ * cycles of index and heap vacuuming while completing the scan.
+ */
+ do_lazy_scan_heap(vacrel, true);
+ }
+
+ /*
+ * We don't need to gather the scan results here because the leader's scan
+ * state got updated directly.
+ */
+}
+
+/*
+ * Helper routine to launch parallel workers for parallel lazy heap scan.
+ */
+static void
+parallel_lazy_scan_heap_begin(LVRelState *vacrel)
+{
+ Assert(ParallelHeapVacuumIsActive(vacrel));
+ Assert(!IsParallelWorker());
+
+ /* launcher workers */
+ vacrel->leader->nworkers_launched = parallel_vacuum_collect_dead_items_begin(vacrel->pvs);
+
+ ereport(vacrel->verbose ? INFO : DEBUG2,
+ (errmsg(ngettext("launched %d parallel vacuum worker for collecting dead tuples (planned: %d)",
+ "launched %d parallel vacuum workers for collecting dead tuples (planned: %d)",
+ vacrel->leader->nworkers_launched),
+ vacrel->leader->nworkers_launched,
+ parallel_vacuum_get_nworkers_table(vacrel->pvs))));
+}
+
+/*
+ * Helper routine to finish the parallel lazy heap scan.
+ */
+static void
+parallel_lazy_scan_heap_end(LVRelState *vacrel)
+{
+ /* Wait for all parallel workers to finish */
+ parallel_vacuum_collect_dead_items_end(vacrel->pvs);
+
+ /* Gather the workers' scan results */
+ parallel_lazy_scan_gather_results(vacrel);
+}
+
+/*
+ * Accumulate each worker's scan results into the leader's.
+*/
+static void
+parallel_lazy_scan_gather_results(LVRelState *vacrel)
+{
+ Assert(ParallelHeapVacuumIsActive(vacrel));
+ Assert(!IsParallelWorker());
+
+ /* Gather the workers' scan results */
+ for (int i = 0; i < vacrel->leader->nworkers_launched; i++)
+ {
+ LVScanData *data = &(vacrel->leader->scandata_array[i]);
+ ParallelLVScanWorkerData *scanwork = &(vacrel->leader->scanwork_array[i]);
+
+ /* Accumulate the counters collected by workers */
+#define ACCUM_COUNT(item) vacrel->scan_data->item += data->item
+ ACCUM_COUNT(scanned_pages);
+ ACCUM_COUNT(removed_pages);
+ ACCUM_COUNT(new_frozen_tuple_pages);
+ ACCUM_COUNT(vm_new_visible_pages);
+ ACCUM_COUNT(vm_new_visible_frozen_pages);
+ ACCUM_COUNT(vm_new_frozen_pages);
+ ACCUM_COUNT(lpdead_item_pages);
+ ACCUM_COUNT(missed_dead_pages);
+ ACCUM_COUNT(tuples_deleted);
+ ACCUM_COUNT(tuples_frozen);
+ ACCUM_COUNT(lpdead_items);
+ ACCUM_COUNT(live_tuples);
+ ACCUM_COUNT(recently_dead_tuples);
+ ACCUM_COUNT(missed_dead_tuples);
+#undef ACCUM_COUNT
+
+ /*
+ * Track the greatest non-empty page among values the workers
+ * collected as it's used to cut-off point of heap truncation.
+ */
+ if (vacrel->scan_data->nonempty_pages < data->nonempty_pages)
+ vacrel->scan_data->nonempty_pages = data->nonempty_pages;
+
+ /*
+ * All workers must have initialized both values with the values
+ * passed by the leader.
+ */
+ Assert(TransactionIdIsValid(data->NewRelfrozenXid));
+ Assert(MultiXactIdIsValid(data->NewRelminMxid));
+
+ /*
+ * During parallel lazy scanning, since different workers process
+ * separate blocks, they may observe different existing XIDs and
+ * MXIDs. Therefore, we compute the oldest XID and MXID from the
+ * values observed by each worker (including the leader). These
+ * computations are crucial for correctly advancing both relfrozenxid
+ * and relmminmxid values.
+ */
+
+ if (TransactionIdPrecedes(data->NewRelfrozenXid, vacrel->scan_data->NewRelfrozenXid))
+ vacrel->scan_data->NewRelfrozenXid = data->NewRelfrozenXid;
+
+ if (MultiXactIdPrecedesOrEquals(data->NewRelminMxid, vacrel->scan_data->NewRelminMxid))
+ vacrel->scan_data->NewRelminMxid = data->NewRelminMxid;
+
+ /* Has any one of workers skipped all-visible page? */
+ vacrel->scan_data->skippedallvis |= data->skippedallvis;
+
+ /*
+ * Gather the remaining success count so that we can distribute the
+ * success counter again in the next parallel lazy scan.
+ */
+ vacrel->eager_scan_remaining_successes += scanwork->remaining_successes_save;
+ }
}
/*
@@ -1899,8 +2612,8 @@ lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf, BlockNumber blkno,
END_CRIT_SECTION();
/* Count the newly all-frozen pages for logging */
- vacrel->vm_new_visible_pages++;
- vacrel->vm_new_visible_frozen_pages++;
+ vacrel->scan_data->vm_new_visible_pages++;
+ vacrel->scan_data->vm_new_visible_frozen_pages++;
}
freespace = PageGetHeapFreeSpace(page);
@@ -1977,10 +2690,10 @@ lazy_scan_prune(LVRelState *vacrel,
heap_page_prune_and_freeze(rel, buf, vacrel->vistest, prune_options,
&vacrel->cutoffs, &presult, PRUNE_VACUUM_SCAN,
&vacrel->offnum,
- &vacrel->NewRelfrozenXid, &vacrel->NewRelminMxid);
+ &vacrel->scan_data->NewRelfrozenXid, &vacrel->scan_data->NewRelminMxid);
- Assert(MultiXactIdIsValid(vacrel->NewRelminMxid));
- Assert(TransactionIdIsValid(vacrel->NewRelfrozenXid));
+ Assert(MultiXactIdIsValid(vacrel->scan_data->NewRelminMxid));
+ Assert(TransactionIdIsValid(vacrel->scan_data->NewRelfrozenXid));
if (presult.nfrozen > 0)
{
@@ -1990,7 +2703,7 @@ lazy_scan_prune(LVRelState *vacrel,
* frozen tuples (don't confuse that with pages newly set all-frozen
* in VM).
*/
- vacrel->new_frozen_tuple_pages++;
+ vacrel->scan_data->new_frozen_tuple_pages++;
}
/*
@@ -2025,7 +2738,7 @@ lazy_scan_prune(LVRelState *vacrel,
*/
if (presult.lpdead_items > 0)
{
- vacrel->lpdead_item_pages++;
+ vacrel->scan_data->lpdead_item_pages++;
/*
* deadoffsets are collected incrementally in
@@ -2040,15 +2753,16 @@ lazy_scan_prune(LVRelState *vacrel,
}
/* Finally, add page-local counts to whole-VACUUM counts */
- vacrel->tuples_deleted += presult.ndeleted;
- vacrel->tuples_frozen += presult.nfrozen;
- vacrel->lpdead_items += presult.lpdead_items;
- vacrel->live_tuples += presult.live_tuples;
- vacrel->recently_dead_tuples += presult.recently_dead_tuples;
+ vacrel->scan_data->tuples_deleted += presult.ndeleted;
+ vacrel->scan_data->tuples_frozen += presult.nfrozen;
+ vacrel->scan_data->lpdead_items += presult.lpdead_items;
+ vacrel->scan_data->live_tuples += presult.live_tuples;
+ vacrel->scan_data->recently_dead_tuples += presult.recently_dead_tuples;
/* Can't truncate this page */
if (presult.hastup)
- vacrel->nonempty_pages = blkno + 1;
+ vacrel->scan_data->nonempty_pages =
+ Max(blkno + 1, vacrel->scan_data->nonempty_pages);
/* Did we find LP_DEAD items? */
*has_lpdead_items = (presult.lpdead_items > 0);
@@ -2097,17 +2811,17 @@ lazy_scan_prune(LVRelState *vacrel,
*/
if ((old_vmbits & VISIBILITYMAP_ALL_VISIBLE) == 0)
{
- vacrel->vm_new_visible_pages++;
+ vacrel->scan_data->vm_new_visible_pages++;
if (presult.all_frozen)
{
- vacrel->vm_new_visible_frozen_pages++;
+ vacrel->scan_data->vm_new_visible_frozen_pages++;
*vm_page_frozen = true;
}
}
else if ((old_vmbits & VISIBILITYMAP_ALL_FROZEN) == 0 &&
presult.all_frozen)
{
- vacrel->vm_new_frozen_pages++;
+ vacrel->scan_data->vm_new_frozen_pages++;
*vm_page_frozen = true;
}
}
@@ -2201,8 +2915,8 @@ lazy_scan_prune(LVRelState *vacrel,
*/
if ((old_vmbits & VISIBILITYMAP_ALL_VISIBLE) == 0)
{
- vacrel->vm_new_visible_pages++;
- vacrel->vm_new_visible_frozen_pages++;
+ vacrel->scan_data->vm_new_visible_pages++;
+ vacrel->scan_data->vm_new_visible_frozen_pages++;
*vm_page_frozen = true;
}
@@ -2212,7 +2926,7 @@ lazy_scan_prune(LVRelState *vacrel,
*/
else
{
- vacrel->vm_new_frozen_pages++;
+ vacrel->scan_data->vm_new_frozen_pages++;
*vm_page_frozen = true;
}
}
@@ -2255,8 +2969,8 @@ lazy_scan_noprune(LVRelState *vacrel,
missed_dead_tuples;
bool hastup;
HeapTupleHeader tupleheader;
- TransactionId NoFreezePageRelfrozenXid = vacrel->NewRelfrozenXid;
- MultiXactId NoFreezePageRelminMxid = vacrel->NewRelminMxid;
+ TransactionId NoFreezePageRelfrozenXid = vacrel->scan_data->NewRelfrozenXid;
+ MultiXactId NoFreezePageRelminMxid = vacrel->scan_data->NewRelminMxid;
OffsetNumber deadoffsets[MaxHeapTuplesPerPage];
Assert(BufferGetBlockNumber(buf) == blkno);
@@ -2383,8 +3097,8 @@ lazy_scan_noprune(LVRelState *vacrel,
* this particular page until the next VACUUM. Remember its details now.
* (lazy_scan_prune expects a clean slate, so we have to do this last.)
*/
- vacrel->NewRelfrozenXid = NoFreezePageRelfrozenXid;
- vacrel->NewRelminMxid = NoFreezePageRelminMxid;
+ vacrel->scan_data->NewRelfrozenXid = NoFreezePageRelfrozenXid;
+ vacrel->scan_data->NewRelminMxid = NoFreezePageRelminMxid;
/* Save any LP_DEAD items found on the page in dead_items */
if (vacrel->nindexes == 0)
@@ -2411,25 +3125,26 @@ lazy_scan_noprune(LVRelState *vacrel,
* indexes will be deleted during index vacuuming (and then marked
* LP_UNUSED in the heap)
*/
- vacrel->lpdead_item_pages++;
+ vacrel->scan_data->lpdead_item_pages++;
dead_items_add(vacrel, blkno, deadoffsets, lpdead_items);
- vacrel->lpdead_items += lpdead_items;
+ vacrel->scan_data->lpdead_items += lpdead_items;
}
/*
* Finally, add relevant page-local counts to whole-VACUUM counts
*/
- vacrel->live_tuples += live_tuples;
- vacrel->recently_dead_tuples += recently_dead_tuples;
- vacrel->missed_dead_tuples += missed_dead_tuples;
+ vacrel->scan_data->live_tuples += live_tuples;
+ vacrel->scan_data->recently_dead_tuples += recently_dead_tuples;
+ vacrel->scan_data->missed_dead_tuples += missed_dead_tuples;
if (missed_dead_tuples > 0)
- vacrel->missed_dead_pages++;
+ vacrel->scan_data->missed_dead_pages++;
/* Can't truncate this page */
if (hastup)
- vacrel->nonempty_pages = blkno + 1;
+ vacrel->scan_data->nonempty_pages =
+ Max(blkno + 1, vacrel->scan_data->nonempty_pages);
/* Did we find LP_DEAD items? */
*has_lpdead_items = (lpdead_items > 0);
@@ -2458,7 +3173,7 @@ lazy_vacuum(LVRelState *vacrel)
/* Should not end up here with no indexes */
Assert(vacrel->nindexes > 0);
- Assert(vacrel->lpdead_item_pages > 0);
+ Assert(vacrel->scan_data->lpdead_item_pages > 0);
if (!vacrel->do_index_vacuuming)
{
@@ -2492,7 +3207,7 @@ lazy_vacuum(LVRelState *vacrel)
BlockNumber threshold;
Assert(vacrel->num_index_scans == 0);
- Assert(vacrel->lpdead_items == vacrel->dead_items_info->num_items);
+ Assert(vacrel->scan_data->lpdead_items == vacrel->dead_items_info->num_items);
Assert(vacrel->do_index_vacuuming);
Assert(vacrel->do_index_cleanup);
@@ -2519,7 +3234,7 @@ lazy_vacuum(LVRelState *vacrel)
* cases then this may need to be reconsidered.
*/
threshold = (double) vacrel->rel_pages * BYPASS_THRESHOLD_PAGES;
- bypass = (vacrel->lpdead_item_pages < threshold &&
+ bypass = (vacrel->scan_data->lpdead_item_pages < threshold &&
TidStoreMemoryUsage(vacrel->dead_items) < 32 * 1024 * 1024);
}
@@ -2657,7 +3372,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
* place).
*/
Assert(vacrel->num_index_scans > 0 ||
- vacrel->dead_items_info->num_items == vacrel->lpdead_items);
+ vacrel->dead_items_info->num_items == vacrel->scan_data->lpdead_items);
Assert(allindexes || VacuumFailsafeActive);
/*
@@ -2819,8 +3534,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
* the second heap pass. No more, no less.
*/
Assert(vacrel->num_index_scans > 1 ||
- (vacrel->dead_items_info->num_items == vacrel->lpdead_items &&
- vacuumed_pages == vacrel->lpdead_item_pages));
+ (vacrel->dead_items_info->num_items == vacrel->scan_data->lpdead_items &&
+ vacuumed_pages == vacrel->scan_data->lpdead_item_pages));
ereport(DEBUG2,
(errmsg("table \"%s\": removed %" PRId64 " dead item identifiers in %u pages",
@@ -2930,9 +3645,9 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer,
flags);
/* Count the newly set VM page for logging */
- vacrel->vm_new_visible_pages++;
+ vacrel->scan_data->vm_new_visible_pages++;
if (all_frozen)
- vacrel->vm_new_visible_frozen_pages++;
+ vacrel->scan_data->vm_new_visible_frozen_pages++;
}
/* Revert to the previous phase information for error traceback */
@@ -3008,7 +3723,7 @@ static void
lazy_cleanup_all_indexes(LVRelState *vacrel)
{
double reltuples = vacrel->new_rel_tuples;
- bool estimated_count = vacrel->scanned_pages < vacrel->rel_pages;
+ bool estimated_count = vacrel->scan_data->scanned_pages < vacrel->rel_pages;
const int progress_start_index[] = {
PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_INDEXES_TOTAL
@@ -3189,7 +3904,7 @@ should_attempt_truncation(LVRelState *vacrel)
if (!vacrel->do_rel_truncate || VacuumFailsafeActive)
return false;
- possibly_freeable = vacrel->rel_pages - vacrel->nonempty_pages;
+ possibly_freeable = vacrel->rel_pages - vacrel->scan_data->nonempty_pages;
if (possibly_freeable > 0 &&
(possibly_freeable >= REL_TRUNCATE_MINIMUM ||
possibly_freeable >= vacrel->rel_pages / REL_TRUNCATE_FRACTION))
@@ -3215,7 +3930,7 @@ lazy_truncate_heap(LVRelState *vacrel)
/* Update error traceback information one last time */
update_vacuum_error_info(vacrel, NULL, VACUUM_ERRCB_PHASE_TRUNCATE,
- vacrel->nonempty_pages, InvalidOffsetNumber);
+ vacrel->scan_data->nonempty_pages, InvalidOffsetNumber);
/*
* Loop until no more truncating can be done.
@@ -3316,7 +4031,7 @@ lazy_truncate_heap(LVRelState *vacrel)
* without also touching reltuples, since the tuple count wasn't
* changed by the truncation.
*/
- vacrel->removed_pages += orig_rel_pages - new_rel_pages;
+ vacrel->scan_data->removed_pages += orig_rel_pages - new_rel_pages;
vacrel->rel_pages = new_rel_pages;
ereport(vacrel->verbose ? INFO : DEBUG2,
@@ -3324,7 +4039,7 @@ lazy_truncate_heap(LVRelState *vacrel)
vacrel->relname,
orig_rel_pages, new_rel_pages)));
orig_rel_pages = new_rel_pages;
- } while (new_rel_pages > vacrel->nonempty_pages && lock_waiter_detected);
+ } while (new_rel_pages > vacrel->scan_data->nonempty_pages && lock_waiter_detected);
}
/*
@@ -3352,7 +4067,7 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected)
StaticAssertStmt((PREFETCH_SIZE & (PREFETCH_SIZE - 1)) == 0,
"prefetch size must be power of 2");
prefetchedUntil = InvalidBlockNumber;
- while (blkno > vacrel->nonempty_pages)
+ while (blkno > vacrel->scan_data->nonempty_pages)
{
Buffer buf;
Page page;
@@ -3464,7 +4179,7 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected)
* pages still are; we need not bother to look at the last known-nonempty
* page.
*/
- return vacrel->nonempty_pages;
+ return vacrel->scan_data->nonempty_pages;
}
/*
@@ -3482,12 +4197,8 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
autovacuum_work_mem != -1 ?
autovacuum_work_mem : maintenance_work_mem;
- /*
- * Initialize state for a parallel vacuum. As of now, only one worker can
- * be used for an index, so we invoke parallelism only if there are at
- * least two indexes on a table.
- */
- if (nworkers >= 0 && vacrel->nindexes > 1 && vacrel->do_index_vacuuming)
+ /* Initialize state for a parallel vacuum */
+ if (nworkers >= 0)
{
/*
* Since parallel workers cannot access data in temporary tables, we
@@ -3505,11 +4216,17 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
vacrel->relname)));
}
else
+ {
+ /*
+ * We initialize the parallel vacuum state for either lazy heap
+ * scan, index vacuuming, or both.
+ */
vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels,
vacrel->nindexes, nworkers,
vac_work_mem,
vacrel->verbose ? INFO : DEBUG2,
- vacrel->bstrategy);
+ vacrel->bstrategy, (void *) vacrel);
+ }
/*
* If parallel mode started, dead_items and dead_items_info spaces are
@@ -3549,15 +4266,35 @@ dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets,
};
int64 prog_val[2];
+ if (ParallelHeapVacuumIsActive(vacrel))
+ TidStoreLockExclusive(vacrel->dead_items);
+
TidStoreSetBlockOffsets(vacrel->dead_items, blkno, offsets, num_offsets);
vacrel->dead_items_info->num_items += num_offsets;
+ if (ParallelHeapVacuumIsActive(vacrel))
+ TidStoreUnlock(vacrel->dead_items);
+
/* update the progress information */
prog_val[0] = vacrel->dead_items_info->num_items;
prog_val[1] = TidStoreMemoryUsage(vacrel->dead_items);
pgstat_progress_update_multi_param(2, prog_index, prog_val);
}
+/*
+ * Check the memory usage of the collected dead items and return true
+ * if we are close to overrunning the available space for dead_items TIDs.
+ * However, let's force at least one page-worth of tuples to be stored as
+ * to ensure we do at least some work when the memory configured is so low
+ * that we run out before storing anything.
+ */
+static bool
+dead_items_check_memory_limit(LVRelState *vacrel)
+{
+ return vacrel->dead_items_info->num_items > 0 &&
+ TidStoreMemoryUsage(vacrel->dead_items) > vacrel->dead_items_info->max_bytes;
+}
+
/*
* Forget all collected dead items.
*/
@@ -3751,6 +4488,303 @@ update_relstats_all_indexes(LVRelState *vacrel)
}
}
+/*
+ * Compute the number of workers for parallel heap vacuum.
+ */
+int
+heap_parallel_vacuum_compute_workers(Relation rel, int nworkers_requested,
+ void *state)
+{
+ BlockNumber relpages = RelationGetNumberOfBlocks(rel);
+ int parallel_workers = 0;
+
+ /*
+ * Parallel heap vacuuming a small relation shouldn't take long. We use
+ * two times the chunk size as the size cutoff because the leader is
+ * assigned to one chunk.
+ */
+ if (relpages < PARALLEL_LV_CHUNK_SIZE * 2 || relpages < min_parallel_table_scan_size)
+ return 0;
+
+ if (nworkers_requested == 0)
+ {
+ LVRelState *vacrel = (LVRelState *) state;
+ int heap_parallel_threshold;
+ int heap_pages;
+ BlockNumber allvisible;
+ BlockNumber allfrozen;
+
+ /*
+ * Estimate the number of blocks that we're going to scan during
+ * lazy_scan_heap().
+ */
+ visibilitymap_count(rel, &allvisible, &allfrozen);
+ heap_pages = relpages - (vacrel->aggressive ? allfrozen : allvisible);
+
+ Assert(heap_pages >= 0);
+
+ /*
+ * Select the number of workers based on the log of the number of
+ * pages to scan. Note that the upper limit of the
+ * min_parallel_table_scan_size GUC is chosen to prevent overflow
+ * here.
+ */
+ heap_parallel_threshold = PARALLEL_LV_CHUNK_SIZE;
+ while (heap_pages >= (BlockNumber) (heap_parallel_threshold * 3))
+ {
+ parallel_workers++;
+ heap_parallel_threshold *= 3;
+ if (heap_parallel_threshold > INT_MAX / 3)
+ break;
+ }
+ }
+ else
+ parallel_workers = nworkers_requested;
+
+ return parallel_workers;
+}
+
+/*
+ * Estimate shared memory size required for parallel heap vacuum.
+ */
+void
+heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers,
+ void *state)
+{
+ LVRelState *vacrel = (LVRelState *) state;
+ Size size = 0;
+ bool leaderparticipate = true;
+
+ vacrel->leader = palloc(sizeof(ParallelLVLeader));
+
+ /* Estimate space for ParallelLVShared */
+ size = add_size(size, sizeof(ParallelLVShared));
+ vacrel->leader->shared_len = size;
+ shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->shared_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Estimate space for ParallelLVScanDesc */
+ vacrel->leader->scandesc_len = sizeof(ParallelLVScanDesc);
+ shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->scandesc_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Estimate space for an array of ParallelLVScanWorkerData */
+ vacrel->leader->scanwork_len = mul_size(sizeof(ParallelLVScanWorkerData),
+ nworkers);
+ shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->scanwork_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+ /* Estimate space for an array of LVScanData */
+ vacrel->leader->scandata_len = mul_size(sizeof(LVScanData), nworkers);
+ shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->scandata_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+#ifdef USE_INJECTION_POINTS
+ if (IS_INJECTION_POINT_ATTACHED("parallel-heap-vacuum-disable-leader-participation"))
+ leaderparticipate = false;
+#endif
+ vacrel->leader->leaderparticipate = leaderparticipate;
+}
+
+/*
+ * Set up shared memory for parallel heap vacuum.
+ */
+void
+heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, int nworkers,
+ void *state)
+{
+ LVRelState *vacrel = (LVRelState *) state;
+ ParallelLVShared *shared;
+ ParallelLVScanDesc *scandesc;
+ ParallelLVScanWorkerData *scanwork;
+ LVScanData *scandata;
+
+ vacrel->plvstate = palloc0(sizeof(ParallelLVState));
+
+ /* Initialize ParallelLVShared */
+
+ shared = shm_toc_allocate(pcxt->toc, vacrel->leader->shared_len);
+ MemSet(shared, 0, vacrel->leader->shared_len);
+ shared->aggressive = vacrel->aggressive;
+ shared->skipwithvm = vacrel->skipwithvm;
+ shared->cutoffs = vacrel->cutoffs;
+ shared->NewRelfrozenXid = vacrel->scan_data->NewRelfrozenXid;
+ shared->NewRelminMxid = vacrel->scan_data->NewRelminMxid;
+ shared->initial_chunk_size = BlockNumberIsValid(vacrel->next_eager_scan_region_start)
+ ? vacrel->next_eager_scan_region_start
+ : PARALLEL_LV_CHUNK_SIZE;
+
+ /* Calculate the per-chunk maximum failure count */
+ shared->eager_scan_max_fails_per_chunk =
+ (BlockNumber) (vacrel->eager_scan_max_fails_per_region *
+ ((float) PARALLEL_LV_CHUNK_SIZE / EAGER_SCAN_REGION_SIZE));
+
+ /* including the leader too */
+ shared->eager_scan_remaining_successes_per_worker =
+ vacrel->eager_scan_remaining_successes /
+ (vacrel->leader->leaderparticipate ? nworkers + 1 : nworkers);
+
+ shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SHARED, shared);
+ vacrel->plvstate->shared = shared;
+
+ /* Initialize ParallelLVScanDesc */
+ scandesc = shm_toc_allocate(pcxt->toc, vacrel->leader->scandesc_len);
+ scandesc->nblocks = RelationGetNumberOfBlocks(rel);
+ pg_atomic_init_u64(&scandesc->nallocated, 0);
+ shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SCANDESC, scandesc);
+ vacrel->plvstate->scandesc = scandesc;
+
+ /* Initialize the array of ParallelLVScanWorkerData */
+ scanwork = shm_toc_allocate(pcxt->toc, vacrel->leader->scanwork_len);
+ MemSet(scanwork, 0, vacrel->leader->scanwork_len);
+ shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SCANWORKER, scanwork);
+ vacrel->leader->scanwork_array = scanwork;
+
+ /* Initialize the array of LVScanData */
+ scandata = shm_toc_allocate(pcxt->toc, vacrel->leader->scandata_len);
+ shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SCANDATA, scandata);
+ vacrel->leader->scandata_array = scandata;
+}
+
+/*
+ * Initialize lazy vacuum state with the information retrieved from
+ * shared memory.
+ */
+void
+heap_parallel_vacuum_initialize_worker(Relation rel, ParallelVacuumState *pvs,
+ ParallelWorkerContext *pwcxt,
+ void **state_out)
+{
+ LVRelState *vacrel;
+ ParallelLVState *plvstate;
+ ParallelLVShared *shared;
+ ParallelLVScanDesc *scandesc;
+ ParallelLVScanWorkerData *scanwork_array;
+ LVScanData *scandata_array;
+
+ /* Initialize ParallelLVState and prepare the related objects */
+
+ plvstate = palloc0(sizeof(ParallelLVState));
+
+ /* Prepare ParallelLVShared */
+ shared = (ParallelLVShared *) shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SHARED, false);
+ plvstate->shared = shared;
+
+ /* Prepare ParallelLVScanDesc */
+ scandesc = shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SCANDESC, false);
+ plvstate->scandesc = scandesc;
+
+ /* Prepare ParallelLVScanWorkerData */
+ scanwork_array = shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SCANWORKER, false);
+ plvstate->scanwork = &(scanwork_array[ParallelWorkerNumber]);
+
+ /* Initialize LVRelState and prepare fields required by lazy scan heap */
+ vacrel = palloc0(sizeof(LVRelState));
+ vacrel->rel = rel;
+ vacrel->indrels = parallel_vacuum_get_table_indexes(pvs,
+ &vacrel->nindexes);
+ vacrel->bstrategy = parallel_vacuum_get_bstrategy(pvs);
+ vacrel->pvs = pvs;
+ vacrel->aggressive = shared->aggressive;
+ vacrel->skipwithvm = shared->skipwithvm;
+ vacrel->vistest = GlobalVisTestFor(rel);
+ vacrel->cutoffs = shared->cutoffs;
+ vacrel->dead_items = parallel_vacuum_get_dead_items(pvs,
+ &vacrel->dead_items_info);
+ vacrel->rel_pages = RelationGetNumberOfBlocks(rel);
+
+ /*
+ * Set the per-region failure counter and per-worker success counter,
+ * which are not changed during parallel heap vacuum.
+ */
+ vacrel->eager_scan_max_fails_per_region =
+ plvstate->shared->eager_scan_max_fails_per_chunk;
+ vacrel->eager_scan_remaining_successes =
+ plvstate->shared->eager_scan_remaining_successes_per_worker;
+
+ /* Does this worker have un-scanned blocks in a chunk? */
+ if (plvstate->scanwork->chunk_remaining > 0)
+ {
+ /*
+ * We restore the previous eager scan state of the already allocated
+ * chunk, if the worker's previous scan suspended due to the full of
+ * dead_items TIDs space.
+ */
+ vacrel->next_eager_scan_region_start = plvstate->scanwork->next_region_start_save;
+ vacrel->eager_scan_remaining_fails = plvstate->scanwork->remaining_fails_save;
+ }
+ else
+ {
+ /*
+ * next_eager_scan_region_start will be set when the first chunk is
+ * assigned
+ */
+ vacrel->next_eager_scan_region_start = InvalidBlockNumber;
+ vacrel->eager_scan_remaining_fails = vacrel->eager_scan_max_fails_per_region;
+ }
+
+ vacrel->plvstate = plvstate;
+
+ /* Prepare LVScanData */
+ scandata_array = shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SCANDATA, false);
+ vacrel->scan_data = &(scandata_array[ParallelWorkerNumber]);
+ MemSet(vacrel->scan_data, 0, sizeof(LVScanData));
+ vacrel->scan_data->NewRelfrozenXid = shared->NewRelfrozenXid;
+ vacrel->scan_data->NewRelminMxid = shared->NewRelminMxid;
+ vacrel->scan_data->skippedallvis = false;
+
+ /*
+ * Initialize the scan state if not yet. The chunk of blocks will be
+ * allocated when to get the scan block for the first time.
+ */
+ if (!vacrel->plvstate->scanwork->inited)
+ parallel_lazy_scan_init_scan_worker(vacrel->plvstate->scanwork,
+ vacrel->plvstate->shared->initial_chunk_size);
+
+ *state_out = (void *) vacrel;
+}
+
+/*
+ * Parallel heap vacuum callback for collecting dead items (i.e., lazy heap scan).
+ */
+void
+heap_parallel_vacuum_collect_dead_items(Relation rel, ParallelVacuumState *pvs,
+ void *state)
+{
+ LVRelState *vacrel = (LVRelState *) state;
+ ErrorContextCallback errcallback;
+
+ Assert(ParallelHeapVacuumIsActive(vacrel));
+
+ /*
+ * Setup error traceback support for ereport() for parallel table vacuum
+ * workers
+ */
+ vacrel->dbname = get_database_name(MyDatabaseId);
+ vacrel->relnamespace = get_database_name(RelationGetNamespace(rel));
+ vacrel->relname = pstrdup(RelationGetRelationName(rel));
+ vacrel->indname = NULL;
+ vacrel->phase = VACUUM_ERRCB_PHASE_SCAN_HEAP;
+ errcallback.callback = vacuum_error_callback;
+ errcallback.arg = &vacrel;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* Join the parallel heap vacuum */
+ do_lazy_scan_heap(vacrel, false);
+
+ /* Advertise the last processed block number */
+ pg_atomic_write_u32(&(vacrel->plvstate->scanwork->last_blkno), vacrel->last_blkno);
+
+ /* Save the eager scan state */
+ vacrel->plvstate->scanwork->remaining_fails_save = vacrel->eager_scan_remaining_fails;
+ vacrel->plvstate->scanwork->remaining_successes_save = vacrel->eager_scan_remaining_successes;
+ vacrel->plvstate->scanwork->next_region_start_save = vacrel->next_eager_scan_region_start;
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
/*
* Error context callback for errors occurring during vacuum. The error
* context messages for index phases should match the messages set in parallel
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index 0feea1d30ec3..7f0869ee4dc0 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -4,17 +4,18 @@
* Support routines for parallel vacuum execution.
*
* This file contains routines that are intended to support setting up, using,
- * and tearing down a ParallelVacuumState.
+ * and tearing down a ParallelVacuumState. ParallelVacuumState contains shared
+ * information as well as the memory space for storing dead items allocated in
+ * the DSA area. We launch
*
- * In a parallel vacuum, we perform both index bulk deletion and index cleanup
- * with parallel worker processes. Individual indexes are processed by one
- * vacuum process. ParallelVacuumState contains shared information as well as
- * the memory space for storing dead items allocated in the DSA area. We
- * launch parallel worker processes at the start of parallel index
- * bulk-deletion and index cleanup and once all indexes are processed, the
- * parallel worker processes exit. Each time we process indexes in parallel,
- * the parallel context is re-initialized so that the same DSM can be used for
- * multiple passes of index bulk-deletion and index cleanup.
+ * In a parallel vacuum, we perform table scan, index bulk-deletion, index
+ * cleanup, or all of them with parallel worker processes depending on the
+ * number of parallel workers required for each phase. So different numbers of
+ * workers might be required for the table scanning and index processing.
+ * We launch parallel worker processes at the start of a phase, and once we
+ * complete all work in the phase, parallel workers exit. Each time we process
+ * table or indexes in parallel, the parallel context is re-initialized so that
+ * the same DSM can be used for multiple passes of each phase.
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -26,8 +27,10 @@
*/
#include "postgres.h"
+#include "access/parallel.h"
#include "access/amapi.h"
#include "access/table.h"
+#include "access/tableam.h"
#include "access/xact.h"
#include "commands/progress.h"
#include "commands/vacuum.h"
@@ -36,6 +39,7 @@
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "tcop/tcopprot.h"
+#include "utils/injection_point.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
@@ -50,6 +54,13 @@
#define PARALLEL_VACUUM_KEY_WAL_USAGE 4
#define PARALLEL_VACUUM_KEY_INDEX_STATS 5
+/* The kind of parallel vacuum phases */
+typedef enum
+{
+ PV_WORK_PHASE_PROCESS_INDEXES, /* index vacuuming or cleanup */
+ PV_WORK_PHASE_COLLECT_DEAD_ITEMS, /* collect dead tuples */
+} PVWorkPhase;
+
/*
* Shared information among parallel workers. So this is allocated in the DSM
* segment.
@@ -65,6 +76,12 @@ typedef struct PVShared
int elevel;
int64 queryid;
+ /*
+ * Tell parallel workers what phase to perform: processing indexes or
+ * collecting dead tuples from the table.
+ */
+ PVWorkPhase work_phase;
+
/*
* Fields for both index vacuum and cleanup.
*
@@ -164,6 +181,9 @@ struct ParallelVacuumState
/* NULL for worker processes */
ParallelContext *pcxt;
+ /* Do we need to reinitialize parallel DSM? */
+ bool need_reinitialize_dsm;
+
/* Parent Heap Relation */
Relation heaprel;
@@ -178,7 +198,7 @@ struct ParallelVacuumState
* Shared index statistics among parallel vacuum workers. The array
* element is allocated for every index, even those indexes where parallel
* index vacuuming is unsafe or not worthwhile (e.g.,
- * will_parallel_vacuum[] is false). During parallel vacuum,
+ * idx_will_parallel_vacuum[] is false). During parallel vacuum,
* IndexBulkDeleteResult of each index is kept in DSM and is copied into
* local memory at the end of parallel vacuum.
*/
@@ -193,12 +213,18 @@ struct ParallelVacuumState
/* Points to WAL usage area in DSM */
WalUsage *wal_usage;
+ /*
+ * The number of workers for parallel table vacuuming. If 0, the parallel
+ * table vacuum is disabled.
+ */
+ int nworkers_for_table;
+
/*
* False if the index is totally unsuitable target for all parallel
* processing. For example, the index could be <
* min_parallel_index_scan_size cutoff.
*/
- bool *will_parallel_vacuum;
+ bool *idx_will_parallel_vacuum;
/*
* The number of indexes that support parallel index bulk-deletion and
@@ -221,8 +247,10 @@ struct ParallelVacuumState
PVIndVacStatus status;
};
-static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
- bool *will_parallel_vacuum);
+static int parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes,
+ int nrequested, int *nworkers_for_table,
+ bool *idx_will_parallel_vacuum,
+ void *state);
static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans,
bool vacuum);
static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs);
@@ -231,18 +259,25 @@ static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation
PVIndStats *indstats);
static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
bool vacuum);
+static void parallel_vacuum_begin_work_phase(ParallelVacuumState *pvs, int nworkers,
+ PVWorkPhase work_phase);
+static void parallel_vacuum_end_worker_phase(ParallelVacuumState *pvs);
static void parallel_vacuum_error_callback(void *arg);
/*
* Try to enter parallel mode and create a parallel context. Then initialize
* shared memory state.
*
+ * nrequested_workers is the requested parallel degree. 0 means that the parallel
+ * degrees for table and indexes vacuum are decided differently. See the comments
+ * of parallel_vacuum_compute_workers() for details.
+ *
* On success, return parallel vacuum state. Otherwise return NULL.
*/
ParallelVacuumState *
parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
int nrequested_workers, int vac_work_mem,
- int elevel, BufferAccessStrategy bstrategy)
+ int elevel, BufferAccessStrategy bstrategy, void *state)
{
ParallelVacuumState *pvs;
ParallelContext *pcxt;
@@ -251,38 +286,38 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
PVIndStats *indstats;
BufferUsage *buffer_usage;
WalUsage *wal_usage;
- bool *will_parallel_vacuum;
+ bool *idx_will_parallel_vacuum;
Size est_indstats_len;
Size est_shared_len;
int nindexes_mwm = 0;
int parallel_workers = 0;
+ int nworkers_for_table;
int querylen;
- /*
- * A parallel vacuum must be requested and there must be indexes on the
- * relation
- */
+ /* A parallel vacuum must be requested */
Assert(nrequested_workers >= 0);
- Assert(nindexes > 0);
/*
* Compute the number of parallel vacuum workers to launch
*/
- will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
- parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes,
+ idx_will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
+ parallel_workers = parallel_vacuum_compute_workers(rel, indrels, nindexes,
nrequested_workers,
- will_parallel_vacuum);
+ &nworkers_for_table,
+ idx_will_parallel_vacuum,
+ state);
+
if (parallel_workers <= 0)
{
/* Can't perform vacuum in parallel -- return NULL */
- pfree(will_parallel_vacuum);
+ pfree(idx_will_parallel_vacuum);
return NULL;
}
pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState));
pvs->indrels = indrels;
pvs->nindexes = nindexes;
- pvs->will_parallel_vacuum = will_parallel_vacuum;
+ pvs->idx_will_parallel_vacuum = idx_will_parallel_vacuum;
pvs->bstrategy = bstrategy;
pvs->heaprel = rel;
@@ -291,6 +326,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
parallel_workers);
Assert(pcxt->nworkers > 0);
pvs->pcxt = pcxt;
+ pvs->need_reinitialize_dsm = false;
+ pvs->nworkers_for_table = nworkers_for_table;
/* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
est_indstats_len = mul_size(sizeof(PVIndStats), nindexes);
@@ -327,6 +364,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
else
querylen = 0; /* keep compiler quiet */
+ /* Estimate AM-specific space for parallel table vacuum */
+ if (pvs->nworkers_for_table > 0)
+ table_parallel_vacuum_estimate(rel, pcxt, pvs->nworkers_for_table, state);
+
InitializeParallelDSM(pcxt);
/* Prepare index vacuum stats */
@@ -345,7 +386,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
- if (!will_parallel_vacuum[i])
+ if (!idx_will_parallel_vacuum[i])
continue;
if (indrel->rd_indam->amusemaintenanceworkmem)
@@ -419,6 +460,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
}
+ /* Initialize AM-specific DSM space for parallel table vacuum */
+ if (pvs->nworkers_for_table > 0)
+ table_parallel_vacuum_initialize(rel, pcxt, pvs->nworkers_for_table, state);
+
/* Success -- return parallel vacuum state */
return pvs;
}
@@ -456,10 +501,39 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats)
DestroyParallelContext(pvs->pcxt);
ExitParallelMode();
- pfree(pvs->will_parallel_vacuum);
+ pfree(pvs->idx_will_parallel_vacuum);
pfree(pvs);
}
+/*
+ * Return the number of parallel workers initialized for parallel table vacuum.
+ */
+int
+parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs)
+{
+ return pvs->nworkers_for_table;
+}
+
+/*
+ * Return the array of indexes associated to the given table to be vacuumed.
+ */
+Relation *
+parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes)
+{
+ *nindexes = pvs->nindexes;
+
+ return pvs->indrels;
+}
+
+/*
+ * Return the buffer strategy for parallel vacuum.
+ */
+BufferAccessStrategy
+parallel_vacuum_get_bstrategy(ParallelVacuumState *pvs)
+{
+ return pvs->bstrategy;
+}
+
/*
* Returns the dead items space and dead items information.
*/
@@ -533,26 +607,35 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup
}
/*
- * Compute the number of parallel worker processes to request. Both index
- * vacuum and index cleanup can be executed with parallel workers.
- * The index is eligible for parallel vacuum iff its size is greater than
- * min_parallel_index_scan_size as invoking workers for very small indexes
- * can hurt performance.
+ * Compute the number of parallel worker processes to request for table
+ * vacuum and index vacuum/cleanup. Return the maximum number of parallel
+ * workers for table vacuuming and index vacuuming.
+ *
+ * nrequested is the number of parallel workers that user requested, which
+ * applies to both the number of workers for table vacuum and index vacuum.
+ * If nrequested is 0, we compute the parallel degree for them differently
+ * as described below.
*
- * nrequested is the number of parallel workers that user requested. If
- * nrequested is 0, we compute the parallel degree based on nindexes, that is
- * the number of indexes that support parallel vacuum. This function also
- * sets will_parallel_vacuum to remember indexes that participate in parallel
- * vacuum.
+ * For parallel table vacuum, we ask AM-specific routine to compute the
+ * number of parallel worker processes. The result is set to nworkers_table_p.
+ *
+ * For parallel index vacuum, the index is eligible for parallel vacuum iff
+ * its size is greater than min_parallel_index_scan_size as invoking workers
+ * for very small indexes can hurt performance. This function sets
+ * idx_will_parallel_vacuum to remember indexes that participate in parallel vacuum.
*/
static int
-parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
- bool *will_parallel_vacuum)
+parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes,
+ int nrequested, int *nworkers_table_p,
+ bool *idx_will_parallel_vacuum, void *state)
{
int nindexes_parallel = 0;
int nindexes_parallel_bulkdel = 0;
int nindexes_parallel_cleanup = 0;
- int parallel_workers;
+ int nworkers_table = 0;
+ int nworkers_index = 0;
+
+ *nworkers_table_p = 0;
/*
* We don't allow performing parallel operation in standalone backend or
@@ -561,6 +644,14 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
return 0;
+ /* Compute the number of workers for parallel table scan */
+ if (rel->rd_tableam->parallel_vacuum_compute_workers != NULL)
+ nworkers_table = table_parallel_vacuum_compute_workers(rel, nrequested,
+ state);
+
+ /* Cap by max_parallel_maintenance_workers */
+ nworkers_table = Min(nworkers_table, max_parallel_maintenance_workers);
+
/*
* Compute the number of indexes that can participate in parallel vacuum.
*/
@@ -574,7 +665,7 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
continue;
- will_parallel_vacuum[i] = true;
+ idx_will_parallel_vacuum[i] = true;
if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
nindexes_parallel_bulkdel++;
@@ -589,18 +680,18 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested,
/* The leader process takes one index */
nindexes_parallel--;
- /* No index supports parallel vacuum */
- if (nindexes_parallel <= 0)
- return 0;
-
- /* Compute the parallel degree */
- parallel_workers = (nrequested > 0) ?
- Min(nrequested, nindexes_parallel) : nindexes_parallel;
+ if (nindexes_parallel > 0)
+ {
+ /* Take into account the requested number of workers */
+ nworkers_index = (nrequested > 0) ?
+ Min(nrequested, nindexes_parallel) : nindexes_parallel;
- /* Cap by max_parallel_maintenance_workers */
- parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
+ /* Cap by max_parallel_maintenance_workers */
+ nworkers_index = Min(nworkers_index, max_parallel_maintenance_workers);
+ }
- return parallel_workers;
+ *nworkers_table_p = nworkers_table;
+ return Max(nworkers_table, nworkers_index);
}
/*
@@ -657,7 +748,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
indstats->status = new_status;
indstats->parallel_workers_can_process =
- (pvs->will_parallel_vacuum[i] &&
+ (pvs->idx_will_parallel_vacuum[i] &&
parallel_vacuum_index_is_parallel_safe(pvs->indrels[i],
num_index_scans,
vacuum));
@@ -669,40 +760,9 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
/* Setup the shared cost-based vacuum delay and launch workers */
if (nworkers > 0)
{
- /* Reinitialize parallel context to relaunch parallel workers */
- if (num_index_scans > 0)
- ReinitializeParallelDSM(pvs->pcxt);
-
- /*
- * Set up shared cost balance and the number of active workers for
- * vacuum delay. We need to do this before launching workers as
- * otherwise, they might not see the updated values for these
- * parameters.
- */
- pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
- pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
-
- /*
- * The number of workers can vary between bulkdelete and cleanup
- * phase.
- */
- ReinitializeParallelWorkers(pvs->pcxt, nworkers);
-
- LaunchParallelWorkers(pvs->pcxt);
-
- if (pvs->pcxt->nworkers_launched > 0)
- {
- /*
- * Reset the local cost values for leader backend as we have
- * already accumulated the remaining balance of heap.
- */
- VacuumCostBalance = 0;
- VacuumCostBalanceLocal = 0;
-
- /* Enable shared cost balance for leader backend */
- VacuumSharedCostBalance = &(pvs->shared->cost_balance);
- VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
- }
+ /* Start parallel vacuum workers for processing indexes */
+ parallel_vacuum_begin_work_phase(pvs, nworkers,
+ PV_WORK_PHASE_PROCESS_INDEXES);
if (vacuum)
ereport(pvs->shared->elevel,
@@ -732,13 +792,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
* to finish, or we might get incomplete data.)
*/
if (nworkers > 0)
- {
- /* Wait for all vacuum workers to finish */
- WaitForParallelWorkersToFinish(pvs->pcxt);
-
- for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
- InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
- }
+ parallel_vacuum_end_worker_phase(pvs);
/*
* Reset all index status back to initial (while checking that we have
@@ -755,15 +809,8 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
indstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
}
- /*
- * Carry the shared balance value to heap scan and disable shared costing
- */
- if (VacuumSharedCostBalance)
- {
- VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
- VacuumSharedCostBalance = NULL;
- VacuumActiveNWorkers = NULL;
- }
+ /* Parallel DSM will need to be reinitialized for the next execution */
+ pvs->need_reinitialize_dsm = true;
}
/*
@@ -979,6 +1026,91 @@ parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans,
return true;
}
+/*
+ * Begin the parallel scan to collect dead items. Return the number of
+ * launched parallel workers.
+ *
+ * The caller must call parallel_vacuum_collect_dead_items_end() to finish
+ * the parallel scan.
+ */
+int
+parallel_vacuum_collect_dead_items_begin(ParallelVacuumState *pvs)
+{
+ int nworkers = pvs->nworkers_for_table;
+#ifdef USE_INJECTION_POINTS
+ static int ntimes = 0;
+#endif
+
+ Assert(!IsParallelWorker());
+
+ if (nworkers == 0)
+ return 0;
+
+ /* Start parallel vacuum workers for collecting dead items */
+ Assert(nworkers <= pvs->pcxt->nworkers);
+
+#ifdef USE_INJECTION_POINTS
+ if (IS_INJECTION_POINT_ATTACHED("parallel-vacuum-ramp-down-workers"))
+ {
+ nworkers = pvs->nworkers_for_table - Min(ntimes, pvs->nworkers_for_table);
+ ntimes++;
+ }
+#endif
+
+ parallel_vacuum_begin_work_phase(pvs, nworkers,
+ PV_WORK_PHASE_COLLECT_DEAD_ITEMS);
+
+ /* Include the worker count for the leader itself */
+ if (pvs->pcxt->nworkers_launched > 0)
+ pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+ return pvs->pcxt->nworkers_launched;
+}
+
+/*
+ * Wait for all workers for parallel vacuum workers launched by
+ * parallel_vacuum_collect_dead_items_begin(), and gather workers' statistics.
+ */
+void
+parallel_vacuum_collect_dead_items_end(ParallelVacuumState *pvs)
+{
+ Assert(!IsParallelWorker());
+ Assert(pvs->shared->work_phase == PV_WORK_PHASE_COLLECT_DEAD_ITEMS);
+
+ if (pvs->nworkers_for_table == 0)
+ return;
+
+ /* Wait for parallel workers to finish */
+ parallel_vacuum_end_worker_phase(pvs);
+
+ /* Decrement the worker count for the leader itself */
+ if (VacuumActiveNWorkers)
+ pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
+/*
+ * The function is for parallel workers to execute the parallel scan to
+ * collect dead tuples.
+ */
+static void
+parallel_vacuum_process_table(ParallelVacuumState *pvs, void *state)
+{
+ Assert(VacuumActiveNWorkers);
+ Assert(pvs->shared->work_phase == PV_WORK_PHASE_COLLECT_DEAD_ITEMS);
+
+ /* Increment the active worker before starting the table vacuum */
+ pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
+
+ /* Do the parallel scan to collect dead tuples */
+ table_parallel_vacuum_collect_dead_items(pvs->heaprel, pvs, state);
+
+ /*
+ * We have completed the table vacuum so decrement the active worker
+ * count.
+ */
+ pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
+}
+
/*
* Perform work within a launched parallel process.
*
@@ -998,6 +1130,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
WalUsage *wal_usage;
int nindexes;
char *sharedquery;
+ void *state;
ErrorContextCallback errcallback;
/*
@@ -1030,7 +1163,6 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
* matched to the leader's one.
*/
vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
- Assert(nindexes > 0);
/*
* Apply the desired value of maintenance_work_mem within this process.
@@ -1076,6 +1208,17 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM,
shared->ring_nbuffers * (BLCKSZ / 1024));
+ /* Initialize AM-specific vacuum state for parallel table vacuuming */
+ if (shared->work_phase == PV_WORK_PHASE_COLLECT_DEAD_ITEMS)
+ {
+ ParallelWorkerContext pwcxt;
+
+ pwcxt.toc = toc;
+ pwcxt.seg = seg;
+ table_parallel_vacuum_initialize_worker(rel, &pvs, &pwcxt,
+ &state);
+ }
+
/* Setup error traceback support for ereport() */
errcallback.callback = parallel_vacuum_error_callback;
errcallback.arg = &pvs;
@@ -1085,8 +1228,19 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
/* Prepare to track buffer usage during parallel execution */
InstrStartParallelQuery();
- /* Process indexes to perform vacuum/cleanup */
- parallel_vacuum_process_safe_indexes(&pvs);
+ switch (pvs.shared->work_phase)
+ {
+ case PV_WORK_PHASE_COLLECT_DEAD_ITEMS:
+ /* Scan the table to collect dead items */
+ parallel_vacuum_process_table(&pvs, state);
+ break;
+ case PV_WORK_PHASE_PROCESS_INDEXES:
+ /* Process indexes to perform vacuum/cleanup */
+ parallel_vacuum_process_safe_indexes(&pvs);
+ break;
+ default:
+ elog(ERROR, "unrecognized parallel vacuum phase %d", pvs.shared->work_phase);
+ }
/* Report buffer/WAL usage during parallel execution */
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
@@ -1109,6 +1263,77 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
FreeAccessStrategy(pvs.bstrategy);
}
+/*
+ * Launch parallel vacuum workers for the given phase. If at least one
+ * worker launched, enable the shared vacuum delay costing.
+ */
+static void
+parallel_vacuum_begin_work_phase(ParallelVacuumState *pvs, int nworkers,
+ PVWorkPhase work_phase)
+{
+ /* Set the work phase */
+ pvs->shared->work_phase = work_phase;
+
+ /* Reinitialize parallel context to relaunch parallel workers */
+ if (pvs->need_reinitialize_dsm)
+ ReinitializeParallelDSM(pvs->pcxt);
+
+ /*
+ * Set up shared cost balance and the number of active workers for vacuum
+ * delay. We need to do this before launching workers as otherwise, they
+ * might not see the updated values for these parameters.
+ */
+ pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance);
+ pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0);
+
+ /*
+ * The number of workers can vary between bulkdelete and cleanup phase.
+ */
+ ReinitializeParallelWorkers(pvs->pcxt, nworkers);
+
+ LaunchParallelWorkers(pvs->pcxt);
+
+ /* Enable shared vacuum costing if we are able to launch any worker */
+ if (pvs->pcxt->nworkers_launched > 0)
+ {
+ /*
+ * Reset the local cost values for leader backend as we have already
+ * accumulated the remaining balance of heap.
+ */
+ VacuumCostBalance = 0;
+ VacuumCostBalanceLocal = 0;
+
+ /* Enable shared cost balance for leader backend */
+ VacuumSharedCostBalance = &(pvs->shared->cost_balance);
+ VacuumActiveNWorkers = &(pvs->shared->active_nworkers);
+ }
+}
+
+/*
+ * Wait for parallel vacuum workers to finish, accumulate the statistics,
+ * and disable shared vacuum delay costing if enabled.
+ */
+static void
+parallel_vacuum_end_worker_phase(ParallelVacuumState *pvs)
+{
+ /* Wait for all vacuum workers to finish */
+ WaitForParallelWorkersToFinish(pvs->pcxt);
+
+ for (int i = 0; i < pvs->pcxt->nworkers_launched; i++)
+ InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]);
+
+ /* Carry the shared balance value and disable shared costing */
+ if (VacuumSharedCostBalance)
+ {
+ VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
+ VacuumSharedCostBalance = NULL;
+ VacuumActiveNWorkers = NULL;
+ }
+
+ /* Parallel DSM will need to be reinitialized for the next execution */
+ pvs->need_reinitialize_dsm = true;
+}
+
/*
* Error context callback for errors occurring during parallel index vacuum.
* The error context messages should match the messages set in the lazy vacuum
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index a1de400b9a53..743e29f673c7 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -15,6 +15,7 @@
#define HEAPAM_H
#include "access/heapam_xlog.h"
+#include "access/parallel.h"
#include "access/relation.h" /* for backward compatibility */
#include "access/relscan.h"
#include "access/sdir.h"
@@ -391,8 +392,20 @@ extern void log_heap_prune_and_freeze(Relation relation, Buffer buffer,
OffsetNumber *unused, int nunused);
/* in heap/vacuumlazy.c */
+struct ParallelVacuumState;
extern void heap_vacuum_rel(Relation rel,
const VacuumParams params, BufferAccessStrategy bstrategy);
+extern int heap_parallel_vacuum_compute_workers(Relation rel, int nworkers_requested,
+ void *state);
+extern void heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers,
+ void *state);
+extern void heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt,
+ int nworkers, void *state);
+extern void heap_parallel_vacuum_initialize_worker(Relation rel, struct ParallelVacuumState *pvs,
+ ParallelWorkerContext *pwcxt,
+ void **state_out);
+extern void heap_parallel_vacuum_collect_dead_items(Relation rel, struct ParallelVacuumState *pvs,
+ void *state);
/* in heap/heapam_visibility.c */
extern bool HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 77eb41eb6dc9..9dfbb4fa6992 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -36,6 +36,9 @@ extern PGDLLIMPORT bool synchronize_seqscans;
struct BulkInsertStateData;
struct IndexInfo;
+struct ParallelContext;
+struct ParallelVacuumState;
+struct ParallelWorkerContext;
struct SampleScanState;
struct ValidateIndexState;
@@ -653,6 +656,79 @@ typedef struct TableAmRoutine
const VacuumParams params,
BufferAccessStrategy bstrategy);
+ /* ------------------------------------------------------------------------
+ * Callbacks for parallel table vacuum.
+ * ------------------------------------------------------------------------
+ */
+
+ /*
+ * Compute the number of parallel workers for parallel table vacuum. The
+ * parallel degree for parallel vacuum is further limited by
+ * max_parallel_maintenance_workers. The function must return 0 to disable
+ * parallel table vacuum.
+ *
+ * 'nworkers_requested' is a >=0 number and the requested number of
+ * workers. This comes from the PARALLEL option. 0 means to choose the
+ * parallel degree based on the table AM specific factors such as table
+ * size.
+ *
+ * Optional callback.
+ */
+ int (*parallel_vacuum_compute_workers) (Relation rel,
+ int nworkers_requested,
+ void *state);
+
+ /*
+ * Estimate the size of shared memory needed for a parallel table vacuum
+ * of this relation.
+ *
+ * Not called if parallel table vacuum is disabled.
+ *
+ * Optional callback.
+ */
+ void (*parallel_vacuum_estimate) (Relation rel,
+ struct ParallelContext *pcxt,
+ int nworkers,
+ void *state);
+
+ /*
+ * Initialize DSM space for parallel table vacuum.
+ *
+ * Not called if parallel table vacuum is disabled.
+ *
+ * Optional callback.
+ */
+ void (*parallel_vacuum_initialize) (Relation rel,
+ struct ParallelContext *pctx,
+ int nworkers,
+ void *state);
+
+ /*
+ * Initialize AM-specific vacuum state for worker processes.
+ *
+ * The state_out is the output parameter so that arbitrary data can be
+ * passed to the subsequent callback, parallel_vacuum_remove_dead_items.
+ *
+ * Not called if parallel table vacuum is disabled.
+ *
+ * Optional callback.
+ */
+ void (*parallel_vacuum_initialize_worker) (Relation rel,
+ struct ParallelVacuumState *pvs,
+ struct ParallelWorkerContext *pwcxt,
+ void **state_out);
+
+ /*
+ * Execute a parallel scan to collect dead items.
+ *
+ * Not called if parallel table vacuum is disabled.
+ *
+ * Optional callback.
+ */
+ void (*parallel_vacuum_collect_dead_items) (Relation rel,
+ struct ParallelVacuumState *pvs,
+ void *state);
+
/*
* Prepare to analyze block `blockno` of `scan`. The scan has been started
* with table_beginscan_analyze(). See also
@@ -1679,6 +1755,68 @@ table_relation_vacuum(Relation rel, const VacuumParams params,
rel->rd_tableam->relation_vacuum(rel, params, bstrategy);
}
+/* ----------------------------------------------------------------------------
+ * Parallel vacuum related functions.
+ * ----------------------------------------------------------------------------
+ */
+
+/*
+ * Compute the number of parallel workers for a parallel vacuum scan of this
+ * relation.
+ */
+static inline int
+table_parallel_vacuum_compute_workers(Relation rel, int nworkers_requested,
+ void *state)
+{
+ return rel->rd_tableam->parallel_vacuum_compute_workers(rel,
+ nworkers_requested,
+ state);
+}
+
+/*
+ * Estimate the size of shared memory needed for a parallel vacuum scan of this
+ * of this relation.
+ */
+static inline void
+table_parallel_vacuum_estimate(Relation rel, struct ParallelContext *pcxt,
+ int nworkers, void *state)
+{
+ Assert(nworkers > 0);
+ rel->rd_tableam->parallel_vacuum_estimate(rel, pcxt, nworkers, state);
+}
+
+/*
+ * Initialize shared memory area for a parallel vacuum scan of this relation.
+ */
+static inline void
+table_parallel_vacuum_initialize(Relation rel, struct ParallelContext *pcxt,
+ int nworkers, void *state)
+{
+ Assert(nworkers > 0);
+ rel->rd_tableam->parallel_vacuum_initialize(rel, pcxt, nworkers, state);
+}
+
+/*
+ * Initialize AM-specific vacuum state for worker processes.
+ */
+static inline void
+table_parallel_vacuum_initialize_worker(Relation rel, struct ParallelVacuumState *pvs,
+ struct ParallelWorkerContext *pwcxt,
+ void **state_out)
+{
+ rel->rd_tableam->parallel_vacuum_initialize_worker(rel, pvs, pwcxt, state_out);
+}
+
+/*
+ * Execute a parallel vacuum scan to collect dead items.
+ */
+static inline void
+table_parallel_vacuum_collect_dead_items(Relation rel, struct ParallelVacuumState *pvs,
+ void *state)
+{
+ rel->rd_tableam->parallel_vacuum_collect_dead_items(rel, pvs, state);
+}
+
/*
* Prepare to analyze the next block in the read stream. The scan needs to
* have been started with table_beginscan_analyze(). Note that this routine
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 14eeccbd7185..876cef301b73 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -382,8 +382,12 @@ extern void VacuumUpdateCosts(void);
extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels,
int nindexes, int nrequested_workers,
int vac_work_mem, int elevel,
- BufferAccessStrategy bstrategy);
+ BufferAccessStrategy bstrategy,
+ void *state);
extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats);
+extern int parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs);
+extern Relation *parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes);
+extern BufferAccessStrategy parallel_vacuum_get_bstrategy(ParallelVacuumState *pvs);
extern TidStore *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs,
VacDeadItemsInfo **dead_items_info_p);
extern void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs);
@@ -394,6 +398,8 @@ extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs,
long num_table_tuples,
int num_index_scans,
bool estimated_count);
+extern int parallel_vacuum_collect_dead_items_begin(ParallelVacuumState *pvs);
+extern void parallel_vacuum_collect_dead_items_end(ParallelVacuumState *pvs);
extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
/* in commands/analyze.c */
diff --git a/src/test/modules/injection_points/t/002_parallel_vacuum.pl b/src/test/modules/injection_points/t/002_parallel_vacuum.pl
new file mode 100644
index 000000000000..f0ef33ed86bf
--- /dev/null
+++ b/src/test/modules/injection_points/t/002_parallel_vacuum.pl
@@ -0,0 +1,97 @@
+
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Tests for parallel heap vacuum.
+
+use strict;
+use warnings FATAL => 'all';
+use locale;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Test persistency of statistics generated for injection points.
+if ($ENV{enable_injection_points} ne 'yes')
+{
+ plan skip_all => 'Injection points not supported by this build';
+}
+
+my $node = PostgreSQL::Test::Cluster->new('master');
+$node->init;
+$node->start;
+$node->safe_psql('postgres', qq[create extension injection_points;]);
+
+$node->safe_psql('postgres', qq[
+create table t (i int) with (autovacuum_enabled = off);
+create index on t (i);
+ ]);
+my $nrows = 1_000_000;
+my $first = int($nrows * rand());
+my $second = $nrows - $first;
+
+my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+# Begin the transaciton that holds xmin.
+$psql->query_safe('begin; select pg_current_xact_id();');
+
+# consume some xids
+$node->safe_psql('postgres', qq[
+select pg_current_xact_id();
+select pg_current_xact_id();
+select pg_current_xact_id();
+select pg_current_xact_id();
+select pg_current_xact_id();
+ ]);
+
+# While inserting $nrows tuples into the table with an older XID,
+# we inject some tuples with a newer XID filling one page somewhere
+# in the table.
+
+# Insert the first part of rows.
+$psql->query_safe(qq[insert into t select generate_series(1, $first);]);
+
+# Insert some rows with a newer XID, which needs to fill at least
+# one page to prevent the page from begin frozen in the following
+# vacuum.
+my $xid = $node->safe_psql('postgres', qq[
+begin;
+insert into t select 0 from generate_series(1, 300);
+select pg_current_xact_id()::xid;
+commit;
+]);
+
+# Insert remaining rows and commit.
+$psql->query_safe(qq[insert into t select generate_series($first, $nrows);]);
+$psql->query_safe(qq[commit;]);
+
+# Delete some rows.
+$node->safe_psql('postgres', qq[delete from t where i between 1 and 20000;]);
+
+# Execute parallel vacuum that freezes all rows except for the
+# tuple inserted by $psql. We should update the relfrozenxid up to
+# that XID. Setting a lower value to maintenance_work_mem invokes
+# multiple rounds of heap scanning and the number of parallel workers
+# will ramp-down thanks to the injection points.
+$node->safe_psql('postgres', qq[
+set vacuum_freeze_min_age to 5;
+set max_parallel_maintenance_workers TO 5;
+set maintenance_work_mem TO 256;
+select injection_points_set_local();
+select injection_points_attach('parallel-vacuum-ramp-down-workers', 'notice');
+select injection_points_attach('parallel-heap-vacuum-disable-leader-participation', 'notice');
+vacuum (parallel 5, verbose) t;
+ ]);
+
+is( $node->safe_psql('postgres', qq[select relfrozenxid from pg_class where relname = 't';]),
+ "$xid", "relfrozenxid is updated as expected");
+
+# Check if we have successfully frozen the table in the previous
+# vacuum by scanning all tuples.
+$node->safe_psql('postgres', qq[vacuum (freeze, parallel 0, verbose, disable_page_skipping) t;]);
+is( $node->safe_psql('postgres', qq[select $xid < relfrozenxid::text::int from pg_class where relname = 't';]),
+ "t", "all rows are frozen");
+
+$node->stop;
+done_testing();
+
diff --git a/src/test/regress/expected/vacuum_parallel.out b/src/test/regress/expected/vacuum_parallel.out
index ddf0ee544b7b..b793d8093c2c 100644
--- a/src/test/regress/expected/vacuum_parallel.out
+++ b/src/test/regress/expected/vacuum_parallel.out
@@ -1,5 +1,6 @@
SET max_parallel_maintenance_workers TO 4;
SET min_parallel_index_scan_size TO '128kB';
+SET min_parallel_table_scan_size TO '128kB';
-- Bug #17245: Make sure that we don't totally fail to VACUUM individual indexes that
-- happen to be below min_parallel_index_scan_size during parallel VACUUM:
CREATE TABLE parallel_vacuum_table (a int) WITH (autovacuum_enabled = off);
@@ -43,7 +44,13 @@ VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table;
-- Since vacuum_in_leader_small_index uses deduplication, we expect an
-- assertion failure with bug #17245 (in the absence of bugfix):
INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i;
+-- Insert more tuples to use parallel heap vacuum.
+INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 500_000) i;
+VACUUM (PARALLEL 2) parallel_vacuum_table;
+DELETE FROM parallel_vacuum_table WHERE a < 1000;
+VACUUM (PARALLEL 1) parallel_vacuum_table;
RESET max_parallel_maintenance_workers;
RESET min_parallel_index_scan_size;
+RESET min_parallel_table_scan_size;
-- Deliberately don't drop table, to get further coverage from tools like
-- pg_amcheck in some testing scenarios
diff --git a/src/test/regress/sql/vacuum_parallel.sql b/src/test/regress/sql/vacuum_parallel.sql
index 1d23f33e39cf..5381023642f4 100644
--- a/src/test/regress/sql/vacuum_parallel.sql
+++ b/src/test/regress/sql/vacuum_parallel.sql
@@ -1,5 +1,6 @@
SET max_parallel_maintenance_workers TO 4;
SET min_parallel_index_scan_size TO '128kB';
+SET min_parallel_table_scan_size TO '128kB';
-- Bug #17245: Make sure that we don't totally fail to VACUUM individual indexes that
-- happen to be below min_parallel_index_scan_size during parallel VACUUM:
@@ -39,8 +40,15 @@ VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table;
-- assertion failure with bug #17245 (in the absence of bugfix):
INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i;
+-- Insert more tuples to use parallel heap vacuum.
+INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 500_000) i;
+VACUUM (PARALLEL 2) parallel_vacuum_table;
+DELETE FROM parallel_vacuum_table WHERE a < 1000;
+VACUUM (PARALLEL 1) parallel_vacuum_table;
+
RESET max_parallel_maintenance_workers;
RESET min_parallel_index_scan_size;
+RESET min_parallel_table_scan_size;
-- Deliberately don't drop table, to get further coverage from tools like
-- pg_amcheck in some testing scenarios
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 3c80d49b67e0..e033c8cf73b3 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1527,6 +1527,7 @@ LSEG
LUID
LVRelState
LVSavedErrInfo
+LVScanData
LWLock
LWLockHandle
LWLockMode
@@ -1963,6 +1964,11 @@ PLpgSQL_type
PLpgSQL_type_type
PLpgSQL_var
PLpgSQL_variable
+ParallelLVLeader
+ParallelLVScanDesc
+ParallelLVScanWorkerData
+ParallelLVShared
+ParallelLVState
PLwdatum
PLword
PLyArrayToOb
@@ -2037,6 +2043,7 @@ PVIndStats
PVIndVacStatus
PVOID
PVShared
+PVWorkPhase
PX_Alias
PX_Cipher
PX_Combo