diff options
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
| -rw-r--r-- | contrib/postgres_fdw/connection.c | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 8ca1c1c898..0a29c63105 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -50,6 +50,13 @@ typedef struct ConnCacheEntry bool have_error; /* have any subxacts aborted in this xact? */ } ConnCacheEntry; +struct PgFdwAsyncQuery +{ + PGconn *conn; + char *query; + PGresult *res; +}; + /* * Connection cache (initialized on first use) */ @@ -521,6 +528,68 @@ pgfdw_get_result(PGconn *conn, const char *query) } /* + * Begin an asynchronous query using a PGconn. + */ +PgFdwAsyncQuery * +pgfdw_begin_query(PGconn *conn, const char *query) +{ + PgFdwAsyncQuery *aq; + MemoryContext oldcontext; + + /* + * XXX. This should be added to a list someplace or to the connection cache + * so that we don't leak it if an error happens. But I haven't written + * code to do that yet, so right now we just leak. + * + * XXX. Also, we need to track the fact that the connection is busy so + * that we don't end up with multiple scans trying to use the same + * connection at the same time. + */ + oldcontext = MemoryContextSwitchTo(CacheMemoryContext); + aq = palloc(sizeof(PgFdwAsyncQuery)); + aq->conn = conn; + aq->query = pstrdup(query); + aq->res = NULL; + MemoryContextSwitchTo(oldcontext); + + /* See comments in pgfdw_exec_query. */ + if (!PQsendQuery(conn, query)) + pgfdw_report_error(ERROR, NULL, conn, false, query); + + /* Return handle to caller. */ + return aq; +} + +/* + * See whether an asynchronous query using a PGconn has finished. Returns + * true if yes and false if no. In the former case, *result gets the final + * result of the query. + */ +bool +pgfdw_finish_query(PgFdwAsyncQuery *aq, PGresult **result) +{ + for (;;) + { + PGresult *res; + + if (!PQconsumeInput(aq->conn)) + pgfdw_report_error(ERROR, NULL, aq->conn, false, aq->query); + if (PQisBusy(aq->conn)) + return false; + res = PQgetResult(aq->conn); + if (res == NULL) + { + res = aq->res; + pfree(aq->query); + pfree(aq); + *result = res; + return true; + } + aq->res = res; + } +} + +/* * Report an error we got from the remote server. * * elevel: error level to use (typically ERROR, but might be less) |
