From 4b3609cdcb573c9894daaf16c43e286d3c351bde Mon Sep 17 00:00:00 2001 From: Anthonin Bonnefoy Date: Fri, 17 Jan 2025 15:12:48 +0100 Subject: [PATCH 1/2] Add PgProto test module to send message on a raw socket Add a new PostgreSQL::Test::PgProto class that provides helper functions to send PostgreSQL protocol message to a socket. This initial version provides functions to send a startup packet, send a simple query, read the content of a response and consume responsed until the socket is closed. This is useful to test edge cases like sessions stuck in a specific socket state. --- src/test/perl/Makefile | 2 + src/test/perl/PostgreSQL/Test/PgProto.pm | 276 ++++++++++++++++++ src/test/perl/meson.build | 1 + .../postmaster/t/001_connection_limits.pl | 10 +- src/test/postmaster/t/002_start_stop.pl | 10 +- 5 files changed, 287 insertions(+), 12 deletions(-) create mode 100644 src/test/perl/PostgreSQL/Test/PgProto.pm diff --git a/src/test/perl/Makefile b/src/test/perl/Makefile index d82fb67540ed..d8e9fb6b00d2 100644 --- a/src/test/perl/Makefile +++ b/src/test/perl/Makefile @@ -25,6 +25,7 @@ install: all installdirs $(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/Kerberos.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Kerberos.pm' $(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/Cluster.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Cluster.pm' $(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/BackgroundPsql.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/BackgroundPsql.pm' + $(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/PgProto.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/PgProto.pm' $(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/AdjustUpgrade.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/AdjustUpgrade.pm' $(INSTALL_DATA) $(srcdir)/PostgreSQL/Version.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Version.pm' @@ -35,6 +36,7 @@ uninstall: rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Kerberos.pm' rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Cluster.pm' rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/BackgroundPsql.pm' + rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/PgProto.pm' rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/AdjustUpgrade.pm' rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Version.pm' diff --git a/src/test/perl/PostgreSQL/Test/PgProto.pm b/src/test/perl/PostgreSQL/Test/PgProto.pm new file mode 100644 index 000000000000..8a0659c1c5ee --- /dev/null +++ b/src/test/perl/PostgreSQL/Test/PgProto.pm @@ -0,0 +1,276 @@ + +# Copyright (c) 2025, PostgreSQL Global Development Group + +=pod + +=head1 NAME + +PostgreSQL::Test::PgProto - class for manipulating PG protocol over a raw socket + +=head1 SYNOPSIS + + use PostgreSQL::Test::PgProto; + + my $node = PostgreSQL::Test::PgProto->new('mynode'); + + # Create a data directory with initdb + $node->init(); + + # Start the PostgreSQL server + $node->start(); + + # Get raw socket to the node + my $sock = $node->raw_connect(); + + # Create pgproto from the raw socket + my $pgproto = PostgreSQL::Test::PgProto->new($sock); + + # Send startup packet + my %parameters = ( user => "postgres", database => "postgres", application_name => "app" ); + $pgproto->send_startup_message(\%parameters); + + # Read startup sequence until Ready For Query is reached + $pgproto->read_until_message('Z'); + + # Send a simple query + $pgproto->send_simple_query('SELECT 1'); + +=head1 DESCRIPTION + +PostgreSQL::Test::PgProto contains functionality for sending and reading +PostgreSQL protocol messages over a raw socket. + +=cut + +package PostgreSQL::Test::PgProto; +use Test::More; + +use strict; +use warnings FATAL => 'all'; + +=pod + +=head1 METHODS + +=over + +=item PostgreSQL::Test::PgProto->new($sock) + +Builds a new object of class C. + +=cut + +sub new +{ + my $class = shift; + my ($sock) = @_; + + my $pgproto = { + 'sock' => $sock + }; + bless $pgproto, $class; + return $pgproto; +} + +=pod + +=item $pgproto->send_startup_message + +Send a startup message with the provided parameters. Database parameters need to +be passed as a reference to a hash. The server's response is not consumed. + +=cut + +sub send_startup_message +{ + my $self = shift; + my $parameters = shift; + + # Startup packet contains: + # Packet length: 4 bytes + # Major proto version: 2 bytes + # Minor proto version: 2 bytes + # Multiple Parameters: + # Parameter Name (null terminated string) + # Parameter Value (null terminated string) + # Ending null character + my $pack_template = "Nnn(Z*Z*)" . keys(%{$parameters}) . 'x'; + # Packet length, proto and final null character + my $total_length = 9; + + for(keys %{$parameters}){ + my $key_length = length($_) + 1; + my $value_length = length($parameters->{$_}) + 1; + $total_length += $key_length + $value_length; + } + + my $startup_packet = pack($pack_template, $total_length, 3, 0, %{$parameters}); + $self->{sock}->send($startup_packet); +} + +=pod + +=item $pgproto->send_ssl_request + +Send an ssl request packet to the server. The server's response is not consumed. + +=cut + +sub send_ssl_request +{ + my $self = shift; + + # SSLRequest packet contains: + # Packet length: 4 bytes + # NEGOTIATE_SSL_CODE (1234, 5679): 4 bytes + my $ssl_request_packet = pack("Nnn", 8, 1234, 5679); + + $self->{sock}->send($ssl_request_packet); +} + +=pod + +=item $pgproto->send_simple_query + +Send a simple query message to the server. The response is not consumed. + +=cut + +sub send_simple_query +{ + my ($self, $query) = @_; + + # Query message contains: + # Message type 'Q' (1 byte) + # Message length not including message type (4 bytes) + # Null terminated string + my $query_packet = pack("CNZ*", ord('Q'), length($query) + 5, $query); + note "Sending following simple query through raw_tcp: $query"; + $self->{sock}->send($query_packet); +} + +=pod + +=item $pgproto->read_session_pid + +Returns the pid of the session. The session needs to be in a ready for query +state. All results will be consumed and will leave the session in a ready for +query state. + +=cut + +sub read_session_pid +{ + my ($self) = @_; + + $self->send_simple_query("select pg_backend_pid()"); + my $data_row = $self->read_until_message('D'); + # We should have only one field and one column with the + # pid representing the rest of the payload outside of the + # data row header + my ($field_count, $column_length, $pid) = unpack("nNA*", $data_row); + note "raw_tcp has pid $pid"; + # Consume until Ready for query is reached + $self->read_until_message('Z'); + return $pid; +} + +=pod + +=item $pgproto->wait_until_closed + +Block and read all responses until the socket is terminated by the server. + +=cut + +sub wait_until_closed +{ + my ($self) = @_; + my $received = ""; + + while (1) { + $self->{sock}->recv($received, 64*1024); + if ($received eq "") { + # Closed socket was detected + return; + } + } +} + +=pod + +=item $pgproto->read_type + +Read a 1 byte type from the socket. + +=cut + +sub read_type +{ + my ($self) = @_; + + my $type = ""; + $self->{sock}->recv($type, 1); + return $type; +} + +=pod + +=item $pgproto->read_length + +Read a 4 bytes length from the socket. + +=cut + +sub read_length +{ + my ($self) = @_; + + my $length_reply = ""; + $self->{sock}->recv($length_reply, 4); + my ($length) = unpack("N", $length_reply); + return $length; +} + +=pod + +=item $pgproto->read_until_message + +Read all messages from the server until C is found. The message's +payload will be returned. + +=cut + +sub read_until_message +{ + my ($self, $message_type) = @_; + + note "Reading until message of type $message_type is found"; + while (1) + { + my $type = $self->read_type(); + if ($type eq "") { + diag("Reached end of the socket before type $message_type was found"); + return; + } + my $length = $self->read_length(); + note "Reading message of type $type and length $length"; + + # Need to remove message's length from the payload's length + $length -= 4; + if ($length < 0) { + diag("read_until_message Unexpected payload length $length"); + return; + } + my $payload = ""; + $self->{sock}->recv($payload, $length); + + if ($type eq $message_type) { + # We've found the desired message type + note "Found expected message type $message_type"; + return $payload; + } + } +} + +1; diff --git a/src/test/perl/meson.build b/src/test/perl/meson.build index 58e30f15f9d2..04639a11d953 100644 --- a/src/test/perl/meson.build +++ b/src/test/perl/meson.build @@ -13,5 +13,6 @@ install_data( 'PostgreSQL/Test/Kerberos.pm', 'PostgreSQL/Test/Cluster.pm', 'PostgreSQL/Test/BackgroundPsql.pm', + 'PostgreSQL/Test/PgProto.pm', 'PostgreSQL/Test/AdjustUpgrade.pm', install_dir: dir_pgxs / 'src/test/perl/PostgreSQL/Test') diff --git a/src/test/postmaster/t/001_connection_limits.pl b/src/test/postmaster/t/001_connection_limits.pl index 8cfa6e0ced53..b6df0e8a3efa 100644 --- a/src/test/postmaster/t/001_connection_limits.pl +++ b/src/test/postmaster/t/001_connection_limits.pl @@ -7,6 +7,7 @@ use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::PgProto; use PostgreSQL::Test::Utils; use Test::More; @@ -84,6 +85,7 @@ sub background_psql_as_user for (my $i = 0; $i <= 20; $i++) { my $sock = $node->raw_connect(); + my $pgproto = PostgreSQL::Test::PgProto->new($sock); # On a busy system, the server might reject connections if # postmaster cannot accept() them fast enough. The exact limit @@ -93,14 +95,10 @@ sub background_psql_as_user # when it does so, we know that the backend has been launched # and we should be able to open another connection. - # SSLRequest packet consists of packet length followed by - # NEGOTIATE_SSL_CODE. - my $negotiate_ssl_code = pack("Nnn", 8, 1234, 5679); - my $sent = $sock->send($negotiate_ssl_code); + $pgproto->send_ssl_request(); # Read reply. We expect the server to reject it with 'N' - my $reply = ""; - $sock->recv($reply, 1); + my $reply = $pgproto->read_type(); is($reply, "N", "dead-end connection $i"); push(@raw_connections, $sock); diff --git a/src/test/postmaster/t/002_start_stop.pl b/src/test/postmaster/t/002_start_stop.pl index 036b296f72b8..499af04b7938 100644 --- a/src/test/postmaster/t/002_start_stop.pl +++ b/src/test/postmaster/t/002_start_stop.pl @@ -6,6 +6,7 @@ use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::PgProto; use PostgreSQL::Test::Utils; use Test::More; @@ -49,6 +50,7 @@ for (my $i = 0; $i <= 20; $i++) { my $sock = $node->raw_connect(); + my $pgproto = PostgreSQL::Test::PgProto->new($sock); # On a busy system, the server might reject connections if # postmaster cannot accept() them fast enough. The exact limit @@ -58,14 +60,10 @@ # when it does so, we know that the backend has been launched # and we should be able to open another connection. - # SSLRequest packet consists of packet length followed by - # NEGOTIATE_SSL_CODE. - my $negotiate_ssl_code = pack("Nnn", 8, 1234, 5679); - my $sent = $sock->send($negotiate_ssl_code); + $pgproto->send_ssl_request(); # Read reply. We expect the server to reject it with 'N' - my $reply = ""; - $sock->recv($reply, 1); + my $reply = $pgproto->read_type(); is($reply, "N", "dead-end connection $i"); push(@raw_connections, $sock); From b1dc0eb9125aa262da1b2a4a5c351e078d659474 Mon Sep 17 00:00:00 2001 From: Anthonin Bonnefoy Date: Fri, 17 Jan 2025 15:20:52 +0100 Subject: [PATCH 2/2] Accept recovery conflict interrupt on blocked writing Previously, all interrupts except process dying were ignored while a process was blocked writing to a socket. If the connection to the client was broken (no clean FIN nor RST), a process sending results to the client could be stuck for 924s until the TCP retransmission timeout is reached. During this time, it was possible for the process to conflict with recovery: For example, the process returning results can have a conflicting buffer pin. To avoid blocking recovery for an extended period of time, this patch changes client write interrupts by handling recovery conflict interrupts instead of ignoring them. Since the interrupt happens while we're likely to have partially written results on the socket, there's no easy way to keep protocol sync so the session needs to be terminated. In addition, a blocked write could happen while interrupts are blocked. For example, we're sending an error message to the client and eventually saturate the socket buffer. In this case, we can't process the interrupt in ProcessClientWriteInterrupt. Instead, we stop retrying writes and trigger a write error, allowing to close the connection and termiate the session. --- src/backend/libpq/be-secure.c | 29 +++- src/backend/tcop/postgres.c | 163 +++++++++++++++--- src/include/tcop/tcopprot.h | 2 +- src/test/recovery/t/031_recovery_conflict.pl | 166 ++++++++++++++++--- 4 files changed, 302 insertions(+), 58 deletions(-) diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index 3b4f80146be8..3b6089464a18 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -306,6 +306,7 @@ secure_write(Port *port, void *ptr, size_t len) { ssize_t n; int waitfor; + bool retryable = true; /* Deal with any already-pending interrupt condition. */ ProcessClientWriteInterrupt(false); @@ -353,14 +354,28 @@ secure_write(Port *port, void *ptr, size_t len) if (event.events & WL_LATCH_SET) { ResetLatch(MyLatch); - ProcessClientWriteInterrupt(true); - - /* - * We'll retry the write. Most likely it will return immediately - * because there's still no buffer space available, and we'll wait - * for the socket to become ready again. - */ + retryable = ProcessClientWriteInterrupt(true); + if (!retryable) + { + /* + * The blocking write is interfering with recovery but + * ProcessClientWriteInterrupt can't process interrupts. This + * can happen when trying to send error messages to the client + * and saturating the buffer. To dislodge ourself, we give up + * retrying with a socket error. This will close the + * connection with a "connection to client lost" error during + * the next CHECK_FOR_INTERRUPTS. + */ + errno = ENOBUFS; + return -1; + } } + + /* + * We'll retry the write. Most likely it will return immediately + * because there's still no buffer space available, and we'll wait for + * the socket to become ready again. + */ goto retry; } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 5655348a2e29..24dfb638b5d2 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -184,6 +184,9 @@ static void drop_unnamed_stmt(void); static void log_disconnections(int code, Datum arg); static void enable_statement_timeout(void); static void disable_statement_timeout(void); +static void ProcessRecoveryConflictInterrupts(void); +static bool CheckBlockedWriteConflictInterrupts(ProcSignalReason *triggered_reason); +static int errdetail_recovery_conflict(ProcSignalReason reason); /* ---------------------------------------------------------------- @@ -540,49 +543,97 @@ ProcessClientReadInterrupt(bool blocked) * 'blocked' is true if no data could be written and we plan to retry, * false if about to write or done writing. * + * Return true if write is retryable. + * * Must preserve errno! */ -void +bool ProcessClientWriteInterrupt(bool blocked) { int save_errno = errno; + bool retryable = true; + bool can_process_interrupts = InterruptHoldoffCount == 0 && CritSectionCount == 0; + bool conflict_with_recovery = false; + ProcSignalReason reason; - if (ProcDiePending) + if (blocked && RecoveryConflictPending) + { + conflict_with_recovery = CheckBlockedWriteConflictInterrupts(&reason); + } + + if (!blocked && ProcDiePending) { /* - * We're dying. If it's not possible to write, then we should handle - * that immediately, else a stuck client could indefinitely delay our - * response to the signal. If we haven't tried to write yet, make - * sure the process latch is set, so that if the write would block - * then we'll come back here and die. If we're done writing, also - * make sure the process latch is set, as we might've undesirably - * cleared it while writing. + * We're dying and we haven't tried to write yet, make sure the + * process latch is set, so that if the write would block then we'll + * come back here and die. If we're done writing, also make sure the + * process latch is set, as we might've undesirably cleared it while + * writing. */ - if (blocked) + SetLatch(MyLatch); + } + else if (blocked && !can_process_interrupts && conflict_with_recovery) + { + /* + * We have a blocking write conflicting with recovery but interrupts + * can't be processed. This can happen when sending errors to the + * client and saturating the socket buffer. This will make the + * blocking write fail, triggering a "could not send data to client" + * error and closing the socket. Since the socket error will supersede + * the recovery conflict, we need to log the recovery conflict now. + */ + pgstat_report_recovery_conflict(reason); + ereport(COMMERROR, + (errcode(reason == PROCSIG_RECOVERY_CONFLICT_DATABASE ? + ERRCODE_DATABASE_DROPPED : + ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("Interrupting blocking write due to conflict with recovery"), + errdetail_recovery_conflict(reason))); + + retryable = false; + } + else if (blocked && can_process_interrupts && (ProcDiePending || conflict_with_recovery)) + { + /* + * Don't mess with whereToSendOutput if ProcessInterrupts wouldn't + * service ProcDiePending. + * + * We don't want to send the client the error message, as a) that + * would possibly block again, and b) it would likely lead to loss of + * protocol sync because we may have already sent a partial protocol + * message. + */ + if (whereToSendOutput == DestRemote) + whereToSendOutput = DestNone; + if (ProcDiePending) { /* - * Don't mess with whereToSendOutput if ProcessInterrupts wouldn't - * service ProcDiePending. + * We're dying and it's not possible to write so we should handle + * that immediately, else a stuck client could indefinitely delay + * our response to the signal. */ - if (InterruptHoldoffCount == 0 && CritSectionCount == 0) - { - /* - * We don't want to send the client the error message, as a) - * that would possibly block again, and b) it would likely - * lead to loss of protocol sync because we may have already - * sent a partial protocol message. - */ - if (whereToSendOutput == DestRemote) - whereToSendOutput = DestNone; - - CHECK_FOR_INTERRUPTS(); - } + CHECK_FOR_INTERRUPTS(); } else - SetLatch(MyLatch); + { + /* + * As we're in a blocked write, we can't keep protocol sync. + * Upgrade + */ + ExitOnAnyError = true; + ProcessRecoveryConflictInterrupts(); + } + + /* + * Both ProcDiePending and ProcessRecoveryConflictInterrupts should + * exit as interrupts can be processed and we checked that we did + * conflict with recovery + */ + pg_unreachable(); } errno = save_errno; + return retryable; } /* @@ -3227,6 +3278,66 @@ ProcessRecoveryConflictInterrupt(ProcSignalReason reason) } } +/* + * Check if a blocked write conflict with a specific conflict reason. + */ +static bool +CheckBlockedWriteConflictInterrupt(ProcSignalReason reason) +{ + switch (reason) + { + case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: + /* Blocked writes can't be waiting for a lock */ + return false; + case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: + /* Do we block the startup process? */ + return HoldingBufferPinThatDelaysRecovery(); + case PROCSIG_RECOVERY_CONFLICT_LOCK: + case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: + case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: + /* We can only block if we have an existing transaction */ + return IsTransactionOrTransactionBlock(); + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + + /* + * Aborted subtransaction and top level aborted transaction can be + * ignored + */ + return IsSubTransaction() || !IsAbortedTransactionBlockState(); + case PROCSIG_RECOVERY_CONFLICT_DATABASE: + /* This will always conflict */ + return true; + default: + return true; + } +} + +/* + * Check if the blocked write conflict with any conflict reason. + * + * If log_conflict is true, the recovery conflict will be logged and reported in the stats. + */ +static bool +CheckBlockedWriteConflictInterrupts(ProcSignalReason *triggered_reason) +{ + Assert(RecoveryConflictPending); + + for (ProcSignalReason reason = PROCSIG_RECOVERY_CONFLICT_FIRST; + reason <= PROCSIG_RECOVERY_CONFLICT_LAST; + reason++) + { + if (RecoveryConflictPendingReasons[reason]) + { + if (CheckBlockedWriteConflictInterrupt(reason)) + { + *triggered_reason = reason; + return true; + } + } + } + return false; +} + /* * Check each possible recovery conflict reason. */ diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index a62367f7793d..63b0442903d1 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -74,7 +74,7 @@ extern void StatementCancelHandler(SIGNAL_ARGS); extern void FloatExceptionHandler(SIGNAL_ARGS) pg_attribute_noreturn(); extern void HandleRecoveryConflictInterrupt(ProcSignalReason reason); extern void ProcessClientReadInterrupt(bool blocked); -extern void ProcessClientWriteInterrupt(bool blocked); +extern bool ProcessClientWriteInterrupt(bool blocked); extern void process_postgres_switches(int argc, char *argv[], GucContext ctx, const char **dbname); diff --git a/src/test/recovery/t/031_recovery_conflict.pl b/src/test/recovery/t/031_recovery_conflict.pl index 028b0b5f0e15..0f272f149b9d 100644 --- a/src/test/recovery/t/031_recovery_conflict.pl +++ b/src/test/recovery/t/031_recovery_conflict.pl @@ -7,8 +7,10 @@ use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::PgProto; use PostgreSQL::Test::Utils; use Test::More; +use Time::HiRes qw(usleep); # Set up nodes @@ -74,25 +76,10 @@ ## RECOVERY CONFLICT 1: Buffer pin conflict my $sect = "buffer pin conflict"; +my $cursor1 = "test_recovery_conflict_cursor"; $expected_conflicts++; -# Aborted INSERT on primary that will be cleaned up by vacuum. Has to be old -# enough so that there's not a snapshot conflict before the buffer pin -# conflict. - -$node_primary->safe_psql( - $test_db, - qq[ - BEGIN; - INSERT INTO $table1 VALUES (1,0); - ROLLBACK; - -- ensure flush, rollback doesn't do so - BEGIN; LOCK $table1; COMMIT; - ]); - -$node_primary->wait_for_replay_catchup($node_standby); - -my $cursor1 = "test_recovery_conflict_cursor"; +setup_bufferpin_conflict(); # DECLARE and use a cursor on standby, causing buffer with the only block of # the relation to be pinned on the standby @@ -120,8 +107,94 @@ check_conflict_log("User was holding shared buffer pin for too long"); $psql_standby->reconnect_and_clear(); -check_conflict_stat("bufferpin"); +check_conflict_stat("bufferpin", 1); + +SKIP: +{ + skip "those tests require working raw_connect()" + unless $node_standby->raw_connect_works(); + + ## RECOVERY CONFLICT 1 bis: Buffer pin conflict with conflicting query in ClientWrite + $sect = "buffer pin conflict (ClientWrite)"; + $expected_conflicts++; + + setup_bufferpin_conflict(); + + # The simplest way to get the user to use for the startup packet + # is to grab it from an existing psql session + my $user = $psql_standby->query_safe("SELECT current_user"); + + # Start the conflicting session + my $sock = $node_standby->raw_connect(); + my $pgproto = PostgreSQL::Test::PgProto->new($sock); + my %parameters = ( user => $user, database => $test_db, application_name => $ENV{PGAPPNAME} ); + + $pgproto->send_startup_message(\%parameters); + # Read until Ready For Query message + $pgproto->read_until_message('Z'); + my $pid = $pgproto->read_session_pid(); + + # We want the session to pin table1's block while staying in a ClientWrite + # state. To achieve that, we ask the server enough rows to saturate the + # buffer with the client not read those results. + $pgproto->send_simple_query(qq[ + BEGIN; + DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1; + FETCH FORWARD FROM $cursor1; + SELECT generate_series(1, 100000); + ]); + + # Check that our session is in ClientWrite + wait_for_wait_event($pid, 'ClientWrite'); + + # VACUUM FREEZE on the primary + $node_primary->safe_psql($test_db, qq[VACUUM FREEZE $table1;]); + + check_conflict_log("User was holding shared buffer pin for too long"); + + # The conflicting session should be terminated, consume everything until the socket is closed. + $pgproto->wait_until_closed(); + + check_conflict_stat("bufferpin", 2); + + ## RECOVERY CONFLICT 1 ter: Buffer pin conflict with conflicting query reports + ## an error and saturate socket buffer + $sect = "buffer pin conflict (Error report)"; + $expected_conflicts++; + setup_bufferpin_conflict(); + + # Start the conflicting session + $sock = $node_standby->raw_connect(); + $pgproto = PostgreSQL::Test::PgProto->new($sock); + + $pgproto->send_startup_message(\%parameters); + # Read until Ready For Query message + $pgproto->read_until_message('Z'); + $pid = $pgproto->read_session_pid(); + + # We want the session to pin table1's block while staying in a ClientWrite + # state and reporting an error to the client. + $pgproto->send_simple_query(qq[ + BEGIN; + DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1; + FETCH FORWARD FROM $cursor1; + DO \$\$BEGIN RAISE 'endless scream: %', repeat('a', 2000000); END;\$\$; + ]); + + # Check that our session is in ClientWrite + wait_for_wait_event($pid, 'ClientWrite'); + + # VACUUM FREEZE on the primary + $node_primary->safe_psql($test_db, qq[VACUUM FREEZE $table1;]); + + check_conflict_log("User was holding shared buffer pin for too long"); + + # The conflicting session should be terminated, consume everything until the socket is closed. + $pgproto->wait_until_closed(); + + check_conflict_stat("bufferpin", 3); +} ## RECOVERY CONFLICT 2: Snapshot conflict $sect = "snapshot conflict"; @@ -153,7 +226,7 @@ check_conflict_log( "User query might have needed to see row versions that must be removed"); $psql_standby->reconnect_and_clear(); -check_conflict_stat("snapshot"); +check_conflict_stat("snapshot", 1); ## RECOVERY CONFLICT 3: Lock conflict @@ -176,7 +249,7 @@ check_conflict_log("User was holding a relation lock for too long"); $psql_standby->reconnect_and_clear(); -check_conflict_stat("lock"); +check_conflict_stat("lock", 1); ## RECOVERY CONFLICT 4: Tablespace conflict @@ -206,7 +279,7 @@ check_conflict_log( "User was or might have been using tablespace that must be dropped"); $psql_standby->reconnect_and_clear(); -check_conflict_stat("tablespace"); +check_conflict_stat("tablespace", 1); ## RECOVERY CONFLICT 5: Deadlock @@ -271,7 +344,7 @@ check_conflict_log("User transaction caused buffer deadlock with recovery."); $psql_standby->reconnect_and_clear(); -check_conflict_stat("deadlock"); +check_conflict_stat("deadlock", 1); # clean up for next tests $node_primary->safe_psql($test_db, qq[ROLLBACK PREPARED 'lock';]); @@ -310,6 +383,22 @@ done_testing(); +sub setup_bufferpin_conflict +{ + # Aborted INSERT on primary that will be cleaned up by vacuum. Has to be old + # enough so that there's not a snapshot conflict before the buffer pin + # conflict. + $node_primary->safe_psql($test_db, + qq[BEGIN; + INSERT INTO $table1 VALUES (1,0); + ROLLBACK; + -- ensure flush, rollback doesn't do so + BEGIN; LOCK $table1; COMMIT;] + ); + + $node_primary->wait_for_replay_catchup($node_standby); +} + sub check_conflict_log { my $message = shift; @@ -318,16 +407,45 @@ sub check_conflict_log $log_location = $node_standby->wait_for_log(qr/$message/, $log_location); cmp_ok($log_location, '>', $old_log_location, - "$sect: logfile contains terminated connection due to recovery conflict" + "$sect: logfile contains '$message'" ); } sub check_conflict_stat { my $conflict_type = shift; + my $expected_count = shift; my $count = $node_standby->safe_psql($test_db, qq[SELECT confl_$conflict_type FROM pg_stat_database_conflicts WHERE datname='$test_db';] ); - is($count, 1, "$sect: stats show conflict on standby"); + is($count, $expected_count, "$sect: stats show $count conflicts on standby (expected $expected_count)"); +} + +sub wait_for_wait_event +{ + my $pid = shift; + my $expected_wait_event = shift; + + my $max_attempts = 10 * $PostgreSQL::Test::Utils::timeout_default; + my $attempts = 0; + my $wait_event = ""; + + while ($attempts < $max_attempts) + { + $wait_event = $node_standby->safe_psql($test_db, + qq[SELECT wait_event FROM pg_stat_activity WHERE pid=$pid;] + ); + + if ($wait_event eq $expected_wait_event) { + last; + } + + # Wait 0.1 second before retrying. + usleep(100_000); + + $attempts++; + } + + is($wait_event, $expected_wait_event, "$sect: session with pid $pid has wait_event $wait_event"); }