From e48923618a5337198657cea9383fe859f0cbf86b Mon Sep 17 00:00:00 2001 From: usernamedt Date: Thu, 19 Nov 2020 12:27:31 +0500 Subject: [PATCH 1/3] Add more buffered_tx and buffered_rx checks --- src/backend/libpq/pqcomm.c | 4 ++-- src/interfaces/libpq/fe-connect.c | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 73d4230aefd..36022a739b2 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -1564,7 +1564,7 @@ socket_flush_if_writable(void) int res; /* Quick exit if nothing to do */ - if (PqSendPointer == PqSendStart) + if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0)) return 0; /* No-op if reentrant call */ @@ -1587,7 +1587,7 @@ socket_flush_if_writable(void) static bool socket_is_send_pending(void) { - return (PqSendStart < PqSendPointer); + return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0)); } /* -------------------------------- diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 5b047677af9..3442087988e 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2157,6 +2157,12 @@ connectDBComplete(PGconn *conn) return 1; /* success! */ case PGRES_POLLING_READING: + /* if there is some buffered RX data in ZpqStream + * then don't proceed to pqWaitTimed */ + if (zpq_buffered_rx(conn->zstream)) { + break; + } + ret = pqWaitTimed(1, 0, conn, finish_time); if (ret == -1) { From 3d7b27d693f6a741aabfaec90db4183748a9a8ab Mon Sep 17 00:00:00 2001 From: usernamedt Date: Thu, 19 Nov 2020 12:30:17 +0500 Subject: [PATCH 2/3] Fix zstd_write -- With zs->tx.pos == 0 check turned on, there could be a situation when socket could not fully consume tx buffer data resulting in partial read so this check would break the loop too soon. --- src/common/zpq_stream.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c index 076b7855f77..af2579ffd8e 100644 --- a/src/common/zpq_stream.c +++ b/src/common/zpq_stream.c @@ -209,7 +209,8 @@ zstd_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed) zs->tx_total_raw += in_buf.pos; return rc; } - } while (zs->tx.pos == 0 && (in_buf.pos < size || zs->tx_not_flushed)); /* repeat sending data until first partial write */ + /* repeat sending while there is some data in input or internal zstd buffer */ + } while (in_buf.pos < size || zs->tx_not_flushed); zs->tx_total_raw += in_buf.pos; zs->tx_buffered = zs->tx.pos; From c8050ff911195a69c9692ae703a4b74644ed91e5 Mon Sep 17 00:00:00 2001 From: usernamedt Date: Thu, 19 Nov 2020 13:00:15 +0500 Subject: [PATCH 3/3] Fix replication hanging due to data left in ZLIB buffer --- src/common/zpq_stream.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c index af2579ffd8e..d1bf01ab79c 100644 --- a/src/common/zpq_stream.c +++ b/src/common/zpq_stream.c @@ -283,7 +283,7 @@ typedef struct ZlibStream zpq_tx_func tx_func; zpq_rx_func rx_func; void* arg; - + unsigned tx_deflate_pending; size_t tx_buffered; Bytef tx_buf[ZLIB_BUFFER_SIZE]; @@ -310,6 +310,7 @@ zlib_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, memset(&zs->rx, 0, sizeof(zs->tx)); zs->rx.next_in = zs->rx_buf; zs->rx.avail_in = ZLIB_BUFFER_SIZE; + zs->tx_deflate_pending = 0; rc = inflateInit(&zs->rx); if (rc != Z_OK) { @@ -384,10 +385,11 @@ zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed) { zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */ - if (zs->tx.avail_in != 0) /* Has something in input buffer */ + if (zs->tx.avail_in != 0 || (zs->tx_deflate_pending > 0)) /* Has something in input or deflate buffer */ { rc = deflate(&zs->tx, Z_SYNC_FLUSH); Assert(rc == Z_OK); + deflatePending(&zs->tx, &zs->tx_deflate_pending, Z_NULL); /* check if any data left in deflate buffer */ zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */ } } @@ -403,7 +405,8 @@ zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed) zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out; return rc; } - } while (zs->tx.avail_out == ZLIB_BUFFER_SIZE && zs->tx.avail_in != 0); /* repeat sending data until first partial write */ + /* repeat sending while there is some data in input or deflate buffer */ + } while (zs->tx.avail_in != 0 || zs->tx_deflate_pending > 0); zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out; @@ -433,7 +436,7 @@ static size_t zlib_buffered_tx(ZpqStream *zstream) { ZlibStream* zs = (ZlibStream*)zstream; - return zs != NULL ? zs->tx_buffered : 0; + return zs != NULL ? zs->tx_buffered + zs->tx_deflate_pending : 0; } static size_t