diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index a9ead3c41aa3..54079b7d83df 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -37,12 +37,17 @@ SELECT pg_stat_force_next_flush();
(1 row)
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
- slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t | t | t | t | t
- regression_slot_stats2 | t | t | t | t | t
- regression_slot_stats3 | t | t | t | t | t
+-- total_wal_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped. Usually we
+-- expect filtered_bytes to be 0 since the entire transaction executed by this
+-- test is replicated. But there may be some background transactions, changes
+-- from which are filtered out by the output plugin, so we check for >= 0 here.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes > 0 AS sent_bytes, filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+ slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count
+------------------------+------------+-------------+----------------+-----------------+-----------+------------+----------------+--------------------
+ regression_slot_stats1 | t | t | t | t | 1 | t | t | t
+ regression_slot_stats2 | t | t | t | t | 1 | t | t | t
+ regression_slot_stats3 | t | t | t | t | 1 | t | t | t
(3 rows)
RESET logical_decoding_work_mem;
@@ -53,12 +58,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
(1 row)
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
- slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t | t | f | f | t
- regression_slot_stats2 | t | t | t | t | t
- regression_slot_stats3 | t | t | t | t | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes > 0 AS sent_bytes, filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+ slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count
+------------------------+------------+-------------+----------------+-----------------+-----------+------------+----------------+--------------------
+ regression_slot_stats1 | t | t | f | f | 0 | f | t | t
+ regression_slot_stats2 | t | t | t | t | 1 | t | t | t
+ regression_slot_stats3 | t | t | t | t | 1 | t | t | t
(3 rows)
-- reset stats for all slots
@@ -68,27 +73,27 @@ SELECT pg_stat_reset_replication_slot(NULL);
(1 row)
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
- slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t | t | f | f | t
- regression_slot_stats2 | t | t | f | f | t
- regression_slot_stats3 | t | t | f | f | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes, filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+ slot_name | spill_txns | spill_count | total_wal_txns | total_wal_bytes | sent_txns | sent_bytes | filtered_bytes | mem_exceeded_count
+------------------------+------------+-------------+----------------+-----------------+-----------+------------+----------------+--------------------
+ regression_slot_stats1 | t | t | f | f | 0 | 0 | 0 | t
+ regression_slot_stats2 | t | t | f | f | 0 | 0 | 0 | t
+ regression_slot_stats3 | t | t | f | f | 0 | 0 | 0 | t
(3 rows)
-- verify accessing/resetting stats for non-existent slot does something reasonable
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
- slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+--------------------+-------------
- do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
+ slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_wal_txns | total_wal_bytes | filtered_bytes | sent_txns | sent_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+----------------+-----------------+----------------+-----------+------------+---------------------+--------------------+-------------
+ do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
(1 row)
SELECT pg_stat_reset_replication_slot('do-not-exist');
ERROR: replication slot "do-not-exist" does not exist
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
- slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+--------------------+-------------
- do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
+ slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_wal_txns | total_wal_bytes | filtered_bytes | sent_txns | sent_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+----------------+-----------------+----------------+-----------+------------+---------------------+--------------------+-------------
+ do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
(1 row)
-- spilling the xact
@@ -121,20 +126,20 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count,
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
BEGIN;
-SELECT slot_name FROM pg_stat_replication_slots;
- slot_name
-------------------------
- regression_slot_stats1
- regression_slot_stats2
- regression_slot_stats3
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+ slot_name | plugin
+------------------------+---------------
+ regression_slot_stats1 | test_decoding
+ regression_slot_stats2 | test_decoding
+ regression_slot_stats3 | test_decoding
(3 rows)
-SELECT slot_name FROM pg_stat_replication_slots;
- slot_name
-------------------------
- regression_slot_stats1
- regression_slot_stats2
- regression_slot_stats3
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+ slot_name | plugin
+------------------------+---------------
+ regression_slot_stats1 | test_decoding
+ regression_slot_stats2 | test_decoding
+ regression_slot_stats3 | test_decoding
(3 rows)
COMMIT;
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 6661dbcb85c3..17e7c0e8f889 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -15,16 +15,22 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL,
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+
+-- total_wal_txns may vary based on the background activity but sent_txns should
+-- always be 1 since the background transactions are always skipped. Usually we
+-- expect filtered_bytes to be 0 since the entire transaction executed by this
+-- test is replicated. But there may be some background transactions, changes
+-- from which are filtered out by the output plugin, so we check for >= 0 here.
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes > 0 AS sent_bytes, filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
RESET logical_decoding_work_mem;
-- reset stats for one slot, others should be unaffected
SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes > 0 AS sent_bytes, filtered_bytes >= 0 AS filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-- reset stats for all slots
SELECT pg_stat_reset_replication_slot(NULL);
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_wal_txns > 0 AS total_wal_txns, total_wal_bytes > 0 AS total_wal_bytes, sent_txns, sent_bytes, filtered_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-- verify accessing/resetting stats for non-existent slot does something reasonable
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
@@ -46,8 +52,8 @@ SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count,
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
BEGIN;
-SELECT slot_name FROM pg_stat_replication_slots;
-SELECT slot_name FROM pg_stat_replication_slots;
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
+SELECT slot_name, plugin FROM pg_stat_replication_slots;
COMMIT;
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 0de62edb7d84..89d3ff0a2398 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -23,10 +23,16 @@ sub test_slot_stats
my ($node, $expected, $msg) = @_;
+ # If there are background transactions which are filtered out by the output
+ # plugin, filtered_bytes may be greater than 0. But it's not guaranteed that
+ # such transactions would be present.
my $result = $node->safe_psql(
'postgres', qq[
- SELECT slot_name, total_txns > 0 AS total_txn,
- total_bytes > 0 AS total_bytes
+ SELECT slot_name, total_wal_txns > 0 AS total_txn,
+ total_wal_bytes > 0 AS total_bytes,
+ sent_txns > 0 AS sent_txn,
+ sent_bytes > 0 AS sent_bytes,
+ filtered_bytes >= 0 AS filtered_bytes
FROM pg_stat_replication_slots
ORDER BY slot_name]);
is($result, $expected, $msg);
@@ -65,7 +71,7 @@ sub test_slot_stats
'postgres', qq[
SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots
WHERE slot_name ~ 'regression_slot'
- AND total_txns > 0 AND total_bytes > 0;
+ AND total_wal_txns > 0 AND total_wal_bytes > 0;
]) or die "Timed out while waiting for statistics to be updated";
# Test to drop one of the replication slot and verify replication statistics data is
@@ -80,9 +86,9 @@ sub test_slot_stats
# restart.
test_slot_stats(
$node,
- qq(regression_slot1|t|t
-regression_slot2|t|t
-regression_slot3|t|t),
+ qq(regression_slot1|t|t|t|t|t
+regression_slot2|t|t|t|t|t
+regression_slot3|t|t|t|t|t),
'check replication statistics are updated');
# Test to remove one of the replication slots and adjust
@@ -104,8 +110,8 @@ sub test_slot_stats
# restart.
test_slot_stats(
$node,
- qq(regression_slot1|t|t
-regression_slot2|t|t),
+ qq(regression_slot1|t|t|t|t|t
+regression_slot2|t|t|t|t|t),
'check replication statistics after removing the slot file');
# cleanup
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 47094f86f5fe..69ad95998046 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -60,12 +60,12 @@ static void pg_output_begin(LogicalDecodingContext *ctx,
TestDecodingData *data,
ReorderBufferTXN *txn,
bool last_write);
-static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
+static bool pg_decode_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
-static void pg_decode_change(LogicalDecodingContext *ctx,
+static bool pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation relation,
ReorderBufferChange *change);
-static void pg_decode_truncate(LogicalDecodingContext *ctx,
+static bool pg_decode_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change);
@@ -80,7 +80,7 @@ static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
const char *gid);
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
-static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+static bool pg_decode_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
@@ -98,16 +98,16 @@ static void pg_output_stream_start(LogicalDecodingContext *ctx,
bool last_write);
static void pg_decode_stream_stop(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
-static void pg_decode_stream_abort(LogicalDecodingContext *ctx,
+static bool pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
-static void pg_decode_stream_prepare(LogicalDecodingContext *ctx,
+static bool pg_decode_stream_prepare(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
-static void pg_decode_stream_commit(LogicalDecodingContext *ctx,
+static bool pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
-static void pg_decode_stream_change(LogicalDecodingContext *ctx,
+static bool pg_decode_stream_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
@@ -115,7 +115,7 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
-static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
+static bool pg_decode_stream_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change);
@@ -318,7 +318,7 @@ pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBuff
}
/* COMMIT callback */
-static void
+static bool
pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
@@ -330,7 +330,7 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
txn->output_plugin_private = NULL;
if (data->skip_empty_xacts && !xact_wrote_changes)
- return;
+ return false;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
@@ -343,6 +343,7 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
timestamptz_to_str(txn->commit_time));
OutputPluginWrite(ctx, true);
+ return true;
}
/* BEGIN PREPARE callback */
@@ -367,7 +368,7 @@ pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
}
/* PREPARE callback */
-static void
+static bool
pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
@@ -379,7 +380,7 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* where the first operation is received for this transaction.
*/
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
- return;
+ return false;
OutputPluginPrepareWrite(ctx, true);
@@ -394,6 +395,7 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
timestamptz_to_str(txn->prepare_time));
OutputPluginWrite(ctx, true);
+ return true;
}
/* COMMIT PREPARED callback */
@@ -599,7 +601,7 @@ tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_
/*
* callback for individual changed tuples
*/
-static void
+static bool
pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
@@ -684,9 +686,10 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextReset(data->context);
OutputPluginWrite(ctx, true);
+ return true;
}
-static void
+static bool
pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
@@ -739,6 +742,7 @@ pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextReset(data->context);
OutputPluginWrite(ctx, true);
+ return true;
}
static void
@@ -818,7 +822,7 @@ pg_decode_stream_stop(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
-static void
+static bool
pg_decode_stream_abort(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn)
@@ -842,7 +846,7 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
}
if (data->skip_empty_xacts && !xact_wrote_changes)
- return;
+ return false;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
@@ -850,9 +854,10 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
else
appendStringInfoString(ctx->out, "aborting streamed (sub)transaction");
OutputPluginWrite(ctx, true);
+ return true;
}
-static void
+static bool
pg_decode_stream_prepare(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
@@ -861,7 +866,7 @@ pg_decode_stream_prepare(LogicalDecodingContext *ctx,
TestDecodingTxnData *txndata = txn->output_plugin_private;
if (data->skip_empty_xacts && !txndata->xact_wrote_changes)
- return;
+ return false;
OutputPluginPrepareWrite(ctx, true);
@@ -877,9 +882,10 @@ pg_decode_stream_prepare(LogicalDecodingContext *ctx,
timestamptz_to_str(txn->prepare_time));
OutputPluginWrite(ctx, true);
+ return true;
}
-static void
+static bool
pg_decode_stream_commit(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
@@ -892,7 +898,7 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
txn->output_plugin_private = NULL;
if (data->skip_empty_xacts && !xact_wrote_changes)
- return;
+ return false;
OutputPluginPrepareWrite(ctx, true);
@@ -906,6 +912,7 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
timestamptz_to_str(txn->commit_time));
OutputPluginWrite(ctx, true);
+ return true;
}
/*
@@ -913,7 +920,7 @@ pg_decode_stream_commit(LogicalDecodingContext *ctx,
* at a later point in time. We don't want users to see the changes until the
* transaction is committed.
*/
-static void
+static bool
pg_decode_stream_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
@@ -935,6 +942,7 @@ pg_decode_stream_change(LogicalDecodingContext *ctx,
else
appendStringInfoString(ctx->out, "streaming change for transaction");
OutputPluginWrite(ctx, true);
+ return true;
}
/*
@@ -981,7 +989,7 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
* In streaming mode, we don't display the detailed information of Truncate.
* See pg_decode_stream_change.
*/
-static void
+static bool
pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change)
@@ -1001,4 +1009,5 @@ pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
else
appendStringInfoString(ctx->out, "streaming truncate for transaction");
OutputPluginWrite(ctx, true);
+ return true;
}
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index d5a5e22fe2c2..886c7d940df0 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -981,10 +981,15 @@ typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
rows will have been called before this, if there have been any modified
rows.
-typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+ If the callback outputs the transaction, it is expected to return true;
+ otherwise false. The return value is used to update the
+ sent_txns counter reported in
+ pg_stat_replication_slots view.
@@ -1005,7 +1010,7 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
this very same transaction. In that case, the logical decoding of this
aborted transaction is stopped gracefully.
-typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
@@ -1015,8 +1020,12 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
and commit_cb callbacks, but additionally the
relation descriptor relation points to the
relation the row belongs to and a struct
- change describing the row modification are passed
- in.
+ change describing the row modification are passed in.
+ If the output plugin decoded and output the change, it is expected
+ to return true; otherwise false. This return value is used to update the
+ filtered_bytes counter reported in
+
+ pg_stat_replication_slots view.
@@ -1036,18 +1045,18 @@ typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
The optional truncate_cb callback is called for a
TRUNCATE command.
-typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
- The parameters are analogous to the change_cb
- callback. However, because TRUNCATE actions on
- tables connected by foreign keys need to be executed together, this
- callback receives an array of relations instead of just a single one.
- See the description of the statement for
- details.
+ The parameters and the expected return value are analogous to the
+ change_cb callback. However, because
+ TRUNCATE actions on tables connected by foreign keys
+ need to be executed together, this callback receives an array of relations
+ instead of just a single one. See the description of the statement for details.
@@ -1180,8 +1189,18 @@ typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
rows will have been called before this, if there have been any modified
rows. The gid field, which is part of the
txn parameter, can be used in this callback.
+ If the callback outputs the prepared transaction, it is expected to return
+ true; otherwise false. The return value is used to update the
+ sent_txns counter reported in
+
+ pg_stat_replication_slots view. Please
+ note that the return value of this callback suffices to determine
+ whether a prepared transaction was output or not; callbacks
+ commit_prepared_cb and
+ rollback_prepared_cb do not need to return this
+ status again.
-typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
@@ -1255,9 +1274,14 @@ typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
Stream Abort Callback
The required stream_abort_cb callback is called to
- abort a previously streamed transaction.
+ abort a previously streamed transaction. If the output plugin has output
+ the streamed transaction, the callback is expected to return true;
+ otherwise false. The return value is used to update the
+ sent_txns counter reported in
+
+ pg_stat_replication_slots view.
-typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
@@ -1270,9 +1294,19 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
The stream_prepare_cb callback is called to prepare
a previously streamed transaction as part of a two-phase commit. This
callback is required when the output plugin supports both the streaming
- of large in-progress transactions and two-phase commits.
+ of large in-progress transactions and two-phase commits. If the output
+ plugin has output the streamed transaction, the callback is expected to
+ return true; otherwise false. The return value is used to update the
+ sent_txns counter reported in
+
+ pg_stat_replication_slots view. Please
+ note that only the return value of this callback suffices to determine
+ whether a prepared transaction was output or not; callbacks
+ commit_prepared_cb and
+ rollback_prepared_cb do not need to return this
+ status again.
-typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
@@ -1283,9 +1317,14 @@ typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx
Stream Commit Callback
The required stream_commit_cb callback is called to
- commit a previously streamed transaction.
+ commit a previously streamed transaction. If the output plugin
+ has output the streamed transaction, the callback is expected to return
+ true; otherwise false. The return value is used to update the
+ sent_txns counter reported in
+
+ pg_stat_replication_slots view.
-typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
@@ -1298,10 +1337,15 @@ typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
The required stream_change_cb callback is called
when sending a change in a block of streamed changes (demarcated by
stream_start_cb and stream_stop_cb calls).
+ If the output plugin decoded and output the change, it is expected to
+ return true. Otherwise it is expected to return false. This return value
+ is used to update the filtered_bytes counter
+ reported in
+ pg_stat_replication_slots view.
The actual changes are not displayed as the transaction can abort at a later
point in time and we don't decode changes for aborted transactions.
-typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
@@ -1338,18 +1382,18 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
(demarcated by stream_start_cb and
stream_stop_cb calls).
-typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
- The parameters are analogous to the stream_change_cb
- callback. However, because TRUNCATE actions on
- tables connected by foreign keys need to be executed together, this
- callback receives an array of relations instead of just a single one.
- See the description of the statement for
- details.
+ The parameters and the return value are analogous to the
+ stream_change_cb callback. However, because
+ TRUNCATE actions on tables connected by foreign keys
+ need to be executed together, this callback receives an array of relations
+ instead of just a single one. See the description of the statement for details.
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 817fd9f4ca7a..d3710e762e41 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1547,6 +1547,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
+
+
+ plugin text
+
+
+ The base name of the shared object containing the output plugin this
+ logical slot is using. This column is same as the one in
+ pg_replication_slots.
+
+
+
spill_txns bigint
@@ -1635,19 +1646,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage
- total_txns bigint
+ total_wal_txns bigint
- Number of decoded transactions sent to the decoding output plugin for
- this slot. This counts top-level transactions only, and is not incremented
- for subtransactions. Note that this includes the transactions that are
- streamed and/or spilled.
+ Number of decoded transactions from WAL sent to the decoding output
+ plugin for this slot. This counts top-level transactions only, and is
+ not incremented for subtransactions. Note that this includes the
+ transactions that are streamed and/or spilled.
- total_bytesbigint
+ total_wal_bytesbigint
Amount of transaction data decoded for sending transactions to the
@@ -1657,6 +1668,42 @@ description | Waiting for a newly initialized WAL file to reach durable storage
+
+
+ filtered_bytes bigint
+
+
+ Amount of changes, from total_wal_bytes, filtered
+ out by the output plugin and not sent downstream. Please note that it
+ does not include the changes filtered before a change is sent to
+ the output plugin, e.g. the changes filtered by origin.
+
+
+
+
+
+ sent_txns bigint
+
+
+ Number of decoded transactions sent downstream for this slot. This
+ counts top-level transactions only, and is not incremented for
+ subtransactions. These transactions are subset of transactions sent to
+ the decoding plugin. Hence this count is expected to be less than or
+ equal to total_wal_txns.
+
+
+
+
+
+ sent_bytesbigint
+
+
+ Amount of transaction changes, in the output format, sent downstream for
+ this slot by the output plugin.
+
+
+
+
slotsync_skip_countbigint
@@ -1693,6 +1740,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
+
+
+ The accuracy of columns filtered_bytes, and
+ sent_txns depends upon the accuracy of return values
+ from respective callbacks associated with those counts as mentioned in . A descripancy in those
+ counts may be result of incorrect implementation of those callbacks in the
+ output plugin given by column plugin.
+
+
+
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 0a0f95f6bb9f..2b30361d32ac 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1068,6 +1068,7 @@ CREATE VIEW pg_replication_slots AS
CREATE VIEW pg_stat_replication_slots AS
SELECT
s.slot_name,
+ r.plugin,
s.spill_txns,
s.spill_count,
s.spill_bytes,
@@ -1075,8 +1076,11 @@ CREATE VIEW pg_stat_replication_slots AS
s.stream_count,
s.stream_bytes,
s.mem_exceeded_count,
- s.total_txns,
- s.total_bytes,
+ s.total_wal_txns,
+ s.total_wal_bytes,
+ s.filtered_bytes,
+ s.sent_txns,
+ s.sent_bytes,
s.slotsync_skip_count,
s.slotsync_last_skip,
s.stats_reset
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1b11ed63dc69..c65399a9c3d4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -888,7 +888,8 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->end_xact = true;
/* do the actual work: call callback */
- ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
+ if (ctx->callbacks.commit_cb(ctx, txn, commit_lsn))
+ cache->sentTxns++;
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -984,7 +985,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
"prepare_cb")));
/* do the actual work: call callback */
- ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+ if (ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn))
+ cache->sentTxns++;
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -1115,7 +1117,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->end_xact = false;
- ctx->callbacks.change_cb(ctx, txn, relation, change);
+ if (!ctx->callbacks.change_cb(ctx, txn, relation, change))
+ cache->filteredBytes += ReorderBufferChangeSize(change);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -1157,7 +1160,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->end_xact = false;
- ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
+ if (!ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change))
+ cache->filteredBytes += ReorderBufferChangeSize(change);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -1396,7 +1400,8 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
errmsg("logical streaming requires a %s callback",
"stream_abort_cb")));
- ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
+ if (ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn))
+ cache->sentTxns++;
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -1441,7 +1446,8 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
errmsg("logical streaming at prepare time requires a %s callback",
"stream_prepare_cb")));
- ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
+ if (ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn))
+ cache->sentTxns++;
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -1482,7 +1488,8 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
errmsg("logical streaming requires a %s callback",
"stream_commit_cb")));
- ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
+ if (ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn))
+ cache->sentTxns++;
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -1531,7 +1538,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
errmsg("logical streaming requires a %s callback",
"stream_change_cb")));
- ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
+ if (!ctx->callbacks.stream_change_cb(ctx, txn, relation, change))
+ cache->filteredBytes += ReorderBufferChangeSize(change);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -1619,7 +1627,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->end_xact = false;
- ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
+ if (!ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change))
+ cache->filteredBytes += ReorderBufferChangeSize(change);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -1959,7 +1968,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->memExceededCount <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
rb,
rb->spillTxns,
rb->spillCount,
@@ -1969,7 +1978,10 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->streamBytes,
rb->memExceededCount,
rb->totalTxns,
- rb->totalBytes);
+ rb->totalBytes,
+ rb->sentTxns,
+ rb->sentBytes,
+ rb->filteredBytes);
repSlotStat.spill_txns = rb->spillTxns;
repSlotStat.spill_count = rb->spillCount;
@@ -1978,8 +1990,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
repSlotStat.stream_count = rb->streamCount;
repSlotStat.stream_bytes = rb->streamBytes;
repSlotStat.mem_exceeded_count = rb->memExceededCount;
- repSlotStat.total_txns = rb->totalTxns;
- repSlotStat.total_bytes = rb->totalBytes;
+ repSlotStat.total_wal_txns = rb->totalTxns;
+ repSlotStat.total_wal_bytes = rb->totalBytes;
+ repSlotStat.sent_txns = rb->sentTxns;
+ repSlotStat.sent_bytes = rb->sentBytes;
+ repSlotStat.filtered_bytes = rb->filteredBytes;
pgstat_report_replslot(ctx->slot, &repSlotStat);
@@ -1992,6 +2007,9 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->memExceededCount = 0;
rb->totalTxns = 0;
rb->totalBytes = 0;
+ rb->sentTxns = 0;
+ rb->sentBytes = 0;
+ rb->filteredBytes = 0;
}
/*
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index cf77ee28dfe7..0acbda94941e 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -65,6 +65,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
Datum values[3];
bool nulls[3];
DecodingOutputState *p;
+ int64 sentBytes = 0;
/* SQL Datums can only be of a limited length... */
if (ctx->out->len > MaxAllocSize - VARHDRSZ)
@@ -74,7 +75,9 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
memset(nulls, 0, sizeof(nulls));
values[0] = LSNGetDatum(lsn);
+ sentBytes += sizeof(XLogRecPtr);
values[1] = TransactionIdGetDatum(xid);
+ sentBytes += sizeof(TransactionId);
/*
* Assert ctx->out is in database encoding when we're writing textual
@@ -87,8 +90,13 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
/* ick, but cstring_to_text_with_len works for bytea perfectly fine */
values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
+ sentBytes += ctx->out->len;
tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
+
+ /* Update the amount of data sent downstream. */
+ ctx->reorder->sentBytes += sentBytes;
+
p->returned_rows++;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index f18c6fb52b57..e4e656882352 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -310,7 +310,6 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
* memory accounting
* ---------------------------------------
*/
-static Size ReorderBufferChangeSize(ReorderBufferChange *change);
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
ReorderBufferChange *change,
ReorderBufferTXN *txn,
@@ -393,6 +392,9 @@ ReorderBufferAllocate(void)
buffer->memExceededCount = 0;
buffer->totalTxns = 0;
buffer->totalBytes = 0;
+ buffer->sentTxns = 0;
+ buffer->sentBytes = 0;
+ buffer->filteredBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
@@ -4455,7 +4457,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/*
* Size of a change in memory.
*/
-static Size
+Size
ReorderBufferChangeSize(ReorderBufferChange *change)
{
Size sz = sizeof(ReorderBufferChange);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 787998abb8a2..8377c2ea464d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -46,12 +46,12 @@ static void pgoutput_startup(LogicalDecodingContext *ctx,
static void pgoutput_shutdown(LogicalDecodingContext *ctx);
static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
-static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
+static bool pgoutput_commit_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
-static void pgoutput_change(LogicalDecodingContext *ctx,
+static bool pgoutput_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation relation,
ReorderBufferChange *change);
-static void pgoutput_truncate(LogicalDecodingContext *ctx,
+static bool pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, int nrelations, Relation relations[],
ReorderBufferChange *change);
static void pgoutput_message(LogicalDecodingContext *ctx,
@@ -62,7 +62,7 @@ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
-static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
+static bool pgoutput_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
@@ -74,13 +74,13 @@ static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
-static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
+static bool pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
-static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+static bool pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
-static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+static bool pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static bool publications_valid;
@@ -624,7 +624,7 @@ pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
/*
* COMMIT callback
*/
-static void
+static bool
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
@@ -645,12 +645,13 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (!sent_begin_txn)
{
elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
- return;
+ return false;
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
+ return true;
}
/*
@@ -673,7 +674,7 @@ pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
/*
* PREPARE callback
*/
-static void
+static bool
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
@@ -682,6 +683,7 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
+ return true;
}
/*
@@ -1476,7 +1478,7 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
*
* This is called both in streaming and non-streaming modes.
*/
-static void
+static bool
pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
@@ -1490,9 +1492,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
ReorderBufferChangeType action = change->action;
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
+ bool result;
if (!is_publishable_relation(relation))
- return;
+ return false;
/*
* Remember the xid for the change in streaming mode. We need to send xid
@@ -1510,15 +1513,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert)
- return;
+ return false;
break;
case REORDER_BUFFER_CHANGE_UPDATE:
if (!relentry->pubactions.pubupdate)
- return;
+ return false;
break;
case REORDER_BUFFER_CHANGE_DELETE:
if (!relentry->pubactions.pubdelete)
- return;
+ return false;
/*
* This is only possible if deletes are allowed even when replica
@@ -1528,7 +1531,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (!change->data.tp.oldtuple)
{
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
- return;
+ return false;
}
break;
default:
@@ -1583,7 +1586,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* of the row filter for old and new tuple.
*/
if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+ {
+ result = false;
goto cleanup;
+ }
+
+ /*
+ * Even if we filter some columns while sending the message we are not
+ * filtering the change as a whole. Hence we will return true.
+ */
+ result = true;
/*
* Send BEGIN if we haven't yet.
@@ -1646,9 +1658,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
+ return result;
}
-static void
+static bool
pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
@@ -1660,6 +1673,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelids;
Oid *relids;
TransactionId xid = InvalidTransactionId;
+ bool result = false;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if (data->in_streaming)
@@ -1710,10 +1724,18 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
change->data.truncate.cascade,
change->data.truncate.restart_seqs);
OutputPluginWrite(ctx, true);
+
+ /*
+ * Even if we filtered out some relations, we still send a TRUNCATE
+ * message for the remaining relations. Since the change, as a whole,
+ * is not filtered out we return true.
+ */
+ result = true;
}
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
+ return result;
}
static void
@@ -1885,7 +1907,7 @@ pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
* Notify downstream to discard the streamed transaction (along with all
* its subtransactions, if it's a toplevel transaction).
*/
-static void
+static bool
pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn)
@@ -1912,13 +1934,14 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(toptxn->xid, false);
+ return true;
}
/*
* Notify downstream to apply the streamed transaction (along with all
* its subtransactions).
*/
-static void
+static bool
pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
@@ -1939,6 +1962,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(txn->xid, true);
+ return true;
}
/*
@@ -1946,7 +1970,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
*
* Notify the downstream to prepare the transaction.
*/
-static void
+static bool
pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
@@ -1957,6 +1981,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
+ return true;
}
/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 449632ad1aa3..8ff11f7e5c89 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1587,6 +1587,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/* output previously gathered data in a CopyData packet */
pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
+ /* Update the amount of data sent downstream. */
+ ctx->reorder->sentBytes += ctx->out->len + 1; /* +1 for the 'd' */
+
CHECK_FOR_INTERRUPTS();
/* Try to flush pending output to the client */
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index d757e00eb54d..541c39bd0ccd 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -95,8 +95,11 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
REPLSLOT_ACC(stream_count);
REPLSLOT_ACC(stream_bytes);
REPLSLOT_ACC(mem_exceeded_count);
- REPLSLOT_ACC(total_txns);
- REPLSLOT_ACC(total_bytes);
+ REPLSLOT_ACC(total_wal_txns);
+ REPLSLOT_ACC(total_wal_bytes);
+ REPLSLOT_ACC(sent_txns);
+ REPLSLOT_ACC(sent_bytes);
+ REPLSLOT_ACC(filtered_bytes);
#undef REPLSLOT_ACC
pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index ef6fffe60b97..3752a89553c8 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2129,7 +2129,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum
pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 13
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 16
text *slotname_text = PG_GETARG_TEXT_P(0);
NameData slotname;
TupleDesc tupdesc;
@@ -2156,15 +2156,21 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "mem_exceeded_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_txns",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_wal_txns",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_wal_bytes",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 11, "slotsync_skip_count",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "filtered_bytes",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_last_skip",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 12, "sent_txns",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 13, "sent_bytes",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 14, "slotsync_skip_count",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 15, "slotsync_last_skip",
TIMESTAMPTZOID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 16, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
@@ -2188,19 +2194,22 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
values[5] = Int64GetDatum(slotent->stream_count);
values[6] = Int64GetDatum(slotent->stream_bytes);
values[7] = Int64GetDatum(slotent->mem_exceeded_count);
- values[8] = Int64GetDatum(slotent->total_txns);
- values[9] = Int64GetDatum(slotent->total_bytes);
- values[10] = Int64GetDatum(slotent->slotsync_skip_count);
+ values[8] = Int64GetDatum(slotent->total_wal_txns);
+ values[9] = Int64GetDatum(slotent->total_wal_bytes);
+ values[10] = Int64GetDatum(slotent->filtered_bytes);
+ values[11] = Int64GetDatum(slotent->sent_txns);
+ values[12] = Int64GetDatum(slotent->sent_bytes);
+ values[13] = Int64GetDatum(slotent->slotsync_skip_count);
if (slotent->slotsync_last_skip == 0)
- nulls[11] = true;
+ nulls[14] = true;
else
- values[11] = TimestampTzGetDatum(slotent->slotsync_last_skip);
+ values[14] = TimestampTzGetDatum(slotent->slotsync_last_skip);
if (slotent->stat_reset_timestamp == 0)
- nulls[12] = true;
+ nulls[15] = true;
else
- values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+ values[15] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fd9448ec7b98..8cd14c88bdb1 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5691,9 +5691,9 @@
{ oid => '6169', descr => 'statistics: information about replication slot',
proname => 'pg_stat_get_replication_slot', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'text',
- proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,slotsync_skip_count,slotsync_last_skip,stats_reset}',
+ proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_wal_txns,total_wal_bytes,filtered_bytes,sent_txns,sent_bytes,slotsync_skip_count,slotsync_last_skip,stats_reset}',
prosrc => 'pg_stat_get_replication_slot' },
{ oid => '6230', descr => 'statistics: check if a stats object exists',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index f23dd5870da7..cc4e0a561b0e 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -398,8 +398,11 @@ typedef struct PgStat_StatReplSlotEntry
PgStat_Counter stream_count;
PgStat_Counter stream_bytes;
PgStat_Counter mem_exceeded_count;
- PgStat_Counter total_txns;
- PgStat_Counter total_bytes;
+ PgStat_Counter total_wal_txns;
+ PgStat_Counter total_wal_bytes;
+ PgStat_Counter sent_txns;
+ PgStat_Counter sent_bytes;
+ PgStat_Counter filtered_bytes;
PgStat_Counter slotsync_skip_count;
TimestampTz slotsync_last_skip;
TimestampTz stat_reset_timestamp;
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 8d4d5b71887d..8c27e8266e7c 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -56,17 +56,19 @@ typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
/*
- * Callback for every individual change in a successful transaction.
+ * Callback for every individual change in a successful transaction. Should
+ * return true if the change is output, false otherwise.
*/
-typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
/*
- * Callback for every TRUNCATE in a successful transaction.
+ * Callback for every TRUNCATE in a successful transaction. Should return true if
+ * the change is output, false otherwise.
*/
-typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
@@ -74,8 +76,9 @@ typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
/*
* Called for every (explicit or implicit) COMMIT of a successful transaction.
+ * Should return true if the transaction is output, false otherwise.
*/
-typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
@@ -118,10 +121,10 @@ typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
/*
- * Called for PREPARE record unless it was filtered by filter_prepare()
- * callback.
+ * Called for PREPARE record unless it was filtered by filter_prepare() callback.
+ * Should return true if the transaction is output, false otherwise.
*/
-typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
@@ -159,32 +162,35 @@ typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx,
/*
* Called to discard changes streamed to remote node from in-progress
- * transaction.
+ * transaction. Should return true if the transaction is output, false
+ * otherwise.
*/
-typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
/*
* Called to prepare changes streamed to remote node from in-progress
- * transaction. This is called as part of a two-phase commit.
+ * transaction. This is called as part of a two-phase commit. Should return true
+ * if the transaction is output, false otherwise.
*/
-typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn);
/*
- * Called to apply changes streamed to remote node from in-progress
- * transaction.
+ * Called to apply changes streamed to remote node from in-progress transaction.
+ * Should return true if the transaction is output, false otherwise.
*/
-typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
/*
* Callback for streaming individual changes from in-progress transactions.
+ * Should return true if the change is output, false otherwise.
*/
-typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change);
@@ -202,9 +208,10 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx
const char *message);
/*
- * Callback for streaming truncates from in-progress transactions.
+ * Callback for streaming truncates from in-progress transactions. Should return
+ * true if the change is output, false otherwise.
*/
-typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
+typedef bool (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3cbe106a3c78..bd4c17da7aca 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -699,6 +699,11 @@ struct ReorderBuffer
*/
int64 totalTxns; /* total number of transactions sent */
int64 totalBytes; /* total amount of data decoded */
+ int64 sentTxns; /* number of transactions decoded and sent
+ * downstream */
+ int64 sentBytes; /* amount of data decoded and sent downstream */
+ int64 filteredBytes; /* amount of data filtered out by output
+ * plugin */
};
@@ -718,6 +723,7 @@ extern void ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids);
extern void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn, ReorderBufferChange *change,
bool toast_insert);
+extern Size ReorderBufferChangeSize(ReorderBufferChange *change);
extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
Snapshot snap, XLogRecPtr lsn,
bool transactional, const char *prefix,
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 96b70b84d5e1..c8ada58379b6 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -214,10 +214,10 @@
# Stats exist for stats test slot 1
is( $node_primary->safe_psql(
'postgres',
- qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+ qq(SELECT total_wal_bytes > 0, sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
),
- qq(t|t),
- qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
+ qq(t|t|t),
+ qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
);
# Do reset of stats for stats test slot 1
@@ -235,10 +235,10 @@
is( $node_primary->safe_psql(
'postgres',
- qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+ qq(SELECT stats_reset > '$reset1'::timestamptz, total_wal_bytes = 0, sent_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
),
- qq(t|t),
- qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.)
+ qq(t|t|t),
+ qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_wal_bytes and sent_bytes were set to 0.)
);
# Check that test slot 2 has NULL in reset timestamp
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index ebe2fae17898..5f4df30d65ae 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -577,7 +577,7 @@ sub wait_until_vacuum_can_remove
qq[INSERT INTO decoding_test(x,y) SELECT 100,'100';]);
$node_standby->poll_query_until('testdb',
- qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
+ qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
) or die "replication slot stats of vacuum_full_activeslot not updated";
# This should trigger the conflict
@@ -605,7 +605,7 @@ sub wait_until_vacuum_can_remove
# Ensure that replication slot stats are not removed after invalidation.
is( $node_standby->safe_psql(
'testdb',
- qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
+ qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot']
),
't',
'replication slot stats not removed after invalidation');
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 4286c266e17f..9801c66fba81 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2160,6 +2160,7 @@ pg_stat_replication| SELECT s.pid,
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_replication_slots| SELECT s.slot_name,
+ r.plugin,
s.spill_txns,
s.spill_count,
s.spill_bytes,
@@ -2167,13 +2168,16 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.stream_count,
s.stream_bytes,
s.mem_exceeded_count,
- s.total_txns,
- s.total_bytes,
+ s.total_wal_txns,
+ s.total_wal_bytes,
+ s.filtered_bytes,
+ s.sent_txns,
+ s.sent_bytes,
s.slotsync_skip_count,
s.slotsync_last_skip,
s.stats_reset
FROM pg_replication_slots r,
- LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, slotsync_skip_count, slotsync_last_skip, stats_reset)
+ LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_wal_txns, total_wal_bytes, filtered_bytes, sent_txns, sent_bytes, slotsync_skip_count, slotsync_last_skip, stats_reset)
WHERE (r.datoid IS NOT NULL);
pg_stat_slru| SELECT name,
blks_zeroed,
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 430c1246d14c..68501aa6ad5b 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -124,6 +124,9 @@
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
is($result, qq(1002), 'check initial data was copied to subscriber');
+my $initial_filtered_bytes = $node_publisher->safe_psql('postgres',
+ "SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_ins SELECT generate_series(1,50)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
@@ -157,6 +160,14 @@
$node_publisher->wait_for_catchup('tap_sub');
+# Verify that filtered_bytes increased due to filtered update and delete
+# operations on tab_ins. We cannot test the exact value since it may include
+# changes from other concurrent transactions.
+my $final_filtered_bytes = $node_publisher->safe_psql('postgres',
+ "SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+ 'filtered_bytes increased after DML filtering');
+
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins");
is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
index 3d16c2a800de..011c931dbd3a 100644
--- a/src/test/subscription/t/010_truncate.pl
+++ b/src/test/subscription/t/010_truncate.pl
@@ -69,6 +69,9 @@
# Wait for initial sync of all subscriptions
$node_subscriber->wait_for_subscription_sync;
+my $initial_filtered_bytes = $node_publisher->safe_psql('postgres',
+ "SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'");
+
# insert data to truncate
$node_subscriber->safe_psql('postgres',
@@ -98,6 +101,16 @@
$result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')");
is($result, qq(101), 'truncate restarted identities');
+# All the DMLs above happen on tables that are subscribed to by sub1 and not
+# sub2. filtered_bytes should get incremented for replication slot
+# corresponding to the subscription sub2. We can not test the exact value of
+# filtered_bytes because the counter is affected by background activity.
+my $final_filtered_bytes = $node_publisher->safe_psql('postgres',
+ "SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+ 'filtered_bytes increased after publication level filtering');
+$initial_filtered_bytes = $final_filtered_bytes;
+
# test publication that does not replicate truncate
$node_subscriber->safe_psql('postgres',
@@ -107,6 +120,13 @@
$node_publisher->wait_for_catchup('sub2');
+# Truncate changes are filtered out at publication level itself. Make sure that
+# the filtered_bytes is incremented.
+$final_filtered_bytes = $node_publisher->safe_psql('postgres',
+ "SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+ 'filtered_bytes increased after truncate filtering');
+
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab2");
is($result, qq(3|1|3), 'truncate not replicated');
diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl
index e2c836700535..b772676d6bce 100644
--- a/src/test/subscription/t/028_row_filter.pl
+++ b/src/test/subscription/t/028_row_filter.pl
@@ -579,6 +579,9 @@
# commands are for testing normal logical replication behavior.
#
# test row filter (INSERT, UPDATE, DELETE)
+my $initial_filtered_bytes = $node_publisher->safe_psql('postgres',
+ "SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')");
$node_publisher->safe_psql('postgres',
@@ -612,6 +615,14 @@
$node_publisher->wait_for_catchup($appname);
+# The changes which do not pass the row filter will be filtered. Make sure that
+# the filtered_bytes reflects that. We can not test the exact value of
+# filtered_bytes since it is affected by background activity.
+my $final_filtered_bytes = $node_publisher->safe_psql('postgres',
+ "SELECT filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'");
+cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes,
+ 'filtered_bytes increased after row filtering');
+
# Check expected replicated rows for tab_rowfilter_2
# tap_pub_1 filter is: (c % 2 = 0)
# tap_pub_2 filter is: (c % 3 = 0)