diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 09714ac36b8..f87c195191a 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -1487,7 +1487,7 @@ internal_flush(void) char *bufptr = PqSendBuffer + PqSendStart; char *bufend = PqSendBuffer + PqSendPointer; - while (bufptr < bufend || zpq_buffered(PqStream) != 0) + while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0) /* has more data to flush or unsent data in internal compression buffer */ { int r; diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c index 36cf44e9d55..45423173ca2 100644 --- a/src/common/zpq_stream.c +++ b/src/common/zpq_stream.c @@ -50,7 +50,12 @@ typedef struct /* * Returns amount of data in internal compression buffer. */ - size_t (*buffered)(ZpqStream *zs); + size_t (*buffered_tx)(ZpqStream *zs); + + /* + * Returns amount of data in internal decompression buffer. + */ + size_t (*buffered_rx)(ZpqStream *zs); } ZpqAlgorithm; struct ZpqStream @@ -74,7 +79,8 @@ typedef struct ZstdStream ZSTD_outBuffer tx; ZSTD_inBuffer rx; size_t tx_not_flushed; /* Amount of data in internal zstd buffer */ - size_t tx_buffered; /* Data which is consumed by ztd_read but not yet sent */ + size_t tx_buffered; /* Data which is consumed by zstd_write but not yet sent */ + size_t rx_buffered; /* Data which is received by zstd_read but not yet decompressed */ zpq_tx_func tx_func; zpq_rx_func rx_func; void* arg; @@ -112,6 +118,7 @@ zstd_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, zs->rx_total = zs->rx_total_raw = 0; zs->rx.size = rx_data_size; + zs->rx_buffered = rx_data_size; Assert(rx_data_size < ZSTD_BUFFER_SIZE); memcpy(zs->rx_buf, rx_data, rx_data_size); @@ -136,6 +143,7 @@ zstd_read(ZpqStream *zstream, void *buf, size_t size) zs->rx_error = ZSTD_getErrorName(rc); return ZPQ_DECOMPRESS_ERROR; } + zs->rx_buffered = zs->rx.size - zs->rx.pos; /* Return result if we fill requested amount of bytes or read operation was performed */ if (out.pos != 0) { @@ -151,6 +159,7 @@ zstd_read(ZpqStream *zstream, void *buf, size_t size) { zs->rx.size += rc; zs->rx_total += rc; + zs->rx_buffered += rc; } else /* read failed */ { @@ -225,12 +234,19 @@ zstd_error(ZpqStream *zstream) } static size_t -zstd_buffered(ZpqStream *zstream) +zstd_buffered_tx(ZpqStream *zstream) { ZstdStream* zs = (ZstdStream*)zstream; return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0; } +static size_t +zstd_buffered_rx(ZpqStream *zstream) +{ + ZstdStream *zs = (ZstdStream *)zstream; + return zs != NULL ? zs->rx_buffered : 0; +} + static char zstd_name(void) { @@ -411,12 +427,19 @@ zlib_error(ZpqStream *zstream) } static size_t -zlib_buffered(ZpqStream *zstream) +zlib_buffered_tx(ZpqStream *zstream) { ZlibStream* zs = (ZlibStream*)zstream; return zs != NULL ? zs->tx_buffered : 0; } +static size_t +zlib_buffered_rx(ZpqStream *zstream) +{ + ZlibStream *zs = (ZlibStream *)zstream; + return zs != NULL ? zs->rx.avail_in : 0; +} + static char zlib_name(void) { @@ -431,10 +454,10 @@ zlib_name(void) static ZpqAlgorithm const zpq_algorithms[] = { #if HAVE_LIBZSTD - {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered}, + {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered_tx, zstd_buffered_rx}, #endif #if HAVE_LIBZ - {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered}, + {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered_tx, zlib_buffered_rx}, #endif {NULL} }; @@ -478,9 +501,15 @@ zpq_error(ZpqStream *zs) size_t -zpq_buffered(ZpqStream *zs) +zpq_buffered_tx(ZpqStream *zs) +{ + return zs ? zs->algorithm->buffered_tx(zs) : 0; +} + +size_t +zpq_buffered_rx(ZpqStream *zs) { - return zs ? zs->algorithm->buffered(zs) : 0; + return zs ? zs->algorithm->buffered_rx(zs) : 0; } /* diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h index c7f079311b1..b75e7ac407c 100644 --- a/src/include/common/zpq_stream.h +++ b/src/include/common/zpq_stream.h @@ -23,7 +23,8 @@ ZpqStream* zpq_create(int impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void* ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size); ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed); char const* zpq_error(ZpqStream* zs); -size_t zpq_buffered(ZpqStream* zs); +size_t zpq_buffered_tx(ZpqStream* zs); +size_t zpq_buffered_rx(ZpqStream* zs); void zpq_free(ZpqStream* zs); void zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS]); diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 56e8468150c..4f39f66321e 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -899,7 +899,7 @@ pqSendSome(PGconn *conn, int len) } /* while there's still data to send */ - while (len > 0 || zpq_buffered(conn->zstream)) + while (len > 0 || zpq_buffered_tx(conn->zstream)) { int sent; size_t processed = 0; @@ -972,7 +972,7 @@ pqSendSome(PGconn *conn, int len) remaining -= sent; } - if (len > 0 || sent < 0 || zpq_buffered(conn->zstream)) + if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream)) { /* * We didn't send it all, wait till we can send more.