/*-------------------------------------------------------------------------- * * parallel_dummy.c * Test harness code for parallel mode code. * * Copyright (C) 2013-2014, PostgreSQL Global Development Group * * IDENTIFICATION * contrib/parallel_dummy/parallel_dummy.c * * ------------------------------------------------------------------------- */ #include "postgres.h" #include "access/heapam.h" #include "access/parallel.h" #include "access/xact.h" #include "fmgr.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "storage/spin.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/snapmgr.h" #include "utils/tqual.h" PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(parallel_sleep); PG_FUNCTION_INFO_V1(parallel_count); #define PARALLEL_DUMMY_KEY 1 typedef struct { int32 sleep_time; } ParallelSleepInfo; typedef struct { int32 relid; slock_t mutex; BlockNumber lastblock; BlockNumber currentblock; BlockNumber prefetchblock; int64 ntuples; } ParallelCountInfo; void _PG_init(void); void sleep_worker_main(dsm_segment *seg, shm_toc *toc); void count_worker_main(dsm_segment *seg, shm_toc *toc); static void count_helper(Relation rel, ParallelCountInfo *info); int prefetch_distance; int prefetch_increment; void _PG_init() { DefineCustomIntVariable("parallel_dummy.prefetch_distance", "Sets the prefetch distance in blocks.", NULL, &prefetch_distance, 0, 0, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL); DefineCustomIntVariable("parallel_dummy.prefetch_increment", "Sets the prefetch increment in blocks.", NULL, &prefetch_increment, 8, 1, INT_MAX, PGC_USERSET, 0, NULL, NULL, NULL); } Datum parallel_sleep(PG_FUNCTION_ARGS) { int32 sleep_time = PG_GETARG_INT32(0); int32 nworkers = PG_GETARG_INT32(1); bool already_in_parallel_mode = IsInParallelMode(); ParallelContext *pcxt; ParallelSleepInfo *info; if (nworkers < 0) ereport(ERROR, (errmsg("number of parallel workers must be non-negative"))); if (!already_in_parallel_mode) EnterParallelMode(); pcxt = CreateParallelContextForExtension("parallel_dummy", "sleep_worker_main", nworkers); shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelSleepInfo)); shm_toc_estimate_keys(&pcxt->estimator, 1); InitializeParallelDSM(pcxt); info = shm_toc_allocate(pcxt->toc, sizeof(ParallelSleepInfo)); info->sleep_time = sleep_time; shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info); LaunchParallelWorkers(pcxt); /* here's where we do the "real work" ... */ DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) sleep_time)); WaitForParallelWorkersToFinish(pcxt); DestroyParallelContext(pcxt); if (!already_in_parallel_mode) ExitParallelMode(); PG_RETURN_VOID(); } Datum parallel_count(PG_FUNCTION_ARGS) { Oid relid = PG_GETARG_OID(0); int32 nworkers = PG_GETARG_INT32(1); bool already_in_parallel_mode = IsInParallelMode(); ParallelContext *pcxt; ParallelCountInfo *info; Relation rel; int64 result; if (nworkers < 0) ereport(ERROR, (errmsg("number of parallel workers must be non-negative"))); rel = relation_open(relid, AccessShareLock); if (!already_in_parallel_mode) EnterParallelMode(); pcxt = CreateParallelContextForExtension("parallel_dummy", "count_worker_main", nworkers); shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelCountInfo)); shm_toc_estimate_keys(&pcxt->estimator, 1); InitializeParallelDSM(pcxt); info = shm_toc_allocate(pcxt->toc, sizeof(ParallelCountInfo)); info->relid = relid; SpinLockInit(&info->mutex); info->lastblock = RelationGetNumberOfBlocks(rel); info->currentblock = 0; info->prefetchblock = 0; info->ntuples = 0; shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info); LaunchParallelWorkers(pcxt); /* here's where we do the "real work" ... */ count_helper(rel, info); WaitForParallelWorkersToFinish(pcxt); result = info->ntuples; DestroyParallelContext(pcxt); relation_close(rel, AccessShareLock); if (!already_in_parallel_mode) ExitParallelMode(); PG_RETURN_INT64(result); } void sleep_worker_main(dsm_segment *seg, shm_toc *toc) { ParallelSleepInfo *info; info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY); Assert(info != NULL); /* here's where we do the "real work" ... */ DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) info->sleep_time)); } void count_worker_main(dsm_segment *seg, shm_toc *toc) { ParallelCountInfo *info; Relation rel; info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY); Assert(info != NULL); rel = relation_open(info->relid, AccessShareLock); count_helper(rel, info); relation_close(rel, AccessShareLock); } static void count_helper(Relation rel, ParallelCountInfo *info) { int64 ntuples = 0; int64 mytuples = 0; Oid relid = info->relid; Snapshot snapshot = GetActiveSnapshot(); for (;;) { BlockNumber blkno; Buffer buffer; Page page; int lines; OffsetNumber lineoff; ItemId lpp; bool all_visible; bool done = false; #ifdef USE_PREFETCH BlockNumber prefetch_blkno = InvalidBlockNumber; uint32 prefetch_count = 0; #endif CHECK_FOR_INTERRUPTS(); SpinLockAcquire(&info->mutex); if (info->currentblock >= info->lastblock) done = true; else { #ifdef USE_PREFETCH BlockNumber max_prefetch; max_prefetch = info->lastblock - info->prefetchblock; if (max_prefetch > 0 && info->prefetchblock - info->currentblock < prefetch_distance) { prefetch_blkno = info->prefetchblock; prefetch_count = Min(prefetch_increment, max_prefetch); info->prefetchblock += prefetch_count; } #endif blkno = info->currentblock++; } info->ntuples += ntuples; SpinLockRelease(&info->mutex); mytuples += ntuples; if (done) break; #ifdef USE_PREFETCH while (prefetch_count > 0) { PrefetchBuffer(rel, MAIN_FORKNUM, prefetch_blkno); ++prefetch_blkno; --prefetch_count; } #endif buffer = ReadBuffer(rel, blkno); LockBuffer(buffer, BUFFER_LOCK_SHARE); page = BufferGetPage(buffer); lines = PageGetMaxOffsetNumber(page); ntuples = 0; all_visible = PageIsAllVisible(page) && !snapshot->takenDuringRecovery; for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(page, lineoff); lineoff <= lines; lineoff++, lpp++) { HeapTupleData loctup; if (!ItemIdIsNormal(lpp)) continue; if (all_visible) { ++ntuples; continue; } loctup.t_tableOid = relid; loctup.t_data = (HeapTupleHeader) PageGetItem(page, lpp); loctup.t_len = ItemIdGetLength(lpp); if (HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer)) ++ntuples; } UnlockReleaseBuffer(buffer); } elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples", MyProcPid, mytuples); }