Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/backend/libpq/pqcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,7 @@ socket_flush_if_writable(void)
int res;

/* Quick exit if nothing to do */
if (PqSendPointer == PqSendStart)
if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0))
return 0;

/* No-op if reentrant call */
Expand All @@ -1587,7 +1587,7 @@ socket_flush_if_writable(void)
static bool
socket_is_send_pending(void)
{
return (PqSendStart < PqSendPointer);
return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0));
}

/* --------------------------------
Expand Down
14 changes: 9 additions & 5 deletions src/common/zpq_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ zstd_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
zs->tx_total_raw += in_buf.pos;
return rc;
}
} while (zs->tx.pos == 0 && (in_buf.pos < size || zs->tx_not_flushed)); /* repeat sending data until first partial write */
/* repeat sending while there is some data in input or internal zstd buffer */
} while (in_buf.pos < size || zs->tx_not_flushed);

zs->tx_total_raw += in_buf.pos;
zs->tx_buffered = zs->tx.pos;
Expand Down Expand Up @@ -282,7 +283,7 @@ typedef struct ZlibStream
zpq_tx_func tx_func;
zpq_rx_func rx_func;
void* arg;

unsigned tx_deflate_pending;
size_t tx_buffered;

Bytef tx_buf[ZLIB_BUFFER_SIZE];
Expand All @@ -309,6 +310,7 @@ zlib_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data,
memset(&zs->rx, 0, sizeof(zs->tx));
zs->rx.next_in = zs->rx_buf;
zs->rx.avail_in = ZLIB_BUFFER_SIZE;
zs->tx_deflate_pending = 0;
rc = inflateInit(&zs->rx);
if (rc != Z_OK)
{
Expand Down Expand Up @@ -383,10 +385,11 @@ zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
{
zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */

if (zs->tx.avail_in != 0) /* Has something in input buffer */
if (zs->tx.avail_in != 0 || (zs->tx_deflate_pending > 0)) /* Has something in input or deflate buffer */
{
rc = deflate(&zs->tx, Z_SYNC_FLUSH);
Assert(rc == Z_OK);
deflatePending(&zs->tx, &zs->tx_deflate_pending, Z_NULL); /* check if any data left in deflate buffer */
zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */
}
}
Expand All @@ -402,7 +405,8 @@ zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
return rc;
}
} while (zs->tx.avail_out == ZLIB_BUFFER_SIZE && zs->tx.avail_in != 0); /* repeat sending data until first partial write */
/* repeat sending while there is some data in input or deflate buffer */
} while (zs->tx.avail_in != 0 || zs->tx_deflate_pending > 0);

zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;

Expand Down Expand Up @@ -432,7 +436,7 @@ static size_t
zlib_buffered_tx(ZpqStream *zstream)
{
ZlibStream* zs = (ZlibStream*)zstream;
return zs != NULL ? zs->tx_buffered : 0;
return zs != NULL ? zs->tx_buffered + zs->tx_deflate_pending : 0;
}

static size_t
Expand Down
6 changes: 6 additions & 0 deletions src/interfaces/libpq/fe-connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -2157,6 +2157,12 @@ connectDBComplete(PGconn *conn)
return 1; /* success! */

case PGRES_POLLING_READING:
/* if there is some buffered RX data in ZpqStream
* then don't proceed to pqWaitTimed */
if (zpq_buffered_rx(conn->zstream)) {
break;
}

ret = pqWaitTimed(1, 0, conn, finish_time);
if (ret == -1)
{
Expand Down