diff options
Diffstat (limited to 'src/backend/executor/execAsync.c')
| -rw-r--r-- | src/backend/executor/execAsync.c | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c new file mode 100644 index 0000000000..20601fa04e --- /dev/null +++ b/src/backend/executor/execAsync.c @@ -0,0 +1,256 @@ +/*------------------------------------------------------------------------- + * + * execAsync.c + * Support routines for asynchronous execution. + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * This file contains routines that are intended to asynchronous + * execution; that is, suspending an executor node until some external + * event occurs, or until one of its child nodes produces a tuple. + * This allows the executor to avoid blocking on a single external event, + * such as a file descriptor waiting on I/O, or a parallel worker which + * must complete work elsewhere in the plan tree, when there might at the + * same time be useful computation that could be accomplished in some + * other part of the plan tree. + * + * IDENTIFICATION + * src/backend/executor/execParallel.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execAsync.h" +#include "executor/executor.h" +#include "storage/latch.h" + +#define EVENT_BUFFER_SIZE 16 + +static void ExecAsyncConfigureWait(PlanState *planstate, bool reinit); + +void +ExecAsyncWaitForNode(PlanState *planstate) +{ + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + PlanState *callbacks[EVENT_BUFFER_SIZE]; + int ncallbacks = 0; + EState *estate = planstate->state; + + while (!planstate->result_ready) + { + bool reinit = (estate->es_wait_event_set == NULL); + int n; + int noccurred; + + if (reinit) + { + /* + * Allow for a few extra events without reinitializing. It + * doesn't seem worth the complexity of doing anything very + * aggressive here, because plans that depend on massive numbers + * of external FDs are likely to run afoul of kernel limits anyway. + */ + estate->es_max_async_events = estate->es_total_async_events + 16; + estate->es_wait_event_set = + CreateWaitEventSet(estate->es_query_cxt, + estate->es_max_async_events); + } + + /* Give each waiting node a chance to add or modify events. */ + for (n = 0; n < estate->es_num_waiting_nodes; ++n) + ExecAsyncConfigureWait(estate->es_waiting_nodes[n], reinit); + + /* Wait for at least one event to occur. */ + noccurred = WaitEventSetWait(estate->es_wait_event_set, -1, + occurred_event, EVENT_BUFFER_SIZE); + Assert(noccurred > 0); + + /* + * Loop over the occurred events and make a list of nodes that need + * a callback. The waiting nodes should have registered their wait + * events with user_data pointing back to the node. + */ + for (n = 0; n < noccurred; ++n) + { + WaitEvent *w = &occurred_event[n]; + PlanState *ps = w->user_data; + + callbacks[ncallbacks++] = ps; + } + + /* + * Initially, this loop will call the node-type-specific function for + * each node for which an event occurred. If any of those nodes + * produce a result, its parent enters the set of nodes that are + * pending for a callback. In this way, when a result becomes + * available in a leaf of the plan tree, it can bubble upwards towards + * the root as far as necessary. + */ + while (ncallbacks > 0) + { + int i, + j; + + /* Loop over all callbacks. */ + for (i = 0; i < ncallbacks; ++i) + { + /* Skip if NULL. */ + if (callbacks[i] == NULL) + continue; + + /* + * Remove any duplicates. O(n) may not seem good, but it + * should hopefully be OK as long as EVENT_BUFFER_SIZE is + * not too large. + */ + for (j = i + 1; j < ncallbacks; ++j) + if (callbacks[i] == callbacks[j]) + callbacks[j] = NULL; + + /* Dispatch to node-type-specific code. */ + ExecDispatchNode(callbacks[i]); + + /* + * If there's now a tuple ready, we must dispatch to the + * parent node; otherwise, there's nothing more to do. + */ + if (callbacks[i]->result_ready) + callbacks[i] = callbacks[i]->parent; + else + callbacks[i] = NULL; + } + + /* Squeeze out NULLs. */ + for (i = 0, j = 0; j < ncallbacks; ++j) + if (callbacks[j] != NULL) + callbacks[i++] = callbacks[j]; + ncallbacks = i; + } + } +} + +/* + * An executor node should call this function to signal that it needs to wait + * on one more or events that can be registered on a WaitEventSet. nevents + * should be the maximum number of events that it will wish to register. + * reinit should be true if the node can't reuse the WaitEventSet it most + * recently initialized, for example because it needs to drop a wait event + * from the set. + */ +void +ExecAsyncNeedsWait(PlanState *planstate, int nevents, bool reinit) +{ + EState *estate = planstate->state; + + Assert(nevents > 0); /* otherwise, use ExecAsyncDoesNotNeedWait */ + + /* + * If this node is not already present in the array of waiting nodes, + * then add it. If that array hasn't been allocated or is full, this may + * require (re)allocating it. + */ + if (planstate->n_async_events == 0) + { + if (estate->es_max_waiting_nodes >= estate->es_num_waiting_nodes) + { + int newmax; + + if (estate->es_max_waiting_nodes == 0) + { + newmax = 16; + estate->es_waiting_nodes = + MemoryContextAlloc(estate->es_query_cxt, newmax); + } + else + { + newmax = estate->es_max_waiting_nodes * 2; + estate->es_waiting_nodes = + repalloc(estate->es_waiting_nodes, + newmax * sizeof(PlanState *)); + } + estate->es_max_waiting_nodes = newmax; + } + estate->es_waiting_nodes[estate->es_num_waiting_nodes++] = planstate; + } + + /* Adjust per-node and per-estate totals. */ + estate->es_total_async_events -= planstate->n_async_events; + planstate->n_async_events = nevents; + estate->es_total_async_events += planstate->n_async_events; + + /* + * If a WaitEventSet has already been created, we need to discard it and + * start again if the user passed reinit = true, or if the total number of + * required events exceeds the supported number. + */ + if (estate->es_wait_event_set != NULL && (reinit || + estate->es_total_async_events > estate->es_max_async_events)) + { + FreeWaitEventSet(estate->es_wait_event_set); + estate->es_wait_event_set = NULL; + } +} + +/* + * If an executor node no longer needs to wait, it should call this function + * to report that fact. + */ +void +ExecAsyncDoesNotNeedWait(PlanState *planstate) +{ + int n; + EState *estate = planstate->state; + + if (planstate->n_async_events <= 0) + return; + + /* + * Remove the node from the list of waiting nodes. (Is a linear search + * going to be a problem here? I think probably not.) + */ + for (n = 0; n < estate->es_num_waiting_nodes; ++n) + { + if (estate->es_waiting_nodes[n] == planstate) + { + estate->es_waiting_nodes[n] = + estate->es_waiting_nodes[--estate->es_num_waiting_nodes]; + break; + } + } + + /* We should always find ourselves in the array. */ + Assert(n < estate->es_num_waiting_nodes); + + /* We no longer need any asynchronous events. */ + estate->es_total_async_events -= planstate->n_async_events; + planstate->n_async_events = 0; + + /* + * The next wait will need to rebuild the WaitEventSet, because whatever + * events we registered are gone now. It's probably OK that this code + * assumes we actually did register some events at one point, because we + * needed to wait at some point and we don't any more. + */ + if (estate->es_wait_event_set != NULL) + { + FreeWaitEventSet(estate->es_wait_event_set); + estate->es_wait_event_set = NULL; + } +} + +/* + * Give per-nodetype function a chance to register wait events. + */ +static void +ExecAsyncConfigureWait(PlanState *planstate, bool reinit) +{ + switch (nodeTag(planstate)) + { + /* XXX: Add calls to per-nodetype handlers here. */ + default: + elog(ERROR, "unexpected node type: %d", nodeTag(planstate)); + } +} |
