From 07557404c0e5b59abbb074e1671e5661acfc6e32 Mon Sep 17 00:00:00 2001 From: usernamedt Date: Wed, 23 Dec 2020 13:21:09 +0500 Subject: [PATCH 1/3] Add max window size limit --- src/common/zpq_stream.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c index 459566228ac..e9a56594b83 100644 --- a/src/common/zpq_stream.c +++ b/src/common/zpq_stream.c @@ -70,6 +70,12 @@ struct ZpqStream #include #define ZSTD_BUFFER_SIZE (8*1024) +/* + * Maximum allowed back-reference distance, expressed as power of 2. + * This setting controls max compressor/decompressor window size. + * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536 + */ +#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */ typedef struct ZstdStream { @@ -100,8 +106,10 @@ zstd_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char zs->tx_stream = ZSTD_createCStream(); ZSTD_initCStream(zs->tx_stream, level); + ZSTD_CCtx_setParameter(zs->tx_stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); zs->rx_stream = ZSTD_createDStream(); ZSTD_initDStream(zs->rx_stream); + ZSTD_DCtx_setParameter(zs->rx_stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); zs->tx.dst = zs->tx_buf; zs->tx.pos = 0; zs->tx.size = ZSTD_BUFFER_SIZE; From 00e4ddc58ec16a718fa9bf70cb6291ed87c1f878 Mon Sep 17 00:00:00 2001 From: usernamedt Date: Tue, 5 Jan 2021 18:17:24 +0500 Subject: [PATCH 2/3] Refactor zpq_stream.c -- Split compression and decompression algorithms Extract common socket read/write logic from algorithm implementations --- src/backend/libpq/pqcomm.c | 6 +- src/common/zpq_stream.c | 761 ++++++++++++++++-------------- src/include/common/zpq_stream.h | 64 ++- src/interfaces/libpq/fe-connect.c | 1 + src/interfaces/libpq/fe-misc.c | 8 +- 5 files changed, 484 insertions(+), 356 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 562244247ac..b94852e5fc8 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -282,7 +282,7 @@ pq_configure(Port* port) if (index >= 0) /* Use compression */ { - PqStream = zpq_create(impl, compression_level, write_compressed, read_compressed, MyProcPort, NULL, 0); + PqStream = zpq_create(impl, compression_level, impl, write_compressed, read_compressed, MyProcPort, NULL, 0); if (!PqStream) { ereport(LOG, @@ -1072,7 +1072,7 @@ pq_recvbuf(bool nowait) { if (r == ZPQ_DECOMPRESS_ERROR) { - char const* msg = zpq_error(PqStream); + char const* msg = zpq_decompress_error(PqStream); if (msg == NULL) msg = "end of stream"; ereport(COMMERROR, @@ -2096,7 +2096,7 @@ PG_FUNCTION_INFO_V1(pg_compression_algorithm); Datum pg_compression_algorithm(PG_FUNCTION_ARGS) { - char const* algorithm_name = PqStream ? zpq_algorithm_name(PqStream) : NULL; + char const* algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL; if (algorithm_name) PG_RETURN_TEXT_P(cstring_to_text(algorithm_name)); else diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c index e9a56594b83..fc18906e921 100644 --- a/src/common/zpq_stream.c +++ b/src/common/zpq_stream.c @@ -14,54 +14,106 @@ typedef struct char const* (*name)(void); /* - * Create compression stream with using rx/tx function for fetching/sending compressed data. + * Create new compression stream. * level: compression level - * tx_func: function for writing compressed data in underlying stream - * rx_func: function for receiving compressed data from underlying stream - * arg: context passed to the function - * rx_data: received data (compressed data already fetched from input stream) - * rx_data_size: size of data fetched from input stream */ - ZpqStream* (*create)(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size); + void* (*create_compressor)(int level); - /* - * Read up to "size" raw (decompressed) bytes. - * Returns number of decompressed bytes or error code. - * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function. - */ - ssize_t (*read)(ZpqStream *zs, void *buf, size_t size); + /* + * Create new decompression stream. + */ + void* (*create_decompressor)(); /* - * Write up to "size" raw (decompressed) bytes. - * Returns number of written raw bytes or error code returned by tx function. - * In the last case amount of written raw bytes is stored in *processed. + * Decompress up to "src_size" compressed bytes from *src and write up to + * "dst_size" raw (decompressed) bytes to *dst. + * Number of decompressed bytes written to *dst is stored in *dst_processed. + * Number of compressed bytes read from *src is stored in *src_processed. + * + * Return codes: + * ZPQ_OK if no errors were encountered during decompression attempt. + * This return code does not guarantee that *src_processed > 0 or *dst_processed > 0. + * + * ZPQ_DATA_PENDING means that there might be some data left within + * decompressor internal buffers. + * + * ZPQ_STREAM_END if encountered end of compressed data stream. + * + * ZPQ_DECOMPRESS_ERROR if encountered an error during decompression attempt. */ - ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed); + ssize_t (*decompress)(void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); /* - * Free stream created by create function. + * Compress up to "src_size" raw (non-compressed) bytes from *src and write up to + * "dst_size" compressed bytes to *dst. + * Number of compressed bytes written to *dst is stored in *dst_processed. + * Number of non-compressed bytes read from *src is stored in *src_processed. + * + * Return codes: + * ZPQ_OK if no errors were encountered during compression attempt. + * This return code does not guarantee that *src_processed > 0 or *dst_processed > 0. + * + * ZPQ_DATA_PENDING means that there might be some data left within + * compressor internal buffers. + * + * ZPQ_COMPRESS_ERROR if encountered an error during compression attempt. */ - void (*free)(ZpqStream *zs); + ssize_t (*compress)(void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); /* - * Get error message. + * Free compression stream created by create_compressor function. */ - char const* (*error)(ZpqStream *zs); + void (*free_compressor)(void *cs); - /* - * Returns amount of data in internal tx decompression buffer. - */ - size_t (*buffered_tx)(ZpqStream *zs); + /* + * Free decompression stream created by create_decompressor function. + */ + void (*free_decompressor)(void *ds); /* - * Returns amount of data in internal rx compression buffer. + * Get compressor error message. */ - size_t (*buffered_rx)(ZpqStream *zs); + char const* (*compress_error)(void *cs); + + /* + * Get decompressor error message. + */ + char const* (*decompress_error)(void *ds); } ZpqAlgorithm; + +#define ZPQ_BUFFER_SIZE 8192 /* We have to flush stream after each protocol command + * and command is mostly limited by record length, + * which in turn usually less than page size (except TOAST) + */ + struct ZpqStream { - ZpqAlgorithm const* algorithm; + ZpqAlgorithm const* c_algorithm; + void* c_stream; + + ZpqAlgorithm const* d_algorithm; + void* d_stream; + + char tx_buf[ZPQ_BUFFER_SIZE]; + size_t tx_pos; + size_t tx_size; + + char rx_buf[ZPQ_BUFFER_SIZE]; + size_t rx_pos; + size_t rx_size; + + zpq_tx_func tx_func; + zpq_rx_func rx_func; + void* arg; + + size_t tx_total; + size_t tx_total_raw; + size_t rx_total; + size_t rx_total_raw; + + bool rx_not_flushed; + bool tx_not_flushed; }; #if HAVE_LIBZSTD @@ -69,7 +121,6 @@ struct ZpqStream #include #include -#define ZSTD_BUFFER_SIZE (8*1024) /* * Maximum allowed back-reference distance, expressed as power of 2. * This setting controls max compressor/decompressor window size. @@ -77,185 +128,142 @@ struct ZpqStream */ #define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */ -typedef struct ZstdStream -{ - ZpqStream common; - ZSTD_CStream* tx_stream; - ZSTD_DStream* rx_stream; - ZSTD_outBuffer tx; - ZSTD_inBuffer rx; - size_t tx_not_flushed; /* Amount of data in internal zstd buffer */ - size_t tx_buffered; /* Data which is consumed by ztd_write but not yet sent */ - size_t rx_buffered; /* Data which is needed for ztd_read */ - zpq_tx_func tx_func; - zpq_rx_func rx_func; - void* arg; - char const* rx_error; /* Decompress error message */ - size_t tx_total; - size_t tx_total_raw; - size_t rx_total; - size_t rx_total_raw; - char tx_buf[ZSTD_BUFFER_SIZE]; - char rx_buf[ZSTD_BUFFER_SIZE]; -} ZstdStream; - -static ZpqStream* -zstd_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size) -{ - ZstdStream* zs = (ZstdStream*)malloc(sizeof(ZstdStream)); - - zs->tx_stream = ZSTD_createCStream(); - ZSTD_initCStream(zs->tx_stream, level); - ZSTD_CCtx_setParameter(zs->tx_stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); - zs->rx_stream = ZSTD_createDStream(); - ZSTD_initDStream(zs->rx_stream); - ZSTD_DCtx_setParameter(zs->rx_stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); - zs->tx.dst = zs->tx_buf; - zs->tx.pos = 0; - zs->tx.size = ZSTD_BUFFER_SIZE; - zs->rx.src = zs->rx_buf; - zs->rx.pos = 0; - zs->rx.size = 0; - zs->rx_func = rx_func; - zs->tx_func = tx_func; - zs->tx_buffered = 0; - zs->rx_buffered = 0; - zs->tx_not_flushed = 0; - zs->rx_error = NULL; - zs->arg = arg; - zs->tx_total = zs->tx_total_raw = 0; - zs->rx_total = zs->rx_total_raw = 0; - zs->rx.size = rx_data_size; - Assert(rx_data_size < ZSTD_BUFFER_SIZE); - memcpy(zs->rx_buf, rx_data, rx_data_size); - - return (ZpqStream*)zs; +typedef struct ZPQ_ZSTD_CStream +{ + ZSTD_CStream* stream; + char const* error; /* error message */ +} ZPQ_ZSTD_CStream; + +typedef struct ZPQ_ZSTD_DStream +{ + ZSTD_DStream* stream; + char const* error; /* error message */ +} ZPQ_ZSTD_DStream; + +static void* +zstd_create_compressor(int level) +{ + ZPQ_ZSTD_CStream* c_stream = (ZPQ_ZSTD_CStream*)malloc(sizeof(ZPQ_ZSTD_CStream)); + + c_stream->stream = ZSTD_createCStream(); + ZSTD_initCStream(c_stream->stream, level); + ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); + c_stream->error = NULL; + return c_stream; +} + +static void* +zstd_create_decompressor() +{ + ZPQ_ZSTD_DStream* d_stream = (ZPQ_ZSTD_DStream*)malloc(sizeof(ZPQ_ZSTD_DStream)); + + d_stream->stream = ZSTD_createDStream(); + ZSTD_initDStream(d_stream->stream); + ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); + d_stream->error = NULL; + return d_stream; } static ssize_t -zstd_read(ZpqStream *zstream, void *buf, size_t size) +zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZstdStream* zs = (ZstdStream*)zstream; - ssize_t rc; + ZPQ_ZSTD_DStream* ds = (ZPQ_ZSTD_DStream*)d_stream; + ZSTD_inBuffer in; + in.src = src; + in.pos = 0; + in.size = src_size; ZSTD_outBuffer out; - out.dst = buf; + out.dst = dst; out.pos = 0; - out.size = size; - - while (1) - { - if (zs->rx.pos != zs->rx.size || zs->rx_buffered == 0) - { - rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx); - if (ZSTD_isError(rc)) - { - zs->rx_error = ZSTD_getErrorName(rc); - return ZPQ_DECOMPRESS_ERROR; - } - /* Return result if we fill requested amount of bytes or read operation was performed */ - if (out.pos != 0) - { - zs->rx_total_raw += out.pos; - zs->rx_buffered = 0; - return out.pos; - } - zs->rx_buffered = rc; - if (zs->rx.pos == zs->rx.size) - { - zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */ - } - } - rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZSTD_BUFFER_SIZE - zs->rx.size); - if (rc > 0) /* read fetches some data */ - { - zs->rx.size += rc; - zs->rx_total += rc; - } - else /* read failed */ - { - zs->rx_total_raw += out.pos; - return rc; - } - } + out.size = dst_size; + + size_t rc = ZSTD_decompressStream(ds->stream, &out, &in); + *src_processed = in.pos; + *dst_processed = out.pos; + if (ZSTD_isError(rc)) + { + ds->error = ZSTD_getErrorName(rc); + return ZPQ_DECOMPRESS_ERROR; + } + + if (out.pos == out.size) { + /* if `output.pos == output.size`, there might be some data left within internal buffers */ + return ZPQ_DATA_PENDING; + } + return ZPQ_OK; } static ssize_t -zstd_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed) +zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZstdStream* zs = (ZstdStream*)zstream; - ssize_t rc; - ZSTD_inBuffer in_buf; - in_buf.src = buf; - in_buf.pos = 0; - in_buf.size = size; - - do - { - if (zs->tx.pos == 0) /* Compress buffer is empty */ - { - zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */ - - if (in_buf.pos < size) /* Has something to compress in input buffer */ - ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf); - - if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */ - { - zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx); - } - } - rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos); - if (rc > 0) - { - zs->tx.pos -= rc; - zs->tx.dst = (char*)zs->tx.dst + rc; - zs->tx_total += rc; - } - else - { - *processed = in_buf.pos; - zs->tx_buffered = zs->tx.pos; - zs->tx_total_raw += in_buf.pos; - return rc; - } - /* repeat sending while there is some data in input or internal zstd buffer */ - } while (in_buf.pos < size || zs->tx_not_flushed); - - zs->tx_total_raw += in_buf.pos; - zs->tx_buffered = zs->tx.pos; - return in_buf.pos; + ZPQ_ZSTD_CStream* cs = (ZPQ_ZSTD_CStream*)c_stream; + ZSTD_inBuffer in; + in.src = src; + in.pos = 0; + in.size = src_size; + ZSTD_outBuffer out; + out.dst = dst; + out.pos = 0; + out.size = dst_size; + + if (in.pos < src_size) /* Has something to compress in input buffer */ + { + size_t rc = ZSTD_compressStream(cs->stream, &out, &in); + *dst_processed = out.pos; + *src_processed = in.pos; + if (ZSTD_isError(rc)) + { + cs->error = ZSTD_getErrorName(rc); + return ZPQ_COMPRESS_ERROR; + } + } + + if (in.pos == src_size) /* All data is compressed: flush internal zstd buffer */ + { + size_t tx_not_flushed = ZSTD_flushStream(cs->stream, &out); + *dst_processed = out.pos; + if (tx_not_flushed > 0) { + return ZPQ_DATA_PENDING; + } + } + + return ZPQ_OK; } static void -zstd_free(ZpqStream *zstream) +zstd_free_compressor(void *c_stream) { - ZstdStream* zs = (ZstdStream*)zstream; - if (zs != NULL) + ZPQ_ZSTD_CStream* cs = (ZPQ_ZSTD_CStream*)c_stream; + if (cs != NULL) { - ZSTD_freeCStream(zs->tx_stream); - ZSTD_freeDStream(zs->rx_stream); - free(zs); + ZSTD_freeCStream(cs->stream); + free(cs); } } -static char const* -zstd_error(ZpqStream *zstream) +static void +zstd_free_decompressor(void *d_stream) { - ZstdStream* zs = (ZstdStream*)zstream; - return zs->rx_error; + ZPQ_ZSTD_DStream* ds = (ZPQ_ZSTD_DStream*)d_stream; + if (ds != NULL) + { + ZSTD_freeDStream(ds->stream); + free(ds); + } } -static size_t -zstd_buffered_tx(ZpqStream *zstream) +static char const* +zstd_compress_error(void *c_stream) { - ZstdStream* zs = (ZstdStream*)zstream; - return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0; + ZPQ_ZSTD_CStream* cs = (ZPQ_ZSTD_CStream*)c_stream; + return cs->error; } -static size_t -zstd_buffered_rx(ZpqStream *zstream) +static char const* +zstd_decompress_error(void *d_stream) { - ZstdStream* zs = (ZstdStream*)zstream; - return zs != NULL ? zs->rx.size - zs->rx.pos : 0; + ZPQ_ZSTD_DStream* ds = (ZPQ_ZSTD_DStream*)d_stream; + return ds->error; } static char const* @@ -271,182 +279,112 @@ zstd_name(void) #include #include -#define ZLIB_BUFFER_SIZE 8192 /* We have to flush stream after each protocol command - * and command is mostly limited by record length, - * which in turn usually less than page size (except TOAST) - */ -typedef struct ZlibStream -{ - ZpqStream common; - - z_stream tx; - z_stream rx; - - zpq_tx_func tx_func; - zpq_rx_func rx_func; - void* arg; - unsigned tx_deflate_pending; - size_t tx_buffered; - - Bytef tx_buf[ZLIB_BUFFER_SIZE]; - Bytef rx_buf[ZLIB_BUFFER_SIZE]; -} ZlibStream; - -static ZpqStream* -zlib_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size) +static void* +zlib_create_compressor(int level) { int rc; - ZlibStream* zs = (ZlibStream*)malloc(sizeof(ZlibStream)); - memset(&zs->tx, 0, sizeof(zs->tx)); - zs->tx.next_out = zs->tx_buf; - zs->tx.avail_out = ZLIB_BUFFER_SIZE; - zs->tx_buffered = 0; - rc = deflateInit(&zs->tx, level); - if (rc != Z_OK) - { - free(zs); - return NULL; - } - Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZLIB_BUFFER_SIZE); - - memset(&zs->rx, 0, sizeof(zs->tx)); - zs->rx.next_in = zs->rx_buf; - zs->rx.avail_in = ZLIB_BUFFER_SIZE; - zs->tx_deflate_pending = 0; - rc = inflateInit(&zs->rx); + z_stream* c_stream = (z_stream*)malloc(sizeof(z_stream)); + memset(c_stream, 0, sizeof(*c_stream)); + rc = deflateInit(c_stream, level); if (rc != Z_OK) { - free(zs); + free(c_stream); return NULL; } - Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZLIB_BUFFER_SIZE); - - zs->rx.avail_in = rx_data_size; - Assert(rx_data_size < ZLIB_BUFFER_SIZE); - memcpy(zs->rx_buf, rx_data, rx_data_size); - - zs->rx_func = rx_func; - zs->tx_func = tx_func; - zs->arg = arg; + return c_stream; +} - return (ZpqStream*)zs; +static void* +zlib_create_decompressor() +{ + int rc; + z_stream* d_stream = (z_stream*)malloc(sizeof(z_stream)); + memset(d_stream, 0, sizeof(*d_stream)); + rc = inflateInit(d_stream); + if (rc != Z_OK) + { + free(d_stream); + return NULL; + } + return d_stream; } static ssize_t -zlib_read(ZpqStream *zstream, void *buf, size_t size) +zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZlibStream* zs = (ZlibStream*)zstream; + z_stream* ds = (z_stream*)d_stream; int rc; - zs->rx.next_out = (Bytef *)buf; - zs->rx.avail_out = size; - - while (1) - { - if (zs->rx.avail_in != 0) /* If there is some data in receiver buffer, then decompress it */ - { - rc = inflate(&zs->rx, Z_SYNC_FLUSH); - if (rc != Z_OK && rc != Z_BUF_ERROR) - { - return ZPQ_DECOMPRESS_ERROR; - } - if (zs->rx.avail_out != size) - { - return size - zs->rx.avail_out; - } - if (zs->rx.avail_in == 0) - { - zs->rx.next_in = zs->rx_buf; - } - } - else - { - zs->rx.next_in = zs->rx_buf; - } - rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, zs->rx_buf + ZLIB_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in); - if (rc > 0) - { - zs->rx.avail_in += rc; - } - else - { - return rc; - } - } + ds->next_in = (Bytef *)src; + ds->avail_in = src_size; + ds->next_out = (Bytef *)dst; + ds->avail_out = dst_size; + + rc = inflate(ds, Z_SYNC_FLUSH); + *src_processed = src_size - ds->avail_in; + *dst_processed = dst_size - ds->avail_out; + + if (rc == Z_STREAM_END) { + return ZPQ_STREAM_END; + } + if (rc != Z_OK && rc != Z_BUF_ERROR) + { + return ZPQ_DECOMPRESS_ERROR; + } + + return ZPQ_OK; } static ssize_t -zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed) +zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZlibStream* zs = (ZlibStream*)zstream; + z_stream* cs = (z_stream*)c_stream; int rc; - zs->tx.next_in = (Bytef *)buf; - zs->tx.avail_in = size; - do - { - if (zs->tx.avail_out == ZLIB_BUFFER_SIZE) /* Compress buffer is empty */ - { - zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */ - - if (zs->tx.avail_in != 0 || (zs->tx_deflate_pending > 0)) /* Has something in input or deflate buffer */ - { - rc = deflate(&zs->tx, Z_SYNC_FLUSH); - Assert(rc == Z_OK); - deflatePending(&zs->tx, &zs->tx_deflate_pending, Z_NULL); /* check if any data left in deflate buffer */ - zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */ - } - } - rc = zs->tx_func(zs->arg, zs->tx.next_out, ZLIB_BUFFER_SIZE - zs->tx.avail_out); - if (rc > 0) - { - zs->tx.next_out += rc; - zs->tx.avail_out += rc; - } - else - { - *processed = size - zs->tx.avail_in; - zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out; - return rc; - } - /* repeat sending while there is some data in input or deflate buffer */ - } while (zs->tx.avail_in != 0 || zs->tx_deflate_pending > 0); - - zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out; - - return size - zs->tx.avail_in; + cs->next_out = (Bytef *)dst; + cs->avail_out = dst_size; + cs->next_in = (Bytef *)src; + cs->avail_in = src_size; + + rc = deflate(cs, Z_SYNC_FLUSH); + Assert(rc == Z_OK); + *dst_processed = dst_size - cs->avail_out; + *src_processed = src_size - cs->avail_in; + + unsigned deflate_pending = 0; + deflatePending(cs, &deflate_pending, Z_NULL); /* check if any data left in deflate buffer */ + if (deflate_pending > 0) { + return ZPQ_DATA_PENDING; + } + return ZPQ_OK; } static void -zlib_free(ZpqStream *zstream) +zlib_free_compressor(void *c_stream) { - ZlibStream* zs = (ZlibStream*)zstream; - if (zs != NULL) + z_stream* cs = (z_stream*)c_stream; + if (cs != NULL) { - inflateEnd(&zs->rx); - deflateEnd(&zs->tx); - free(zs); + deflateEnd(cs); + free(cs); } } -static char const* -zlib_error(ZpqStream *zstream) -{ - ZlibStream* zs = (ZlibStream*)zstream; - return zs->rx.msg; -} - -static size_t -zlib_buffered_tx(ZpqStream *zstream) +static void +zlib_free_decompressor(void *d_stream) { - ZlibStream* zs = (ZlibStream*)zstream; - return zs != NULL ? zs->tx_buffered + zs->tx_deflate_pending : 0; + z_stream* ds = (z_stream*)d_stream; + if (ds != NULL) + { + inflateEnd(ds); + free(ds); + } } -static size_t -zlib_buffered_rx(ZpqStream *zstream) +static char const* +zlib_error(void *stream) { - ZlibStream* zs = (ZlibStream*)zstream; - return zs != NULL ? zs->rx.avail_in : 0; + z_stream* zs = (z_stream*)stream; + return zs->msg; } static char const* @@ -469,10 +407,10 @@ no_compression_name(void) static ZpqAlgorithm const zpq_algorithms[] = { #if HAVE_LIBZSTD - {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered_tx, zstd_buffered_rx}, + {zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error}, #endif #if HAVE_LIBZ - {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered_tx, zlib_buffered_rx}, + {zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error}, #endif {no_compression_name} }; @@ -481,50 +419,175 @@ static ZpqAlgorithm const zpq_algorithms[] = * Index of used compression algorithm in zpq_algorithms array. */ ZpqStream* -zpq_create(int algorithm_impl, int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size) +zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size) { - ZpqStream* stream = zpq_algorithms[algorithm_impl].create(level, tx_func, rx_func, arg, rx_data, rx_data_size); - if (stream) - stream->algorithm = &zpq_algorithms[algorithm_impl]; - return stream; + ZpqStream* zs = (ZpqStream*)malloc(sizeof(ZpqStream)); + zs->tx_pos = 0; + zs->tx_size = 0; + zs->rx_pos = 0; + zs->rx_size = 0; + zs->tx_func = tx_func; + zs->rx_func = rx_func; + zs->arg = arg; + zs->tx_total = 0; + zs->tx_total_raw = 0; + zs->rx_total = 0; + zs->rx_total_raw = 0; + zs->tx_not_flushed = false; + zs->rx_not_flushed = false; + + zs->rx_size = rx_data_size; + Assert(rx_data_size < ZPQ_BUFFER_SIZE); + memcpy(zs->rx_buf, rx_data, rx_data_size); + + zs->c_algorithm = &zpq_algorithms[c_alg_impl]; + zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level); + if (zs->c_stream == NULL) { + free(zs); + return NULL; + } + zs->d_algorithm = &zpq_algorithms[d_alg_impl]; + zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor(); + if (zs->d_stream == NULL) { + free(zs); + return NULL; + } + + return zs; } ssize_t zpq_read(ZpqStream *zs, void *buf, size_t size) { - return zs->algorithm->read(zs, buf, size); + size_t buf_pos = 0; + + while (buf_pos == 0) { /* Read until some data fetched */ + if (zs->rx_pos == zs->rx_size) { + zs->rx_pos = zs->rx_size = 0; /* Reset rx buffer */ + } + + if (zs->rx_pos == zs->rx_size && !zs->rx_not_flushed) { + ssize_t rc = zs->rx_func(zs->arg, (char*)zs->rx_buf + zs->rx_size, ZPQ_BUFFER_SIZE - zs->rx_size); + if (rc > 0) /* read fetches some data */ + { + zs->rx_size += rc; + zs->rx_total += rc; + } + else /* read failed */ + { + return rc; + } + } + + Assert(zs->rx_pos <= zs->rx_size); + size_t rx_processed = 0; + size_t buf_processed = 0; + ssize_t rc = zs->d_algorithm->decompress(zs->d_stream, + (char*)zs->rx_buf + zs->rx_pos, zs->rx_size - zs->rx_pos, &rx_processed, + buf, size, &buf_processed); + zs->rx_pos += rx_processed; + zs->rx_total_raw += rx_processed; + buf_pos += buf_processed; + zs->rx_not_flushed = false; + if (rc == ZPQ_STREAM_END) { + break; + } + if (rc == ZPQ_DATA_PENDING) { + zs->rx_not_flushed = true; + continue; + } + if (rc != ZPQ_OK) { + return ZPQ_DECOMPRESS_ERROR; + } + } + return buf_pos; } ssize_t zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t* processed) { - return zs->algorithm->write(zs, buf, size, processed); + size_t buf_pos = 0; + do + { + if (zs->tx_pos == zs->tx_size) /* Have nothing to send */ + { + zs->tx_pos = zs->tx_size = 0; /* Reset pointer to the beginning of buffer */ + + size_t tx_processed = 0; + size_t buf_processed = 0; + ssize_t rc = zs->c_algorithm->compress(zs->c_stream, + (char*)buf + buf_pos, size - buf_pos, &buf_processed, + (char*)zs->tx_buf + zs->tx_size, ZPQ_BUFFER_SIZE - zs->tx_size, &tx_processed); + + zs->tx_size += tx_processed; + buf_pos += buf_processed; + zs->tx_total_raw += buf_processed; + zs->tx_not_flushed = false; + + if (rc == ZPQ_DATA_PENDING) { + zs->tx_not_flushed = true; + continue; + } + if (rc != ZPQ_OK) { + *processed = buf_pos; + return ZPQ_COMPRESS_ERROR; + } + } + while(zs->tx_pos < zs->tx_size) { + ssize_t rc = zs->tx_func(zs->arg, (char*)zs->tx_buf + zs->tx_pos, zs->tx_size - zs->tx_pos); + if (rc > 0) + { + zs->tx_pos += rc; + zs->tx_total += rc; + } + else + { + *processed = buf_pos; + return rc; + } + } + /* repeat sending while there is some data in input or internal compression buffer */ + } while (buf_pos < size || zs->tx_not_flushed); + + return buf_pos; } void zpq_free(ZpqStream *zs) { - if (zs) - zs->algorithm->free(zs); + if (zs) { + if (zs->c_stream) { + zs->c_algorithm->free_compressor(zs->c_stream); + } + if (zs->d_stream) { + zs->d_algorithm->free_decompressor(zs->d_stream); + } + free(zs); + } } char const* -zpq_error(ZpqStream *zs) +zpq_compress_error(ZpqStream *zs) { - return zs->algorithm->error(zs); + return zs->c_algorithm->compress_error(zs->c_stream); } +char const* +zpq_decompress_error(ZpqStream *zs) +{ + return zs->d_algorithm->decompress_error(zs->d_stream); +} size_t zpq_buffered_rx(ZpqStream *zs) { - return zs ? zs->algorithm->buffered_rx(zs) : 0; + return zs ? zs->rx_not_flushed || (zs->rx_size - zs->rx_pos) : 0; } size_t zpq_buffered_tx(ZpqStream *zs) { - return zs ? zs->algorithm->buffered_tx(zs) : 0; + return zs ? zs->tx_not_flushed || (zs->tx_size - zs->tx_pos) : 0; } /* @@ -545,7 +608,13 @@ zpq_get_supported_algorithms(void) } char const* -zpq_algorithm_name(ZpqStream *zs) +zpq_compress_algorithm_name(ZpqStream *zs) { - return zs ? zs->algorithm->name() : NULL; + return zs ? zs->c_algorithm->name() : NULL; } + +char const* +zpq_decompress_algorithm_name(ZpqStream *zs) +{ + return zs ? zs->d_algorithm->name() : NULL; +} \ No newline at end of file diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h index 27aef0aab9b..ad090750ca6 100644 --- a/src/include/common/zpq_stream.h +++ b/src/include/common/zpq_stream.h @@ -8,8 +8,12 @@ #include +#define ZPQ_OK (0) #define ZPQ_IO_ERROR (-1) #define ZPQ_DECOMPRESS_ERROR (-2) +#define ZPQ_COMPRESS_ERROR (-3) +#define ZPQ_STREAM_END (-4) +#define ZPQ_DATA_PENDING (-5) #define ZPQ_DEFAULT_COMPRESSION_LEVEL (1) @@ -19,14 +23,68 @@ typedef struct ZpqStream ZpqStream; typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size); typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size); -ZpqStream* zpq_create(int impl, int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg, char* rx_data, size_t rx_data_size); +/* + * Create compression stream with using rx/tx function for fetching/sending compressed data. + * c_alg_impl: index of chosen compression algorithm + * c_level: compression c_level + * d_alg_impl: index of chosen decompression algorithm + * tx_func: function for writing compressed data in underlying stream + * rx_func: function for receiving compressed data from underlying stream + * arg: context passed to the function + * rx_data: received data (compressed data already fetched from input stream) + * rx_data_size: size of data fetched from input stream + */ +ZpqStream* zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg, char* rx_data, size_t rx_data_size); + +/* + * Read up to "size" raw (decompressed) bytes. + * Returns number of decompressed bytes or error code. + * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function. + */ ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size); + +/* + * Write up to "size" raw (decompressed) bytes. + * Returns number of written raw bytes or error code. + * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function. + * In case of an error, amount of written raw bytes is stored in *processed. + */ ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed); -char const* zpq_error(ZpqStream* zs); + +/* + * Get decompressor error message. + */ +char const* zpq_decompress_error(ZpqStream* zs); + +/* + * Get compressor error message. + */ +char const* zpq_compress_error(ZpqStream* zs); + +/* + * Returns an estimated amount of data in internal rx decompression buffer. + */ size_t zpq_buffered_rx(ZpqStream* zs); + +/* + * Returns an estimated amount of data in internal tx compression buffer. + */ size_t zpq_buffered_tx(ZpqStream* zs); + +/* + * Free stream created by zpq_create function. + */ void zpq_free(ZpqStream* zs); -char const* zpq_algorithm_name(ZpqStream* zs); + +/* + * Get the name of chosen compression algorithm. + */ +char const* zpq_compress_algorithm_name(ZpqStream* zs); + +/* + * Get the name of chosen decompression algorithm. + */ +char const* zpq_decompress_algorithm_name(ZpqStream* zs); /* Returns zero terminated array with compression algorithms names diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 74864826e56..f426c00df8a 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -3271,6 +3271,7 @@ PQconnectPoll(PGconn *conn) Assert(!conn->zstream); conn->zstream = zpq_create(conn->compressors[index].impl, conn->compressors[index].level, + conn->compressors[index].impl, (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn, &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor); if (!conn->zstream) diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 4c69aec38e4..c9bae6531df 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -682,8 +682,8 @@ pqReadData(PGconn *conn) if (nread == ZPQ_DECOMPRESS_ERROR) { printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("decompress error: %s\n"), - zpq_error(conn->zstream)); + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zstream)); return -1; } @@ -785,8 +785,8 @@ pqReadData(PGconn *conn) if (nread == ZPQ_DECOMPRESS_ERROR) { printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("decompress error: %s\n"), - zpq_error(conn->zstream)); + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zstream)); return -1; } From 770f37cf450abc0dd6b0ce952969c7f45c214378 Mon Sep 17 00:00:00 2001 From: usernamedt Date: Mon, 11 Jan 2021 15:46:17 +0500 Subject: [PATCH 3/3] Run pgindent --- src/backend/libpq/pqcomm.c | 96 ++-- src/common/zpq_stream.c | 763 +++++++++++++++++-------------- src/include/common/zpq_stream.h | 26 +- src/include/libpq/libpq-be.h | 3 +- src/include/libpq/libpq.h | 2 +- src/include/pgstat.h | 8 +- src/interfaces/libpq/libpq-int.h | 15 +- 7 files changed, 496 insertions(+), 417 deletions(-) diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index b94852e5fc8..7e25757b14a 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -122,7 +122,7 @@ */ int Unix_socket_permissions; char *Unix_socket_group; -bool libpq_compression; +bool libpq_compression; /* Where the Unix socket files are (list of palloc'd strings) */ static List *sock_paths = NIL; @@ -146,7 +146,7 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ static int PqRecvLength; /* End of data available in PqRecvBuffer */ -static ZpqStream* PqStream; +static ZpqStream * PqStream; /* @@ -191,17 +191,21 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods; WaitEventSet *FeBeWaitSet; -static ssize_t write_compressed(void* arg, void const* data, size_t size) +static ssize_t +write_compressed(void *arg, void const *data, size_t size) { - ssize_t rc = secure_write((Port*)arg, (void*)data, size); + ssize_t rc = secure_write((Port *) arg, (void *) data, size); + if (rc > 0) pgstat_report_network_traffic(0, 0, 0, rc); return rc; } -static ssize_t read_compressed(void* arg, void* data, size_t size) +static ssize_t +read_compressed(void *arg, void *data, size_t size) { - ssize_t rc = secure_read((Port*)arg, data, size); + ssize_t rc = secure_read((Port *) arg, data, size); + if (rc > 0) pgstat_report_network_traffic(0, 0, rc, 0); return rc; @@ -214,8 +218,9 @@ static void SendCompressionACK(int algorithm) { StringInfoData buf; + pq_beginmessage(&buf, 'z'); - pq_sendbyte(&buf, (uint8)algorithm); + pq_sendbyte(&buf, (uint8) algorithm); pq_endmessage(&buf); pq_flush(); } @@ -228,20 +233,21 @@ SendCompressionACK(int algorithm) * -------------------------------- */ int -pq_configure(Port* port) +pq_configure(Port *port) { - char* client_compression_algorithms = port->compression_algorithms; + char *client_compression_algorithms = port->compression_algorithms; /* - * If client request compression, it sends list of supported compression algorithms separated by comma. + * If client request compression, it sends list of supported compression + * algorithms separated by comma. */ if (client_compression_algorithms && libpq_compression) { - int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; - int impl = -1; - char** server_compression_algorithms = zpq_get_supported_algorithms(); - int index = -1; - char* protocol_extension = strchr(client_compression_algorithms, ';'); + int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + int impl = -1; + char **server_compression_algorithms = zpq_get_supported_algorithms(); + int index = -1; + char *protocol_extension = strchr(client_compression_algorithms, ';'); /* No protocol extension are currently supported */ if (protocol_extension) @@ -249,18 +255,19 @@ pq_configure(Port* port) for (int i = 0; *client_compression_algorithms; i++) { - char* sep = strchr(client_compression_algorithms, ','); - char* level; + char *sep = strchr(client_compression_algorithms, ','); + char *level; + if (sep != NULL) *sep = '\0'; level = strchr(client_compression_algorithms, ':'); if (level != NULL) { - *level = '\0'; /* compression level is ignored now */ - if (sscanf(level+1, "%d", &compression_level) != 1) + *level = '\0'; /* compression level is ignored now */ + if (sscanf(level + 1, "%d", &compression_level) != 1) ereport(LOG, - (errmsg("Invalid compression level: %s", level+1))); + (errmsg("Invalid compression level: %s", level + 1))); } for (impl = 0; server_compression_algorithms[impl] != NULL; impl++) { @@ -272,15 +279,15 @@ pq_configure(Port* port) } if (sep != NULL) - client_compression_algorithms = sep+1; + client_compression_algorithms = sep + 1; else break; } - SendCompressionAck: +SendCompressionAck: free(server_compression_algorithms); SendCompressionACK(index); - if (index >= 0) /* Use compression */ + if (index >= 0) /* Use compression */ { PqStream = zpq_create(impl, compression_level, impl, write_compressed, read_compressed, MyProcPort, NULL, 0); if (!PqStream) @@ -1032,14 +1039,14 @@ socket_set_nonblocking(bool nonblocking) /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * - * nowait parameter toggles non-blocking mode. + * nowait parameter toggles non-blocking mode. * returns number of read bytes, EOF if trouble * -------------------------------- */ static int pq_recvbuf(bool nowait) { - int r; + int r; if (PqRecvPointer > 0) { @@ -1061,7 +1068,10 @@ pq_recvbuf(bool nowait) /* Can fill buffer from PqRecvLength and upwards */ for (;;) { - /* If streaming compression is enabled then use correspondent compression read function. */ + /* + * If streaming compression is enabled then use correspondent + * compression read function. + */ r = PqStream ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength, PQ_RECV_BUFFER_SIZE - PqRecvLength) @@ -1072,7 +1082,8 @@ pq_recvbuf(bool nowait) { if (r == ZPQ_DECOMPRESS_ERROR) { - char const* msg = zpq_decompress_error(PqStream); + char const *msg = zpq_decompress_error(PqStream); + if (msg == NULL) msg = "end of stream"; ereport(COMMERROR, @@ -1122,7 +1133,8 @@ pq_getbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer++]; @@ -1141,7 +1153,8 @@ pq_peekbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer]; @@ -1187,7 +1200,8 @@ pq_getbytes(char *s, size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1221,7 +1235,8 @@ pq_discardbytes(size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1262,7 +1277,8 @@ pq_getstring(StringInfo s) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } @@ -1513,11 +1529,16 @@ internal_flush(void) char *bufend = PqSendBuffer + PqSendPointer; while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0) - /* has more data to flush or unsent data in internal compression buffer */ + + /* + * has more data to flush or unsent data in internal compression + * buffer + */ { - int r; - size_t processed = 0; - size_t available = bufend - bufptr; + int r; + size_t processed = 0; + size_t available = bufend - bufptr; + r = PqStream ? zpq_write(PqStream, bufptr, available, &processed) : secure_write(MyProcPort, bufptr, available); @@ -2096,7 +2117,8 @@ PG_FUNCTION_INFO_V1(pg_compression_algorithm); Datum pg_compression_algorithm(PG_FUNCTION_ARGS) { - char const* algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL; + char const *algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL; + if (algorithm_name) PG_RETURN_TEXT_P(cstring_to_text(algorithm_name)); else diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c index fc18906e921..dc2aa605bd6 100644 --- a/src/common/zpq_stream.c +++ b/src/common/zpq_stream.c @@ -11,109 +11,110 @@ typedef struct /* * Returns name of compression algorithm. */ - char const* (*name)(void); + char const *(*name) (void); /* - * Create new compression stream. - * level: compression level + * Create new compression stream. level: compression level */ - void* (*create_compressor)(int level); + void *(*create_compressor) (int level); - /* - * Create new decompression stream. - */ - void* (*create_decompressor)(); + /* + * Create new decompression stream. + */ + void *(*create_decompressor) (); /* * Decompress up to "src_size" compressed bytes from *src and write up to - * "dst_size" raw (decompressed) bytes to *dst. - * Number of decompressed bytes written to *dst is stored in *dst_processed. - * Number of compressed bytes read from *src is stored in *src_processed. + * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed + * bytes written to *dst is stored in *dst_processed. Number of compressed + * bytes read from *src is stored in *src_processed. * - * Return codes: - * ZPQ_OK if no errors were encountered during decompression attempt. - * This return code does not guarantee that *src_processed > 0 or *dst_processed > 0. + * Return codes: ZPQ_OK if no errors were encountered during decompression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. * * ZPQ_DATA_PENDING means that there might be some data left within * decompressor internal buffers. * * ZPQ_STREAM_END if encountered end of compressed data stream. * - * ZPQ_DECOMPRESS_ERROR if encountered an error during decompression attempt. + * ZPQ_DECOMPRESS_ERROR if encountered an error during decompression + * attempt. */ - ssize_t (*decompress)(void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + ssize_t (*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); /* - * Compress up to "src_size" raw (non-compressed) bytes from *src and write up to - * "dst_size" compressed bytes to *dst. - * Number of compressed bytes written to *dst is stored in *dst_processed. - * Number of non-compressed bytes read from *src is stored in *src_processed. + * Compress up to "src_size" raw (non-compressed) bytes from *src and + * write up to "dst_size" compressed bytes to *dst. Number of compressed + * bytes written to *dst is stored in *dst_processed. Number of + * non-compressed bytes read from *src is stored in *src_processed. * - * Return codes: - * ZPQ_OK if no errors were encountered during compression attempt. - * This return code does not guarantee that *src_processed > 0 or *dst_processed > 0. + * Return codes: ZPQ_OK if no errors were encountered during compression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. * * ZPQ_DATA_PENDING means that there might be some data left within * compressor internal buffers. * * ZPQ_COMPRESS_ERROR if encountered an error during compression attempt. */ - ssize_t (*compress)(void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); + ssize_t (*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); /* * Free compression stream created by create_compressor function. */ - void (*free_compressor)(void *cs); + void (*free_compressor) (void *cs); - /* - * Free decompression stream created by create_decompressor function. - */ - void (*free_decompressor)(void *ds); + /* + * Free decompression stream created by create_decompressor function. + */ + void (*free_decompressor) (void *ds); /* * Get compressor error message. */ - char const* (*compress_error)(void *cs); + char const *(*compress_error) (void *cs); - /* - * Get decompressor error message. - */ - char const* (*decompress_error)(void *ds); -} ZpqAlgorithm; + /* + * Get decompressor error message. + */ + char const *(*decompress_error) (void *ds); +} ZpqAlgorithm; -#define ZPQ_BUFFER_SIZE 8192 /* We have to flush stream after each protocol command - * and command is mostly limited by record length, - * which in turn usually less than page size (except TOAST) - */ +#define ZPQ_BUFFER_SIZE 8192 /* We have to flush stream after each + * protocol command and command is mostly + * limited by record length, which in turn + * usually less than page size (except + * TOAST) */ struct ZpqStream { - ZpqAlgorithm const* c_algorithm; - void* c_stream; + ZpqAlgorithm const *c_algorithm; + void *c_stream; - ZpqAlgorithm const* d_algorithm; - void* d_stream; + ZpqAlgorithm const *d_algorithm; + void *d_stream; - char tx_buf[ZPQ_BUFFER_SIZE]; - size_t tx_pos; - size_t tx_size; + char tx_buf[ZPQ_BUFFER_SIZE]; + size_t tx_pos; + size_t tx_size; - char rx_buf[ZPQ_BUFFER_SIZE]; - size_t rx_pos; - size_t rx_size; + char rx_buf[ZPQ_BUFFER_SIZE]; + size_t rx_pos; + size_t rx_size; - zpq_tx_func tx_func; - zpq_rx_func rx_func; - void* arg; + zpq_tx_func tx_func; + zpq_rx_func rx_func; + void *arg; - size_t tx_total; - size_t tx_total_raw; - size_t rx_total; - size_t rx_total_raw; + size_t tx_total; + size_t tx_total_raw; + size_t rx_total; + size_t rx_total_raw; - bool rx_not_flushed; - bool tx_not_flushed; + bool rx_not_flushed; + bool tx_not_flushed; }; #if HAVE_LIBZSTD @@ -130,111 +131,125 @@ struct ZpqStream typedef struct ZPQ_ZSTD_CStream { - ZSTD_CStream* stream; - char const* error; /* error message */ -} ZPQ_ZSTD_CStream; + ZSTD_CStream *stream; + char const *error; /* error message */ +} ZPQ_ZSTD_CStream; typedef struct ZPQ_ZSTD_DStream { - ZSTD_DStream* stream; - char const* error; /* error message */ -} ZPQ_ZSTD_DStream; + ZSTD_DStream *stream; + char const *error; /* error message */ +} ZPQ_ZSTD_DStream; -static void* +static void * zstd_create_compressor(int level) { - ZPQ_ZSTD_CStream* c_stream = (ZPQ_ZSTD_CStream*)malloc(sizeof(ZPQ_ZSTD_CStream)); + ZPQ_ZSTD_CStream *c_stream = (ZPQ_ZSTD_CStream *) malloc(sizeof(ZPQ_ZSTD_CStream)); - c_stream->stream = ZSTD_createCStream(); + c_stream->stream = ZSTD_createCStream(); ZSTD_initCStream(c_stream->stream, level); ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); - c_stream->error = NULL; + c_stream->error = NULL; return c_stream; } -static void* +static void * zstd_create_decompressor() { - ZPQ_ZSTD_DStream* d_stream = (ZPQ_ZSTD_DStream*)malloc(sizeof(ZPQ_ZSTD_DStream)); + ZPQ_ZSTD_DStream *d_stream = (ZPQ_ZSTD_DStream *) malloc(sizeof(ZPQ_ZSTD_DStream)); - d_stream->stream = ZSTD_createDStream(); - ZSTD_initDStream(d_stream->stream); - ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); - d_stream->error = NULL; - return d_stream; + d_stream->stream = ZSTD_createDStream(); + ZSTD_initDStream(d_stream->stream); + ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); + d_stream->error = NULL; + return d_stream; } static ssize_t zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZPQ_ZSTD_DStream* ds = (ZPQ_ZSTD_DStream*)d_stream; - ZSTD_inBuffer in; - in.src = src; - in.pos = 0; - in.size = src_size; + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + ZSTD_inBuffer in; + + in.src = src; + in.pos = 0; + in.size = src_size; ZSTD_outBuffer out; + out.dst = dst; out.pos = 0; out.size = dst_size; - size_t rc = ZSTD_decompressStream(ds->stream, &out, &in); - *src_processed = in.pos; - *dst_processed = out.pos; - if (ZSTD_isError(rc)) - { - ds->error = ZSTD_getErrorName(rc); - return ZPQ_DECOMPRESS_ERROR; - } - - if (out.pos == out.size) { - /* if `output.pos == output.size`, there might be some data left within internal buffers */ - return ZPQ_DATA_PENDING; - } - return ZPQ_OK; + size_t rc = ZSTD_decompressStream(ds->stream, &out, &in); + + *src_processed = in.pos; + *dst_processed = out.pos; + if (ZSTD_isError(rc)) + { + ds->error = ZSTD_getErrorName(rc); + return ZPQ_DECOMPRESS_ERROR; + } + + if (out.pos == out.size) + { + /* + * if `output.pos == output.size`, there might be some data left + * within internal buffers + */ + return ZPQ_DATA_PENDING; + } + return ZPQ_OK; } static ssize_t zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZPQ_ZSTD_CStream* cs = (ZPQ_ZSTD_CStream*)c_stream; - ZSTD_inBuffer in; - in.src = src; - in.pos = 0; - in.size = src_size; - ZSTD_outBuffer out; - out.dst = dst; - out.pos = 0; - out.size = dst_size; - - if (in.pos < src_size) /* Has something to compress in input buffer */ - { - size_t rc = ZSTD_compressStream(cs->stream, &out, &in); - *dst_processed = out.pos; - *src_processed = in.pos; - if (ZSTD_isError(rc)) - { - cs->error = ZSTD_getErrorName(rc); - return ZPQ_COMPRESS_ERROR; - } - } - - if (in.pos == src_size) /* All data is compressed: flush internal zstd buffer */ - { - size_t tx_not_flushed = ZSTD_flushStream(cs->stream, &out); - *dst_processed = out.pos; - if (tx_not_flushed > 0) { - return ZPQ_DATA_PENDING; - } - } - - return ZPQ_OK; + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZSTD_inBuffer in; + + in.src = src; + in.pos = 0; + in.size = src_size; + ZSTD_outBuffer out; + + out.dst = dst; + out.pos = 0; + out.size = dst_size; + + if (in.pos < src_size) /* Has something to compress in input buffer */ + { + size_t rc = ZSTD_compressStream(cs->stream, &out, &in); + + *dst_processed = out.pos; + *src_processed = in.pos; + if (ZSTD_isError(rc)) + { + cs->error = ZSTD_getErrorName(rc); + return ZPQ_COMPRESS_ERROR; + } + } + + if (in.pos == src_size) /* All data is compressed: flush internal zstd + * buffer */ + { + size_t tx_not_flushed = ZSTD_flushStream(cs->stream, &out); + + *dst_processed = out.pos; + if (tx_not_flushed > 0) + { + return ZPQ_DATA_PENDING; + } + } + + return ZPQ_OK; } static void zstd_free_compressor(void *c_stream) { - ZPQ_ZSTD_CStream* cs = (ZPQ_ZSTD_CStream*)c_stream; - if (cs != NULL) + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + + if (cs != NULL) { ZSTD_freeCStream(cs->stream); free(cs); @@ -244,29 +259,32 @@ zstd_free_compressor(void *c_stream) static void zstd_free_decompressor(void *d_stream) { - ZPQ_ZSTD_DStream* ds = (ZPQ_ZSTD_DStream*)d_stream; - if (ds != NULL) - { - ZSTD_freeDStream(ds->stream); - free(ds); - } + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + + if (ds != NULL) + { + ZSTD_freeDStream(ds->stream); + free(ds); + } } -static char const* +static char const * zstd_compress_error(void *c_stream) { - ZPQ_ZSTD_CStream* cs = (ZPQ_ZSTD_CStream*)c_stream; - return cs->error; + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + + return cs->error; } -static char const* +static char const * zstd_decompress_error(void *d_stream) { - ZPQ_ZSTD_DStream* ds = (ZPQ_ZSTD_DStream*)d_stream; - return ds->error; + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + + return ds->error; } -static char const* +static char const * zstd_name(void) { return "zstd"; @@ -280,11 +298,12 @@ zstd_name(void) #include -static void* +static void * zlib_create_compressor(int level) { - int rc; - z_stream* c_stream = (z_stream*)malloc(sizeof(z_stream)); + int rc; + z_stream *c_stream = (z_stream *) malloc(sizeof(z_stream)); + memset(c_stream, 0, sizeof(*c_stream)); rc = deflateInit(c_stream, level); if (rc != Z_OK) @@ -295,73 +314,81 @@ zlib_create_compressor(int level) return c_stream; } -static void* +static void * zlib_create_decompressor() { - int rc; - z_stream* d_stream = (z_stream*)malloc(sizeof(z_stream)); - memset(d_stream, 0, sizeof(*d_stream)); - rc = inflateInit(d_stream); - if (rc != Z_OK) - { - free(d_stream); - return NULL; - } - return d_stream; + int rc; + z_stream *d_stream = (z_stream *) malloc(sizeof(z_stream)); + + memset(d_stream, 0, sizeof(*d_stream)); + rc = inflateInit(d_stream); + if (rc != Z_OK) + { + free(d_stream); + return NULL; + } + return d_stream; } static ssize_t zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - z_stream* ds = (z_stream*)d_stream; - int rc; - ds->next_in = (Bytef *)src; - ds->avail_in = src_size; - ds->next_out = (Bytef *)dst; - ds->avail_out = dst_size; - - rc = inflate(ds, Z_SYNC_FLUSH); - *src_processed = src_size - ds->avail_in; - *dst_processed = dst_size - ds->avail_out; - - if (rc == Z_STREAM_END) { - return ZPQ_STREAM_END; - } - if (rc != Z_OK && rc != Z_BUF_ERROR) - { - return ZPQ_DECOMPRESS_ERROR; - } - - return ZPQ_OK; + z_stream *ds = (z_stream *) d_stream; + int rc; + + ds->next_in = (Bytef *) src; + ds->avail_in = src_size; + ds->next_out = (Bytef *) dst; + ds->avail_out = dst_size; + + rc = inflate(ds, Z_SYNC_FLUSH); + *src_processed = src_size - ds->avail_in; + *dst_processed = dst_size - ds->avail_out; + + if (rc == Z_STREAM_END) + { + return ZPQ_STREAM_END; + } + if (rc != Z_OK && rc != Z_BUF_ERROR) + { + return ZPQ_DECOMPRESS_ERROR; + } + + return ZPQ_OK; } static ssize_t zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - z_stream* cs = (z_stream*)c_stream; - int rc; - cs->next_out = (Bytef *)dst; - cs->avail_out = dst_size; - cs->next_in = (Bytef *)src; - cs->avail_in = src_size; - - rc = deflate(cs, Z_SYNC_FLUSH); - Assert(rc == Z_OK); - *dst_processed = dst_size - cs->avail_out; - *src_processed = src_size - cs->avail_in; - - unsigned deflate_pending = 0; - deflatePending(cs, &deflate_pending, Z_NULL); /* check if any data left in deflate buffer */ - if (deflate_pending > 0) { - return ZPQ_DATA_PENDING; - } - return ZPQ_OK; + z_stream *cs = (z_stream *) c_stream; + int rc; + + cs->next_out = (Bytef *) dst; + cs->avail_out = dst_size; + cs->next_in = (Bytef *) src; + cs->avail_in = src_size; + + rc = deflate(cs, Z_SYNC_FLUSH); + Assert(rc == Z_OK); + *dst_processed = dst_size - cs->avail_out; + *src_processed = src_size - cs->avail_in; + + unsigned deflate_pending = 0; + + deflatePending(cs, &deflate_pending, Z_NULL); /* check if any data left + * in deflate buffer */ + if (deflate_pending > 0) + { + return ZPQ_DATA_PENDING; + } + return ZPQ_OK; } static void zlib_free_compressor(void *c_stream) { - z_stream* cs = (z_stream*)c_stream; + z_stream *cs = (z_stream *) c_stream; + if (cs != NULL) { deflateEnd(cs); @@ -372,22 +399,24 @@ zlib_free_compressor(void *c_stream) static void zlib_free_decompressor(void *d_stream) { - z_stream* ds = (z_stream*)d_stream; - if (ds != NULL) - { - inflateEnd(ds); - free(ds); - } + z_stream *ds = (z_stream *) d_stream; + + if (ds != NULL) + { + inflateEnd(ds); + free(ds); + } } -static char const* +static char const * zlib_error(void *stream) { - z_stream* zs = (z_stream*)stream; + z_stream *zs = (z_stream *) stream; + return zs->msg; } -static char const* +static char const * zlib_name(void) { return "zlib"; @@ -395,7 +424,7 @@ zlib_name(void) #endif -static char const* +static char const * no_compression_name(void) { return NULL; @@ -418,174 +447,198 @@ static ZpqAlgorithm const zpq_algorithms[] = /* * Index of used compression algorithm in zpq_algorithms array. */ -ZpqStream* -zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size) -{ - ZpqStream* zs = (ZpqStream*)malloc(sizeof(ZpqStream)); - zs->tx_pos = 0; - zs->tx_size = 0; - zs->rx_pos = 0; - zs->rx_size = 0; - zs->tx_func = tx_func; - zs->rx_func = rx_func; - zs->arg = arg; - zs->tx_total = 0; - zs->tx_total_raw = 0; - zs->rx_total = 0; - zs->rx_total_raw = 0; - zs->tx_not_flushed = false; - zs->rx_not_flushed = false; - - zs->rx_size = rx_data_size; - Assert(rx_data_size < ZPQ_BUFFER_SIZE); - memcpy(zs->rx_buf, rx_data, rx_data_size); - - zs->c_algorithm = &zpq_algorithms[c_alg_impl]; - zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level); - if (zs->c_stream == NULL) { - free(zs); - return NULL; - } - zs->d_algorithm = &zpq_algorithms[d_alg_impl]; - zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor(); - if (zs->d_stream == NULL) { - free(zs); - return NULL; - } +ZpqStream * +zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size) +{ + ZpqStream *zs = (ZpqStream *) malloc(sizeof(ZpqStream)); + + zs->tx_pos = 0; + zs->tx_size = 0; + zs->rx_pos = 0; + zs->rx_size = 0; + zs->tx_func = tx_func; + zs->rx_func = rx_func; + zs->arg = arg; + zs->tx_total = 0; + zs->tx_total_raw = 0; + zs->rx_total = 0; + zs->rx_total_raw = 0; + zs->tx_not_flushed = false; + zs->rx_not_flushed = false; + + zs->rx_size = rx_data_size; + Assert(rx_data_size < ZPQ_BUFFER_SIZE); + memcpy(zs->rx_buf, rx_data, rx_data_size); + + zs->c_algorithm = &zpq_algorithms[c_alg_impl]; + zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level); + if (zs->c_stream == NULL) + { + free(zs); + return NULL; + } + zs->d_algorithm = &zpq_algorithms[d_alg_impl]; + zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor(); + if (zs->d_stream == NULL) + { + free(zs); + return NULL; + } return zs; } ssize_t -zpq_read(ZpqStream *zs, void *buf, size_t size) -{ - size_t buf_pos = 0; - - while (buf_pos == 0) { /* Read until some data fetched */ - if (zs->rx_pos == zs->rx_size) { - zs->rx_pos = zs->rx_size = 0; /* Reset rx buffer */ - } - - if (zs->rx_pos == zs->rx_size && !zs->rx_not_flushed) { - ssize_t rc = zs->rx_func(zs->arg, (char*)zs->rx_buf + zs->rx_size, ZPQ_BUFFER_SIZE - zs->rx_size); - if (rc > 0) /* read fetches some data */ - { - zs->rx_size += rc; - zs->rx_total += rc; - } - else /* read failed */ - { - return rc; - } - } - - Assert(zs->rx_pos <= zs->rx_size); - size_t rx_processed = 0; - size_t buf_processed = 0; - ssize_t rc = zs->d_algorithm->decompress(zs->d_stream, - (char*)zs->rx_buf + zs->rx_pos, zs->rx_size - zs->rx_pos, &rx_processed, - buf, size, &buf_processed); - zs->rx_pos += rx_processed; - zs->rx_total_raw += rx_processed; - buf_pos += buf_processed; - zs->rx_not_flushed = false; - if (rc == ZPQ_STREAM_END) { - break; - } - if (rc == ZPQ_DATA_PENDING) { - zs->rx_not_flushed = true; - continue; - } - if (rc != ZPQ_OK) { - return ZPQ_DECOMPRESS_ERROR; - } - } - return buf_pos; +zpq_read(ZpqStream * zs, void *buf, size_t size) +{ + size_t buf_pos = 0; + + while (buf_pos == 0) + { /* Read until some data fetched */ + if (zs->rx_pos == zs->rx_size) + { + zs->rx_pos = zs->rx_size = 0; /* Reset rx buffer */ + } + + if (zs->rx_pos == zs->rx_size && !zs->rx_not_flushed) + { + ssize_t rc = zs->rx_func(zs->arg, (char *) zs->rx_buf + zs->rx_size, ZPQ_BUFFER_SIZE - zs->rx_size); + + if (rc > 0) /* read fetches some data */ + { + zs->rx_size += rc; + zs->rx_total += rc; + } + else /* read failed */ + { + return rc; + } + } + + Assert(zs->rx_pos <= zs->rx_size); + size_t rx_processed = 0; + size_t buf_processed = 0; + ssize_t rc = zs->d_algorithm->decompress(zs->d_stream, + (char *) zs->rx_buf + zs->rx_pos, zs->rx_size - zs->rx_pos, &rx_processed, + buf, size, &buf_processed); + + zs->rx_pos += rx_processed; + zs->rx_total_raw += rx_processed; + buf_pos += buf_processed; + zs->rx_not_flushed = false; + if (rc == ZPQ_STREAM_END) + { + break; + } + if (rc == ZPQ_DATA_PENDING) + { + zs->rx_not_flushed = true; + continue; + } + if (rc != ZPQ_OK) + { + return ZPQ_DECOMPRESS_ERROR; + } + } + return buf_pos; } ssize_t -zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t* processed) -{ - size_t buf_pos = 0; - do - { - if (zs->tx_pos == zs->tx_size) /* Have nothing to send */ - { - zs->tx_pos = zs->tx_size = 0; /* Reset pointer to the beginning of buffer */ - - size_t tx_processed = 0; - size_t buf_processed = 0; - ssize_t rc = zs->c_algorithm->compress(zs->c_stream, - (char*)buf + buf_pos, size - buf_pos, &buf_processed, - (char*)zs->tx_buf + zs->tx_size, ZPQ_BUFFER_SIZE - zs->tx_size, &tx_processed); - - zs->tx_size += tx_processed; - buf_pos += buf_processed; - zs->tx_total_raw += buf_processed; - zs->tx_not_flushed = false; - - if (rc == ZPQ_DATA_PENDING) { - zs->tx_not_flushed = true; - continue; - } - if (rc != ZPQ_OK) { - *processed = buf_pos; - return ZPQ_COMPRESS_ERROR; - } - } - while(zs->tx_pos < zs->tx_size) { - ssize_t rc = zs->tx_func(zs->arg, (char*)zs->tx_buf + zs->tx_pos, zs->tx_size - zs->tx_pos); - if (rc > 0) - { - zs->tx_pos += rc; - zs->tx_total += rc; - } - else - { - *processed = buf_pos; - return rc; - } - } - /* repeat sending while there is some data in input or internal compression buffer */ - } while (buf_pos < size || zs->tx_not_flushed); - - return buf_pos; +zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed) +{ + size_t buf_pos = 0; + + do + { + if (zs->tx_pos == zs->tx_size) /* Have nothing to send */ + { + zs->tx_pos = zs->tx_size = 0; /* Reset pointer to the beginning + * of buffer */ + + size_t tx_processed = 0; + size_t buf_processed = 0; + ssize_t rc = zs->c_algorithm->compress(zs->c_stream, + (char *) buf + buf_pos, size - buf_pos, &buf_processed, + (char *) zs->tx_buf + zs->tx_size, ZPQ_BUFFER_SIZE - zs->tx_size, &tx_processed); + + zs->tx_size += tx_processed; + buf_pos += buf_processed; + zs->tx_total_raw += buf_processed; + zs->tx_not_flushed = false; + + if (rc == ZPQ_DATA_PENDING) + { + zs->tx_not_flushed = true; + continue; + } + if (rc != ZPQ_OK) + { + *processed = buf_pos; + return ZPQ_COMPRESS_ERROR; + } + } + while (zs->tx_pos < zs->tx_size) + { + ssize_t rc = zs->tx_func(zs->arg, (char *) zs->tx_buf + zs->tx_pos, zs->tx_size - zs->tx_pos); + + if (rc > 0) + { + zs->tx_pos += rc; + zs->tx_total += rc; + } + else + { + *processed = buf_pos; + return rc; + } + } + + /* + * repeat sending while there is some data in input or internal + * compression buffer + */ + } while (buf_pos < size || zs->tx_not_flushed); + + return buf_pos; } void -zpq_free(ZpqStream *zs) -{ - if (zs) { - if (zs->c_stream) { - zs->c_algorithm->free_compressor(zs->c_stream); - } - if (zs->d_stream) { - zs->d_algorithm->free_decompressor(zs->d_stream); - } - free(zs); +zpq_free(ZpqStream * zs) +{ + if (zs) + { + if (zs->c_stream) + { + zs->c_algorithm->free_compressor(zs->c_stream); + } + if (zs->d_stream) + { + zs->d_algorithm->free_decompressor(zs->d_stream); + } + free(zs); } } -char const* -zpq_compress_error(ZpqStream *zs) +char const * +zpq_compress_error(ZpqStream * zs) { return zs->c_algorithm->compress_error(zs->c_stream); } -char const* -zpq_decompress_error(ZpqStream *zs) +char const * +zpq_decompress_error(ZpqStream * zs) { - return zs->d_algorithm->decompress_error(zs->d_stream); + return zs->d_algorithm->decompress_error(zs->d_stream); } size_t -zpq_buffered_rx(ZpqStream *zs) +zpq_buffered_rx(ZpqStream * zs) { return zs ? zs->rx_not_flushed || (zs->rx_size - zs->rx_pos) : 0; } size_t -zpq_buffered_tx(ZpqStream *zs) +zpq_buffered_tx(ZpqStream * zs) { return zs ? zs->tx_not_flushed || (zs->tx_size - zs->tx_pos) : 0; } @@ -593,28 +646,28 @@ zpq_buffered_tx(ZpqStream *zs) /* * Get list of the supported algorithms. */ -char** +char ** zpq_get_supported_algorithms(void) { - size_t n_algorithms = sizeof(zpq_algorithms)/sizeof(*zpq_algorithms); - char** algorithm_names = malloc(n_algorithms*sizeof(char*)); + size_t n_algorithms = sizeof(zpq_algorithms) / sizeof(*zpq_algorithms); + char **algorithm_names = malloc(n_algorithms * sizeof(char *)); for (size_t i = 0; i < n_algorithms; i++) { - algorithm_names[i] = (char*)zpq_algorithms[i].name(); + algorithm_names[i] = (char *) zpq_algorithms[i].name(); } return algorithm_names; } -char const* -zpq_compress_algorithm_name(ZpqStream *zs) +char const * +zpq_compress_algorithm_name(ZpqStream * zs) { return zs ? zs->c_algorithm->name() : NULL; } -char const* -zpq_decompress_algorithm_name(ZpqStream *zs) +char const * +zpq_decompress_algorithm_name(ZpqStream * zs) { - return zs ? zs->d_algorithm->name() : NULL; -} \ No newline at end of file + return zs ? zs->d_algorithm->name() : NULL; +} diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h index ad090750ca6..2d768439390 100644 --- a/src/include/common/zpq_stream.h +++ b/src/include/common/zpq_stream.h @@ -20,8 +20,8 @@ struct ZpqStream; typedef struct ZpqStream ZpqStream; -typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size); -typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size); +typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size); +typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size); /* * Create compression stream with using rx/tx function for fetching/sending compressed data. @@ -34,14 +34,14 @@ typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size); * rx_data: received data (compressed data already fetched from input stream) * rx_data_size: size of data fetched from input stream */ -ZpqStream* zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg, char* rx_data, size_t rx_data_size); +ZpqStream *zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size); /* * Read up to "size" raw (decompressed) bytes. * Returns number of decompressed bytes or error code. * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function. */ -ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size); +ssize_t zpq_read(ZpqStream * zs, void *buf, size_t size); /* * Write up to "size" raw (decompressed) bytes. @@ -49,46 +49,46 @@ ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size); * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function. * In case of an error, amount of written raw bytes is stored in *processed. */ -ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed); +ssize_t zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed); /* * Get decompressor error message. */ -char const* zpq_decompress_error(ZpqStream* zs); +char const *zpq_decompress_error(ZpqStream * zs); /* * Get compressor error message. */ -char const* zpq_compress_error(ZpqStream* zs); +char const *zpq_compress_error(ZpqStream * zs); /* * Returns an estimated amount of data in internal rx decompression buffer. */ -size_t zpq_buffered_rx(ZpqStream* zs); +size_t zpq_buffered_rx(ZpqStream * zs); /* * Returns an estimated amount of data in internal tx compression buffer. */ -size_t zpq_buffered_tx(ZpqStream* zs); +size_t zpq_buffered_tx(ZpqStream * zs); /* * Free stream created by zpq_create function. */ -void zpq_free(ZpqStream* zs); +void zpq_free(ZpqStream * zs); /* * Get the name of chosen compression algorithm. */ -char const* zpq_compress_algorithm_name(ZpqStream* zs); +char const *zpq_compress_algorithm_name(ZpqStream * zs); /* * Get the name of chosen decompression algorithm. */ -char const* zpq_decompress_algorithm_name(ZpqStream* zs); +char const *zpq_decompress_algorithm_name(ZpqStream * zs); /* Returns zero terminated array with compression algorithms names */ -char** zpq_get_supported_algorithms(void); +char **zpq_get_supported_algorithms(void); #endif diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 5289f9f001a..a24bbca1a65 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -170,7 +170,8 @@ typedef struct Port int keepalives_count; int tcp_user_timeout; - char* compression_algorithms; /* Compression algorithms supported by client */ + char *compression_algorithms; /* Compression algorithms supported by + * client */ /* * GSSAPI structures. diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 224ff3d47b8..de24a3f76a3 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -63,7 +63,7 @@ extern void StreamClose(pgsocket sock); extern void TouchSocketFiles(void); extern void RemoveSocketFiles(void); extern void pq_init(void); -extern int pq_configure(Port* port); +extern int pq_configure(Port *port); extern int pq_getbytes(char *s, size_t len); extern int pq_getstring(StringInfo s); extern void pq_startmsgread(void); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index b43efc04993..9b86ee085f8 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1191,10 +1191,10 @@ typedef struct PgBackendStatus char *st_appname; /* client-server traffic information */ - uint64 st_rx_raw_bytes; - uint64 st_tx_raw_bytes; - uint64 st_rx_compressed_bytes; - uint64 st_tx_compressed_bytes; + uint64 st_rx_raw_bytes; + uint64 st_tx_raw_bytes; + uint64 st_rx_compressed_bytes; + uint64 st_tx_compressed_bytes; /* * Current command string; MUST be null-terminated. Note that this string diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 89128df7a59..932666f6717 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -324,9 +324,9 @@ typedef struct pg_conn_host */ typedef struct pg_conn_compressor { - int impl; /* compression implementation index */ - int level; /* compression level */ -} pg_conn_compressor; + int impl; /* compression implementation index */ + int level; /* compression level */ +} pg_conn_compressor; /* * PGconn stores all the state data associated with a single connection @@ -381,8 +381,11 @@ struct pg_conn char *ssl_min_protocol_version; /* minimum TLS protocol version */ char *ssl_max_protocol_version; /* maximum TLS protocol version */ - char *compression; /* stream compression (boolean value, "any" or list of compression algorithms separated by comma) */ - pg_conn_compressor* compressors; /* descriptors of compression algorithms chosen by client */ + char *compression; /* stream compression (boolean value, "any" or + * list of compression algorithms separated by + * comma) */ + pg_conn_compressor *compressors; /* descriptors of compression + * algorithms chosen by client */ /* Type of connection to make. Possible values: any, read-write. */ char *target_session_attrs; @@ -543,7 +546,7 @@ struct pg_conn PQExpBufferData workBuffer; /* expansible string */ /* Compression stream */ - ZpqStream* zstream; + ZpqStream *zstream; }; /* PGcancel stores all data necessary to cancel a connection. A copy of this