From 2a0190956b501946cd62685f8df02befe2e864a0 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Thu, 16 Jan 2025 15:35:03 -0800 Subject: [PATCH 1/5] Introduces table AM APIs for parallel table vacuuming. This commit introduces the following new table AM APIs for parallel table vacuuming: - parallel_vacuum_compute_workers - parallel_vacuum_estimate - parallel_vacuum_initialize - parallel_vacuum_initialize_worker - parallel_vacuum_collect_dead_items All callbacks are optional. parallel_vacuum_compute_workers needs to return 0 to disable parallel table vacuuming. There is no code using these new APIs for now. Upcoming parallel vacuum patches utilize these APIs. Reviewed-by: Amit Kapila Reviewed-by: Hayato Kuroda Reviewed-by: Peter Smith Reviewed-by: Tomas Vondra Reviewed-by: Dilip Kumar Reviewed-by: Melanie Plageman Discussion: https://postgr.es/m/CAD21AoAEfCNv-GgaDheDJ+s-p_Lv1H24AiJeNoPGCmZNSwL1YA@mail.gmail.com --- src/include/access/tableam.h | 138 +++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 77eb41eb6dc9..9dfbb4fa6992 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -36,6 +36,9 @@ extern PGDLLIMPORT bool synchronize_seqscans; struct BulkInsertStateData; struct IndexInfo; +struct ParallelContext; +struct ParallelVacuumState; +struct ParallelWorkerContext; struct SampleScanState; struct ValidateIndexState; @@ -653,6 +656,79 @@ typedef struct TableAmRoutine const VacuumParams params, BufferAccessStrategy bstrategy); + /* ------------------------------------------------------------------------ + * Callbacks for parallel table vacuum. + * ------------------------------------------------------------------------ + */ + + /* + * Compute the number of parallel workers for parallel table vacuum. The + * parallel degree for parallel vacuum is further limited by + * max_parallel_maintenance_workers. The function must return 0 to disable + * parallel table vacuum. + * + * 'nworkers_requested' is a >=0 number and the requested number of + * workers. This comes from the PARALLEL option. 0 means to choose the + * parallel degree based on the table AM specific factors such as table + * size. + * + * Optional callback. + */ + int (*parallel_vacuum_compute_workers) (Relation rel, + int nworkers_requested, + void *state); + + /* + * Estimate the size of shared memory needed for a parallel table vacuum + * of this relation. + * + * Not called if parallel table vacuum is disabled. + * + * Optional callback. + */ + void (*parallel_vacuum_estimate) (Relation rel, + struct ParallelContext *pcxt, + int nworkers, + void *state); + + /* + * Initialize DSM space for parallel table vacuum. + * + * Not called if parallel table vacuum is disabled. + * + * Optional callback. + */ + void (*parallel_vacuum_initialize) (Relation rel, + struct ParallelContext *pctx, + int nworkers, + void *state); + + /* + * Initialize AM-specific vacuum state for worker processes. + * + * The state_out is the output parameter so that arbitrary data can be + * passed to the subsequent callback, parallel_vacuum_remove_dead_items. + * + * Not called if parallel table vacuum is disabled. + * + * Optional callback. + */ + void (*parallel_vacuum_initialize_worker) (Relation rel, + struct ParallelVacuumState *pvs, + struct ParallelWorkerContext *pwcxt, + void **state_out); + + /* + * Execute a parallel scan to collect dead items. + * + * Not called if parallel table vacuum is disabled. + * + * Optional callback. + */ + void (*parallel_vacuum_collect_dead_items) (Relation rel, + struct ParallelVacuumState *pvs, + void *state); + /* * Prepare to analyze block `blockno` of `scan`. The scan has been started * with table_beginscan_analyze(). See also @@ -1679,6 +1755,68 @@ table_relation_vacuum(Relation rel, const VacuumParams params, rel->rd_tableam->relation_vacuum(rel, params, bstrategy); } +/* ---------------------------------------------------------------------------- + * Parallel vacuum related functions. + * ---------------------------------------------------------------------------- + */ + +/* + * Compute the number of parallel workers for a parallel vacuum scan of this + * relation. + */ +static inline int +table_parallel_vacuum_compute_workers(Relation rel, int nworkers_requested, + void *state) +{ + return rel->rd_tableam->parallel_vacuum_compute_workers(rel, + nworkers_requested, + state); +} + +/* + * Estimate the size of shared memory needed for a parallel vacuum scan of this + * of this relation. + */ +static inline void +table_parallel_vacuum_estimate(Relation rel, struct ParallelContext *pcxt, + int nworkers, void *state) +{ + Assert(nworkers > 0); + rel->rd_tableam->parallel_vacuum_estimate(rel, pcxt, nworkers, state); +} + +/* + * Initialize shared memory area for a parallel vacuum scan of this relation. + */ +static inline void +table_parallel_vacuum_initialize(Relation rel, struct ParallelContext *pcxt, + int nworkers, void *state) +{ + Assert(nworkers > 0); + rel->rd_tableam->parallel_vacuum_initialize(rel, pcxt, nworkers, state); +} + +/* + * Initialize AM-specific vacuum state for worker processes. + */ +static inline void +table_parallel_vacuum_initialize_worker(Relation rel, struct ParallelVacuumState *pvs, + struct ParallelWorkerContext *pwcxt, + void **state_out) +{ + rel->rd_tableam->parallel_vacuum_initialize_worker(rel, pvs, pwcxt, state_out); +} + +/* + * Execute a parallel vacuum scan to collect dead items. + */ +static inline void +table_parallel_vacuum_collect_dead_items(Relation rel, struct ParallelVacuumState *pvs, + void *state) +{ + rel->rd_tableam->parallel_vacuum_collect_dead_items(rel, pvs, state); +} + /* * Prepare to analyze the next block in the read stream. The scan needs to * have been started with table_beginscan_analyze(). Note that this routine From 4ca0fb710f96f1b8cade71aef565f2b28808b855 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 18 Feb 2025 17:45:36 -0800 Subject: [PATCH 2/5] vacuumparallel.c: Support parallel vacuuming for tables to collect dead items. Previously, parallel vacuum was available only for index vacuuming and index cleanup, ParallelVacuumState was initialized only when the table has at least two indexes that are eligible for parallel index vacuuming and cleanup. This commit extends vacuumparallel.c to support parallel table vacuuming. parallel_vacuum_init() now initializes ParallelVacuumState to perform parallel heap scan to collect dead items, or paralel index vacuuming/cleanup, or both. During the initialization, it asks the table AM for the number of parallel workers required for parallel table vacuuming. If >0, it enables parallel table vacuuming and calls further table AM APIs such as parallel_vacuum_estimate. For parallel table vacuuming, this commit introduces parallel_vacuum_collect_dead_items_begin() function, which can be used to collect dead items in the table (for example, the first pass over heap table in lazy vacuum for heap tables). Heap table AM disables the parallel heap vacuuming for now, but an upcoming patch uses it. Reviewed-by: Amit Kapila Reviewed-by: Hayato Kuroda Reviewed-by: Peter Smith Reviewed-by: Tomas Vondra Reviewed-by: Dilip Kumar Reviewed-by: Melanie Plageman Discussion: https://postgr.es/m/CAD21AoAEfCNv-GgaDheDJ+s-p_Lv1H24AiJeNoPGCmZNSwL1YA@mail.gmail.com --- src/backend/access/heap/vacuumlazy.c | 2 +- src/backend/commands/vacuumparallel.c | 393 +++++++++++++++++++------- src/include/commands/vacuum.h | 5 +- src/tools/pgindent/typedefs.list | 1 + 4 files changed, 293 insertions(+), 108 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 981d9380a925..0fce0f13ea13 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -3509,7 +3509,7 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->nindexes, nworkers, vac_work_mem, vacrel->verbose ? INFO : DEBUG2, - vacrel->bstrategy); + vacrel->bstrategy, (void *) vacrel); /* * If parallel mode started, dead_items and dead_items_info spaces are diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 0feea1d30ec3..3726fb41028d 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -4,17 +4,18 @@ * Support routines for parallel vacuum execution. * * This file contains routines that are intended to support setting up, using, - * and tearing down a ParallelVacuumState. + * and tearing down a ParallelVacuumState. ParallelVacuumState contains shared + * information as well as the memory space for storing dead items allocated in + * the DSA area. We launch * - * In a parallel vacuum, we perform both index bulk deletion and index cleanup - * with parallel worker processes. Individual indexes are processed by one - * vacuum process. ParallelVacuumState contains shared information as well as - * the memory space for storing dead items allocated in the DSA area. We - * launch parallel worker processes at the start of parallel index - * bulk-deletion and index cleanup and once all indexes are processed, the - * parallel worker processes exit. Each time we process indexes in parallel, - * the parallel context is re-initialized so that the same DSM can be used for - * multiple passes of index bulk-deletion and index cleanup. + * In a parallel vacuum, we perform table scan, index bulk-deletion, index + * cleanup, or all of them with parallel worker processes depending on the + * number of parallel workers required for each phase. So different numbers of + * workers might be required for the table scanning and index processing. + * We launch parallel worker processes at the start of a phase, and once we + * complete all work in the phase, parallel workers exit. Each time we process + * table or indexes in parallel, the parallel context is re-initialized so that + * the same DSM can be used for multiple passes of each phase. * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -26,8 +27,10 @@ */ #include "postgres.h" +#include "access/parallel.h" #include "access/amapi.h" #include "access/table.h" +#include "access/tableam.h" #include "access/xact.h" #include "commands/progress.h" #include "commands/vacuum.h" @@ -50,6 +53,13 @@ #define PARALLEL_VACUUM_KEY_WAL_USAGE 4 #define PARALLEL_VACUUM_KEY_INDEX_STATS 5 +/* The kind of parallel vacuum phases */ +typedef enum +{ + PV_WORK_PHASE_PROCESS_INDEXES, /* index vacuuming or cleanup */ + PV_WORK_PHASE_COLLECT_DEAD_ITEMS, /* collect dead tuples */ +} PVWorkPhase; + /* * Shared information among parallel workers. So this is allocated in the DSM * segment. @@ -65,6 +75,12 @@ typedef struct PVShared int elevel; int64 queryid; + /* + * Tell parallel workers what phase to perform: processing indexes or + * collecting dead tuples from the table. + */ + PVWorkPhase work_phase; + /* * Fields for both index vacuum and cleanup. * @@ -164,6 +180,9 @@ struct ParallelVacuumState /* NULL for worker processes */ ParallelContext *pcxt; + /* Do we need to reinitialize parallel DSM? */ + bool need_reinitialize_dsm; + /* Parent Heap Relation */ Relation heaprel; @@ -178,7 +197,7 @@ struct ParallelVacuumState * Shared index statistics among parallel vacuum workers. The array * element is allocated for every index, even those indexes where parallel * index vacuuming is unsafe or not worthwhile (e.g., - * will_parallel_vacuum[] is false). During parallel vacuum, + * idx_will_parallel_vacuum[] is false). During parallel vacuum, * IndexBulkDeleteResult of each index is kept in DSM and is copied into * local memory at the end of parallel vacuum. */ @@ -193,12 +212,18 @@ struct ParallelVacuumState /* Points to WAL usage area in DSM */ WalUsage *wal_usage; + /* + * The number of workers for parallel table vacuuming. If 0, the parallel + * table vacuum is disabled. + */ + int nworkers_for_table; + /* * False if the index is totally unsuitable target for all parallel * processing. For example, the index could be < * min_parallel_index_scan_size cutoff. */ - bool *will_parallel_vacuum; + bool *idx_will_parallel_vacuum; /* * The number of indexes that support parallel index bulk-deletion and @@ -221,8 +246,10 @@ struct ParallelVacuumState PVIndVacStatus status; }; -static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, - bool *will_parallel_vacuum); +static int parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes, + int nrequested, int *nworkers_for_table, + bool *idx_will_parallel_vacuum, + void *state); static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum); static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs); @@ -231,18 +258,25 @@ static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation PVIndStats *indstats); static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum); +static void parallel_vacuum_begin_work_phase(ParallelVacuumState *pvs, int nworkers, + PVWorkPhase work_phase); +static void parallel_vacuum_end_worker_phase(ParallelVacuumState *pvs); static void parallel_vacuum_error_callback(void *arg); /* * Try to enter parallel mode and create a parallel context. Then initialize * shared memory state. * + * nrequested_workers is the requested parallel degree. 0 means that the parallel + * degrees for table and indexes vacuum are decided differently. See the comments + * of parallel_vacuum_compute_workers() for details. + * * On success, return parallel vacuum state. Otherwise return NULL. */ ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, - int elevel, BufferAccessStrategy bstrategy) + int elevel, BufferAccessStrategy bstrategy, void *state) { ParallelVacuumState *pvs; ParallelContext *pcxt; @@ -251,38 +285,38 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PVIndStats *indstats; BufferUsage *buffer_usage; WalUsage *wal_usage; - bool *will_parallel_vacuum; + bool *idx_will_parallel_vacuum; Size est_indstats_len; Size est_shared_len; int nindexes_mwm = 0; int parallel_workers = 0; + int nworkers_for_table; int querylen; - /* - * A parallel vacuum must be requested and there must be indexes on the - * relation - */ + /* A parallel vacuum must be requested */ Assert(nrequested_workers >= 0); - Assert(nindexes > 0); /* * Compute the number of parallel vacuum workers to launch */ - will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); - parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes, + idx_will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); + parallel_workers = parallel_vacuum_compute_workers(rel, indrels, nindexes, nrequested_workers, - will_parallel_vacuum); + &nworkers_for_table, + idx_will_parallel_vacuum, + state); + if (parallel_workers <= 0) { /* Can't perform vacuum in parallel -- return NULL */ - pfree(will_parallel_vacuum); + pfree(idx_will_parallel_vacuum); return NULL; } pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState)); pvs->indrels = indrels; pvs->nindexes = nindexes; - pvs->will_parallel_vacuum = will_parallel_vacuum; + pvs->idx_will_parallel_vacuum = idx_will_parallel_vacuum; pvs->bstrategy = bstrategy; pvs->heaprel = rel; @@ -291,6 +325,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, parallel_workers); Assert(pcxt->nworkers > 0); pvs->pcxt = pcxt; + pvs->need_reinitialize_dsm = false; + pvs->nworkers_for_table = nworkers_for_table; /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */ est_indstats_len = mul_size(sizeof(PVIndStats), nindexes); @@ -327,6 +363,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, else querylen = 0; /* keep compiler quiet */ + /* Estimate AM-specific space for parallel table vacuum */ + if (pvs->nworkers_for_table > 0) + table_parallel_vacuum_estimate(rel, pcxt, pvs->nworkers_for_table, state); + InitializeParallelDSM(pcxt); /* Prepare index vacuum stats */ @@ -345,7 +385,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); - if (!will_parallel_vacuum[i]) + if (!idx_will_parallel_vacuum[i]) continue; if (indrel->rd_indam->amusemaintenanceworkmem) @@ -419,6 +459,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); } + /* Initialize AM-specific DSM space for parallel table vacuum */ + if (pvs->nworkers_for_table > 0) + table_parallel_vacuum_initialize(rel, pcxt, pvs->nworkers_for_table, state); + /* Success -- return parallel vacuum state */ return pvs; } @@ -456,7 +500,7 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) DestroyParallelContext(pvs->pcxt); ExitParallelMode(); - pfree(pvs->will_parallel_vacuum); + pfree(pvs->idx_will_parallel_vacuum); pfree(pvs); } @@ -533,26 +577,35 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup } /* - * Compute the number of parallel worker processes to request. Both index - * vacuum and index cleanup can be executed with parallel workers. - * The index is eligible for parallel vacuum iff its size is greater than - * min_parallel_index_scan_size as invoking workers for very small indexes - * can hurt performance. + * Compute the number of parallel worker processes to request for table + * vacuum and index vacuum/cleanup. Return the maximum number of parallel + * workers for table vacuuming and index vacuuming. + * + * nrequested is the number of parallel workers that user requested, which + * applies to both the number of workers for table vacuum and index vacuum. + * If nrequested is 0, we compute the parallel degree for them differently + * as described below. * - * nrequested is the number of parallel workers that user requested. If - * nrequested is 0, we compute the parallel degree based on nindexes, that is - * the number of indexes that support parallel vacuum. This function also - * sets will_parallel_vacuum to remember indexes that participate in parallel - * vacuum. + * For parallel table vacuum, we ask AM-specific routine to compute the + * number of parallel worker processes. The result is set to nworkers_table_p. + * + * For parallel index vacuum, the index is eligible for parallel vacuum iff + * its size is greater than min_parallel_index_scan_size as invoking workers + * for very small indexes can hurt performance. This function sets + * idx_will_parallel_vacuum to remember indexes that participate in parallel vacuum. */ static int -parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, - bool *will_parallel_vacuum) +parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes, + int nrequested, int *nworkers_table_p, + bool *idx_will_parallel_vacuum, void *state) { int nindexes_parallel = 0; int nindexes_parallel_bulkdel = 0; int nindexes_parallel_cleanup = 0; - int parallel_workers; + int nworkers_table = 0; + int nworkers_index = 0; + + *nworkers_table_p = 0; /* * We don't allow performing parallel operation in standalone backend or @@ -561,6 +614,14 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) return 0; + /* Compute the number of workers for parallel table scan */ + if (rel->rd_tableam->parallel_vacuum_compute_workers != NULL) + nworkers_table = table_parallel_vacuum_compute_workers(rel, nrequested, + state); + + /* Cap by max_parallel_maintenance_workers */ + nworkers_table = Min(nworkers_table, max_parallel_maintenance_workers); + /* * Compute the number of indexes that can participate in parallel vacuum. */ @@ -574,7 +635,7 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) continue; - will_parallel_vacuum[i] = true; + idx_will_parallel_vacuum[i] = true; if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) nindexes_parallel_bulkdel++; @@ -589,18 +650,18 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, /* The leader process takes one index */ nindexes_parallel--; - /* No index supports parallel vacuum */ - if (nindexes_parallel <= 0) - return 0; - - /* Compute the parallel degree */ - parallel_workers = (nrequested > 0) ? - Min(nrequested, nindexes_parallel) : nindexes_parallel; + if (nindexes_parallel > 0) + { + /* Take into account the requested number of workers */ + nworkers_index = (nrequested > 0) ? + Min(nrequested, nindexes_parallel) : nindexes_parallel; - /* Cap by max_parallel_maintenance_workers */ - parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); + /* Cap by max_parallel_maintenance_workers */ + nworkers_index = Min(nworkers_index, max_parallel_maintenance_workers); + } - return parallel_workers; + *nworkers_table_p = nworkers_table; + return Max(nworkers_table, nworkers_index); } /* @@ -657,7 +718,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL); indstats->status = new_status; indstats->parallel_workers_can_process = - (pvs->will_parallel_vacuum[i] && + (pvs->idx_will_parallel_vacuum[i] && parallel_vacuum_index_is_parallel_safe(pvs->indrels[i], num_index_scans, vacuum)); @@ -669,40 +730,9 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan /* Setup the shared cost-based vacuum delay and launch workers */ if (nworkers > 0) { - /* Reinitialize parallel context to relaunch parallel workers */ - if (num_index_scans > 0) - ReinitializeParallelDSM(pvs->pcxt); - - /* - * Set up shared cost balance and the number of active workers for - * vacuum delay. We need to do this before launching workers as - * otherwise, they might not see the updated values for these - * parameters. - */ - pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance); - pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0); - - /* - * The number of workers can vary between bulkdelete and cleanup - * phase. - */ - ReinitializeParallelWorkers(pvs->pcxt, nworkers); - - LaunchParallelWorkers(pvs->pcxt); - - if (pvs->pcxt->nworkers_launched > 0) - { - /* - * Reset the local cost values for leader backend as we have - * already accumulated the remaining balance of heap. - */ - VacuumCostBalance = 0; - VacuumCostBalanceLocal = 0; - - /* Enable shared cost balance for leader backend */ - VacuumSharedCostBalance = &(pvs->shared->cost_balance); - VacuumActiveNWorkers = &(pvs->shared->active_nworkers); - } + /* Start parallel vacuum workers for processing indexes */ + parallel_vacuum_begin_work_phase(pvs, nworkers, + PV_WORK_PHASE_PROCESS_INDEXES); if (vacuum) ereport(pvs->shared->elevel, @@ -732,13 +762,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan * to finish, or we might get incomplete data.) */ if (nworkers > 0) - { - /* Wait for all vacuum workers to finish */ - WaitForParallelWorkersToFinish(pvs->pcxt); - - for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); - } + parallel_vacuum_end_worker_phase(pvs); /* * Reset all index status back to initial (while checking that we have @@ -755,15 +779,8 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan indstats->status = PARALLEL_INDVAC_STATUS_INITIAL; } - /* - * Carry the shared balance value to heap scan and disable shared costing - */ - if (VacuumSharedCostBalance) - { - VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); - VacuumSharedCostBalance = NULL; - VacuumActiveNWorkers = NULL; - } + /* Parallel DSM will need to be reinitialized for the next execution */ + pvs->need_reinitialize_dsm = true; } /* @@ -979,6 +996,77 @@ parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, return true; } +/* + * Begin the parallel scan to collect dead items. Return the number of + * launched parallel workers. + * + * The caller must call parallel_vacuum_collect_dead_items_end() to finish + * the parallel scan. + */ +int +parallel_vacuum_collect_dead_items_begin(ParallelVacuumState *pvs) +{ + Assert(!IsParallelWorker()); + + if (pvs->nworkers_for_table == 0) + return 0; + + /* Start parallel vacuum workers for collecting dead items */ + Assert(pvs->nworkers_for_table <= pvs->pcxt->nworkers); + parallel_vacuum_begin_work_phase(pvs, pvs->nworkers_for_table, + PV_WORK_PHASE_COLLECT_DEAD_ITEMS); + + /* Include the worker count for the leader itself */ + if (pvs->pcxt->nworkers_launched > 0) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + return pvs->pcxt->nworkers_launched; +} + +/* + * Wait for all workers for parallel vacuum workers launched by + * parallel_vacuum_collect_dead_items_begin(), and gather workers' statistics. + */ +void +parallel_vacuum_collect_dead_items_end(ParallelVacuumState *pvs) +{ + Assert(!IsParallelWorker()); + Assert(pvs->shared->work_phase == PV_WORK_PHASE_COLLECT_DEAD_ITEMS); + + if (pvs->nworkers_for_table == 0) + return; + + /* Wait for parallel workers to finish */ + parallel_vacuum_end_worker_phase(pvs); + + /* Decrement the worker count for the leader itself */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * The function is for parallel workers to execute the parallel scan to + * collect dead tuples. + */ +static void +parallel_vacuum_process_table(ParallelVacuumState *pvs, void *state) +{ + Assert(VacuumActiveNWorkers); + Assert(pvs->shared->work_phase == PV_WORK_PHASE_COLLECT_DEAD_ITEMS); + + /* Increment the active worker before starting the table vacuum */ + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + /* Do the parallel scan to collect dead tuples */ + table_parallel_vacuum_collect_dead_items(pvs->heaprel, pvs, state); + + /* + * We have completed the table vacuum so decrement the active worker + * count. + */ + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + /* * Perform work within a launched parallel process. * @@ -998,6 +1086,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) WalUsage *wal_usage; int nindexes; char *sharedquery; + void *state; ErrorContextCallback errcallback; /* @@ -1030,7 +1119,6 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) * matched to the leader's one. */ vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); - Assert(nindexes > 0); /* * Apply the desired value of maintenance_work_mem within this process. @@ -1076,6 +1164,17 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM, shared->ring_nbuffers * (BLCKSZ / 1024)); + /* Initialize AM-specific vacuum state for parallel table vacuuming */ + if (shared->work_phase == PV_WORK_PHASE_COLLECT_DEAD_ITEMS) + { + ParallelWorkerContext pwcxt; + + pwcxt.toc = toc; + pwcxt.seg = seg; + table_parallel_vacuum_initialize_worker(rel, &pvs, &pwcxt, + &state); + } + /* Setup error traceback support for ereport() */ errcallback.callback = parallel_vacuum_error_callback; errcallback.arg = &pvs; @@ -1085,8 +1184,19 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) /* Prepare to track buffer usage during parallel execution */ InstrStartParallelQuery(); - /* Process indexes to perform vacuum/cleanup */ - parallel_vacuum_process_safe_indexes(&pvs); + switch (pvs.shared->work_phase) + { + case PV_WORK_PHASE_COLLECT_DEAD_ITEMS: + /* Scan the table to collect dead items */ + parallel_vacuum_process_table(&pvs, state); + break; + case PV_WORK_PHASE_PROCESS_INDEXES: + /* Process indexes to perform vacuum/cleanup */ + parallel_vacuum_process_safe_indexes(&pvs); + break; + default: + elog(ERROR, "unrecognized parallel vacuum phase %d", pvs.shared->work_phase); + } /* Report buffer/WAL usage during parallel execution */ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); @@ -1109,6 +1219,77 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) FreeAccessStrategy(pvs.bstrategy); } +/* + * Launch parallel vacuum workers for the given phase. If at least one + * worker launched, enable the shared vacuum delay costing. + */ +static void +parallel_vacuum_begin_work_phase(ParallelVacuumState *pvs, int nworkers, + PVWorkPhase work_phase) +{ + /* Set the work phase */ + pvs->shared->work_phase = work_phase; + + /* Reinitialize parallel context to relaunch parallel workers */ + if (pvs->need_reinitialize_dsm) + ReinitializeParallelDSM(pvs->pcxt); + + /* + * Set up shared cost balance and the number of active workers for vacuum + * delay. We need to do this before launching workers as otherwise, they + * might not see the updated values for these parameters. + */ + pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance); + pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0); + + /* + * The number of workers can vary between bulkdelete and cleanup phase. + */ + ReinitializeParallelWorkers(pvs->pcxt, nworkers); + + LaunchParallelWorkers(pvs->pcxt); + + /* Enable shared vacuum costing if we are able to launch any worker */ + if (pvs->pcxt->nworkers_launched > 0) + { + /* + * Reset the local cost values for leader backend as we have already + * accumulated the remaining balance of heap. + */ + VacuumCostBalance = 0; + VacuumCostBalanceLocal = 0; + + /* Enable shared cost balance for leader backend */ + VacuumSharedCostBalance = &(pvs->shared->cost_balance); + VacuumActiveNWorkers = &(pvs->shared->active_nworkers); + } +} + +/* + * Wait for parallel vacuum workers to finish, accumulate the statistics, + * and disable shared vacuum delay costing if enabled. + */ +static void +parallel_vacuum_end_worker_phase(ParallelVacuumState *pvs) +{ + /* Wait for all vacuum workers to finish */ + WaitForParallelWorkersToFinish(pvs->pcxt); + + for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); + + /* Carry the shared balance value and disable shared costing */ + if (VacuumSharedCostBalance) + { + VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); + VacuumSharedCostBalance = NULL; + VacuumActiveNWorkers = NULL; + } + + /* Parallel DSM will need to be reinitialized for the next execution */ + pvs->need_reinitialize_dsm = true; +} + /* * Error context callback for errors occurring during parallel index vacuum. * The error context messages should match the messages set in the lazy vacuum diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 14eeccbd7185..1369377ea98b 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -382,7 +382,8 @@ extern void VacuumUpdateCosts(void); extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, - BufferAccessStrategy bstrategy); + BufferAccessStrategy bstrategy, + void *state); extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats); extern TidStore *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p); @@ -394,6 +395,8 @@ extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count); +extern int parallel_vacuum_collect_dead_items_begin(ParallelVacuumState *pvs); +extern void parallel_vacuum_collect_dead_items_end(ParallelVacuumState *pvs); extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); /* in commands/analyze.c */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 3c80d49b67e0..6e50f5ec4dd7 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2037,6 +2037,7 @@ PVIndStats PVIndVacStatus PVOID PVShared +PVWorkPhase PX_Alias PX_Cipher PX_Combo From b4ec56774792612b5481abd767290e4c83243ce2 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Wed, 26 Feb 2025 11:31:55 -0800 Subject: [PATCH 3/5] Move lazy heap scan related variables to new struct LVScanData. This is a pure refactoring for upcoming parallel heap scan, which requires storing relation statistics and relation data such as extant oldest XID/MXID collected during lazy heap scan to a shared memory area. Reviewed-by: Amit Kapila Reviewed-by: Hayato Kuroda Reviewed-by: Peter Smith Reviewed-by: Tomas Vondra Reviewed-by: Dilip Kumar Reviewed-by: Melanie Plageman Discussion: https://postgr.es/m/CAD21AoAEfCNv-GgaDheDJ+s-p_Lv1H24AiJeNoPGCmZNSwL1YA@mail.gmail.com --- src/backend/access/heap/vacuumlazy.c | 308 ++++++++++++++------------- src/tools/pgindent/typedefs.list | 1 + 2 files changed, 163 insertions(+), 146 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 0fce0f13ea13..2282c02ffa44 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -255,6 +255,54 @@ typedef enum #define VAC_BLK_WAS_EAGER_SCANNED (1 << 0) #define VAC_BLK_ALL_VISIBLE_ACCORDING_TO_VM (1 << 1) +/* + * Data and counters updated during lazy heap scan. + */ +typedef struct LVScanData +{ + BlockNumber scanned_pages; /* # pages examined (not skipped via VM) */ + + /* + * Count of all-visible blocks eagerly scanned (for logging only). This + * does not include skippable blocks scanned due to SKIP_PAGES_THRESHOLD. + */ + BlockNumber eager_scanned_pages; + + BlockNumber removed_pages; /* # pages removed by relation truncation */ + BlockNumber new_frozen_tuple_pages; /* # pages with newly frozen tuples */ + + /* # pages newly set all-visible in the VM */ + BlockNumber vm_new_visible_pages; + + /* + * # pages newly set all-visible and all-frozen in the VM. This is a + * subset of vm_new_visible_pages. That is, vm_new_visible_pages includes + * all pages set all-visible, but vm_new_visible_frozen_pages includes + * only those which were also set all-frozen. + */ + BlockNumber vm_new_visible_frozen_pages; + + /* # all-visible pages newly set all-frozen in the VM */ + BlockNumber vm_new_frozen_pages; + + BlockNumber lpdead_item_pages; /* # pages with LP_DEAD items */ + BlockNumber missed_dead_pages; /* # pages with missed dead tuples */ + BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ + + /* Counters that follow are only for scanned_pages */ + int64 tuples_deleted; /* # deleted from table */ + int64 tuples_frozen; /* # newly frozen */ + int64 lpdead_items; /* # deleted from indexes */ + int64 live_tuples; /* # live tuples remaining */ + int64 recently_dead_tuples; /* # dead, but not yet removable */ + int64 missed_dead_tuples; /* # removable, but not removed */ + + /* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid. */ + TransactionId NewRelfrozenXid; + MultiXactId NewRelminMxid; + bool skippedallvis; +} LVScanData; + typedef struct LVRelState { /* Target heap relation and its indexes */ @@ -281,10 +329,6 @@ typedef struct LVRelState /* VACUUM operation's cutoffs for freezing and pruning */ struct VacuumCutoffs cutoffs; GlobalVisState *vistest; - /* Tracks oldest extant XID/MXID for setting relfrozenxid/relminmxid */ - TransactionId NewRelfrozenXid; - MultiXactId NewRelminMxid; - bool skippedallvis; /* Error reporting state */ char *dbname; @@ -310,34 +354,9 @@ typedef struct LVRelState VacDeadItemsInfo *dead_items_info; BlockNumber rel_pages; /* total number of pages */ - BlockNumber scanned_pages; /* # pages examined (not skipped via VM) */ - /* - * Count of all-visible blocks eagerly scanned (for logging only). This - * does not include skippable blocks scanned due to SKIP_PAGES_THRESHOLD. - */ - BlockNumber eager_scanned_pages; - - BlockNumber removed_pages; /* # pages removed by relation truncation */ - BlockNumber new_frozen_tuple_pages; /* # pages with newly frozen tuples */ - - /* # pages newly set all-visible in the VM */ - BlockNumber vm_new_visible_pages; - - /* - * # pages newly set all-visible and all-frozen in the VM. This is a - * subset of vm_new_visible_pages. That is, vm_new_visible_pages includes - * all pages set all-visible, but vm_new_visible_frozen_pages includes - * only those which were also set all-frozen. - */ - BlockNumber vm_new_visible_frozen_pages; - - /* # all-visible pages newly set all-frozen in the VM */ - BlockNumber vm_new_frozen_pages; - - BlockNumber lpdead_item_pages; /* # pages with LP_DEAD items */ - BlockNumber missed_dead_pages; /* # pages with missed dead tuples */ - BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */ + /* Data and counters updated during lazy heap scan */ + LVScanData *scan_data; /* Statistics output by us, for table */ double new_rel_tuples; /* new estimated total # of tuples */ @@ -347,13 +366,6 @@ typedef struct LVRelState /* Instrumentation counters */ int num_index_scans; - /* Counters that follow are only for scanned_pages */ - int64 tuples_deleted; /* # deleted from table */ - int64 tuples_frozen; /* # newly frozen */ - int64 lpdead_items; /* # deleted from indexes */ - int64 live_tuples; /* # live tuples remaining */ - int64 recently_dead_tuples; /* # dead, but not yet removable */ - int64 missed_dead_tuples; /* # removable, but not removed */ /* State maintained by heap_vac_scan_next_block() */ BlockNumber current_block; /* last block returned */ @@ -615,6 +627,7 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, BufferAccessStrategy bstrategy) { LVRelState *vacrel; + LVScanData *scan_data; bool verbose, instrument, skipwithvm, @@ -729,14 +742,24 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, } /* Initialize page counters explicitly (be tidy) */ - vacrel->scanned_pages = 0; - vacrel->eager_scanned_pages = 0; - vacrel->removed_pages = 0; - vacrel->new_frozen_tuple_pages = 0; - vacrel->lpdead_item_pages = 0; - vacrel->missed_dead_pages = 0; - vacrel->nonempty_pages = 0; - /* dead_items_alloc allocates vacrel->dead_items later on */ + scan_data = palloc(sizeof(LVScanData)); + scan_data->scanned_pages = 0; + scan_data->eager_scanned_pages = 0; + scan_data->removed_pages = 0; + scan_data->new_frozen_tuple_pages = 0; + scan_data->lpdead_item_pages = 0; + scan_data->missed_dead_pages = 0; + scan_data->nonempty_pages = 0; + scan_data->tuples_deleted = 0; + scan_data->tuples_frozen = 0; + scan_data->lpdead_items = 0; + scan_data->live_tuples = 0; + scan_data->recently_dead_tuples = 0; + scan_data->missed_dead_tuples = 0; + scan_data->vm_new_visible_pages = 0; + scan_data->vm_new_visible_frozen_pages = 0; + scan_data->vm_new_frozen_pages = 0; + vacrel->scan_data = scan_data; /* Allocate/initialize output statistics state */ vacrel->new_rel_tuples = 0; @@ -746,16 +769,8 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, /* Initialize remaining counters (be tidy) */ vacrel->num_index_scans = 0; - vacrel->tuples_deleted = 0; - vacrel->tuples_frozen = 0; - vacrel->lpdead_items = 0; - vacrel->live_tuples = 0; - vacrel->recently_dead_tuples = 0; - vacrel->missed_dead_tuples = 0; - vacrel->vm_new_visible_pages = 0; - vacrel->vm_new_visible_frozen_pages = 0; - vacrel->vm_new_frozen_pages = 0; + /* dead_items_alloc allocates vacrel->dead_items later on */ /* * Get cutoffs that determine which deleted tuples are considered DEAD, @@ -778,15 +793,15 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, vacrel->vistest = GlobalVisTestFor(rel); /* Initialize state used to track oldest extant XID/MXID */ - vacrel->NewRelfrozenXid = vacrel->cutoffs.OldestXmin; - vacrel->NewRelminMxid = vacrel->cutoffs.OldestMxact; + vacrel->scan_data->NewRelfrozenXid = vacrel->cutoffs.OldestXmin; + vacrel->scan_data->NewRelminMxid = vacrel->cutoffs.OldestMxact; /* * Initialize state related to tracking all-visible page skipping. This is * very important to determine whether or not it is safe to advance the * relfrozenxid/relminmxid. */ - vacrel->skippedallvis = false; + vacrel->scan_data->skippedallvis = false; skipwithvm = true; if (params.options & VACOPT_DISABLE_PAGE_SKIPPING) { @@ -874,15 +889,15 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, * value >= FreezeLimit, and relminmxid to a value >= MultiXactCutoff. * Non-aggressive VACUUMs may advance them by any amount, or not at all. */ - Assert(vacrel->NewRelfrozenXid == vacrel->cutoffs.OldestXmin || + Assert(vacrel->scan_data->NewRelfrozenXid == vacrel->cutoffs.OldestXmin || TransactionIdPrecedesOrEquals(vacrel->aggressive ? vacrel->cutoffs.FreezeLimit : vacrel->cutoffs.relfrozenxid, - vacrel->NewRelfrozenXid)); - Assert(vacrel->NewRelminMxid == vacrel->cutoffs.OldestMxact || + vacrel->scan_data->NewRelfrozenXid)); + Assert(vacrel->scan_data->NewRelminMxid == vacrel->cutoffs.OldestMxact || MultiXactIdPrecedesOrEquals(vacrel->aggressive ? vacrel->cutoffs.MultiXactCutoff : vacrel->cutoffs.relminmxid, - vacrel->NewRelminMxid)); - if (vacrel->skippedallvis) + vacrel->scan_data->NewRelminMxid)); + if (vacrel->scan_data->skippedallvis) { /* * Must keep original relfrozenxid in a non-aggressive VACUUM that @@ -890,8 +905,8 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, * values will have missed unfrozen XIDs from the pages we skipped. */ Assert(!vacrel->aggressive); - vacrel->NewRelfrozenXid = InvalidTransactionId; - vacrel->NewRelminMxid = InvalidMultiXactId; + vacrel->scan_data->NewRelfrozenXid = InvalidTransactionId; + vacrel->scan_data->NewRelminMxid = InvalidMultiXactId; } /* @@ -921,7 +936,8 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, vac_update_relstats(rel, new_rel_pages, vacrel->new_live_tuples, new_rel_allvisible, new_rel_allfrozen, vacrel->nindexes > 0, - vacrel->NewRelfrozenXid, vacrel->NewRelminMxid, + vacrel->scan_data->NewRelfrozenXid, + vacrel->scan_data->NewRelminMxid, &frozenxid_updated, &minmulti_updated, false); /* @@ -937,8 +953,8 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, pgstat_report_vacuum(RelationGetRelid(rel), rel->rd_rel->relisshared, Max(vacrel->new_live_tuples, 0), - vacrel->recently_dead_tuples + - vacrel->missed_dead_tuples, + vacrel->scan_data->recently_dead_tuples + + vacrel->scan_data->missed_dead_tuples, starttime); pgstat_progress_end_command(); @@ -1012,23 +1028,23 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, vacrel->relname, vacrel->num_index_scans); appendStringInfo(&buf, _("pages: %u removed, %u remain, %u scanned (%.2f%% of total), %u eagerly scanned\n"), - vacrel->removed_pages, + vacrel->scan_data->removed_pages, new_rel_pages, - vacrel->scanned_pages, + vacrel->scan_data->scanned_pages, orig_rel_pages == 0 ? 100.0 : - 100.0 * vacrel->scanned_pages / + 100.0 * vacrel->scan_data->scanned_pages / orig_rel_pages, - vacrel->eager_scanned_pages); + vacrel->scan_data->eager_scanned_pages); appendStringInfo(&buf, _("tuples: %" PRId64 " removed, %" PRId64 " remain, %" PRId64 " are dead but not yet removable\n"), - vacrel->tuples_deleted, + vacrel->scan_data->tuples_deleted, (int64) vacrel->new_rel_tuples, - vacrel->recently_dead_tuples); - if (vacrel->missed_dead_tuples > 0) + vacrel->scan_data->recently_dead_tuples); + if (vacrel->scan_data->missed_dead_tuples > 0) appendStringInfo(&buf, _("tuples missed: %" PRId64 " dead from %u pages not removed due to cleanup lock contention\n"), - vacrel->missed_dead_tuples, - vacrel->missed_dead_pages); + vacrel->scan_data->missed_dead_tuples, + vacrel->scan_data->missed_dead_pages); diff = (int32) (ReadNextTransactionId() - vacrel->cutoffs.OldestXmin); appendStringInfo(&buf, @@ -1036,33 +1052,33 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, vacrel->cutoffs.OldestXmin, diff); if (frozenxid_updated) { - diff = (int32) (vacrel->NewRelfrozenXid - + diff = (int32) (vacrel->scan_data->NewRelfrozenXid - vacrel->cutoffs.relfrozenxid); appendStringInfo(&buf, _("new relfrozenxid: %u, which is %d XIDs ahead of previous value\n"), - vacrel->NewRelfrozenXid, diff); + vacrel->scan_data->NewRelfrozenXid, diff); } if (minmulti_updated) { - diff = (int32) (vacrel->NewRelminMxid - + diff = (int32) (vacrel->scan_data->NewRelminMxid - vacrel->cutoffs.relminmxid); appendStringInfo(&buf, _("new relminmxid: %u, which is %d MXIDs ahead of previous value\n"), - vacrel->NewRelminMxid, diff); + vacrel->scan_data->NewRelminMxid, diff); } appendStringInfo(&buf, _("frozen: %u pages from table (%.2f%% of total) had %" PRId64 " tuples frozen\n"), - vacrel->new_frozen_tuple_pages, + vacrel->scan_data->new_frozen_tuple_pages, orig_rel_pages == 0 ? 100.0 : - 100.0 * vacrel->new_frozen_tuple_pages / + 100.0 * vacrel->scan_data->new_frozen_tuple_pages / orig_rel_pages, - vacrel->tuples_frozen); + vacrel->scan_data->tuples_frozen); appendStringInfo(&buf, _("visibility map: %u pages set all-visible, %u pages set all-frozen (%u were all-visible)\n"), - vacrel->vm_new_visible_pages, - vacrel->vm_new_visible_frozen_pages + - vacrel->vm_new_frozen_pages, - vacrel->vm_new_frozen_pages); + vacrel->scan_data->vm_new_visible_pages, + vacrel->scan_data->vm_new_visible_frozen_pages + + vacrel->scan_data->vm_new_frozen_pages, + vacrel->scan_data->vm_new_frozen_pages); if (vacrel->do_index_vacuuming) { if (vacrel->nindexes == 0 || vacrel->num_index_scans == 0) @@ -1082,10 +1098,10 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, msgfmt = _("%u pages from table (%.2f%% of total) have %" PRId64 " dead item identifiers\n"); } appendStringInfo(&buf, msgfmt, - vacrel->lpdead_item_pages, + vacrel->scan_data->lpdead_item_pages, orig_rel_pages == 0 ? 100.0 : - 100.0 * vacrel->lpdead_item_pages / orig_rel_pages, - vacrel->lpdead_items); + 100.0 * vacrel->scan_data->lpdead_item_pages / orig_rel_pages, + vacrel->scan_data->lpdead_items); for (int i = 0; i < vacrel->nindexes; i++) { IndexBulkDeleteResult *istat = vacrel->indstats[i]; @@ -1261,8 +1277,8 @@ lazy_scan_heap(LVRelState *vacrel) * one-pass strategy, and the two-pass strategy with the index_cleanup * param set to 'off'. */ - if (vacrel->scanned_pages > 0 && - vacrel->scanned_pages % FAILSAFE_EVERY_PAGES == 0) + if (vacrel->scan_data->scanned_pages > 0 && + vacrel->scan_data->scanned_pages % FAILSAFE_EVERY_PAGES == 0) lazy_check_wraparound_failsafe(vacrel); /* @@ -1317,9 +1333,9 @@ lazy_scan_heap(LVRelState *vacrel) page = BufferGetPage(buf); blkno = BufferGetBlockNumber(buf); - vacrel->scanned_pages++; + vacrel->scan_data->scanned_pages++; if (blk_info & VAC_BLK_WAS_EAGER_SCANNED) - vacrel->eager_scanned_pages++; + vacrel->scan_data->eager_scanned_pages++; /* Report as block scanned, update error traceback information */ pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, blkno); @@ -1506,16 +1522,16 @@ lazy_scan_heap(LVRelState *vacrel) /* now we can compute the new value for pg_class.reltuples */ vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages, - vacrel->scanned_pages, - vacrel->live_tuples); + vacrel->scan_data->scanned_pages, + vacrel->scan_data->live_tuples); /* * Also compute the total number of surviving heap entries. In the * (unlikely) scenario that new_live_tuples is -1, take it as zero. */ vacrel->new_rel_tuples = - Max(vacrel->new_live_tuples, 0) + vacrel->recently_dead_tuples + - vacrel->missed_dead_tuples; + Max(vacrel->new_live_tuples, 0) + vacrel->scan_data->recently_dead_tuples + + vacrel->scan_data->missed_dead_tuples; read_stream_end(stream); @@ -1562,7 +1578,7 @@ lazy_scan_heap(LVRelState *vacrel) * callback_private_data contains a reference to the LVRelState, passed to the * read stream API during stream setup. The LVRelState is an in/out parameter * here (locally named `vacrel`). Vacuum options and information about the - * relation are read from it. vacrel->skippedallvis is set if we skip a block + * relation are read from it. vacrel->scan_data->skippedallvis is set if we skip a block * that's all-visible but not all-frozen (to ensure that we don't update * relfrozenxid in that case). vacrel also holds information about the next * unskippable block -- as bookkeeping for this function. @@ -1624,7 +1640,7 @@ heap_vac_scan_next_block(ReadStream *stream, { next_block = vacrel->next_unskippable_block; if (skipsallvis) - vacrel->skippedallvis = true; + vacrel->scan_data->skippedallvis = true; } } @@ -1899,8 +1915,8 @@ lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf, BlockNumber blkno, END_CRIT_SECTION(); /* Count the newly all-frozen pages for logging */ - vacrel->vm_new_visible_pages++; - vacrel->vm_new_visible_frozen_pages++; + vacrel->scan_data->vm_new_visible_pages++; + vacrel->scan_data->vm_new_visible_frozen_pages++; } freespace = PageGetHeapFreeSpace(page); @@ -1977,10 +1993,10 @@ lazy_scan_prune(LVRelState *vacrel, heap_page_prune_and_freeze(rel, buf, vacrel->vistest, prune_options, &vacrel->cutoffs, &presult, PRUNE_VACUUM_SCAN, &vacrel->offnum, - &vacrel->NewRelfrozenXid, &vacrel->NewRelminMxid); + &vacrel->scan_data->NewRelfrozenXid, &vacrel->scan_data->NewRelminMxid); - Assert(MultiXactIdIsValid(vacrel->NewRelminMxid)); - Assert(TransactionIdIsValid(vacrel->NewRelfrozenXid)); + Assert(MultiXactIdIsValid(vacrel->scan_data->NewRelminMxid)); + Assert(TransactionIdIsValid(vacrel->scan_data->NewRelfrozenXid)); if (presult.nfrozen > 0) { @@ -1990,7 +2006,7 @@ lazy_scan_prune(LVRelState *vacrel, * frozen tuples (don't confuse that with pages newly set all-frozen * in VM). */ - vacrel->new_frozen_tuple_pages++; + vacrel->scan_data->new_frozen_tuple_pages++; } /* @@ -2025,7 +2041,7 @@ lazy_scan_prune(LVRelState *vacrel, */ if (presult.lpdead_items > 0) { - vacrel->lpdead_item_pages++; + vacrel->scan_data->lpdead_item_pages++; /* * deadoffsets are collected incrementally in @@ -2040,15 +2056,15 @@ lazy_scan_prune(LVRelState *vacrel, } /* Finally, add page-local counts to whole-VACUUM counts */ - vacrel->tuples_deleted += presult.ndeleted; - vacrel->tuples_frozen += presult.nfrozen; - vacrel->lpdead_items += presult.lpdead_items; - vacrel->live_tuples += presult.live_tuples; - vacrel->recently_dead_tuples += presult.recently_dead_tuples; + vacrel->scan_data->tuples_deleted += presult.ndeleted; + vacrel->scan_data->tuples_frozen += presult.nfrozen; + vacrel->scan_data->lpdead_items += presult.lpdead_items; + vacrel->scan_data->live_tuples += presult.live_tuples; + vacrel->scan_data->recently_dead_tuples += presult.recently_dead_tuples; /* Can't truncate this page */ if (presult.hastup) - vacrel->nonempty_pages = blkno + 1; + vacrel->scan_data->nonempty_pages = blkno + 1; /* Did we find LP_DEAD items? */ *has_lpdead_items = (presult.lpdead_items > 0); @@ -2097,17 +2113,17 @@ lazy_scan_prune(LVRelState *vacrel, */ if ((old_vmbits & VISIBILITYMAP_ALL_VISIBLE) == 0) { - vacrel->vm_new_visible_pages++; + vacrel->scan_data->vm_new_visible_pages++; if (presult.all_frozen) { - vacrel->vm_new_visible_frozen_pages++; + vacrel->scan_data->vm_new_visible_frozen_pages++; *vm_page_frozen = true; } } else if ((old_vmbits & VISIBILITYMAP_ALL_FROZEN) == 0 && presult.all_frozen) { - vacrel->vm_new_frozen_pages++; + vacrel->scan_data->vm_new_frozen_pages++; *vm_page_frozen = true; } } @@ -2201,8 +2217,8 @@ lazy_scan_prune(LVRelState *vacrel, */ if ((old_vmbits & VISIBILITYMAP_ALL_VISIBLE) == 0) { - vacrel->vm_new_visible_pages++; - vacrel->vm_new_visible_frozen_pages++; + vacrel->scan_data->vm_new_visible_pages++; + vacrel->scan_data->vm_new_visible_frozen_pages++; *vm_page_frozen = true; } @@ -2212,7 +2228,7 @@ lazy_scan_prune(LVRelState *vacrel, */ else { - vacrel->vm_new_frozen_pages++; + vacrel->scan_data->vm_new_frozen_pages++; *vm_page_frozen = true; } } @@ -2255,8 +2271,8 @@ lazy_scan_noprune(LVRelState *vacrel, missed_dead_tuples; bool hastup; HeapTupleHeader tupleheader; - TransactionId NoFreezePageRelfrozenXid = vacrel->NewRelfrozenXid; - MultiXactId NoFreezePageRelminMxid = vacrel->NewRelminMxid; + TransactionId NoFreezePageRelfrozenXid = vacrel->scan_data->NewRelfrozenXid; + MultiXactId NoFreezePageRelminMxid = vacrel->scan_data->NewRelminMxid; OffsetNumber deadoffsets[MaxHeapTuplesPerPage]; Assert(BufferGetBlockNumber(buf) == blkno); @@ -2383,8 +2399,8 @@ lazy_scan_noprune(LVRelState *vacrel, * this particular page until the next VACUUM. Remember its details now. * (lazy_scan_prune expects a clean slate, so we have to do this last.) */ - vacrel->NewRelfrozenXid = NoFreezePageRelfrozenXid; - vacrel->NewRelminMxid = NoFreezePageRelminMxid; + vacrel->scan_data->NewRelfrozenXid = NoFreezePageRelfrozenXid; + vacrel->scan_data->NewRelminMxid = NoFreezePageRelminMxid; /* Save any LP_DEAD items found on the page in dead_items */ if (vacrel->nindexes == 0) @@ -2411,25 +2427,25 @@ lazy_scan_noprune(LVRelState *vacrel, * indexes will be deleted during index vacuuming (and then marked * LP_UNUSED in the heap) */ - vacrel->lpdead_item_pages++; + vacrel->scan_data->lpdead_item_pages++; dead_items_add(vacrel, blkno, deadoffsets, lpdead_items); - vacrel->lpdead_items += lpdead_items; + vacrel->scan_data->lpdead_items += lpdead_items; } /* * Finally, add relevant page-local counts to whole-VACUUM counts */ - vacrel->live_tuples += live_tuples; - vacrel->recently_dead_tuples += recently_dead_tuples; - vacrel->missed_dead_tuples += missed_dead_tuples; + vacrel->scan_data->live_tuples += live_tuples; + vacrel->scan_data->recently_dead_tuples += recently_dead_tuples; + vacrel->scan_data->missed_dead_tuples += missed_dead_tuples; if (missed_dead_tuples > 0) - vacrel->missed_dead_pages++; + vacrel->scan_data->missed_dead_pages++; /* Can't truncate this page */ if (hastup) - vacrel->nonempty_pages = blkno + 1; + vacrel->scan_data->nonempty_pages = blkno + 1; /* Did we find LP_DEAD items? */ *has_lpdead_items = (lpdead_items > 0); @@ -2458,7 +2474,7 @@ lazy_vacuum(LVRelState *vacrel) /* Should not end up here with no indexes */ Assert(vacrel->nindexes > 0); - Assert(vacrel->lpdead_item_pages > 0); + Assert(vacrel->scan_data->lpdead_item_pages > 0); if (!vacrel->do_index_vacuuming) { @@ -2492,7 +2508,7 @@ lazy_vacuum(LVRelState *vacrel) BlockNumber threshold; Assert(vacrel->num_index_scans == 0); - Assert(vacrel->lpdead_items == vacrel->dead_items_info->num_items); + Assert(vacrel->scan_data->lpdead_items == vacrel->dead_items_info->num_items); Assert(vacrel->do_index_vacuuming); Assert(vacrel->do_index_cleanup); @@ -2519,7 +2535,7 @@ lazy_vacuum(LVRelState *vacrel) * cases then this may need to be reconsidered. */ threshold = (double) vacrel->rel_pages * BYPASS_THRESHOLD_PAGES; - bypass = (vacrel->lpdead_item_pages < threshold && + bypass = (vacrel->scan_data->lpdead_item_pages < threshold && TidStoreMemoryUsage(vacrel->dead_items) < 32 * 1024 * 1024); } @@ -2657,7 +2673,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) * place). */ Assert(vacrel->num_index_scans > 0 || - vacrel->dead_items_info->num_items == vacrel->lpdead_items); + vacrel->dead_items_info->num_items == vacrel->scan_data->lpdead_items); Assert(allindexes || VacuumFailsafeActive); /* @@ -2819,8 +2835,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel) * the second heap pass. No more, no less. */ Assert(vacrel->num_index_scans > 1 || - (vacrel->dead_items_info->num_items == vacrel->lpdead_items && - vacuumed_pages == vacrel->lpdead_item_pages)); + (vacrel->dead_items_info->num_items == vacrel->scan_data->lpdead_items && + vacuumed_pages == vacrel->scan_data->lpdead_item_pages)); ereport(DEBUG2, (errmsg("table \"%s\": removed %" PRId64 " dead item identifiers in %u pages", @@ -2930,9 +2946,9 @@ lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, Buffer buffer, flags); /* Count the newly set VM page for logging */ - vacrel->vm_new_visible_pages++; + vacrel->scan_data->vm_new_visible_pages++; if (all_frozen) - vacrel->vm_new_visible_frozen_pages++; + vacrel->scan_data->vm_new_visible_frozen_pages++; } /* Revert to the previous phase information for error traceback */ @@ -3008,7 +3024,7 @@ static void lazy_cleanup_all_indexes(LVRelState *vacrel) { double reltuples = vacrel->new_rel_tuples; - bool estimated_count = vacrel->scanned_pages < vacrel->rel_pages; + bool estimated_count = vacrel->scan_data->scanned_pages < vacrel->rel_pages; const int progress_start_index[] = { PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_INDEXES_TOTAL @@ -3189,7 +3205,7 @@ should_attempt_truncation(LVRelState *vacrel) if (!vacrel->do_rel_truncate || VacuumFailsafeActive) return false; - possibly_freeable = vacrel->rel_pages - vacrel->nonempty_pages; + possibly_freeable = vacrel->rel_pages - vacrel->scan_data->nonempty_pages; if (possibly_freeable > 0 && (possibly_freeable >= REL_TRUNCATE_MINIMUM || possibly_freeable >= vacrel->rel_pages / REL_TRUNCATE_FRACTION)) @@ -3215,7 +3231,7 @@ lazy_truncate_heap(LVRelState *vacrel) /* Update error traceback information one last time */ update_vacuum_error_info(vacrel, NULL, VACUUM_ERRCB_PHASE_TRUNCATE, - vacrel->nonempty_pages, InvalidOffsetNumber); + vacrel->scan_data->nonempty_pages, InvalidOffsetNumber); /* * Loop until no more truncating can be done. @@ -3316,7 +3332,7 @@ lazy_truncate_heap(LVRelState *vacrel) * without also touching reltuples, since the tuple count wasn't * changed by the truncation. */ - vacrel->removed_pages += orig_rel_pages - new_rel_pages; + vacrel->scan_data->removed_pages += orig_rel_pages - new_rel_pages; vacrel->rel_pages = new_rel_pages; ereport(vacrel->verbose ? INFO : DEBUG2, @@ -3324,7 +3340,7 @@ lazy_truncate_heap(LVRelState *vacrel) vacrel->relname, orig_rel_pages, new_rel_pages))); orig_rel_pages = new_rel_pages; - } while (new_rel_pages > vacrel->nonempty_pages && lock_waiter_detected); + } while (new_rel_pages > vacrel->scan_data->nonempty_pages && lock_waiter_detected); } /* @@ -3352,7 +3368,7 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected) StaticAssertStmt((PREFETCH_SIZE & (PREFETCH_SIZE - 1)) == 0, "prefetch size must be power of 2"); prefetchedUntil = InvalidBlockNumber; - while (blkno > vacrel->nonempty_pages) + while (blkno > vacrel->scan_data->nonempty_pages) { Buffer buf; Page page; @@ -3464,7 +3480,7 @@ count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected) * pages still are; we need not bother to look at the last known-nonempty * page. */ - return vacrel->nonempty_pages; + return vacrel->scan_data->nonempty_pages; } /* diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 6e50f5ec4dd7..ab8a33d1d0a7 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1527,6 +1527,7 @@ LSEG LUID LVRelState LVSavedErrInfo +LVScanData LWLock LWLockHandle LWLockMode From 964ce652b0e8241e16fd173dd34d56a11e44a555 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 15 Sep 2025 22:51:39 +0200 Subject: [PATCH 4/5] Support parallelism for collecting dead items during lazy vacuum. This feature allows the vacuum to leverage multiple CPUs in order to collect dead items (i.e. the first pass over heap table) with parallel workers. The parallel degree for parallel heap vacuuming is determined based on the number of blocks to vacuum unless PARALLEL option of VACUUM command is specified, and further limited by max_parallel_maintenance_workers. For the parallel heap scan to collect dead items, we utilize a parallel block table scan, controlled by ParallelBlockTableScanDesc, in conjunction with the read stream. The workers' parallel scan descriptions are stored in the DSM space, enabling different parallel workers to resume the heap scan (phase 1) after a cycle of heap vacuuming and index vacuuming (phase 2 and 3) from their previous state. However, due to the potential presence of pinned buffers loaded by the read stream's look-ahead mechanism, we cannot abruptly stop phase 1 even when the space of dead_items TIDs exceeds the limit. Therefore, once the space of dead_items TIDs exceeds the limit, we begin processing pages without attempting to retrieve additional blocks by look-ahead mechanism until the read stream is exhausted, even if the the memory limit is surpassed. While this approach may increase the memory usage, it typically doesn't pose a significant problem, as processing a few 10s-100s buffers doesn't substantially increase the size of dead_items TIDs. Reviewed-by: Amit Kapila Reviewed-by: Hayato Kuroda Reviewed-by: Peter Smith Reviewed-by: Tomas Vondra Reviewed-by: Dilip Kumar Reviewed-by: Melanie Plageman Reviewed-by: Andres Freund Discussion: https://postgr.es/m/CAD21AoAEfCNv-GgaDheDJ+s-p_Lv1H24AiJeNoPGCmZNSwL1YA@mail.gmail.com --- doc/src/sgml/ref/vacuum.sgml | 54 +- src/backend/access/heap/heapam_handler.c | 8 +- src/backend/access/heap/vacuumlazy.c | 1182 +++++++++++++++-- src/backend/commands/vacuumparallel.c | 29 + src/include/access/heapam.h | 13 + src/include/commands/vacuum.h | 3 + src/test/regress/expected/vacuum_parallel.out | 7 + src/test/regress/sql/vacuum_parallel.sql | 8 + src/tools/pgindent/typedefs.list | 5 + 9 files changed, 1198 insertions(+), 111 deletions(-) diff --git a/doc/src/sgml/ref/vacuum.sgml b/doc/src/sgml/ref/vacuum.sgml index bd5dcaf86a5c..294494877d9d 100644 --- a/doc/src/sgml/ref/vacuum.sgml +++ b/doc/src/sgml/ref/vacuum.sgml @@ -280,25 +280,41 @@ VACUUM [ ( option [, ...] ) ] [ PARALLEL - Perform index vacuum and index cleanup phases of VACUUM - in parallel using integer - background workers (for the details of each vacuum phase, please - refer to ). The number of workers used - to perform the operation is equal to the number of indexes on the - relation that support parallel vacuum which is limited by the number of - workers specified with PARALLEL option if any which is - further limited by . - An index can participate in parallel vacuum if and only if the size of the - index is more than . - Please note that it is not guaranteed that the number of parallel workers - specified in integer will be - used during execution. It is possible for a vacuum to run with fewer - workers than specified, or even with no workers at all. Only one worker - can be used per index. So parallel workers are launched only when there - are at least 2 indexes in the table. Workers for - vacuum are launched before the start of each phase and exit at the end of - the phase. These behaviors might change in a future release. This - option can't be used with the FULL option. + Perform scanning heap, index vacuum, and index cleanup phases of + VACUUM in parallel using + integer background workers + (for the details of each vacuum phase, please refer to + ). + + + For heap tables, the number of workers used to perform the scanning + heap is determined based on the size of table. A table can participate in + parallel scanning heap if and only if the size of the table is more than + . During scanning heap, + the heap table's blocks will be divided into ranges and shared among the + cooperating processes. Each worker process will complete the scanning of + its given range of blocks before requesting an additional range of blocks. + + + The number of workers used to perform parallel index vacuum and index + cleanup is equal to the number of indexes on the relation that support + parallel vacuum. An index can participate in parallel vacuum if and only + if the size of the index is more than . + Only one worker can be used per index. So parallel workers for index vacuum + and index cleanup are launched only when there are at least 2 + indexes in the table. + + + Workers for vacuum are launched before the start of each phase and exit + at the end of the phase. The number of workers for each phase is limited by + the number of workers specified with PARALLEL option if + any which is futher limited by . + Please note that in any parallel vacuum phase, it is not guaanteed that the + number of parallel workers specified in integer + will be used during execution. It is possible for a vacuum to run with fewer + workers than specified, or even with no workers at all. These behaviors might + change in a future release. This option can't be used with the FULL + option. diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index bcbac844bb66..c2fac6cbe65b 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2668,7 +2668,13 @@ static const TableAmRoutine heapam_methods = { .scan_bitmap_next_tuple = heapam_scan_bitmap_next_tuple, .scan_sample_next_block = heapam_scan_sample_next_block, - .scan_sample_next_tuple = heapam_scan_sample_next_tuple + .scan_sample_next_tuple = heapam_scan_sample_next_tuple, + + .parallel_vacuum_compute_workers = heap_parallel_vacuum_compute_workers, + .parallel_vacuum_estimate = heap_parallel_vacuum_estimate, + .parallel_vacuum_initialize = heap_parallel_vacuum_initialize, + .parallel_vacuum_initialize_worker = heap_parallel_vacuum_initialize_worker, + .parallel_vacuum_collect_dead_items = heap_parallel_vacuum_collect_dead_items }; diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 2282c02ffa44..e6ca9c60e8ae 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -99,6 +99,44 @@ * After pruning and freezing, pages that are newly all-visible and all-frozen * are marked as such in the visibility map. * + * Parallel Vacuum: + * + * Lazy vacuum on heap tables supports parallel processing for phase I and + * phase II. Before starting phase I, we initialize parallel vacuum state, + * ParallelVacuumState, and allocate the TID store in a DSA area if we can + * use parallel mode for any of these two phases. + * + * We could require different number of parallel vacuum workers for each phase + * for various factors such as table size and number of indexes. Parallel + * workers are launched at the beginning of each phase and exit at the end of + * each phase. + * + * While vacuum cutoffs are shared between leader and worker processes, each + * individual process uses its own GlobalVisState, potentially causing some + * workers to remove fewer tuples than optimal. During parallel lazy heap scans, + * each worker tracks the oldest existing XID and MXID. The leader computes the + * globally oldest existing XID and MXID after the parallel scan, while + * gathering table data too. + * + * The parallel lazy heap scan (i.e. parallel phase I) is controlled by + * ParallelLVScanDesc in conjunction with the read stream. The table is split + * into multiple chunks, which are then distributed among parallel workers. + * Due to the potential presence of pinned buffers loaded by the read stream's + * look-ahead mechanism, we cannot abruptly stop phase I even hen the space + * of dead_items TIDs exceeds the limit. Instead, once this threshold is + * surpassed, we begin processing pages without attempting to retrieve additional + * blocks until the read stream is exhausted. While this approach may increase + * the memory usage, it typically doesn't pose a significant problem, as + * processing a few 10s-100s buffers doesn't substantially increase the size + * of dead_items TIDs. The workers' parallel scan descriptions, + * ParallelLVScanWorkerData, are stored in the DSM space, enabling different + * parallel workers to resume phase I from their previous state. + * + * If the leader launches fewer workers than the previous time to resume the + * parallel lazy heap scan, some block within chunks may remain un-scanned. + * To address this, the leader completes workers' unfinished scans at the end + * of the parallel lazy heap scan (see complete_unfinished_lazy_scan_heap()). + * * Dead TID Storage: * * The major space usage for vacuuming is storage for the dead tuple IDs that @@ -146,6 +184,7 @@ #include "common/pg_prng.h" #include "executor/instrument.h" #include "miscadmin.h" +#include "optimizer/paths.h" /* for min_parallel_table_scan_size */ #include "pgstat.h" #include "portability/instr_time.h" #include "postmaster/autovacuum.h" @@ -213,11 +252,22 @@ */ #define PREFETCH_SIZE ((BlockNumber) 32) +/* + * DSM keys for parallel lazy vacuum. Unlike other parallel execution code, we + * we don't need to worry about DSM keys conflicting with plan_node_id, but need to + * avoid conflicting with DSM keys used in vacuumparallel.c. + */ +#define PARALLEL_LV_KEY_SHARED 0xFFFF0001 +#define PARALLEL_LV_KEY_SCANDESC 0xFFFF0002 +#define PARALLEL_LV_KEY_SCANWORKER 0xFFFF0003 +#define PARALLEL_LV_KEY_SCANDATA 0xFFFF0004 + /* * Macro to check if we are in a parallel vacuum. If true, we are in the * parallel mode and the DSM segment is initialized. */ #define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL) +#define ParallelHeapVacuumIsActive(vacrel) ((vacrel)->plvstate != NULL) /* Phases of vacuum during which we report error context. */ typedef enum @@ -248,6 +298,12 @@ typedef enum */ #define EAGER_SCAN_REGION_SIZE 4096 +/* + * During parallel lazy scans, each worker (including the leader) retrieves + * a chunk consisting of PARALLEL_LV_CHUNK_SIZE blocks. + */ +#define PARALLEL_LV_CHUNK_SIZE 1024 + /* * heap_vac_scan_next_block() sets these flags to communicate information * about the block it read to the caller. @@ -303,6 +359,121 @@ typedef struct LVScanData bool skippedallvis; } LVScanData; +/* + * Struct for information that needs to be shared among parallel workers + * for parallel lazy vacuum. All fields are static, set by the leader + * process. + */ +typedef struct ParallelLVShared +{ + bool aggressive; + bool skipwithvm; + + /* The current oldest extant XID/MXID shared by the leader process */ + TransactionId NewRelfrozenXid; + MultiXactId NewRelminMxid; + + /* VACUUM operation's cutoffs for freezing and pruning */ + struct VacuumCutoffs cutoffs; + + /* + * The first chunk size varies depending on the first eager scan region + * size. If eager scan is disabled, we use the default chunk size + * PARALLEL_LV_CHUNK_SIZE for the first chunk. + */ + BlockNumber initial_chunk_size; + + /* + * Similar to LVRelState.eager_scan_max_fails_per_region but this is a + * per-chunk failure counter. + */ + BlockNumber eager_scan_max_fails_per_chunk; + + /* + * Similar to LVRelState.eager_scan_remaining_successes but this is a + * success counter per parallel worker. + */ + BlockNumber eager_scan_remaining_successes_per_worker; +} ParallelLVShared; + +/* + * Shared scan description for parallel lazy scan. + */ +typedef struct ParallelLVScanDesc +{ + /* Number of blocks of the table at start of scan */ + BlockNumber nblocks; + + /* Number of blocks in to allocate in each I/O chunk */ + BlockNumber chunk_size; + + /* Number of blocks allocated to workers so far */ + pg_atomic_uint64 nallocated; +} ParallelLVScanDesc; + +/* + * Per-worker data for scan description, statistics counters, and + * miscellaneous data need to be shared with the leader. + */ +typedef struct ParallelLVScanWorkerData +{ + bool inited; + + /* Current number of blocks into the scan */ + BlockNumber nallocated; + + /* Number of blocks per chunk */ + BlockNumber chunk_size; + + /* Number of blocks left in this chunk */ + uint32 chunk_remaining; + + /* The last processed block number */ + pg_atomic_uint32 last_blkno; + + /* Eager scan state for resuming the scan */ + BlockNumber remaining_fails_save; + BlockNumber remaining_successes_save; + BlockNumber next_region_start_save; +} ParallelLVScanWorkerData; + +/* + * Struct to store parallel lazy vacuum working state. + */ +typedef struct ParallelLVState +{ + /* Shared static information */ + ParallelLVShared *shared; + + /* Parallel scan description shared among parallel workers */ + ParallelLVScanDesc *scandesc; + + /* Per-worker scan data */ + ParallelLVScanWorkerData *scanwork; +} ParallelLVState; + +/* + * Struct for the leader process in parallel lazy vacuum. + */ +typedef struct ParallelLVLeader +{ + /* Shared memory size for each shared object */ + Size shared_len; + Size scandesc_len; + Size scanwork_len; + Size scandata_len; + + /* The number of workers launched for parallel lazy heap scan */ + int nworkers_launched; + + /* + * These fields point to the arrays of all per-worker scan states stored + * in DSM. + */ + ParallelLVScanWorkerData *scanwork_array; + LVScanData *scandata_array; +} ParallelLVLeader; + typedef struct LVRelState { /* Target heap relation and its indexes */ @@ -367,6 +538,12 @@ typedef struct LVRelState /* Instrumentation counters */ int num_index_scans; + /* Last processed block number */ + BlockNumber last_blkno; + + /* Next block to check for FSM vacuum */ + BlockNumber next_fsm_block_to_vacuum; + /* State maintained by heap_vac_scan_next_block() */ BlockNumber current_block; /* last block returned */ BlockNumber next_unskippable_block; /* next unskippable block */ @@ -374,6 +551,16 @@ typedef struct LVRelState bool next_unskippable_eager_scanned; /* if it was eagerly scanned */ Buffer next_unskippable_vmbuffer; /* buffer containing its VM bit */ + /* Fields used for parallel lazy vacuum */ + + /* Parallel lazy vacuum working state */ + ParallelLVState *plvstate; + + /* + * The leader state for parallel lazy vacuum. NULL for parallel workers. + */ + ParallelLVLeader *leader; + /* State related to managing eager scanning of all-visible pages */ /* @@ -433,12 +620,19 @@ typedef struct LVSavedErrInfo /* non-export function prototypes */ static void lazy_scan_heap(LVRelState *vacrel); +static void do_lazy_scan_heap(LVRelState *vacrel, bool check_mem_usage); static void heap_vacuum_eager_scan_setup(LVRelState *vacrel, const VacuumParams params); static BlockNumber heap_vac_scan_next_block(ReadStream *stream, void *callback_private_data, void *per_buffer_data); -static void find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis); +static void parallel_lazy_scan_init_scan_worker(ParallelLVScanWorkerData *scanwork, + BlockNumber initial_chunk_size); +static BlockNumber parallel_lazy_scan_get_nextpage(LVRelState *vacrel, Relation rel, + ParallelLVScanDesc *scandesc, + ParallelLVScanWorkerData *scanwork); +static bool find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis, + BlockNumber start_blk, BlockNumber end_blk); static bool lazy_scan_new_or_empty(LVRelState *vacrel, Buffer buf, BlockNumber blkno, Page page, bool sharelock, Buffer vmbuffer); @@ -449,6 +643,12 @@ static int lazy_scan_prune(LVRelState *vacrel, Buffer buf, static bool lazy_scan_noprune(LVRelState *vacrel, Buffer buf, BlockNumber blkno, Page page, bool *has_lpdead_items); +static void do_parallel_lazy_scan_heap(LVRelState *vacrel); +static BlockNumber parallel_lazy_scan_compute_min_scan_block(LVRelState *vacrel); +static void complete_unfinished_lazy_scan_heap(LVRelState *vacrel); +static void parallel_lazy_scan_heap_begin(LVRelState *vacrel); +static void parallel_lazy_scan_heap_end(LVRelState *vacrel); +static void parallel_lazy_scan_gather_results(LVRelState *vacrel); static void lazy_vacuum(LVRelState *vacrel); static bool lazy_vacuum_all_indexes(LVRelState *vacrel); static void lazy_vacuum_heap_rel(LVRelState *vacrel); @@ -473,6 +673,7 @@ static BlockNumber count_nondeletable_pages(LVRelState *vacrel, static void dead_items_alloc(LVRelState *vacrel, int nworkers); static void dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets, int num_offsets); +static bool dead_items_check_memory_limit(LVRelState *vacrel); static void dead_items_reset(LVRelState *vacrel); static void dead_items_cleanup(LVRelState *vacrel); static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, @@ -769,6 +970,7 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, /* Initialize remaining counters (be tidy) */ vacrel->num_index_scans = 0; + vacrel->next_fsm_block_to_vacuum = 0; /* dead_items_alloc allocates vacrel->dead_items later on */ @@ -1214,13 +1416,7 @@ heap_vacuum_rel(Relation rel, const VacuumParams params, static void lazy_scan_heap(LVRelState *vacrel) { - ReadStream *stream; - BlockNumber rel_pages = vacrel->rel_pages, - blkno = 0, - next_fsm_block_to_vacuum = 0; - BlockNumber orig_eager_scan_success_limit = - vacrel->eager_scan_remaining_successes; /* for logging */ - Buffer vmbuffer = InvalidBuffer; + BlockNumber rel_pages = vacrel->rel_pages; const int initprog_index[] = { PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_TOTAL_HEAP_BLKS, @@ -1241,6 +1437,80 @@ lazy_scan_heap(LVRelState *vacrel) vacrel->next_unskippable_eager_scanned = false; vacrel->next_unskippable_vmbuffer = InvalidBuffer; + /* Do the actual work */ + if (ParallelHeapVacuumIsActive(vacrel)) + do_parallel_lazy_scan_heap(vacrel); + else + do_lazy_scan_heap(vacrel, true); + + /* + * Report that everything is now scanned. We never skip scanning the last + * block in the relation, so we can pass rel_pages here. + */ + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, + rel_pages); + + /* now we can compute the new value for pg_class.reltuples */ + vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages, + vacrel->scan_data->scanned_pages, + vacrel->scan_data->live_tuples); + + /* + * Also compute the total number of surviving heap entries. In the + * (unlikely) scenario that new_live_tuples is -1, take it as zero. + */ + vacrel->new_rel_tuples = + Max(vacrel->new_live_tuples, 0) + vacrel->scan_data->recently_dead_tuples + + vacrel->scan_data->missed_dead_tuples; + + /* + * Do index vacuuming (call each index's ambulkdelete routine), then do + * related heap vacuuming + */ + if (vacrel->dead_items_info->num_items > 0) + lazy_vacuum(vacrel); + + /* + * Vacuum the remainder of the Free Space Map. We must do this whether or + * not there were indexes, and whether or not we bypassed index vacuuming. + * We can pass rel_pages here because we never skip scanning the last + * block of the relation. + */ + if (rel_pages > vacrel->next_fsm_block_to_vacuum) + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, rel_pages); + + /* report all blocks vacuumed */ + pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages); + + /* Do final index cleanup (call each index's amvacuumcleanup routine) */ + if (vacrel->nindexes > 0 && vacrel->do_index_cleanup) + lazy_cleanup_all_indexes(vacrel); +} + +/* + * Workhorse for lazy_scan_heap(). + * + * If check_mem_usage is true, we check the memory usage during the heap scan. + * If the space of dead_items TIDs exceeds the limit, we stop the lazy heap scan + * and invoke a cycle of index vacuuming and heap vacuuming, and then resume the + * scan. If it's false, we continue doing lazy heap scan until the read stream + * is exhausted. + */ +static void +do_lazy_scan_heap(LVRelState *vacrel, bool check_mem_usage) +{ + ReadStream *stream; + BlockNumber blkno = InvalidBlockNumber; + BlockNumber orig_eager_scan_success_limit = + vacrel->eager_scan_remaining_successes; /* for logging */ + Buffer vmbuffer = InvalidBuffer; + + /* + * We should not set check_mem_usage to false unless during parallel heap + * vacuum. + */ + Assert(check_mem_usage || ParallelHeapVacuumIsActive(vacrel)); + /* * Set up the read stream for vacuum's first pass through the heap. * @@ -1276,8 +1546,11 @@ lazy_scan_heap(LVRelState *vacrel) * that point. This check also provides failsafe coverage for the * one-pass strategy, and the two-pass strategy with the index_cleanup * param set to 'off'. + * + * The failsafe check is done only by the leader process. */ - if (vacrel->scan_data->scanned_pages > 0 && + if (!IsParallelWorker() && + vacrel->scan_data->scanned_pages > 0 && vacrel->scan_data->scanned_pages % FAILSAFE_EVERY_PAGES == 0) lazy_check_wraparound_failsafe(vacrel); @@ -1285,12 +1558,9 @@ lazy_scan_heap(LVRelState *vacrel) * Consider if we definitely have enough space to process TIDs on page * already. If we are close to overrunning the available space for * dead_items TIDs, pause and do a cycle of vacuuming before we tackle - * this page. However, let's force at least one page-worth of tuples - * to be stored as to ensure we do at least some work when the memory - * configured is so low that we run out before storing anything. + * this page. */ - if (vacrel->dead_items_info->num_items > 0 && - TidStoreMemoryUsage(vacrel->dead_items) > vacrel->dead_items_info->max_bytes) + if (check_mem_usage && dead_items_check_memory_limit(vacrel)) { /* * Before beginning index vacuuming, we release any pin we may @@ -1313,15 +1583,16 @@ lazy_scan_heap(LVRelState *vacrel) * upper-level FSM pages. Note that blkno is the previously * processed block. */ - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, blkno + 1); - next_fsm_block_to_vacuum = blkno; + vacrel->next_fsm_block_to_vacuum = blkno; /* Report that we are once again scanning the heap */ pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, PROGRESS_VACUUM_PHASE_SCAN_HEAP); } + /* Read the next block to process */ buf = read_stream_next_buffer(stream, &per_buffer_data); /* The relation is exhausted. */ @@ -1331,7 +1602,7 @@ lazy_scan_heap(LVRelState *vacrel) blk_info = *((uint8 *) per_buffer_data); CheckBufferIsPinnedOnce(buf); page = BufferGetPage(buf); - blkno = BufferGetBlockNumber(buf); + blkno = vacrel->last_blkno = BufferGetBlockNumber(buf); vacrel->scan_data->scanned_pages++; if (blk_info & VAC_BLK_WAS_EAGER_SCANNED) @@ -1496,13 +1767,34 @@ lazy_scan_heap(LVRelState *vacrel) * visible on upper FSM pages. This is done after vacuuming if the * table has indexes. There will only be newly-freed space if we * held the cleanup lock and lazy_scan_prune() was called. + * + * During parallel lazy heap scanning, only the leader process + * vacuums the FSM. However, we cannot vacuum the FSM for blocks + * up to 'blk' because there may be un-scanned blocks or blocks + * being processed by workers before this point. Instead, parallel + * workers advertise the block numbers they have just processed, + * and the leader vacuums the FSM up to the smallest block number + * among them. This approach ensures we vacuum the FSM for + * consecutive processed blocks. */ if (got_cleanup_lock && vacrel->nindexes == 0 && ndeleted > 0 && - blkno - next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES) + blkno - vacrel->next_fsm_block_to_vacuum >= VACUUM_FSM_EVERY_PAGES) { - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, + if (IsParallelWorker()) + pg_atomic_write_u32(&(vacrel->plvstate->scanwork->last_blkno), blkno); - next_fsm_block_to_vacuum = blkno; + else + { + BlockNumber fsmvac_upto = blkno; + + if (ParallelHeapVacuumIsActive(vacrel)) + fsmvac_upto = parallel_lazy_scan_compute_min_scan_block(vacrel); + + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, + fsmvac_upto); + } + + vacrel->next_fsm_block_to_vacuum = blkno; } } else @@ -1513,50 +1805,7 @@ lazy_scan_heap(LVRelState *vacrel) if (BufferIsValid(vmbuffer)) ReleaseBuffer(vmbuffer); - /* - * Report that everything is now scanned. We never skip scanning the last - * block in the relation, so we can pass rel_pages here. - */ - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_SCANNED, - rel_pages); - - /* now we can compute the new value for pg_class.reltuples */ - vacrel->new_live_tuples = vac_estimate_reltuples(vacrel->rel, rel_pages, - vacrel->scan_data->scanned_pages, - vacrel->scan_data->live_tuples); - - /* - * Also compute the total number of surviving heap entries. In the - * (unlikely) scenario that new_live_tuples is -1, take it as zero. - */ - vacrel->new_rel_tuples = - Max(vacrel->new_live_tuples, 0) + vacrel->scan_data->recently_dead_tuples + - vacrel->scan_data->missed_dead_tuples; - read_stream_end(stream); - - /* - * Do index vacuuming (call each index's ambulkdelete routine), then do - * related heap vacuuming - */ - if (vacrel->dead_items_info->num_items > 0) - lazy_vacuum(vacrel); - - /* - * Vacuum the remainder of the Free Space Map. We must do this whether or - * not there were indexes, and whether or not we bypassed index vacuuming. - * We can pass rel_pages here because we never skip scanning the last - * block of the relation. - */ - if (rel_pages > next_fsm_block_to_vacuum) - FreeSpaceMapVacuumRange(vacrel->rel, next_fsm_block_to_vacuum, rel_pages); - - /* report all blocks vacuumed */ - pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, rel_pages); - - /* Do final index cleanup (call each index's amvacuumcleanup routine) */ - if (vacrel->nindexes > 0 && vacrel->do_index_cleanup) - lazy_cleanup_all_indexes(vacrel); } /* @@ -1570,7 +1819,8 @@ lazy_scan_heap(LVRelState *vacrel) * heap_vac_scan_next_block() uses the visibility map, vacuum options, and * various thresholds to skip blocks which do not need to be processed and * returns the next block to process or InvalidBlockNumber if there are no - * remaining blocks. + * remaining blocks or the space of dead_items TIDs reaches the limit (only + * in parallel lazy vacuum cases). * * The visibility status of the next block to process and whether or not it * was eager scanned is set in the per_buffer_data. @@ -1592,8 +1842,42 @@ heap_vac_scan_next_block(ReadStream *stream, LVRelState *vacrel = callback_private_data; uint8 blk_info = 0; - /* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */ - next_block = vacrel->current_block + 1; +retry: + next_block = InvalidBlockNumber; + + /* Get the next block to process */ + if (ParallelHeapVacuumIsActive(vacrel)) + { + /* + * Stop returning the next block to the read stream if we are close to + * overrunning the available space for dead_items TIDs so that the + * read stream returns pinned buffers in its buffers queue until the + * stream is exhausted. See the comments atop this file for details. + */ + if (dead_items_check_memory_limit(vacrel)) + { + if (BufferIsValid(vacrel->next_unskippable_vmbuffer)) + { + ReleaseBuffer(vacrel->next_unskippable_vmbuffer); + vacrel->next_unskippable_vmbuffer = InvalidBuffer; + } + + return InvalidBlockNumber; + + } + + next_block = parallel_lazy_scan_get_nextpage(vacrel, + vacrel->rel, + vacrel->plvstate->scandesc, + vacrel->plvstate->scanwork); + } + else + { + /* relies on InvalidBlockNumber + 1 overflowing to 0 on first call */ + next_block = vacrel->current_block + 1; + } + + Assert(BlockNumberIsValid(next_block)); /* Have we reached the end of the relation? */ if (next_block >= vacrel->rel_pages) @@ -1618,8 +1902,41 @@ heap_vac_scan_next_block(ReadStream *stream, * visibility map. */ bool skipsallvis; + bool found; + BlockNumber end_block; + BlockNumber nblocks_skip; - find_next_unskippable_block(vacrel, &skipsallvis); + if (ParallelHeapVacuumIsActive(vacrel)) + { + /* We look for the next unskippable block within the chunk */ + end_block = next_block + vacrel->plvstate->scanwork->chunk_remaining + 1; + } + else + end_block = vacrel->rel_pages; + + found = find_next_unskippable_block(vacrel, &skipsallvis, next_block, end_block); + + /* + * We must have found the next unskippable block within the specified + * range in non-parallel cases as the end_block is always the last + * block + 1 and we must scan the last block. + */ + Assert(found || ParallelHeapVacuumIsActive(vacrel)); + + if (!found) + { + if (skipsallvis) + vacrel->scan_data->skippedallvis = true; + + /* + * Skip all remaining blocks in the current chunk, and retry with + * the next chunk. + */ + vacrel->plvstate->scanwork->chunk_remaining = 0; + goto retry; + } + + Assert(vacrel->next_unskippable_block < end_block); /* * We now know the next block that we must process. It can be the @@ -1636,11 +1953,21 @@ heap_vac_scan_next_block(ReadStream *stream, * pages then skipping makes updating relfrozenxid unsafe, which is a * real downside. */ - if (vacrel->next_unskippable_block - next_block >= SKIP_PAGES_THRESHOLD) + nblocks_skip = vacrel->next_unskippable_block - next_block; + if (nblocks_skip >= SKIP_PAGES_THRESHOLD) { - next_block = vacrel->next_unskippable_block; if (skipsallvis) vacrel->scan_data->skippedallvis = true; + + /* Tell the parallel scans to skip blocks */ + if (ParallelHeapVacuumIsActive(vacrel)) + { + vacrel->plvstate->scanwork->chunk_remaining -= nblocks_skip; + vacrel->plvstate->scanwork->nallocated += nblocks_skip; + Assert(vacrel->plvstate->scanwork->chunk_remaining > 0); + } + + next_block = vacrel->next_unskippable_block; } } @@ -1675,10 +2002,86 @@ heap_vac_scan_next_block(ReadStream *stream, } } + /* - * Find the next unskippable block in a vacuum scan using the visibility map. - * The next unskippable block and its visibility information is updated in - * vacrel. + * Initialize scan state of the given ParallelLVScanWorkerData. + */ +static void +parallel_lazy_scan_init_scan_worker(ParallelLVScanWorkerData *scanwork, + BlockNumber initial_chunk_size) +{ + Assert(BlockNumberIsValid(initial_chunk_size)); + + scanwork->inited = true; + scanwork->nallocated = 0; + scanwork->chunk_size = initial_chunk_size; + scanwork->chunk_remaining = 0; + pg_atomic_init_u32(&(scanwork->last_blkno), InvalidBlockNumber); +} + +/* + * Return the next page to process for parallel lazy scan. + * + * If there is no block to scan for the worker, return the number of blocks in + * the relation. + */ +static BlockNumber +parallel_lazy_scan_get_nextpage(LVRelState *vacrel, Relation rel, + ParallelLVScanDesc *scandesc, + ParallelLVScanWorkerData *scanwork) +{ + uint64 nallocated; + + if (scanwork->chunk_remaining > 0) + { + /* + * Give them the next block in the range and update the remaining + * number of blocks. + */ + nallocated = ++scanwork->nallocated; + scanwork->chunk_remaining--; + } + else + { + /* Get the new chunk */ + nallocated = scanwork->nallocated = + pg_atomic_fetch_add_u64(&scandesc->nallocated, scanwork->chunk_size); + + /* + * Set the remaining number of blocks in this chunk so that subsequent + * calls from this worker continue on with this chunk until it's done. + */ + scanwork->chunk_remaining = scanwork->chunk_size - 1; + + /* We use the fixed size chunk for subsequent scans */ + scanwork->chunk_size = PARALLEL_LV_CHUNK_SIZE; + + /* + * Getting the new chunk also means to start the new eager scan + * region. + * + * Update next_eager_scan_region_start to the first block in the chunk + * so that we can reset the remaining_fails counter when checking the + * visibility of the first block in this chunk in + * find_next_unskippable_block(). + */ + vacrel->next_eager_scan_region_start = nallocated; + + } + + /* Clear the chunk_remaining if there is no more blocks to process */ + if (nallocated >= scandesc->nblocks) + scanwork->chunk_remaining = 0; + + return Min(nallocated, scandesc->nblocks); +} + +/* + * Find the next unskippable block in a vacuum scan using the visibility map, + * in a range of 'start' (inclusive) and 'end' (exclusive). + * + * If found, the next unskippable block and its visibility information is updated + * in vacrel. Otherwise, return false and reset the information in vacrel. * * Note: our opinion of which blocks can be skipped can go stale immediately. * It's okay if caller "misses" a page whose all-visible or all-frozen marking @@ -1688,22 +2091,32 @@ heap_vac_scan_next_block(ReadStream *stream, * older XIDs/MXIDs. The *skippedallvis flag will be set here when the choice * to skip such a range is actually made, making everything safe.) */ -static void -find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis) +static bool +find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis, + BlockNumber start, BlockNumber end) { BlockNumber rel_pages = vacrel->rel_pages; - BlockNumber next_unskippable_block = vacrel->next_unskippable_block + 1; + BlockNumber next_unskippable_block = start; Buffer next_unskippable_vmbuffer = vacrel->next_unskippable_vmbuffer; bool next_unskippable_eager_scanned = false; bool next_unskippable_allvis; + bool found = true; *skipsallvis = false; for (;; next_unskippable_block++) { - uint8 mapbits = visibilitymap_get_status(vacrel->rel, - next_unskippable_block, - &next_unskippable_vmbuffer); + uint8 mapbits; + + /* Reach the end of range? */ + if (next_unskippable_block >= end) + { + found = false; + break; + } + + mapbits = visibilitymap_get_status(vacrel->rel, next_unskippable_block, + &next_unskippable_vmbuffer); next_unskippable_allvis = (mapbits & VISIBILITYMAP_ALL_VISIBLE) != 0; @@ -1779,11 +2192,285 @@ find_next_unskippable_block(LVRelState *vacrel, bool *skipsallvis) *skipsallvis = true; } - /* write the local variables back to vacrel */ - vacrel->next_unskippable_block = next_unskippable_block; - vacrel->next_unskippable_allvis = next_unskippable_allvis; - vacrel->next_unskippable_eager_scanned = next_unskippable_eager_scanned; - vacrel->next_unskippable_vmbuffer = next_unskippable_vmbuffer; + if (found) + { + /* write the local variables back to vacrel */ + vacrel->next_unskippable_block = next_unskippable_block; + vacrel->next_unskippable_allvis = next_unskippable_allvis; + vacrel->next_unskippable_eager_scanned = next_unskippable_eager_scanned; + vacrel->next_unskippable_vmbuffer = next_unskippable_vmbuffer; + } + else + { + if (BufferIsValid(next_unskippable_vmbuffer)) + ReleaseBuffer(next_unskippable_vmbuffer); + + /* + * There is not unskippable block in the specified range. Reset the + * related fields in vacrel. + */ + vacrel->next_unskippable_block = InvalidBlockNumber; + vacrel->next_unskippable_allvis = InvalidBlockNumber; + vacrel->next_unskippable_eager_scanned = false; + vacrel->next_unskippable_vmbuffer = InvalidBuffer; + } + + return found; +} + +/* + * A parallel variant of do_lazy_scan_heap(). The leader process launches + * parallel workers to scan the heap in parallel. +*/ +static void +do_parallel_lazy_scan_heap(LVRelState *vacrel) +{ + ParallelLVScanWorkerData scanwork; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + Assert(!IsParallelWorker()); + + /* Setup the parallel scan description for the leader to join as a worker */ + parallel_lazy_scan_init_scan_worker(&scanwork, + vacrel->plvstate->shared->initial_chunk_size); + vacrel->plvstate->scanwork = &scanwork; + + /* Adjust the eager scan's success counter as a worker */ + vacrel->eager_scan_remaining_successes = + vacrel->plvstate->shared->eager_scan_remaining_successes_per_worker; + + for (;;) + { + BlockNumber fsmvac_upto; + + /* Launch parallel workers */ + parallel_lazy_scan_heap_begin(vacrel); + + /* + * Do lazy heap scan until the read stream is exhausted. We will stop + * retrieving new blocks for the read stream once the space of + * dead_items TIDs exceeds the limit. + */ + do_lazy_scan_heap(vacrel, false); + + /* Wait for parallel workers to finish and gather scan results */ + parallel_lazy_scan_heap_end(vacrel); + + if (!dead_items_check_memory_limit(vacrel)) + break; + + /* Perform a round of index and heap vacuuming */ + vacrel->consider_bypass_optimization = false; + lazy_vacuum(vacrel); + + /* Compute the smallest processed block number */ + fsmvac_upto = parallel_lazy_scan_compute_min_scan_block(vacrel); + + /* + * Vacuum the Free Space Map to make newly-freed space visible on + * upper-level FSM pages. + */ + if (fsmvac_upto > vacrel->next_fsm_block_to_vacuum) + { + FreeSpaceMapVacuumRange(vacrel->rel, vacrel->next_fsm_block_to_vacuum, + fsmvac_upto); + vacrel->next_fsm_block_to_vacuum = fsmvac_upto; + } + + /* Report that we are once again scanning the heap */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_SCAN_HEAP); + } + + /* + * The parallel heap scan finished, but it's possible that some workers + * have allocated blocks but not processed them yet. This can happen for + * example when workers exit because they are full of dead_items TIDs and + * the leader process launched fewer workers in the next cycle. + */ + complete_unfinished_lazy_scan_heap(vacrel); +} + +/* + * Return the smallest block number that the leader and workers have scanned. + */ +static BlockNumber +parallel_lazy_scan_compute_min_scan_block(LVRelState *vacrel) +{ + BlockNumber min_blk; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + + /* Initialized with the leader's value */ + min_blk = vacrel->last_blkno; + + for (int i = 0; i < vacrel->leader->nworkers_launched; i++) + { + ParallelLVScanWorkerData *scanwork = &(vacrel->leader->scanwork_array[i]); + BlockNumber blkno; + + /* Skip if no worker has been initialized the scan state */ + if (!scanwork->inited) + continue; + + blkno = pg_atomic_read_u32(&(scanwork->last_blkno)); + + if (!BlockNumberIsValid(min_blk) || min_blk > blkno) + min_blk = blkno; + } + + Assert(BlockNumberIsValid(min_blk)); + + return min_blk; +} + +/* + * Complete parallel heaps scans that have remaining blocks in their + * chunks. + */ +static void +complete_unfinished_lazy_scan_heap(LVRelState *vacrel) +{ + int nworkers; + + Assert(!IsParallelWorker()); + + nworkers = parallel_vacuum_get_nworkers_table(vacrel->pvs); + + for (int i = 0; i < nworkers; i++) + { + ParallelLVScanWorkerData *scanwork = &(vacrel->leader->scanwork_array[i]); + + if (!scanwork->inited) + continue; + + if (scanwork->chunk_remaining == 0) + continue; + + /* Attach the worker's scan state */ + vacrel->plvstate->scanwork = scanwork; + + vacrel->next_fsm_block_to_vacuum = pg_atomic_read_u32(&(scanwork->last_blkno)); + vacrel->next_eager_scan_region_start = scanwork->next_region_start_save; + vacrel->eager_scan_remaining_fails = scanwork->remaining_fails_save; + + /* + * Complete the unfinished scan. Note that we might perform multiple + * cycles of index and heap vacuuming while completing the scan. + */ + do_lazy_scan_heap(vacrel, true); + } + + /* + * We don't need to gather the scan results here because the leader's scan + * state got updated directly. + */ +} + +/* + * Helper routine to launch parallel workers for parallel lazy heap scan. + */ +static void +parallel_lazy_scan_heap_begin(LVRelState *vacrel) +{ + Assert(ParallelHeapVacuumIsActive(vacrel)); + Assert(!IsParallelWorker()); + + /* launcher workers */ + vacrel->leader->nworkers_launched = parallel_vacuum_collect_dead_items_begin(vacrel->pvs); + + ereport(vacrel->verbose ? INFO : DEBUG2, + (errmsg(ngettext("launched %d parallel vacuum worker for collecting dead tuples (planned: %d)", + "launched %d parallel vacuum workers for collecting dead tuples (planned: %d)", + vacrel->leader->nworkers_launched), + vacrel->leader->nworkers_launched, + parallel_vacuum_get_nworkers_table(vacrel->pvs)))); +} + +/* + * Helper routine to finish the parallel lazy heap scan. + */ +static void +parallel_lazy_scan_heap_end(LVRelState *vacrel) +{ + /* Wait for all parallel workers to finish */ + parallel_vacuum_collect_dead_items_end(vacrel->pvs); + + /* Gather the workers' scan results */ + parallel_lazy_scan_gather_results(vacrel); +} + +/* + * Accumulate each worker's scan results into the leader's. +*/ +static void +parallel_lazy_scan_gather_results(LVRelState *vacrel) +{ + Assert(ParallelHeapVacuumIsActive(vacrel)); + Assert(!IsParallelWorker()); + + /* Gather the workers' scan results */ + for (int i = 0; i < vacrel->leader->nworkers_launched; i++) + { + LVScanData *data = &(vacrel->leader->scandata_array[i]); + ParallelLVScanWorkerData *scanwork = &(vacrel->leader->scanwork_array[i]); + + /* Accumulate the counters collected by workers */ +#define ACCUM_COUNT(item) vacrel->scan_data->item += data->item + ACCUM_COUNT(scanned_pages); + ACCUM_COUNT(removed_pages); + ACCUM_COUNT(new_frozen_tuple_pages); + ACCUM_COUNT(vm_new_visible_pages); + ACCUM_COUNT(vm_new_visible_frozen_pages); + ACCUM_COUNT(vm_new_frozen_pages); + ACCUM_COUNT(lpdead_item_pages); + ACCUM_COUNT(missed_dead_pages); + ACCUM_COUNT(tuples_deleted); + ACCUM_COUNT(tuples_frozen); + ACCUM_COUNT(lpdead_items); + ACCUM_COUNT(live_tuples); + ACCUM_COUNT(recently_dead_tuples); + ACCUM_COUNT(missed_dead_tuples); +#undef ACCUM_COUNT + + /* + * Track the greatest non-empty page among values the workers + * collected as it's used to cut-off point of heap truncation. + */ + if (vacrel->scan_data->nonempty_pages < data->nonempty_pages) + vacrel->scan_data->nonempty_pages = data->nonempty_pages; + + /* + * All workers must have initialized both values with the values + * passed by the leader. + */ + Assert(TransactionIdIsValid(data->NewRelfrozenXid)); + Assert(MultiXactIdIsValid(data->NewRelminMxid)); + + /* + * During parallel lazy scanning, since different workers process + * separate blocks, they may observe different existing XIDs and + * MXIDs. Therefore, we compute the oldest XID and MXID from the + * values observed by each worker (including the leader). These + * computations are crucial for correctly advancing both relfrozenxid + * and relmminmxid values. + */ + + if (TransactionIdPrecedes(data->NewRelfrozenXid, vacrel->scan_data->NewRelfrozenXid)) + vacrel->scan_data->NewRelfrozenXid = data->NewRelfrozenXid; + + if (MultiXactIdPrecedesOrEquals(data->NewRelminMxid, vacrel->scan_data->NewRelminMxid)) + vacrel->scan_data->NewRelminMxid = data->NewRelminMxid; + + /* Has any one of workers skipped all-visible page? */ + vacrel->scan_data->skippedallvis |= data->skippedallvis; + + /* + * Gather the remaining success count so that we can distribute the + * success counter again in the next parallel lazy scan. + */ + vacrel->eager_scan_remaining_successes += scanwork->remaining_successes_save; + } } /* @@ -2064,7 +2751,8 @@ lazy_scan_prune(LVRelState *vacrel, /* Can't truncate this page */ if (presult.hastup) - vacrel->scan_data->nonempty_pages = blkno + 1; + vacrel->scan_data->nonempty_pages = + Max(blkno + 1, vacrel->scan_data->nonempty_pages); /* Did we find LP_DEAD items? */ *has_lpdead_items = (presult.lpdead_items > 0); @@ -2445,7 +3133,8 @@ lazy_scan_noprune(LVRelState *vacrel, /* Can't truncate this page */ if (hastup) - vacrel->scan_data->nonempty_pages = blkno + 1; + vacrel->scan_data->nonempty_pages = + Max(blkno + 1, vacrel->scan_data->nonempty_pages); /* Did we find LP_DEAD items? */ *has_lpdead_items = (lpdead_items > 0); @@ -3498,12 +4187,8 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) autovacuum_work_mem != -1 ? autovacuum_work_mem : maintenance_work_mem; - /* - * Initialize state for a parallel vacuum. As of now, only one worker can - * be used for an index, so we invoke parallelism only if there are at - * least two indexes on a table. - */ - if (nworkers >= 0 && vacrel->nindexes > 1 && vacrel->do_index_vacuuming) + /* Initialize state for a parallel vacuum */ + if (nworkers >= 0) { /* * Since parallel workers cannot access data in temporary tables, we @@ -3521,11 +4206,17 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->relname))); } else + { + /* + * We initialize the parallel vacuum state for either lazy heap + * scan, index vacuuming, or both. + */ vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels, vacrel->nindexes, nworkers, vac_work_mem, vacrel->verbose ? INFO : DEBUG2, vacrel->bstrategy, (void *) vacrel); + } /* * If parallel mode started, dead_items and dead_items_info spaces are @@ -3565,15 +4256,35 @@ dead_items_add(LVRelState *vacrel, BlockNumber blkno, OffsetNumber *offsets, }; int64 prog_val[2]; + if (ParallelHeapVacuumIsActive(vacrel)) + TidStoreLockExclusive(vacrel->dead_items); + TidStoreSetBlockOffsets(vacrel->dead_items, blkno, offsets, num_offsets); vacrel->dead_items_info->num_items += num_offsets; + if (ParallelHeapVacuumIsActive(vacrel)) + TidStoreUnlock(vacrel->dead_items); + /* update the progress information */ prog_val[0] = vacrel->dead_items_info->num_items; prog_val[1] = TidStoreMemoryUsage(vacrel->dead_items); pgstat_progress_update_multi_param(2, prog_index, prog_val); } +/* + * Check the memory usage of the collected dead items and return true + * if we are close to overrunning the available space for dead_items TIDs. + * However, let's force at least one page-worth of tuples to be stored as + * to ensure we do at least some work when the memory configured is so low + * that we run out before storing anything. + */ +static bool +dead_items_check_memory_limit(LVRelState *vacrel) +{ + return vacrel->dead_items_info->num_items > 0 && + TidStoreMemoryUsage(vacrel->dead_items) > vacrel->dead_items_info->max_bytes; +} + /* * Forget all collected dead items. */ @@ -3767,6 +4478,295 @@ update_relstats_all_indexes(LVRelState *vacrel) } } +/* + * Compute the number of workers for parallel heap vacuum. + */ +int +heap_parallel_vacuum_compute_workers(Relation rel, int nworkers_requested, + void *state) +{ + BlockNumber relpages = RelationGetNumberOfBlocks(rel); + int parallel_workers = 0; + + /* + * Parallel heap vacuuming a small relation shouldn't take long. We use + * two times the chunk size as the size cutoff because the leader is + * assigned to one chunk. + */ + if (relpages < PARALLEL_LV_CHUNK_SIZE * 2 || relpages < min_parallel_table_scan_size) + return 0; + + if (nworkers_requested == 0) + { + LVRelState *vacrel = (LVRelState *) state; + int heap_parallel_threshold; + int heap_pages; + BlockNumber allvisible; + BlockNumber allfrozen; + + /* + * Estimate the number of blocks that we're going to scan during + * lazy_scan_heap(). + */ + visibilitymap_count(rel, &allvisible, &allfrozen); + heap_pages = relpages - (vacrel->aggressive ? allfrozen : allvisible); + + Assert(heap_pages >= 0); + + /* + * Select the number of workers based on the log of the number of + * pages to scan. Note that the upper limit of the + * min_parallel_table_scan_size GUC is chosen to prevent overflow + * here. + */ + heap_parallel_threshold = PARALLEL_LV_CHUNK_SIZE; + while (heap_pages >= (BlockNumber) (heap_parallel_threshold * 3)) + { + parallel_workers++; + heap_parallel_threshold *= 3; + if (heap_parallel_threshold > INT_MAX / 3) + break; + } + } + else + parallel_workers = nworkers_requested; + + return parallel_workers; +} + +/* + * Estimate shared memory size required for parallel heap vacuum. + */ +void +heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers, + void *state) +{ + LVRelState *vacrel = (LVRelState *) state; + Size size = 0; + + vacrel->leader = palloc(sizeof(ParallelLVLeader)); + + /* Estimate space for ParallelLVShared */ + size = add_size(size, sizeof(ParallelLVShared)); + vacrel->leader->shared_len = size; + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->shared_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for ParallelLVScanDesc */ + vacrel->leader->scandesc_len = sizeof(ParallelLVScanDesc); + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->scandesc_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for an array of ParallelLVScanWorkerData */ + vacrel->leader->scanwork_len = mul_size(sizeof(ParallelLVScanWorkerData), + nworkers); + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->scanwork_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for an array of LVScanData */ + vacrel->leader->scandata_len = mul_size(sizeof(LVScanData), nworkers); + shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->scandata_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * Set up shared memory for parallel heap vacuum. + */ +void +heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, int nworkers, + void *state) +{ + LVRelState *vacrel = (LVRelState *) state; + ParallelLVShared *shared; + ParallelLVScanDesc *scandesc; + ParallelLVScanWorkerData *scanwork; + LVScanData *scandata; + + vacrel->plvstate = palloc0(sizeof(ParallelLVState)); + + /* Initialize ParallelLVShared */ + + shared = shm_toc_allocate(pcxt->toc, vacrel->leader->shared_len); + MemSet(shared, 0, vacrel->leader->shared_len); + shared->aggressive = vacrel->aggressive; + shared->skipwithvm = vacrel->skipwithvm; + shared->cutoffs = vacrel->cutoffs; + shared->NewRelfrozenXid = vacrel->scan_data->NewRelfrozenXid; + shared->NewRelminMxid = vacrel->scan_data->NewRelminMxid; + shared->initial_chunk_size = BlockNumberIsValid(vacrel->next_eager_scan_region_start) + ? vacrel->next_eager_scan_region_start + : PARALLEL_LV_CHUNK_SIZE; + + /* Calculate the per-chunk maximum failure count */ + shared->eager_scan_max_fails_per_chunk = + (BlockNumber) (vacrel->eager_scan_max_fails_per_region * + ((float) PARALLEL_LV_CHUNK_SIZE / EAGER_SCAN_REGION_SIZE)); + + /* including the leader too */ + shared->eager_scan_remaining_successes_per_worker = + vacrel->eager_scan_remaining_successes / (nworkers + 1); + + shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SHARED, shared); + vacrel->plvstate->shared = shared; + + /* Initialize ParallelLVScanDesc */ + scandesc = shm_toc_allocate(pcxt->toc, vacrel->leader->scandesc_len); + scandesc->nblocks = RelationGetNumberOfBlocks(rel); + pg_atomic_init_u64(&scandesc->nallocated, 0); + shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SCANDESC, scandesc); + vacrel->plvstate->scandesc = scandesc; + + /* Initialize the array of ParallelLVScanWorkerData */ + scanwork = shm_toc_allocate(pcxt->toc, vacrel->leader->scanwork_len); + MemSet(scanwork, 0, vacrel->leader->scanwork_len); + shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SCANWORKER, scanwork); + vacrel->leader->scanwork_array = scanwork; + + /* Initialize the array of LVScanData */ + scandata = shm_toc_allocate(pcxt->toc, vacrel->leader->scandata_len); + shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SCANDATA, scandata); + vacrel->leader->scandata_array = scandata; +} + +/* + * Initialize lazy vacuum state with the information retrieved from + * shared memory. + */ +void +heap_parallel_vacuum_initialize_worker(Relation rel, ParallelVacuumState *pvs, + ParallelWorkerContext *pwcxt, + void **state_out) +{ + LVRelState *vacrel; + ParallelLVState *plvstate; + ParallelLVShared *shared; + ParallelLVScanDesc *scandesc; + ParallelLVScanWorkerData *scanwork_array; + LVScanData *scandata_array; + + /* Initialize ParallelLVState and prepare the related objects */ + + plvstate = palloc0(sizeof(ParallelLVState)); + + /* Prepare ParallelLVShared */ + shared = (ParallelLVShared *) shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SHARED, false); + plvstate->shared = shared; + + /* Prepare ParallelLVScanDesc */ + scandesc = shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SCANDESC, false); + plvstate->scandesc = scandesc; + + /* Prepare ParallelLVScanWorkerData */ + scanwork_array = shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SCANWORKER, false); + plvstate->scanwork = &(scanwork_array[ParallelWorkerNumber]); + + /* Initialize LVRelState and prepare fields required by lazy scan heap */ + vacrel = palloc0(sizeof(LVRelState)); + vacrel->rel = rel; + vacrel->indrels = parallel_vacuum_get_table_indexes(pvs, + &vacrel->nindexes); + vacrel->bstrategy = parallel_vacuum_get_bstrategy(pvs); + vacrel->pvs = pvs; + vacrel->aggressive = shared->aggressive; + vacrel->skipwithvm = shared->skipwithvm; + vacrel->vistest = GlobalVisTestFor(rel); + vacrel->cutoffs = shared->cutoffs; + vacrel->dead_items = parallel_vacuum_get_dead_items(pvs, + &vacrel->dead_items_info); + vacrel->rel_pages = RelationGetNumberOfBlocks(rel); + + /* + * Set the per-region failure counter and per-worker success counter, + * which are not changed during parallel heap vacuum. + */ + vacrel->eager_scan_max_fails_per_region = + plvstate->shared->eager_scan_max_fails_per_chunk; + vacrel->eager_scan_remaining_successes = + plvstate->shared->eager_scan_remaining_successes_per_worker; + + /* Does this worker have un-scanned blocks in a chunk? */ + if (plvstate->scanwork->chunk_remaining > 0) + { + /* + * We restore the previous eager scan state of the already allocated + * chunk, if the worker's previous scan suspended due to the full of + * dead_items TIDs space. + */ + vacrel->next_eager_scan_region_start = plvstate->scanwork->next_region_start_save; + vacrel->eager_scan_remaining_fails = plvstate->scanwork->remaining_fails_save; + } + else + { + /* + * next_eager_scan_region_start will be set when the first chunk is + * assigned + */ + vacrel->next_eager_scan_region_start = InvalidBlockNumber; + vacrel->eager_scan_remaining_fails = vacrel->eager_scan_max_fails_per_region; + } + + vacrel->plvstate = plvstate; + + /* Prepare LVScanData */ + scandata_array = shm_toc_lookup(pwcxt->toc, PARALLEL_LV_KEY_SCANDATA, false); + vacrel->scan_data = &(scandata_array[ParallelWorkerNumber]); + MemSet(vacrel->scan_data, 0, sizeof(LVScanData)); + vacrel->scan_data->NewRelfrozenXid = shared->NewRelfrozenXid; + vacrel->scan_data->NewRelminMxid = shared->NewRelminMxid; + vacrel->scan_data->skippedallvis = false; + + /* + * Initialize the scan state if not yet. The chunk of blocks will be + * allocated when to get the scan block for the first time. + */ + if (!vacrel->plvstate->scanwork->inited) + parallel_lazy_scan_init_scan_worker(vacrel->plvstate->scanwork, + vacrel->plvstate->shared->initial_chunk_size); + + *state_out = (void *) vacrel; +} + +/* + * Parallel heap vacuum callback for collecting dead items (i.e., lazy heap scan). + */ +void +heap_parallel_vacuum_collect_dead_items(Relation rel, ParallelVacuumState *pvs, + void *state) +{ + LVRelState *vacrel = (LVRelState *) state; + ErrorContextCallback errcallback; + + Assert(ParallelHeapVacuumIsActive(vacrel)); + + /* + * Setup error traceback support for ereport() for parallel table vacuum + * workers + */ + vacrel->dbname = get_database_name(MyDatabaseId); + vacrel->relnamespace = get_database_name(RelationGetNamespace(rel)); + vacrel->relname = pstrdup(RelationGetRelationName(rel)); + vacrel->indname = NULL; + vacrel->phase = VACUUM_ERRCB_PHASE_SCAN_HEAP; + errcallback.callback = vacuum_error_callback; + errcallback.arg = &vacrel; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Join the parallel heap vacuum */ + do_lazy_scan_heap(vacrel, false); + + /* Advertise the last processed block number */ + pg_atomic_write_u32(&(vacrel->plvstate->scanwork->last_blkno), vacrel->last_blkno); + + /* Save the eager scan state */ + vacrel->plvstate->scanwork->remaining_fails_save = vacrel->eager_scan_remaining_fails; + vacrel->plvstate->scanwork->remaining_successes_save = vacrel->eager_scan_remaining_successes; + vacrel->plvstate->scanwork->next_region_start_save = vacrel->next_eager_scan_region_start; + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + /* * Error context callback for errors occurring during vacuum. The error * context messages for index phases should match the messages set in parallel diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 3726fb41028d..49e43b951324 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -504,6 +504,35 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) pfree(pvs); } +/* + * Return the number of parallel workers initialized for parallel table vacuum. + */ +int +parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs) +{ + return pvs->nworkers_for_table; +} + +/* + * Return the array of indexes associated to the given table to be vacuumed. + */ +Relation * +parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes) +{ + *nindexes = pvs->nindexes; + + return pvs->indrels; +} + +/* + * Return the buffer strategy for parallel vacuum. + */ +BufferAccessStrategy +parallel_vacuum_get_bstrategy(ParallelVacuumState *pvs) +{ + return pvs->bstrategy; +} + /* * Returns the dead items space and dead items information. */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index a1de400b9a53..743e29f673c7 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -15,6 +15,7 @@ #define HEAPAM_H #include "access/heapam_xlog.h" +#include "access/parallel.h" #include "access/relation.h" /* for backward compatibility */ #include "access/relscan.h" #include "access/sdir.h" @@ -391,8 +392,20 @@ extern void log_heap_prune_and_freeze(Relation relation, Buffer buffer, OffsetNumber *unused, int nunused); /* in heap/vacuumlazy.c */ +struct ParallelVacuumState; extern void heap_vacuum_rel(Relation rel, const VacuumParams params, BufferAccessStrategy bstrategy); +extern int heap_parallel_vacuum_compute_workers(Relation rel, int nworkers_requested, + void *state); +extern void heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers, + void *state); +extern void heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, + int nworkers, void *state); +extern void heap_parallel_vacuum_initialize_worker(Relation rel, struct ParallelVacuumState *pvs, + ParallelWorkerContext *pwcxt, + void **state_out); +extern void heap_parallel_vacuum_collect_dead_items(Relation rel, struct ParallelVacuumState *pvs, + void *state); /* in heap/heapam_visibility.c */ extern bool HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot, diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 1369377ea98b..876cef301b73 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -385,6 +385,9 @@ extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels BufferAccessStrategy bstrategy, void *state); extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats); +extern int parallel_vacuum_get_nworkers_table(ParallelVacuumState *pvs); +extern Relation *parallel_vacuum_get_table_indexes(ParallelVacuumState *pvs, int *nindexes); +extern BufferAccessStrategy parallel_vacuum_get_bstrategy(ParallelVacuumState *pvs); extern TidStore *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p); extern void parallel_vacuum_reset_dead_items(ParallelVacuumState *pvs); diff --git a/src/test/regress/expected/vacuum_parallel.out b/src/test/regress/expected/vacuum_parallel.out index ddf0ee544b7b..b793d8093c2c 100644 --- a/src/test/regress/expected/vacuum_parallel.out +++ b/src/test/regress/expected/vacuum_parallel.out @@ -1,5 +1,6 @@ SET max_parallel_maintenance_workers TO 4; SET min_parallel_index_scan_size TO '128kB'; +SET min_parallel_table_scan_size TO '128kB'; -- Bug #17245: Make sure that we don't totally fail to VACUUM individual indexes that -- happen to be below min_parallel_index_scan_size during parallel VACUUM: CREATE TABLE parallel_vacuum_table (a int) WITH (autovacuum_enabled = off); @@ -43,7 +44,13 @@ VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table; -- Since vacuum_in_leader_small_index uses deduplication, we expect an -- assertion failure with bug #17245 (in the absence of bugfix): INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i; +-- Insert more tuples to use parallel heap vacuum. +INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 500_000) i; +VACUUM (PARALLEL 2) parallel_vacuum_table; +DELETE FROM parallel_vacuum_table WHERE a < 1000; +VACUUM (PARALLEL 1) parallel_vacuum_table; RESET max_parallel_maintenance_workers; RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; -- Deliberately don't drop table, to get further coverage from tools like -- pg_amcheck in some testing scenarios diff --git a/src/test/regress/sql/vacuum_parallel.sql b/src/test/regress/sql/vacuum_parallel.sql index 1d23f33e39cf..5381023642f4 100644 --- a/src/test/regress/sql/vacuum_parallel.sql +++ b/src/test/regress/sql/vacuum_parallel.sql @@ -1,5 +1,6 @@ SET max_parallel_maintenance_workers TO 4; SET min_parallel_index_scan_size TO '128kB'; +SET min_parallel_table_scan_size TO '128kB'; -- Bug #17245: Make sure that we don't totally fail to VACUUM individual indexes that -- happen to be below min_parallel_index_scan_size during parallel VACUUM: @@ -39,8 +40,15 @@ VACUUM (PARALLEL 4, INDEX_CLEANUP ON) parallel_vacuum_table; -- assertion failure with bug #17245 (in the absence of bugfix): INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 10000) i; +-- Insert more tuples to use parallel heap vacuum. +INSERT INTO parallel_vacuum_table SELECT i FROM generate_series(1, 500_000) i; +VACUUM (PARALLEL 2) parallel_vacuum_table; +DELETE FROM parallel_vacuum_table WHERE a < 1000; +VACUUM (PARALLEL 1) parallel_vacuum_table; + RESET max_parallel_maintenance_workers; RESET min_parallel_index_scan_size; +RESET min_parallel_table_scan_size; -- Deliberately don't drop table, to get further coverage from tools like -- pg_amcheck in some testing scenarios diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ab8a33d1d0a7..e033c8cf73b3 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1964,6 +1964,11 @@ PLpgSQL_type PLpgSQL_type_type PLpgSQL_var PLpgSQL_variable +ParallelLVLeader +ParallelLVScanDesc +ParallelLVScanWorkerData +ParallelLVShared +ParallelLVState PLwdatum PLword PLyArrayToOb From c6be54e69d8403e67c44e1ac3109decf0a8c3d23 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 13 Jun 2025 10:58:18 -0700 Subject: [PATCH 5/5] Add more parallel vacuum tests. --- src/backend/access/heap/vacuumlazy.c | 22 ++++- src/backend/commands/vacuumparallel.c | 21 +++- .../injection_points/t/002_parallel_vacuum.pl | 97 +++++++++++++++++++ 3 files changed, 135 insertions(+), 5 deletions(-) create mode 100644 src/test/modules/injection_points/t/002_parallel_vacuum.pl diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index e6ca9c60e8ae..6b7b22816b94 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -192,6 +192,7 @@ #include "storage/freespace.h" #include "storage/lmgr.h" #include "storage/read_stream.h" +#include "utils/injection_point.h" #include "utils/lsyscache.h" #include "utils/pg_rusage.h" #include "utils/timestamp.h" @@ -466,6 +467,14 @@ typedef struct ParallelLVLeader /* The number of workers launched for parallel lazy heap scan */ int nworkers_launched; + /* + * Will the leader participate to parallel lazy heap scan? + * + * This is a parameter for testing and always true unless it is disabled + * explicitly by the injection point. + */ + bool leaderparticipate; + /* * These fields point to the arrays of all per-worker scan states stored * in DSM. @@ -2251,7 +2260,8 @@ do_parallel_lazy_scan_heap(LVRelState *vacrel) * retrieving new blocks for the read stream once the space of * dead_items TIDs exceeds the limit. */ - do_lazy_scan_heap(vacrel, false); + if (vacrel->leader->leaderparticipate) + do_lazy_scan_heap(vacrel, false); /* Wait for parallel workers to finish and gather scan results */ parallel_lazy_scan_heap_end(vacrel); @@ -4543,6 +4553,7 @@ heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers, { LVRelState *vacrel = (LVRelState *) state; Size size = 0; + bool leaderparticipate = true; vacrel->leader = palloc(sizeof(ParallelLVLeader)); @@ -4567,6 +4578,12 @@ heap_parallel_vacuum_estimate(Relation rel, ParallelContext *pcxt, int nworkers, vacrel->leader->scandata_len = mul_size(sizeof(LVScanData), nworkers); shm_toc_estimate_chunk(&pcxt->estimator, vacrel->leader->scandata_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + +#ifdef USE_INJECTION_POINTS + if (IS_INJECTION_POINT_ATTACHED("parallel-heap-vacuum-disable-leader-participation")) + leaderparticipate = false; +#endif + vacrel->leader->leaderparticipate = leaderparticipate; } /* @@ -4604,7 +4621,8 @@ heap_parallel_vacuum_initialize(Relation rel, ParallelContext *pcxt, int nworker /* including the leader too */ shared->eager_scan_remaining_successes_per_worker = - vacrel->eager_scan_remaining_successes / (nworkers + 1); + vacrel->eager_scan_remaining_successes / + (vacrel->leader->leaderparticipate ? nworkers + 1 : nworkers); shm_toc_insert(pcxt->toc, PARALLEL_LV_KEY_SHARED, shared); vacrel->plvstate->shared = shared; diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 49e43b951324..7f0869ee4dc0 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -39,6 +39,7 @@ #include "pgstat.h" #include "storage/bufmgr.h" #include "tcop/tcopprot.h" +#include "utils/injection_point.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -1035,14 +1036,28 @@ parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, int parallel_vacuum_collect_dead_items_begin(ParallelVacuumState *pvs) { + int nworkers = pvs->nworkers_for_table; +#ifdef USE_INJECTION_POINTS + static int ntimes = 0; +#endif + Assert(!IsParallelWorker()); - if (pvs->nworkers_for_table == 0) + if (nworkers == 0) return 0; /* Start parallel vacuum workers for collecting dead items */ - Assert(pvs->nworkers_for_table <= pvs->pcxt->nworkers); - parallel_vacuum_begin_work_phase(pvs, pvs->nworkers_for_table, + Assert(nworkers <= pvs->pcxt->nworkers); + +#ifdef USE_INJECTION_POINTS + if (IS_INJECTION_POINT_ATTACHED("parallel-vacuum-ramp-down-workers")) + { + nworkers = pvs->nworkers_for_table - Min(ntimes, pvs->nworkers_for_table); + ntimes++; + } +#endif + + parallel_vacuum_begin_work_phase(pvs, nworkers, PV_WORK_PHASE_COLLECT_DEAD_ITEMS); /* Include the worker count for the leader itself */ diff --git a/src/test/modules/injection_points/t/002_parallel_vacuum.pl b/src/test/modules/injection_points/t/002_parallel_vacuum.pl new file mode 100644 index 000000000000..f0ef33ed86bf --- /dev/null +++ b/src/test/modules/injection_points/t/002_parallel_vacuum.pl @@ -0,0 +1,97 @@ + +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Tests for parallel heap vacuum. + +use strict; +use warnings FATAL => 'all'; +use locale; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Test persistency of statistics generated for injection points. +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +my $node = PostgreSQL::Test::Cluster->new('master'); +$node->init; +$node->start; +$node->safe_psql('postgres', qq[create extension injection_points;]); + +$node->safe_psql('postgres', qq[ +create table t (i int) with (autovacuum_enabled = off); +create index on t (i); + ]); +my $nrows = 1_000_000; +my $first = int($nrows * rand()); +my $second = $nrows - $first; + +my $psql = $node->background_psql('postgres', on_error_stop => 0); + +# Begin the transaciton that holds xmin. +$psql->query_safe('begin; select pg_current_xact_id();'); + +# consume some xids +$node->safe_psql('postgres', qq[ +select pg_current_xact_id(); +select pg_current_xact_id(); +select pg_current_xact_id(); +select pg_current_xact_id(); +select pg_current_xact_id(); + ]); + +# While inserting $nrows tuples into the table with an older XID, +# we inject some tuples with a newer XID filling one page somewhere +# in the table. + +# Insert the first part of rows. +$psql->query_safe(qq[insert into t select generate_series(1, $first);]); + +# Insert some rows with a newer XID, which needs to fill at least +# one page to prevent the page from begin frozen in the following +# vacuum. +my $xid = $node->safe_psql('postgres', qq[ +begin; +insert into t select 0 from generate_series(1, 300); +select pg_current_xact_id()::xid; +commit; +]); + +# Insert remaining rows and commit. +$psql->query_safe(qq[insert into t select generate_series($first, $nrows);]); +$psql->query_safe(qq[commit;]); + +# Delete some rows. +$node->safe_psql('postgres', qq[delete from t where i between 1 and 20000;]); + +# Execute parallel vacuum that freezes all rows except for the +# tuple inserted by $psql. We should update the relfrozenxid up to +# that XID. Setting a lower value to maintenance_work_mem invokes +# multiple rounds of heap scanning and the number of parallel workers +# will ramp-down thanks to the injection points. +$node->safe_psql('postgres', qq[ +set vacuum_freeze_min_age to 5; +set max_parallel_maintenance_workers TO 5; +set maintenance_work_mem TO 256; +select injection_points_set_local(); +select injection_points_attach('parallel-vacuum-ramp-down-workers', 'notice'); +select injection_points_attach('parallel-heap-vacuum-disable-leader-participation', 'notice'); +vacuum (parallel 5, verbose) t; + ]); + +is( $node->safe_psql('postgres', qq[select relfrozenxid from pg_class where relname = 't';]), + "$xid", "relfrozenxid is updated as expected"); + +# Check if we have successfully frozen the table in the previous +# vacuum by scanning all tuples. +$node->safe_psql('postgres', qq[vacuum (freeze, parallel 0, verbose, disable_page_skipping) t;]); +is( $node->safe_psql('postgres', qq[select $xid < relfrozenxid::text::int from pg_class where relname = 't';]), + "t", "all rows are frozen"); + +$node->stop; +done_testing(); +