/*------------------------------------------------------------------------- * * execAsync.c * Support routines for asynchronous execution. * * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * This file contains routines that are intended to asynchronous * execution; that is, suspending an executor node until some external * event occurs, or until one of its child nodes produces a tuple. * This allows the executor to avoid blocking on a single external event, * such as a file descriptor waiting on I/O, or a parallel worker which * must complete work elsewhere in the plan tree, when there might at the * same time be useful computation that could be accomplished in some * other part of the plan tree. * * IDENTIFICATION * src/backend/executor/execParallel.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "executor/execAsync.h" #include "executor/executor.h" #include "storage/latch.h" #define EVENT_BUFFER_SIZE 16 static void ExecAsyncConfigureWait(PlanState *planstate, bool reinit); void ExecAsyncWaitForNode(PlanState *planstate) { WaitEvent occurred_event[EVENT_BUFFER_SIZE]; PlanState *callbacks[EVENT_BUFFER_SIZE]; int ncallbacks = 0; EState *estate = planstate->state; while (!planstate->result_ready) { bool reinit = (estate->es_wait_event_set == NULL); int n; int noccurred; if (reinit) { /* * Allow for a few extra events without reinitializing. It * doesn't seem worth the complexity of doing anything very * aggressive here, because plans that depend on massive numbers * of external FDs are likely to run afoul of kernel limits anyway. */ estate->es_max_async_events = estate->es_total_async_events + 16; estate->es_wait_event_set = CreateWaitEventSet(estate->es_query_cxt, estate->es_max_async_events); } /* Give each waiting node a chance to add or modify events. */ for (n = 0; n < estate->es_num_waiting_nodes; ++n) ExecAsyncConfigureWait(estate->es_waiting_nodes[n], reinit); /* Wait for at least one event to occur. */ noccurred = WaitEventSetWait(estate->es_wait_event_set, -1, occurred_event, EVENT_BUFFER_SIZE); Assert(noccurred > 0); /* * Loop over the occurred events and make a list of nodes that need * a callback. The waiting nodes should have registered their wait * events with user_data pointing back to the node. */ for (n = 0; n < noccurred; ++n) { WaitEvent *w = &occurred_event[n]; PlanState *ps = w->user_data; callbacks[ncallbacks++] = ps; } /* * Initially, this loop will call the node-type-specific function for * each node for which an event occurred. If any of those nodes * produce a result, its parent enters the set of nodes that are * pending for a callback. In this way, when a result becomes * available in a leaf of the plan tree, it can bubble upwards towards * the root as far as necessary. */ while (ncallbacks > 0) { int i, j; /* Loop over all callbacks. */ for (i = 0; i < ncallbacks; ++i) { /* Skip if NULL. */ if (callbacks[i] == NULL) continue; /* * Remove any duplicates. O(n) may not seem good, but it * should hopefully be OK as long as EVENT_BUFFER_SIZE is * not too large. */ for (j = i + 1; j < ncallbacks; ++j) if (callbacks[i] == callbacks[j]) callbacks[j] = NULL; /* Dispatch to node-type-specific code. */ ExecDispatchNode(callbacks[i]); /* * If there's now a tuple ready, we must dispatch to the * parent node; otherwise, there's nothing more to do. */ if (callbacks[i]->result_ready) callbacks[i] = callbacks[i]->parent; else callbacks[i] = NULL; } /* Squeeze out NULLs. */ for (i = 0, j = 0; j < ncallbacks; ++j) if (callbacks[j] != NULL) callbacks[i++] = callbacks[j]; ncallbacks = i; } } } /* * An executor node should call this function to signal that it needs to wait * on one more or events that can be registered on a WaitEventSet. nevents * should be the maximum number of events that it will wish to register. * reinit should be true if the node can't reuse the WaitEventSet it most * recently initialized, for example because it needs to drop a wait event * from the set. */ void ExecAsyncNeedsWait(PlanState *planstate, int nevents, bool reinit) { EState *estate = planstate->state; Assert(nevents > 0); /* otherwise, use ExecAsyncDoesNotNeedWait */ /* * If this node is not already present in the array of waiting nodes, * then add it. If that array hasn't been allocated or is full, this may * require (re)allocating it. */ if (planstate->n_async_events == 0) { if (estate->es_max_waiting_nodes >= estate->es_num_waiting_nodes) { int newmax; if (estate->es_max_waiting_nodes == 0) { newmax = 16; estate->es_waiting_nodes = MemoryContextAlloc(estate->es_query_cxt, newmax); } else { newmax = estate->es_max_waiting_nodes * 2; estate->es_waiting_nodes = repalloc(estate->es_waiting_nodes, newmax * sizeof(PlanState *)); } estate->es_max_waiting_nodes = newmax; } estate->es_waiting_nodes[estate->es_num_waiting_nodes++] = planstate; } /* Adjust per-node and per-estate totals. */ estate->es_total_async_events -= planstate->n_async_events; planstate->n_async_events = nevents; estate->es_total_async_events += planstate->n_async_events; /* * If a WaitEventSet has already been created, we need to discard it and * start again if the user passed reinit = true, or if the total number of * required events exceeds the supported number. */ if (estate->es_wait_event_set != NULL && (reinit || estate->es_total_async_events > estate->es_max_async_events)) { FreeWaitEventSet(estate->es_wait_event_set); estate->es_wait_event_set = NULL; } } /* * If an executor node no longer needs to wait, it should call this function * to report that fact. */ void ExecAsyncDoesNotNeedWait(PlanState *planstate) { int n; EState *estate = planstate->state; if (planstate->n_async_events <= 0) return; /* * Remove the node from the list of waiting nodes. (Is a linear search * going to be a problem here? I think probably not.) */ for (n = 0; n < estate->es_num_waiting_nodes; ++n) { if (estate->es_waiting_nodes[n] == planstate) { estate->es_waiting_nodes[n] = estate->es_waiting_nodes[--estate->es_num_waiting_nodes]; break; } } /* We should always find ourselves in the array. */ Assert(n < estate->es_num_waiting_nodes); /* We no longer need any asynchronous events. */ estate->es_total_async_events -= planstate->n_async_events; planstate->n_async_events = 0; /* * The next wait will need to rebuild the WaitEventSet, because whatever * events we registered are gone now. It's probably OK that this code * assumes we actually did register some events at one point, because we * needed to wait at some point and we don't any more. */ if (estate->es_wait_event_set != NULL) { FreeWaitEventSet(estate->es_wait_event_set); estate->es_wait_event_set = NULL; } } /* * Give per-nodetype function a chance to register wait events. */ static void ExecAsyncConfigureWait(PlanState *planstate, bool reinit) { switch (nodeTag(planstate)) { /* XXX: Add calls to per-nodetype handlers here. */ default: elog(ERROR, "unexpected node type: %d", nodeTag(planstate)); } }