diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 562244247ac..7e25757b14a 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -122,7 +122,7 @@ */ int Unix_socket_permissions; char *Unix_socket_group; -bool libpq_compression; +bool libpq_compression; /* Where the Unix socket files are (list of palloc'd strings) */ static List *sock_paths = NIL; @@ -146,7 +146,7 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ static int PqRecvLength; /* End of data available in PqRecvBuffer */ -static ZpqStream* PqStream; +static ZpqStream * PqStream; /* @@ -191,17 +191,21 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods; WaitEventSet *FeBeWaitSet; -static ssize_t write_compressed(void* arg, void const* data, size_t size) +static ssize_t +write_compressed(void *arg, void const *data, size_t size) { - ssize_t rc = secure_write((Port*)arg, (void*)data, size); + ssize_t rc = secure_write((Port *) arg, (void *) data, size); + if (rc > 0) pgstat_report_network_traffic(0, 0, 0, rc); return rc; } -static ssize_t read_compressed(void* arg, void* data, size_t size) +static ssize_t +read_compressed(void *arg, void *data, size_t size) { - ssize_t rc = secure_read((Port*)arg, data, size); + ssize_t rc = secure_read((Port *) arg, data, size); + if (rc > 0) pgstat_report_network_traffic(0, 0, rc, 0); return rc; @@ -214,8 +218,9 @@ static void SendCompressionACK(int algorithm) { StringInfoData buf; + pq_beginmessage(&buf, 'z'); - pq_sendbyte(&buf, (uint8)algorithm); + pq_sendbyte(&buf, (uint8) algorithm); pq_endmessage(&buf); pq_flush(); } @@ -228,20 +233,21 @@ SendCompressionACK(int algorithm) * -------------------------------- */ int -pq_configure(Port* port) +pq_configure(Port *port) { - char* client_compression_algorithms = port->compression_algorithms; + char *client_compression_algorithms = port->compression_algorithms; /* - * If client request compression, it sends list of supported compression algorithms separated by comma. + * If client request compression, it sends list of supported compression + * algorithms separated by comma. */ if (client_compression_algorithms && libpq_compression) { - int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; - int impl = -1; - char** server_compression_algorithms = zpq_get_supported_algorithms(); - int index = -1; - char* protocol_extension = strchr(client_compression_algorithms, ';'); + int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL; + int impl = -1; + char **server_compression_algorithms = zpq_get_supported_algorithms(); + int index = -1; + char *protocol_extension = strchr(client_compression_algorithms, ';'); /* No protocol extension are currently supported */ if (protocol_extension) @@ -249,18 +255,19 @@ pq_configure(Port* port) for (int i = 0; *client_compression_algorithms; i++) { - char* sep = strchr(client_compression_algorithms, ','); - char* level; + char *sep = strchr(client_compression_algorithms, ','); + char *level; + if (sep != NULL) *sep = '\0'; level = strchr(client_compression_algorithms, ':'); if (level != NULL) { - *level = '\0'; /* compression level is ignored now */ - if (sscanf(level+1, "%d", &compression_level) != 1) + *level = '\0'; /* compression level is ignored now */ + if (sscanf(level + 1, "%d", &compression_level) != 1) ereport(LOG, - (errmsg("Invalid compression level: %s", level+1))); + (errmsg("Invalid compression level: %s", level + 1))); } for (impl = 0; server_compression_algorithms[impl] != NULL; impl++) { @@ -272,17 +279,17 @@ pq_configure(Port* port) } if (sep != NULL) - client_compression_algorithms = sep+1; + client_compression_algorithms = sep + 1; else break; } - SendCompressionAck: +SendCompressionAck: free(server_compression_algorithms); SendCompressionACK(index); - if (index >= 0) /* Use compression */ + if (index >= 0) /* Use compression */ { - PqStream = zpq_create(impl, compression_level, write_compressed, read_compressed, MyProcPort, NULL, 0); + PqStream = zpq_create(impl, compression_level, impl, write_compressed, read_compressed, MyProcPort, NULL, 0); if (!PqStream) { ereport(LOG, @@ -1032,14 +1039,14 @@ socket_set_nonblocking(bool nonblocking) /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer * - * nowait parameter toggles non-blocking mode. + * nowait parameter toggles non-blocking mode. * returns number of read bytes, EOF if trouble * -------------------------------- */ static int pq_recvbuf(bool nowait) { - int r; + int r; if (PqRecvPointer > 0) { @@ -1061,7 +1068,10 @@ pq_recvbuf(bool nowait) /* Can fill buffer from PqRecvLength and upwards */ for (;;) { - /* If streaming compression is enabled then use correspondent compression read function. */ + /* + * If streaming compression is enabled then use correspondent + * compression read function. + */ r = PqStream ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength, PQ_RECV_BUFFER_SIZE - PqRecvLength) @@ -1072,7 +1082,8 @@ pq_recvbuf(bool nowait) { if (r == ZPQ_DECOMPRESS_ERROR) { - char const* msg = zpq_error(PqStream); + char const *msg = zpq_decompress_error(PqStream); + if (msg == NULL) msg = "end of stream"; ereport(COMMERROR, @@ -1122,7 +1133,8 @@ pq_getbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer++]; @@ -1141,7 +1153,8 @@ pq_peekbyte(void) while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } return (unsigned char) PqRecvBuffer[PqRecvPointer]; @@ -1187,7 +1200,8 @@ pq_getbytes(char *s, size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1221,7 +1235,8 @@ pq_discardbytes(size_t len) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } amount = PqRecvLength - PqRecvPointer; @@ -1262,7 +1277,8 @@ pq_getstring(StringInfo s) { while (PqRecvPointer >= PqRecvLength) { - if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv some */ + if (pq_recvbuf(false) == EOF) /* If nothing in buffer, then recv + * some */ return EOF; /* Failed to recv data */ } @@ -1513,11 +1529,16 @@ internal_flush(void) char *bufend = PqSendBuffer + PqSendPointer; while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0) - /* has more data to flush or unsent data in internal compression buffer */ + + /* + * has more data to flush or unsent data in internal compression + * buffer + */ { - int r; - size_t processed = 0; - size_t available = bufend - bufptr; + int r; + size_t processed = 0; + size_t available = bufend - bufptr; + r = PqStream ? zpq_write(PqStream, bufptr, available, &processed) : secure_write(MyProcPort, bufptr, available); @@ -2096,7 +2117,8 @@ PG_FUNCTION_INFO_V1(pg_compression_algorithm); Datum pg_compression_algorithm(PG_FUNCTION_ARGS) { - char const* algorithm_name = PqStream ? zpq_algorithm_name(PqStream) : NULL; + char const *algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL; + if (algorithm_name) PG_RETURN_TEXT_P(cstring_to_text(algorithm_name)); else diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c index 459566228ac..dc2aa605bd6 100644 --- a/src/common/zpq_stream.c +++ b/src/common/zpq_stream.c @@ -11,57 +11,110 @@ typedef struct /* * Returns name of compression algorithm. */ - char const* (*name)(void); + char const *(*name) (void); /* - * Create compression stream with using rx/tx function for fetching/sending compressed data. - * level: compression level - * tx_func: function for writing compressed data in underlying stream - * rx_func: function for receiving compressed data from underlying stream - * arg: context passed to the function - * rx_data: received data (compressed data already fetched from input stream) - * rx_data_size: size of data fetched from input stream + * Create new compression stream. level: compression level */ - ZpqStream* (*create)(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size); + void *(*create_compressor) (int level); /* - * Read up to "size" raw (decompressed) bytes. - * Returns number of decompressed bytes or error code. - * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function. + * Create new decompression stream. */ - ssize_t (*read)(ZpqStream *zs, void *buf, size_t size); + void *(*create_decompressor) (); /* - * Write up to "size" raw (decompressed) bytes. - * Returns number of written raw bytes or error code returned by tx function. - * In the last case amount of written raw bytes is stored in *processed. + * Decompress up to "src_size" compressed bytes from *src and write up to + * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed + * bytes written to *dst is stored in *dst_processed. Number of compressed + * bytes read from *src is stored in *src_processed. + * + * Return codes: ZPQ_OK if no errors were encountered during decompression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. + * + * ZPQ_DATA_PENDING means that there might be some data left within + * decompressor internal buffers. + * + * ZPQ_STREAM_END if encountered end of compressed data stream. + * + * ZPQ_DECOMPRESS_ERROR if encountered an error during decompression + * attempt. */ - ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed); + ssize_t (*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); /* - * Free stream created by create function. + * Compress up to "src_size" raw (non-compressed) bytes from *src and + * write up to "dst_size" compressed bytes to *dst. Number of compressed + * bytes written to *dst is stored in *dst_processed. Number of + * non-compressed bytes read from *src is stored in *src_processed. + * + * Return codes: ZPQ_OK if no errors were encountered during compression + * attempt. This return code does not guarantee that *src_processed > 0 or + * *dst_processed > 0. + * + * ZPQ_DATA_PENDING means that there might be some data left within + * compressor internal buffers. + * + * ZPQ_COMPRESS_ERROR if encountered an error during compression attempt. */ - void (*free)(ZpqStream *zs); + ssize_t (*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed); /* - * Get error message. + * Free compression stream created by create_compressor function. */ - char const* (*error)(ZpqStream *zs); + void (*free_compressor) (void *cs); /* - * Returns amount of data in internal tx decompression buffer. + * Free decompression stream created by create_decompressor function. */ - size_t (*buffered_tx)(ZpqStream *zs); + void (*free_decompressor) (void *ds); /* - * Returns amount of data in internal rx compression buffer. + * Get compressor error message. */ - size_t (*buffered_rx)(ZpqStream *zs); -} ZpqAlgorithm; + char const *(*compress_error) (void *cs); + + /* + * Get decompressor error message. + */ + char const *(*decompress_error) (void *ds); +} ZpqAlgorithm; + + +#define ZPQ_BUFFER_SIZE 8192 /* We have to flush stream after each + * protocol command and command is mostly + * limited by record length, which in turn + * usually less than page size (except + * TOAST) */ struct ZpqStream { - ZpqAlgorithm const* algorithm; + ZpqAlgorithm const *c_algorithm; + void *c_stream; + + ZpqAlgorithm const *d_algorithm; + void *d_stream; + + char tx_buf[ZPQ_BUFFER_SIZE]; + size_t tx_pos; + size_t tx_size; + + char rx_buf[ZPQ_BUFFER_SIZE]; + size_t rx_pos; + size_t rx_size; + + zpq_tx_func tx_func; + zpq_rx_func rx_func; + void *arg; + + size_t tx_total; + size_t tx_total_raw; + size_t rx_total; + size_t rx_total_raw; + + bool rx_not_flushed; + bool tx_not_flushed; }; #if HAVE_LIBZSTD @@ -69,188 +122,169 @@ struct ZpqStream #include #include -#define ZSTD_BUFFER_SIZE (8*1024) - -typedef struct ZstdStream -{ - ZpqStream common; - ZSTD_CStream* tx_stream; - ZSTD_DStream* rx_stream; - ZSTD_outBuffer tx; - ZSTD_inBuffer rx; - size_t tx_not_flushed; /* Amount of data in internal zstd buffer */ - size_t tx_buffered; /* Data which is consumed by ztd_write but not yet sent */ - size_t rx_buffered; /* Data which is needed for ztd_read */ - zpq_tx_func tx_func; - zpq_rx_func rx_func; - void* arg; - char const* rx_error; /* Decompress error message */ - size_t tx_total; - size_t tx_total_raw; - size_t rx_total; - size_t rx_total_raw; - char tx_buf[ZSTD_BUFFER_SIZE]; - char rx_buf[ZSTD_BUFFER_SIZE]; -} ZstdStream; - -static ZpqStream* -zstd_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size) -{ - ZstdStream* zs = (ZstdStream*)malloc(sizeof(ZstdStream)); - - zs->tx_stream = ZSTD_createCStream(); - ZSTD_initCStream(zs->tx_stream, level); - zs->rx_stream = ZSTD_createDStream(); - ZSTD_initDStream(zs->rx_stream); - zs->tx.dst = zs->tx_buf; - zs->tx.pos = 0; - zs->tx.size = ZSTD_BUFFER_SIZE; - zs->rx.src = zs->rx_buf; - zs->rx.pos = 0; - zs->rx.size = 0; - zs->rx_func = rx_func; - zs->tx_func = tx_func; - zs->tx_buffered = 0; - zs->rx_buffered = 0; - zs->tx_not_flushed = 0; - zs->rx_error = NULL; - zs->arg = arg; - zs->tx_total = zs->tx_total_raw = 0; - zs->rx_total = zs->rx_total_raw = 0; - zs->rx.size = rx_data_size; - Assert(rx_data_size < ZSTD_BUFFER_SIZE); - memcpy(zs->rx_buf, rx_data, rx_data_size); +/* + * Maximum allowed back-reference distance, expressed as power of 2. + * This setting controls max compressor/decompressor window size. + * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536 + */ +#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */ + +typedef struct ZPQ_ZSTD_CStream +{ + ZSTD_CStream *stream; + char const *error; /* error message */ +} ZPQ_ZSTD_CStream; + +typedef struct ZPQ_ZSTD_DStream +{ + ZSTD_DStream *stream; + char const *error; /* error message */ +} ZPQ_ZSTD_DStream; + +static void * +zstd_create_compressor(int level) +{ + ZPQ_ZSTD_CStream *c_stream = (ZPQ_ZSTD_CStream *) malloc(sizeof(ZPQ_ZSTD_CStream)); + + c_stream->stream = ZSTD_createCStream(); + ZSTD_initCStream(c_stream->stream, level); + ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT); + c_stream->error = NULL; + return c_stream; +} + +static void * +zstd_create_decompressor() +{ + ZPQ_ZSTD_DStream *d_stream = (ZPQ_ZSTD_DStream *) malloc(sizeof(ZPQ_ZSTD_DStream)); - return (ZpqStream*)zs; + d_stream->stream = ZSTD_createDStream(); + ZSTD_initDStream(d_stream->stream); + ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT); + d_stream->error = NULL; + return d_stream; } static ssize_t -zstd_read(ZpqStream *zstream, void *buf, size_t size) +zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZstdStream* zs = (ZstdStream*)zstream; - ssize_t rc; + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + ZSTD_inBuffer in; + + in.src = src; + in.pos = 0; + in.size = src_size; ZSTD_outBuffer out; - out.dst = buf; + + out.dst = dst; out.pos = 0; - out.size = size; + out.size = dst_size; + + size_t rc = ZSTD_decompressStream(ds->stream, &out, &in); - while (1) + *src_processed = in.pos; + *dst_processed = out.pos; + if (ZSTD_isError(rc)) { - if (zs->rx.pos != zs->rx.size || zs->rx_buffered == 0) - { - rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx); - if (ZSTD_isError(rc)) - { - zs->rx_error = ZSTD_getErrorName(rc); - return ZPQ_DECOMPRESS_ERROR; - } - /* Return result if we fill requested amount of bytes or read operation was performed */ - if (out.pos != 0) - { - zs->rx_total_raw += out.pos; - zs->rx_buffered = 0; - return out.pos; - } - zs->rx_buffered = rc; - if (zs->rx.pos == zs->rx.size) - { - zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */ - } - } - rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZSTD_BUFFER_SIZE - zs->rx.size); - if (rc > 0) /* read fetches some data */ - { - zs->rx.size += rc; - zs->rx_total += rc; - } - else /* read failed */ - { - zs->rx_total_raw += out.pos; - return rc; - } + ds->error = ZSTD_getErrorName(rc); + return ZPQ_DECOMPRESS_ERROR; } + + if (out.pos == out.size) + { + /* + * if `output.pos == output.size`, there might be some data left + * within internal buffers + */ + return ZPQ_DATA_PENDING; + } + return ZPQ_OK; } static ssize_t -zstd_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed) +zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZstdStream* zs = (ZstdStream*)zstream; - ssize_t rc; - ZSTD_inBuffer in_buf; - in_buf.src = buf; - in_buf.pos = 0; - in_buf.size = size; + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + ZSTD_inBuffer in; - do - { - if (zs->tx.pos == 0) /* Compress buffer is empty */ - { - zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */ + in.src = src; + in.pos = 0; + in.size = src_size; + ZSTD_outBuffer out; - if (in_buf.pos < size) /* Has something to compress in input buffer */ - ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf); + out.dst = dst; + out.pos = 0; + out.size = dst_size; - if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */ - { - zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx); - } - } - rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos); - if (rc > 0) + if (in.pos < src_size) /* Has something to compress in input buffer */ + { + size_t rc = ZSTD_compressStream(cs->stream, &out, &in); + + *dst_processed = out.pos; + *src_processed = in.pos; + if (ZSTD_isError(rc)) { - zs->tx.pos -= rc; - zs->tx.dst = (char*)zs->tx.dst + rc; - zs->tx_total += rc; + cs->error = ZSTD_getErrorName(rc); + return ZPQ_COMPRESS_ERROR; } - else + } + + if (in.pos == src_size) /* All data is compressed: flush internal zstd + * buffer */ + { + size_t tx_not_flushed = ZSTD_flushStream(cs->stream, &out); + + *dst_processed = out.pos; + if (tx_not_flushed > 0) { - *processed = in_buf.pos; - zs->tx_buffered = zs->tx.pos; - zs->tx_total_raw += in_buf.pos; - return rc; + return ZPQ_DATA_PENDING; } - /* repeat sending while there is some data in input or internal zstd buffer */ - } while (in_buf.pos < size || zs->tx_not_flushed); + } - zs->tx_total_raw += in_buf.pos; - zs->tx_buffered = zs->tx.pos; - return in_buf.pos; + return ZPQ_OK; } static void -zstd_free(ZpqStream *zstream) +zstd_free_compressor(void *c_stream) { - ZstdStream* zs = (ZstdStream*)zstream; - if (zs != NULL) + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + + if (cs != NULL) { - ZSTD_freeCStream(zs->tx_stream); - ZSTD_freeDStream(zs->rx_stream); - free(zs); + ZSTD_freeCStream(cs->stream); + free(cs); } } -static char const* -zstd_error(ZpqStream *zstream) +static void +zstd_free_decompressor(void *d_stream) { - ZstdStream* zs = (ZstdStream*)zstream; - return zs->rx_error; + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + + if (ds != NULL) + { + ZSTD_freeDStream(ds->stream); + free(ds); + } } -static size_t -zstd_buffered_tx(ZpqStream *zstream) +static char const * +zstd_compress_error(void *c_stream) { - ZstdStream* zs = (ZstdStream*)zstream; - return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0; + ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream; + + return cs->error; } -static size_t -zstd_buffered_rx(ZpqStream *zstream) +static char const * +zstd_decompress_error(void *d_stream) { - ZstdStream* zs = (ZstdStream*)zstream; - return zs != NULL ? zs->rx.size - zs->rx.pos : 0; + ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream; + + return ds->error; } -static char const* +static char const * zstd_name(void) { return "zstd"; @@ -263,185 +297,126 @@ zstd_name(void) #include #include -#define ZLIB_BUFFER_SIZE 8192 /* We have to flush stream after each protocol command - * and command is mostly limited by record length, - * which in turn usually less than page size (except TOAST) - */ -typedef struct ZlibStream +static void * +zlib_create_compressor(int level) { - ZpqStream common; - - z_stream tx; - z_stream rx; + int rc; + z_stream *c_stream = (z_stream *) malloc(sizeof(z_stream)); - zpq_tx_func tx_func; - zpq_rx_func rx_func; - void* arg; - unsigned tx_deflate_pending; - size_t tx_buffered; - - Bytef tx_buf[ZLIB_BUFFER_SIZE]; - Bytef rx_buf[ZLIB_BUFFER_SIZE]; -} ZlibStream; - -static ZpqStream* -zlib_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size) -{ - int rc; - ZlibStream* zs = (ZlibStream*)malloc(sizeof(ZlibStream)); - memset(&zs->tx, 0, sizeof(zs->tx)); - zs->tx.next_out = zs->tx_buf; - zs->tx.avail_out = ZLIB_BUFFER_SIZE; - zs->tx_buffered = 0; - rc = deflateInit(&zs->tx, level); + memset(c_stream, 0, sizeof(*c_stream)); + rc = deflateInit(c_stream, level); if (rc != Z_OK) { - free(zs); + free(c_stream); return NULL; } - Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZLIB_BUFFER_SIZE); + return c_stream; +} + +static void * +zlib_create_decompressor() +{ + int rc; + z_stream *d_stream = (z_stream *) malloc(sizeof(z_stream)); - memset(&zs->rx, 0, sizeof(zs->tx)); - zs->rx.next_in = zs->rx_buf; - zs->rx.avail_in = ZLIB_BUFFER_SIZE; - zs->tx_deflate_pending = 0; - rc = inflateInit(&zs->rx); + memset(d_stream, 0, sizeof(*d_stream)); + rc = inflateInit(d_stream); if (rc != Z_OK) { - free(zs); + free(d_stream); return NULL; } - Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZLIB_BUFFER_SIZE); - - zs->rx.avail_in = rx_data_size; - Assert(rx_data_size < ZLIB_BUFFER_SIZE); - memcpy(zs->rx_buf, rx_data, rx_data_size); - - zs->rx_func = rx_func; - zs->tx_func = tx_func; - zs->arg = arg; - - return (ZpqStream*)zs; + return d_stream; } static ssize_t -zlib_read(ZpqStream *zstream, void *buf, size_t size) +zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZlibStream* zs = (ZlibStream*)zstream; - int rc; - zs->rx.next_out = (Bytef *)buf; - zs->rx.avail_out = size; + z_stream *ds = (z_stream *) d_stream; + int rc; - while (1) + ds->next_in = (Bytef *) src; + ds->avail_in = src_size; + ds->next_out = (Bytef *) dst; + ds->avail_out = dst_size; + + rc = inflate(ds, Z_SYNC_FLUSH); + *src_processed = src_size - ds->avail_in; + *dst_processed = dst_size - ds->avail_out; + + if (rc == Z_STREAM_END) { - if (zs->rx.avail_in != 0) /* If there is some data in receiver buffer, then decompress it */ - { - rc = inflate(&zs->rx, Z_SYNC_FLUSH); - if (rc != Z_OK && rc != Z_BUF_ERROR) - { - return ZPQ_DECOMPRESS_ERROR; - } - if (zs->rx.avail_out != size) - { - return size - zs->rx.avail_out; - } - if (zs->rx.avail_in == 0) - { - zs->rx.next_in = zs->rx_buf; - } - } - else - { - zs->rx.next_in = zs->rx_buf; - } - rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, zs->rx_buf + ZLIB_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in); - if (rc > 0) - { - zs->rx.avail_in += rc; - } - else - { - return rc; - } + return ZPQ_STREAM_END; + } + if (rc != Z_OK && rc != Z_BUF_ERROR) + { + return ZPQ_DECOMPRESS_ERROR; } + + return ZPQ_OK; } static ssize_t -zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed) +zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed) { - ZlibStream* zs = (ZlibStream*)zstream; - int rc; - zs->tx.next_in = (Bytef *)buf; - zs->tx.avail_in = size; - do - { - if (zs->tx.avail_out == ZLIB_BUFFER_SIZE) /* Compress buffer is empty */ - { - zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */ + z_stream *cs = (z_stream *) c_stream; + int rc; - if (zs->tx.avail_in != 0 || (zs->tx_deflate_pending > 0)) /* Has something in input or deflate buffer */ - { - rc = deflate(&zs->tx, Z_SYNC_FLUSH); - Assert(rc == Z_OK); - deflatePending(&zs->tx, &zs->tx_deflate_pending, Z_NULL); /* check if any data left in deflate buffer */ - zs->tx.next_out = zs->tx_buf; /* Reset pointer to the beginning of buffer */ - } - } - rc = zs->tx_func(zs->arg, zs->tx.next_out, ZLIB_BUFFER_SIZE - zs->tx.avail_out); - if (rc > 0) - { - zs->tx.next_out += rc; - zs->tx.avail_out += rc; - } - else - { - *processed = size - zs->tx.avail_in; - zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out; - return rc; - } - /* repeat sending while there is some data in input or deflate buffer */ - } while (zs->tx.avail_in != 0 || zs->tx_deflate_pending > 0); + cs->next_out = (Bytef *) dst; + cs->avail_out = dst_size; + cs->next_in = (Bytef *) src; + cs->avail_in = src_size; + + rc = deflate(cs, Z_SYNC_FLUSH); + Assert(rc == Z_OK); + *dst_processed = dst_size - cs->avail_out; + *src_processed = src_size - cs->avail_in; - zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out; + unsigned deflate_pending = 0; - return size - zs->tx.avail_in; + deflatePending(cs, &deflate_pending, Z_NULL); /* check if any data left + * in deflate buffer */ + if (deflate_pending > 0) + { + return ZPQ_DATA_PENDING; + } + return ZPQ_OK; } static void -zlib_free(ZpqStream *zstream) +zlib_free_compressor(void *c_stream) { - ZlibStream* zs = (ZlibStream*)zstream; - if (zs != NULL) + z_stream *cs = (z_stream *) c_stream; + + if (cs != NULL) { - inflateEnd(&zs->rx); - deflateEnd(&zs->tx); - free(zs); + deflateEnd(cs); + free(cs); } } -static char const* -zlib_error(ZpqStream *zstream) +static void +zlib_free_decompressor(void *d_stream) { - ZlibStream* zs = (ZlibStream*)zstream; - return zs->rx.msg; -} + z_stream *ds = (z_stream *) d_stream; -static size_t -zlib_buffered_tx(ZpqStream *zstream) -{ - ZlibStream* zs = (ZlibStream*)zstream; - return zs != NULL ? zs->tx_buffered + zs->tx_deflate_pending : 0; + if (ds != NULL) + { + inflateEnd(ds); + free(ds); + } } -static size_t -zlib_buffered_rx(ZpqStream *zstream) +static char const * +zlib_error(void *stream) { - ZlibStream* zs = (ZlibStream*)zstream; - return zs != NULL ? zs->rx.avail_in : 0; + z_stream *zs = (z_stream *) stream; + + return zs->msg; } -static char const* +static char const * zlib_name(void) { return "zlib"; @@ -449,7 +424,7 @@ zlib_name(void) #endif -static char const* +static char const * no_compression_name(void) { return NULL; @@ -461,10 +436,10 @@ no_compression_name(void) static ZpqAlgorithm const zpq_algorithms[] = { #if HAVE_LIBZSTD - {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered_tx, zstd_buffered_rx}, + {zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error}, #endif #if HAVE_LIBZ - {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered_tx, zlib_buffered_rx}, + {zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error}, #endif {no_compression_name} }; @@ -472,72 +447,227 @@ static ZpqAlgorithm const zpq_algorithms[] = /* * Index of used compression algorithm in zpq_algorithms array. */ -ZpqStream* -zpq_create(int algorithm_impl, int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size) +ZpqStream * +zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size) { - ZpqStream* stream = zpq_algorithms[algorithm_impl].create(level, tx_func, rx_func, arg, rx_data, rx_data_size); - if (stream) - stream->algorithm = &zpq_algorithms[algorithm_impl]; - return stream; + ZpqStream *zs = (ZpqStream *) malloc(sizeof(ZpqStream)); + + zs->tx_pos = 0; + zs->tx_size = 0; + zs->rx_pos = 0; + zs->rx_size = 0; + zs->tx_func = tx_func; + zs->rx_func = rx_func; + zs->arg = arg; + zs->tx_total = 0; + zs->tx_total_raw = 0; + zs->rx_total = 0; + zs->rx_total_raw = 0; + zs->tx_not_flushed = false; + zs->rx_not_flushed = false; + + zs->rx_size = rx_data_size; + Assert(rx_data_size < ZPQ_BUFFER_SIZE); + memcpy(zs->rx_buf, rx_data, rx_data_size); + + zs->c_algorithm = &zpq_algorithms[c_alg_impl]; + zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level); + if (zs->c_stream == NULL) + { + free(zs); + return NULL; + } + zs->d_algorithm = &zpq_algorithms[d_alg_impl]; + zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor(); + if (zs->d_stream == NULL) + { + free(zs); + return NULL; + } + + return zs; } ssize_t -zpq_read(ZpqStream *zs, void *buf, size_t size) +zpq_read(ZpqStream * zs, void *buf, size_t size) { - return zs->algorithm->read(zs, buf, size); + size_t buf_pos = 0; + + while (buf_pos == 0) + { /* Read until some data fetched */ + if (zs->rx_pos == zs->rx_size) + { + zs->rx_pos = zs->rx_size = 0; /* Reset rx buffer */ + } + + if (zs->rx_pos == zs->rx_size && !zs->rx_not_flushed) + { + ssize_t rc = zs->rx_func(zs->arg, (char *) zs->rx_buf + zs->rx_size, ZPQ_BUFFER_SIZE - zs->rx_size); + + if (rc > 0) /* read fetches some data */ + { + zs->rx_size += rc; + zs->rx_total += rc; + } + else /* read failed */ + { + return rc; + } + } + + Assert(zs->rx_pos <= zs->rx_size); + size_t rx_processed = 0; + size_t buf_processed = 0; + ssize_t rc = zs->d_algorithm->decompress(zs->d_stream, + (char *) zs->rx_buf + zs->rx_pos, zs->rx_size - zs->rx_pos, &rx_processed, + buf, size, &buf_processed); + + zs->rx_pos += rx_processed; + zs->rx_total_raw += rx_processed; + buf_pos += buf_processed; + zs->rx_not_flushed = false; + if (rc == ZPQ_STREAM_END) + { + break; + } + if (rc == ZPQ_DATA_PENDING) + { + zs->rx_not_flushed = true; + continue; + } + if (rc != ZPQ_OK) + { + return ZPQ_DECOMPRESS_ERROR; + } + } + return buf_pos; } ssize_t -zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t* processed) +zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed) { - return zs->algorithm->write(zs, buf, size, processed); + size_t buf_pos = 0; + + do + { + if (zs->tx_pos == zs->tx_size) /* Have nothing to send */ + { + zs->tx_pos = zs->tx_size = 0; /* Reset pointer to the beginning + * of buffer */ + + size_t tx_processed = 0; + size_t buf_processed = 0; + ssize_t rc = zs->c_algorithm->compress(zs->c_stream, + (char *) buf + buf_pos, size - buf_pos, &buf_processed, + (char *) zs->tx_buf + zs->tx_size, ZPQ_BUFFER_SIZE - zs->tx_size, &tx_processed); + + zs->tx_size += tx_processed; + buf_pos += buf_processed; + zs->tx_total_raw += buf_processed; + zs->tx_not_flushed = false; + + if (rc == ZPQ_DATA_PENDING) + { + zs->tx_not_flushed = true; + continue; + } + if (rc != ZPQ_OK) + { + *processed = buf_pos; + return ZPQ_COMPRESS_ERROR; + } + } + while (zs->tx_pos < zs->tx_size) + { + ssize_t rc = zs->tx_func(zs->arg, (char *) zs->tx_buf + zs->tx_pos, zs->tx_size - zs->tx_pos); + + if (rc > 0) + { + zs->tx_pos += rc; + zs->tx_total += rc; + } + else + { + *processed = buf_pos; + return rc; + } + } + + /* + * repeat sending while there is some data in input or internal + * compression buffer + */ + } while (buf_pos < size || zs->tx_not_flushed); + + return buf_pos; } void -zpq_free(ZpqStream *zs) +zpq_free(ZpqStream * zs) { if (zs) - zs->algorithm->free(zs); + { + if (zs->c_stream) + { + zs->c_algorithm->free_compressor(zs->c_stream); + } + if (zs->d_stream) + { + zs->d_algorithm->free_decompressor(zs->d_stream); + } + free(zs); + } } -char const* -zpq_error(ZpqStream *zs) +char const * +zpq_compress_error(ZpqStream * zs) { - return zs->algorithm->error(zs); + return zs->c_algorithm->compress_error(zs->c_stream); } +char const * +zpq_decompress_error(ZpqStream * zs) +{ + return zs->d_algorithm->decompress_error(zs->d_stream); +} size_t -zpq_buffered_rx(ZpqStream *zs) +zpq_buffered_rx(ZpqStream * zs) { - return zs ? zs->algorithm->buffered_rx(zs) : 0; + return zs ? zs->rx_not_flushed || (zs->rx_size - zs->rx_pos) : 0; } size_t -zpq_buffered_tx(ZpqStream *zs) +zpq_buffered_tx(ZpqStream * zs) { - return zs ? zs->algorithm->buffered_tx(zs) : 0; + return zs ? zs->tx_not_flushed || (zs->tx_size - zs->tx_pos) : 0; } /* * Get list of the supported algorithms. */ -char** +char ** zpq_get_supported_algorithms(void) { - size_t n_algorithms = sizeof(zpq_algorithms)/sizeof(*zpq_algorithms); - char** algorithm_names = malloc(n_algorithms*sizeof(char*)); + size_t n_algorithms = sizeof(zpq_algorithms) / sizeof(*zpq_algorithms); + char **algorithm_names = malloc(n_algorithms * sizeof(char *)); for (size_t i = 0; i < n_algorithms; i++) { - algorithm_names[i] = (char*)zpq_algorithms[i].name(); + algorithm_names[i] = (char *) zpq_algorithms[i].name(); } return algorithm_names; } -char const* -zpq_algorithm_name(ZpqStream *zs) +char const * +zpq_compress_algorithm_name(ZpqStream * zs) +{ + return zs ? zs->c_algorithm->name() : NULL; +} + +char const * +zpq_decompress_algorithm_name(ZpqStream * zs) { - return zs ? zs->algorithm->name() : NULL; + return zs ? zs->d_algorithm->name() : NULL; } diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h index 27aef0aab9b..2d768439390 100644 --- a/src/include/common/zpq_stream.h +++ b/src/include/common/zpq_stream.h @@ -8,29 +8,87 @@ #include +#define ZPQ_OK (0) #define ZPQ_IO_ERROR (-1) #define ZPQ_DECOMPRESS_ERROR (-2) +#define ZPQ_COMPRESS_ERROR (-3) +#define ZPQ_STREAM_END (-4) +#define ZPQ_DATA_PENDING (-5) #define ZPQ_DEFAULT_COMPRESSION_LEVEL (1) struct ZpqStream; typedef struct ZpqStream ZpqStream; -typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size); -typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size); +typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size); +typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size); -ZpqStream* zpq_create(int impl, int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg, char* rx_data, size_t rx_data_size); -ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size); -ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed); -char const* zpq_error(ZpqStream* zs); -size_t zpq_buffered_rx(ZpqStream* zs); -size_t zpq_buffered_tx(ZpqStream* zs); -void zpq_free(ZpqStream* zs); -char const* zpq_algorithm_name(ZpqStream* zs); +/* + * Create compression stream with using rx/tx function for fetching/sending compressed data. + * c_alg_impl: index of chosen compression algorithm + * c_level: compression c_level + * d_alg_impl: index of chosen decompression algorithm + * tx_func: function for writing compressed data in underlying stream + * rx_func: function for receiving compressed data from underlying stream + * arg: context passed to the function + * rx_data: received data (compressed data already fetched from input stream) + * rx_data_size: size of data fetched from input stream + */ +ZpqStream *zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size); + +/* + * Read up to "size" raw (decompressed) bytes. + * Returns number of decompressed bytes or error code. + * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function. + */ +ssize_t zpq_read(ZpqStream * zs, void *buf, size_t size); + +/* + * Write up to "size" raw (decompressed) bytes. + * Returns number of written raw bytes or error code. + * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function. + * In case of an error, amount of written raw bytes is stored in *processed. + */ +ssize_t zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed); + +/* + * Get decompressor error message. + */ +char const *zpq_decompress_error(ZpqStream * zs); + +/* + * Get compressor error message. + */ +char const *zpq_compress_error(ZpqStream * zs); + +/* + * Returns an estimated amount of data in internal rx decompression buffer. + */ +size_t zpq_buffered_rx(ZpqStream * zs); + +/* + * Returns an estimated amount of data in internal tx compression buffer. + */ +size_t zpq_buffered_tx(ZpqStream * zs); + +/* + * Free stream created by zpq_create function. + */ +void zpq_free(ZpqStream * zs); + +/* + * Get the name of chosen compression algorithm. + */ +char const *zpq_compress_algorithm_name(ZpqStream * zs); + +/* + * Get the name of chosen decompression algorithm. + */ +char const *zpq_decompress_algorithm_name(ZpqStream * zs); /* Returns zero terminated array with compression algorithms names */ -char** zpq_get_supported_algorithms(void); +char **zpq_get_supported_algorithms(void); #endif diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 5289f9f001a..a24bbca1a65 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -170,7 +170,8 @@ typedef struct Port int keepalives_count; int tcp_user_timeout; - char* compression_algorithms; /* Compression algorithms supported by client */ + char *compression_algorithms; /* Compression algorithms supported by + * client */ /* * GSSAPI structures. diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 224ff3d47b8..de24a3f76a3 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -63,7 +63,7 @@ extern void StreamClose(pgsocket sock); extern void TouchSocketFiles(void); extern void RemoveSocketFiles(void); extern void pq_init(void); -extern int pq_configure(Port* port); +extern int pq_configure(Port *port); extern int pq_getbytes(char *s, size_t len); extern int pq_getstring(StringInfo s); extern void pq_startmsgread(void); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index b43efc04993..9b86ee085f8 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1191,10 +1191,10 @@ typedef struct PgBackendStatus char *st_appname; /* client-server traffic information */ - uint64 st_rx_raw_bytes; - uint64 st_tx_raw_bytes; - uint64 st_rx_compressed_bytes; - uint64 st_tx_compressed_bytes; + uint64 st_rx_raw_bytes; + uint64 st_tx_raw_bytes; + uint64 st_rx_compressed_bytes; + uint64 st_tx_compressed_bytes; /* * Current command string; MUST be null-terminated. Note that this string diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 74864826e56..f426c00df8a 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -3271,6 +3271,7 @@ PQconnectPoll(PGconn *conn) Assert(!conn->zstream); conn->zstream = zpq_create(conn->compressors[index].impl, conn->compressors[index].level, + conn->compressors[index].impl, (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn, &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor); if (!conn->zstream) diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index 4c69aec38e4..c9bae6531df 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -682,8 +682,8 @@ pqReadData(PGconn *conn) if (nread == ZPQ_DECOMPRESS_ERROR) { printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("decompress error: %s\n"), - zpq_error(conn->zstream)); + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zstream)); return -1; } @@ -785,8 +785,8 @@ pqReadData(PGconn *conn) if (nread == ZPQ_DECOMPRESS_ERROR) { printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("decompress error: %s\n"), - zpq_error(conn->zstream)); + libpq_gettext("decompress error: %s\n"), + zpq_decompress_error(conn->zstream)); return -1; } diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 89128df7a59..932666f6717 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -324,9 +324,9 @@ typedef struct pg_conn_host */ typedef struct pg_conn_compressor { - int impl; /* compression implementation index */ - int level; /* compression level */ -} pg_conn_compressor; + int impl; /* compression implementation index */ + int level; /* compression level */ +} pg_conn_compressor; /* * PGconn stores all the state data associated with a single connection @@ -381,8 +381,11 @@ struct pg_conn char *ssl_min_protocol_version; /* minimum TLS protocol version */ char *ssl_max_protocol_version; /* maximum TLS protocol version */ - char *compression; /* stream compression (boolean value, "any" or list of compression algorithms separated by comma) */ - pg_conn_compressor* compressors; /* descriptors of compression algorithms chosen by client */ + char *compression; /* stream compression (boolean value, "any" or + * list of compression algorithms separated by + * comma) */ + pg_conn_compressor *compressors; /* descriptors of compression + * algorithms chosen by client */ /* Type of connection to make. Possible values: any, read-write. */ char *target_session_attrs; @@ -543,7 +546,7 @@ struct pg_conn PQExpBufferData workBuffer; /* expansible string */ /* Compression stream */ - ZpqStream* zstream; + ZpqStream *zstream; }; /* PGcancel stores all data necessary to cancel a connection. A copy of this