summaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/connection.c
diff options
context:
space:
mode:
authorRobert Haas2016-09-23 21:34:03 +0000
committerRobert Haas2016-09-23 21:36:25 +0000
commit51ca0665e81a92b72b398b5ac1182c1231129ef4 (patch)
tree191be101a559ea50b1d95a8eacc54f1888aa7473 /contrib/postgres_fdw/connection.c
parentf07555cd8ebca78dec856c73a9518dfbbbdc631f (diff)
beginnings of true async attempt, but doesn't workasync2
ERROR: another command is already in progress CONTEXT: Remote SQL command: DECLARE c2 CURSOR FOR SELECT a, b FROM t.foo2 STATEMENT: select * from foo; and if we got past that we can't configure a wait and there are various other problems
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
-rw-r--r--contrib/postgres_fdw/connection.c69
1 files changed, 69 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 8ca1c1c898..0a29c63105 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -50,6 +50,13 @@ typedef struct ConnCacheEntry
bool have_error; /* have any subxacts aborted in this xact? */
} ConnCacheEntry;
+struct PgFdwAsyncQuery
+{
+ PGconn *conn;
+ char *query;
+ PGresult *res;
+};
+
/*
* Connection cache (initialized on first use)
*/
@@ -521,6 +528,68 @@ pgfdw_get_result(PGconn *conn, const char *query)
}
/*
+ * Begin an asynchronous query using a PGconn.
+ */
+PgFdwAsyncQuery *
+pgfdw_begin_query(PGconn *conn, const char *query)
+{
+ PgFdwAsyncQuery *aq;
+ MemoryContext oldcontext;
+
+ /*
+ * XXX. This should be added to a list someplace or to the connection cache
+ * so that we don't leak it if an error happens. But I haven't written
+ * code to do that yet, so right now we just leak.
+ *
+ * XXX. Also, we need to track the fact that the connection is busy so
+ * that we don't end up with multiple scans trying to use the same
+ * connection at the same time.
+ */
+ oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
+ aq = palloc(sizeof(PgFdwAsyncQuery));
+ aq->conn = conn;
+ aq->query = pstrdup(query);
+ aq->res = NULL;
+ MemoryContextSwitchTo(oldcontext);
+
+ /* See comments in pgfdw_exec_query. */
+ if (!PQsendQuery(conn, query))
+ pgfdw_report_error(ERROR, NULL, conn, false, query);
+
+ /* Return handle to caller. */
+ return aq;
+}
+
+/*
+ * See whether an asynchronous query using a PGconn has finished. Returns
+ * true if yes and false if no. In the former case, *result gets the final
+ * result of the query.
+ */
+bool
+pgfdw_finish_query(PgFdwAsyncQuery *aq, PGresult **result)
+{
+ for (;;)
+ {
+ PGresult *res;
+
+ if (!PQconsumeInput(aq->conn))
+ pgfdw_report_error(ERROR, NULL, aq->conn, false, aq->query);
+ if (PQisBusy(aq->conn))
+ return false;
+ res = PQgetResult(aq->conn);
+ if (res == NULL)
+ {
+ res = aq->res;
+ pfree(aq->query);
+ pfree(aq);
+ *result = res;
+ return true;
+ }
+ aq->res = res;
+ }
+}
+
+/*
* Report an error we got from the remote server.
*
* elevel: error level to use (typically ERROR, but might be less)