Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/backend/libpq/pqcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ internal_flush(void)
char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;

while (bufptr < bufend || zpq_buffered(PqStream) != 0)
while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0)
/* has more data to flush or unsent data in internal compression buffer */
{
int r;
Expand Down
45 changes: 37 additions & 8 deletions src/common/zpq_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ typedef struct
/*
* Returns amount of data in internal compression buffer.
*/
size_t (*buffered)(ZpqStream *zs);
size_t (*buffered_tx)(ZpqStream *zs);

/*
* Returns amount of data in internal decompression buffer.
*/
size_t (*buffered_rx)(ZpqStream *zs);
} ZpqAlgorithm;

struct ZpqStream
Expand All @@ -74,7 +79,8 @@ typedef struct ZstdStream
ZSTD_outBuffer tx;
ZSTD_inBuffer rx;
size_t tx_not_flushed; /* Amount of data in internal zstd buffer */
size_t tx_buffered; /* Data which is consumed by ztd_read but not yet sent */
size_t tx_buffered; /* Data which is consumed by zstd_write but not yet sent */
size_t rx_buffered; /* Data which is received by zstd_read but not yet decompressed */
zpq_tx_func tx_func;
zpq_rx_func rx_func;
void* arg;
Expand Down Expand Up @@ -112,6 +118,7 @@ zstd_create(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data,
zs->rx_total = zs->rx_total_raw = 0;

zs->rx.size = rx_data_size;
zs->rx_buffered = rx_data_size;
Assert(rx_data_size < ZSTD_BUFFER_SIZE);
memcpy(zs->rx_buf, rx_data, rx_data_size);

Expand All @@ -136,6 +143,7 @@ zstd_read(ZpqStream *zstream, void *buf, size_t size)
zs->rx_error = ZSTD_getErrorName(rc);
return ZPQ_DECOMPRESS_ERROR;
}
zs->rx_buffered = zs->rx.size - zs->rx.pos;
/* Return result if we fill requested amount of bytes or read operation was performed */
if (out.pos != 0)
{
Expand All @@ -151,6 +159,7 @@ zstd_read(ZpqStream *zstream, void *buf, size_t size)
{
zs->rx.size += rc;
zs->rx_total += rc;
zs->rx_buffered += rc;
}
else /* read failed */
{
Expand Down Expand Up @@ -225,12 +234,19 @@ zstd_error(ZpqStream *zstream)
}

static size_t
zstd_buffered(ZpqStream *zstream)
zstd_buffered_tx(ZpqStream *zstream)
{
ZstdStream* zs = (ZstdStream*)zstream;
return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0;
}

static size_t
zstd_buffered_rx(ZpqStream *zstream)
{
ZstdStream *zs = (ZstdStream *)zstream;
return zs != NULL ? zs->rx_buffered : 0;
}

static char
zstd_name(void)
{
Expand Down Expand Up @@ -411,12 +427,19 @@ zlib_error(ZpqStream *zstream)
}

static size_t
zlib_buffered(ZpqStream *zstream)
zlib_buffered_tx(ZpqStream *zstream)
{
ZlibStream* zs = (ZlibStream*)zstream;
return zs != NULL ? zs->tx_buffered : 0;
}

static size_t
zlib_buffered_rx(ZpqStream *zstream)
{
ZlibStream *zs = (ZlibStream *)zstream;
return zs != NULL ? zs->rx.avail_in : 0;
}

static char
zlib_name(void)
{
Expand All @@ -431,10 +454,10 @@ zlib_name(void)
static ZpqAlgorithm const zpq_algorithms[] =
{
#if HAVE_LIBZSTD
{zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered},
{zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered_tx, zstd_buffered_rx},
#endif
#if HAVE_LIBZ
{zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered},
{zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered_tx, zlib_buffered_rx},
#endif
{NULL}
};
Expand Down Expand Up @@ -478,9 +501,15 @@ zpq_error(ZpqStream *zs)


size_t
zpq_buffered(ZpqStream *zs)
zpq_buffered_tx(ZpqStream *zs)
{
return zs ? zs->algorithm->buffered_tx(zs) : 0;
}

size_t
zpq_buffered_rx(ZpqStream *zs)
{
return zs ? zs->algorithm->buffered(zs) : 0;
return zs ? zs->algorithm->buffered_rx(zs) : 0;
}

/*
Expand Down
3 changes: 2 additions & 1 deletion src/include/common/zpq_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ ZpqStream* zpq_create(int impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void*
ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size);
ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed);
char const* zpq_error(ZpqStream* zs);
size_t zpq_buffered(ZpqStream* zs);
size_t zpq_buffered_tx(ZpqStream* zs);
size_t zpq_buffered_rx(ZpqStream* zs);
void zpq_free(ZpqStream* zs);

void zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS]);
Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/libpq/fe-misc.c
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ pqSendSome(PGconn *conn, int len)
}

/* while there's still data to send */
while (len > 0 || zpq_buffered(conn->zstream))
while (len > 0 || zpq_buffered_tx(conn->zstream))
{
int sent;
size_t processed = 0;
Expand Down Expand Up @@ -972,7 +972,7 @@ pqSendSome(PGconn *conn, int len)
remaining -= sent;
}

if (len > 0 || sent < 0 || zpq_buffered(conn->zstream))
if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream))
{
/*
* We didn't send it all, wait till we can send more.
Expand Down