summaryrefslogtreecommitdiff
path: root/src/backend/executor/execAsync.c
blob: 20601fa04ee3d8f3e9f5acc53d51307c4d86eeb4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
/*-------------------------------------------------------------------------
 *
 * execAsync.c
 *	  Support routines for asynchronous execution.
 *
 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * This file contains routines that are intended to asynchronous
 * execution; that is, suspending an executor node until some external
 * event occurs, or until one of its child nodes produces a tuple.
 * This allows the executor to avoid blocking on a single external event,
 * such as a file descriptor waiting on I/O, or a parallel worker which
 * must complete work elsewhere in the plan tree, when there might at the
 * same time be useful computation that could be accomplished in some
 * other part of the plan tree.
 *
 * IDENTIFICATION
 *	  src/backend/executor/execParallel.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "executor/execAsync.h"
#include "executor/executor.h"
#include "storage/latch.h"

#define	EVENT_BUFFER_SIZE		16

static void ExecAsyncConfigureWait(PlanState *planstate, bool reinit);

void
ExecAsyncWaitForNode(PlanState *planstate)
{
	WaitEvent	occurred_event[EVENT_BUFFER_SIZE];
	PlanState  *callbacks[EVENT_BUFFER_SIZE];
	int			ncallbacks = 0;
	EState *estate = planstate->state;

	while (!planstate->result_ready)
	{
		bool	reinit = (estate->es_wait_event_set == NULL);
		int		n;
		int		noccurred;

		if (reinit)
		{
			/*
			 * Allow for a few extra events without reinitializing.  It
			 * doesn't seem worth the complexity of doing anything very
			 * aggressive here, because plans that depend on massive numbers
			 * of external FDs are likely to run afoul of kernel limits anyway.
			 */
			estate->es_max_async_events = estate->es_total_async_events + 16;
			estate->es_wait_event_set =
				CreateWaitEventSet(estate->es_query_cxt,
								   estate->es_max_async_events);
		}

		/* Give each waiting node a chance to add or modify events. */
		for (n = 0; n < estate->es_num_waiting_nodes; ++n)
			ExecAsyncConfigureWait(estate->es_waiting_nodes[n], reinit);

		/* Wait for at least one event to occur. */
		noccurred = WaitEventSetWait(estate->es_wait_event_set, -1,
									 occurred_event, EVENT_BUFFER_SIZE);
		Assert(noccurred > 0);

		/*
		 * Loop over the occurred events and make a list of nodes that need
		 * a callback.  The waiting nodes should have registered their wait
		 * events with user_data pointing back to the node.
		 */
		for (n = 0; n < noccurred; ++n)
		{
			WaitEvent  *w = &occurred_event[n];
			PlanState  *ps = w->user_data;

			callbacks[ncallbacks++] = ps;
		}

		/*
		 * Initially, this loop will call the node-type-specific function for
		 * each node for which an event occurred.  If any of those nodes
		 * produce a result, its parent enters the set of nodes that are
		 * pending for a callback.  In this way, when a result becomes
		 * available in a leaf of the plan tree, it can bubble upwards towards
		 * the root as far as necessary.
		 */
		while (ncallbacks > 0)
		{
			int		i,
					j;

			/* Loop over all callbacks. */
			for (i = 0; i < ncallbacks; ++i)
			{
				/* Skip if NULL. */
				if (callbacks[i] == NULL)
					continue;

				/*
				 * Remove any duplicates.  O(n) may not seem good, but it
				 * should hopefully be OK as long as EVENT_BUFFER_SIZE is
				 * not too large.
				 */
				for (j = i + 1; j < ncallbacks; ++j)
					if (callbacks[i] == callbacks[j])
						callbacks[j] = NULL;

				/* Dispatch to node-type-specific code. */
				ExecDispatchNode(callbacks[i]);

				/*
				 * If there's now a tuple ready, we must dispatch to the
				 * parent node; otherwise, there's nothing more to do.
				 */
				if (callbacks[i]->result_ready)
					callbacks[i] = callbacks[i]->parent;
				else
					callbacks[i] = NULL;
			}

			/* Squeeze out NULLs. */
			for (i = 0, j = 0; j < ncallbacks; ++j)
				if (callbacks[j] != NULL)
					callbacks[i++] = callbacks[j];
			ncallbacks = i;
		}
	}
}

/*
 * An executor node should call this function to signal that it needs to wait
 * on one more or events that can be registered on a WaitEventSet.  nevents
 * should be the maximum number of events that it will wish to register.
 * reinit should be true if the node can't reuse the WaitEventSet it most
 * recently initialized, for example because it needs to drop a wait event
 * from the set.
 */
void
ExecAsyncNeedsWait(PlanState *planstate, int nevents, bool reinit)
{
	EState *estate = planstate->state;

	Assert(nevents > 0); 	/* otherwise, use ExecAsyncDoesNotNeedWait */

	/*
	 * If this node is not already present in the array of waiting nodes,
	 * then add it.  If that array hasn't been allocated or is full, this may
	 * require (re)allocating it.
	 */
	if (planstate->n_async_events == 0)
	{
		if (estate->es_max_waiting_nodes >= estate->es_num_waiting_nodes)
		{
			int		newmax;

			if (estate->es_max_waiting_nodes == 0)
			{
				newmax = 16;
				estate->es_waiting_nodes =
					MemoryContextAlloc(estate->es_query_cxt, newmax);
			}
			else
			{
				newmax = estate->es_max_waiting_nodes * 2;
				estate->es_waiting_nodes =
					repalloc(estate->es_waiting_nodes,
							 newmax * sizeof(PlanState *));
			}
			estate->es_max_waiting_nodes = newmax;
		}
		estate->es_waiting_nodes[estate->es_num_waiting_nodes++] = planstate;
	}

	/* Adjust per-node and per-estate totals. */
	estate->es_total_async_events -= planstate->n_async_events;
	planstate->n_async_events = nevents;
	estate->es_total_async_events += planstate->n_async_events;

	/*
	 * If a WaitEventSet has already been created, we need to discard it and
	 * start again if the user passed reinit = true, or if the total number of
	 * required events exceeds the supported number.
	 */
	if (estate->es_wait_event_set != NULL && (reinit ||
		estate->es_total_async_events > estate->es_max_async_events))
	{
		FreeWaitEventSet(estate->es_wait_event_set);
		estate->es_wait_event_set = NULL;
	}
}

/*
 * If an executor node no longer needs to wait, it should call this function
 * to report that fact.
 */
void
ExecAsyncDoesNotNeedWait(PlanState *planstate)
{
	int		n;
	EState *estate = planstate->state;

	if (planstate->n_async_events <= 0)
		return;

	/*
	 * Remove the node from the list of waiting nodes.  (Is a linear search
	 * going to be a problem here?  I think probably not.)
	 */
	for (n = 0; n < estate->es_num_waiting_nodes; ++n)
	{
		if (estate->es_waiting_nodes[n] == planstate)
		{
			estate->es_waiting_nodes[n] =
				estate->es_waiting_nodes[--estate->es_num_waiting_nodes];
			break;
		}
	}

	/* We should always find ourselves in the array. */
	Assert(n < estate->es_num_waiting_nodes);

	/* We no longer need any asynchronous events. */
	estate->es_total_async_events -= planstate->n_async_events;
	planstate->n_async_events = 0;

	/*
	 * The next wait will need to rebuild the WaitEventSet, because whatever
	 * events we registered are gone now.  It's probably OK that this code
	 * assumes we actually did register some events at one point, because we
	 * needed to wait at some point and we don't any more.
	 */
	if (estate->es_wait_event_set != NULL)
	{
		FreeWaitEventSet(estate->es_wait_event_set);
		estate->es_wait_event_set = NULL;
	}
}

/*
 * Give per-nodetype function a chance to register wait events.
 */
static void
ExecAsyncConfigureWait(PlanState *planstate, bool reinit)
{
	switch (nodeTag(planstate))
	{
		/* XXX: Add calls to per-nodetype handlers here. */
		default:
			elog(ERROR, "unexpected node type: %d", nodeTag(planstate));
	}
}