diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index 3b4f80146be8..3b6089464a18 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -306,6 +306,7 @@ secure_write(Port *port, void *ptr, size_t len) { ssize_t n; int waitfor; + bool retryable = true; /* Deal with any already-pending interrupt condition. */ ProcessClientWriteInterrupt(false); @@ -353,14 +354,28 @@ secure_write(Port *port, void *ptr, size_t len) if (event.events & WL_LATCH_SET) { ResetLatch(MyLatch); - ProcessClientWriteInterrupt(true); - - /* - * We'll retry the write. Most likely it will return immediately - * because there's still no buffer space available, and we'll wait - * for the socket to become ready again. - */ + retryable = ProcessClientWriteInterrupt(true); + if (!retryable) + { + /* + * The blocking write is interfering with recovery but + * ProcessClientWriteInterrupt can't process interrupts. This + * can happen when trying to send error messages to the client + * and saturating the buffer. To dislodge ourself, we give up + * retrying with a socket error. This will close the + * connection with a "connection to client lost" error during + * the next CHECK_FOR_INTERRUPTS. + */ + errno = ENOBUFS; + return -1; + } } + + /* + * We'll retry the write. Most likely it will return immediately + * because there's still no buffer space available, and we'll wait for + * the socket to become ready again. + */ goto retry; } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 5655348a2e29..24dfb638b5d2 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -184,6 +184,9 @@ static void drop_unnamed_stmt(void); static void log_disconnections(int code, Datum arg); static void enable_statement_timeout(void); static void disable_statement_timeout(void); +static void ProcessRecoveryConflictInterrupts(void); +static bool CheckBlockedWriteConflictInterrupts(ProcSignalReason *triggered_reason); +static int errdetail_recovery_conflict(ProcSignalReason reason); /* ---------------------------------------------------------------- @@ -540,49 +543,97 @@ ProcessClientReadInterrupt(bool blocked) * 'blocked' is true if no data could be written and we plan to retry, * false if about to write or done writing. * + * Return true if write is retryable. + * * Must preserve errno! */ -void +bool ProcessClientWriteInterrupt(bool blocked) { int save_errno = errno; + bool retryable = true; + bool can_process_interrupts = InterruptHoldoffCount == 0 && CritSectionCount == 0; + bool conflict_with_recovery = false; + ProcSignalReason reason; - if (ProcDiePending) + if (blocked && RecoveryConflictPending) + { + conflict_with_recovery = CheckBlockedWriteConflictInterrupts(&reason); + } + + if (!blocked && ProcDiePending) { /* - * We're dying. If it's not possible to write, then we should handle - * that immediately, else a stuck client could indefinitely delay our - * response to the signal. If we haven't tried to write yet, make - * sure the process latch is set, so that if the write would block - * then we'll come back here and die. If we're done writing, also - * make sure the process latch is set, as we might've undesirably - * cleared it while writing. + * We're dying and we haven't tried to write yet, make sure the + * process latch is set, so that if the write would block then we'll + * come back here and die. If we're done writing, also make sure the + * process latch is set, as we might've undesirably cleared it while + * writing. */ - if (blocked) + SetLatch(MyLatch); + } + else if (blocked && !can_process_interrupts && conflict_with_recovery) + { + /* + * We have a blocking write conflicting with recovery but interrupts + * can't be processed. This can happen when sending errors to the + * client and saturating the socket buffer. This will make the + * blocking write fail, triggering a "could not send data to client" + * error and closing the socket. Since the socket error will supersede + * the recovery conflict, we need to log the recovery conflict now. + */ + pgstat_report_recovery_conflict(reason); + ereport(COMMERROR, + (errcode(reason == PROCSIG_RECOVERY_CONFLICT_DATABASE ? + ERRCODE_DATABASE_DROPPED : + ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("Interrupting blocking write due to conflict with recovery"), + errdetail_recovery_conflict(reason))); + + retryable = false; + } + else if (blocked && can_process_interrupts && (ProcDiePending || conflict_with_recovery)) + { + /* + * Don't mess with whereToSendOutput if ProcessInterrupts wouldn't + * service ProcDiePending. + * + * We don't want to send the client the error message, as a) that + * would possibly block again, and b) it would likely lead to loss of + * protocol sync because we may have already sent a partial protocol + * message. + */ + if (whereToSendOutput == DestRemote) + whereToSendOutput = DestNone; + if (ProcDiePending) { /* - * Don't mess with whereToSendOutput if ProcessInterrupts wouldn't - * service ProcDiePending. + * We're dying and it's not possible to write so we should handle + * that immediately, else a stuck client could indefinitely delay + * our response to the signal. */ - if (InterruptHoldoffCount == 0 && CritSectionCount == 0) - { - /* - * We don't want to send the client the error message, as a) - * that would possibly block again, and b) it would likely - * lead to loss of protocol sync because we may have already - * sent a partial protocol message. - */ - if (whereToSendOutput == DestRemote) - whereToSendOutput = DestNone; - - CHECK_FOR_INTERRUPTS(); - } + CHECK_FOR_INTERRUPTS(); } else - SetLatch(MyLatch); + { + /* + * As we're in a blocked write, we can't keep protocol sync. + * Upgrade + */ + ExitOnAnyError = true; + ProcessRecoveryConflictInterrupts(); + } + + /* + * Both ProcDiePending and ProcessRecoveryConflictInterrupts should + * exit as interrupts can be processed and we checked that we did + * conflict with recovery + */ + pg_unreachable(); } errno = save_errno; + return retryable; } /* @@ -3227,6 +3278,66 @@ ProcessRecoveryConflictInterrupt(ProcSignalReason reason) } } +/* + * Check if a blocked write conflict with a specific conflict reason. + */ +static bool +CheckBlockedWriteConflictInterrupt(ProcSignalReason reason) +{ + switch (reason) + { + case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK: + /* Blocked writes can't be waiting for a lock */ + return false; + case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN: + /* Do we block the startup process? */ + return HoldingBufferPinThatDelaysRecovery(); + case PROCSIG_RECOVERY_CONFLICT_LOCK: + case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: + case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: + /* We can only block if we have an existing transaction */ + return IsTransactionOrTransactionBlock(); + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: + + /* + * Aborted subtransaction and top level aborted transaction can be + * ignored + */ + return IsSubTransaction() || !IsAbortedTransactionBlockState(); + case PROCSIG_RECOVERY_CONFLICT_DATABASE: + /* This will always conflict */ + return true; + default: + return true; + } +} + +/* + * Check if the blocked write conflict with any conflict reason. + * + * If log_conflict is true, the recovery conflict will be logged and reported in the stats. + */ +static bool +CheckBlockedWriteConflictInterrupts(ProcSignalReason *triggered_reason) +{ + Assert(RecoveryConflictPending); + + for (ProcSignalReason reason = PROCSIG_RECOVERY_CONFLICT_FIRST; + reason <= PROCSIG_RECOVERY_CONFLICT_LAST; + reason++) + { + if (RecoveryConflictPendingReasons[reason]) + { + if (CheckBlockedWriteConflictInterrupt(reason)) + { + *triggered_reason = reason; + return true; + } + } + } + return false; +} + /* * Check each possible recovery conflict reason. */ diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index a62367f7793d..63b0442903d1 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -74,7 +74,7 @@ extern void StatementCancelHandler(SIGNAL_ARGS); extern void FloatExceptionHandler(SIGNAL_ARGS) pg_attribute_noreturn(); extern void HandleRecoveryConflictInterrupt(ProcSignalReason reason); extern void ProcessClientReadInterrupt(bool blocked); -extern void ProcessClientWriteInterrupt(bool blocked); +extern bool ProcessClientWriteInterrupt(bool blocked); extern void process_postgres_switches(int argc, char *argv[], GucContext ctx, const char **dbname); diff --git a/src/test/perl/Makefile b/src/test/perl/Makefile index d82fb67540ed..d8e9fb6b00d2 100644 --- a/src/test/perl/Makefile +++ b/src/test/perl/Makefile @@ -25,6 +25,7 @@ install: all installdirs $(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/Kerberos.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Kerberos.pm' $(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/Cluster.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Cluster.pm' $(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/BackgroundPsql.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/BackgroundPsql.pm' + $(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/PgProto.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/PgProto.pm' $(INSTALL_DATA) $(srcdir)/PostgreSQL/Test/AdjustUpgrade.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/AdjustUpgrade.pm' $(INSTALL_DATA) $(srcdir)/PostgreSQL/Version.pm '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Version.pm' @@ -35,6 +36,7 @@ uninstall: rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Kerberos.pm' rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/Cluster.pm' rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/BackgroundPsql.pm' + rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/PgProto.pm' rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Test/AdjustUpgrade.pm' rm -f '$(DESTDIR)$(pgxsdir)/$(subdir)/PostgreSQL/Version.pm' diff --git a/src/test/perl/PostgreSQL/Test/PgProto.pm b/src/test/perl/PostgreSQL/Test/PgProto.pm new file mode 100644 index 000000000000..8a0659c1c5ee --- /dev/null +++ b/src/test/perl/PostgreSQL/Test/PgProto.pm @@ -0,0 +1,276 @@ + +# Copyright (c) 2025, PostgreSQL Global Development Group + +=pod + +=head1 NAME + +PostgreSQL::Test::PgProto - class for manipulating PG protocol over a raw socket + +=head1 SYNOPSIS + + use PostgreSQL::Test::PgProto; + + my $node = PostgreSQL::Test::PgProto->new('mynode'); + + # Create a data directory with initdb + $node->init(); + + # Start the PostgreSQL server + $node->start(); + + # Get raw socket to the node + my $sock = $node->raw_connect(); + + # Create pgproto from the raw socket + my $pgproto = PostgreSQL::Test::PgProto->new($sock); + + # Send startup packet + my %parameters = ( user => "postgres", database => "postgres", application_name => "app" ); + $pgproto->send_startup_message(\%parameters); + + # Read startup sequence until Ready For Query is reached + $pgproto->read_until_message('Z'); + + # Send a simple query + $pgproto->send_simple_query('SELECT 1'); + +=head1 DESCRIPTION + +PostgreSQL::Test::PgProto contains functionality for sending and reading +PostgreSQL protocol messages over a raw socket. + +=cut + +package PostgreSQL::Test::PgProto; +use Test::More; + +use strict; +use warnings FATAL => 'all'; + +=pod + +=head1 METHODS + +=over + +=item PostgreSQL::Test::PgProto->new($sock) + +Builds a new object of class C. + +=cut + +sub new +{ + my $class = shift; + my ($sock) = @_; + + my $pgproto = { + 'sock' => $sock + }; + bless $pgproto, $class; + return $pgproto; +} + +=pod + +=item $pgproto->send_startup_message + +Send a startup message with the provided parameters. Database parameters need to +be passed as a reference to a hash. The server's response is not consumed. + +=cut + +sub send_startup_message +{ + my $self = shift; + my $parameters = shift; + + # Startup packet contains: + # Packet length: 4 bytes + # Major proto version: 2 bytes + # Minor proto version: 2 bytes + # Multiple Parameters: + # Parameter Name (null terminated string) + # Parameter Value (null terminated string) + # Ending null character + my $pack_template = "Nnn(Z*Z*)" . keys(%{$parameters}) . 'x'; + # Packet length, proto and final null character + my $total_length = 9; + + for(keys %{$parameters}){ + my $key_length = length($_) + 1; + my $value_length = length($parameters->{$_}) + 1; + $total_length += $key_length + $value_length; + } + + my $startup_packet = pack($pack_template, $total_length, 3, 0, %{$parameters}); + $self->{sock}->send($startup_packet); +} + +=pod + +=item $pgproto->send_ssl_request + +Send an ssl request packet to the server. The server's response is not consumed. + +=cut + +sub send_ssl_request +{ + my $self = shift; + + # SSLRequest packet contains: + # Packet length: 4 bytes + # NEGOTIATE_SSL_CODE (1234, 5679): 4 bytes + my $ssl_request_packet = pack("Nnn", 8, 1234, 5679); + + $self->{sock}->send($ssl_request_packet); +} + +=pod + +=item $pgproto->send_simple_query + +Send a simple query message to the server. The response is not consumed. + +=cut + +sub send_simple_query +{ + my ($self, $query) = @_; + + # Query message contains: + # Message type 'Q' (1 byte) + # Message length not including message type (4 bytes) + # Null terminated string + my $query_packet = pack("CNZ*", ord('Q'), length($query) + 5, $query); + note "Sending following simple query through raw_tcp: $query"; + $self->{sock}->send($query_packet); +} + +=pod + +=item $pgproto->read_session_pid + +Returns the pid of the session. The session needs to be in a ready for query +state. All results will be consumed and will leave the session in a ready for +query state. + +=cut + +sub read_session_pid +{ + my ($self) = @_; + + $self->send_simple_query("select pg_backend_pid()"); + my $data_row = $self->read_until_message('D'); + # We should have only one field and one column with the + # pid representing the rest of the payload outside of the + # data row header + my ($field_count, $column_length, $pid) = unpack("nNA*", $data_row); + note "raw_tcp has pid $pid"; + # Consume until Ready for query is reached + $self->read_until_message('Z'); + return $pid; +} + +=pod + +=item $pgproto->wait_until_closed + +Block and read all responses until the socket is terminated by the server. + +=cut + +sub wait_until_closed +{ + my ($self) = @_; + my $received = ""; + + while (1) { + $self->{sock}->recv($received, 64*1024); + if ($received eq "") { + # Closed socket was detected + return; + } + } +} + +=pod + +=item $pgproto->read_type + +Read a 1 byte type from the socket. + +=cut + +sub read_type +{ + my ($self) = @_; + + my $type = ""; + $self->{sock}->recv($type, 1); + return $type; +} + +=pod + +=item $pgproto->read_length + +Read a 4 bytes length from the socket. + +=cut + +sub read_length +{ + my ($self) = @_; + + my $length_reply = ""; + $self->{sock}->recv($length_reply, 4); + my ($length) = unpack("N", $length_reply); + return $length; +} + +=pod + +=item $pgproto->read_until_message + +Read all messages from the server until C is found. The message's +payload will be returned. + +=cut + +sub read_until_message +{ + my ($self, $message_type) = @_; + + note "Reading until message of type $message_type is found"; + while (1) + { + my $type = $self->read_type(); + if ($type eq "") { + diag("Reached end of the socket before type $message_type was found"); + return; + } + my $length = $self->read_length(); + note "Reading message of type $type and length $length"; + + # Need to remove message's length from the payload's length + $length -= 4; + if ($length < 0) { + diag("read_until_message Unexpected payload length $length"); + return; + } + my $payload = ""; + $self->{sock}->recv($payload, $length); + + if ($type eq $message_type) { + # We've found the desired message type + note "Found expected message type $message_type"; + return $payload; + } + } +} + +1; diff --git a/src/test/perl/meson.build b/src/test/perl/meson.build index 58e30f15f9d2..04639a11d953 100644 --- a/src/test/perl/meson.build +++ b/src/test/perl/meson.build @@ -13,5 +13,6 @@ install_data( 'PostgreSQL/Test/Kerberos.pm', 'PostgreSQL/Test/Cluster.pm', 'PostgreSQL/Test/BackgroundPsql.pm', + 'PostgreSQL/Test/PgProto.pm', 'PostgreSQL/Test/AdjustUpgrade.pm', install_dir: dir_pgxs / 'src/test/perl/PostgreSQL/Test') diff --git a/src/test/postmaster/t/001_connection_limits.pl b/src/test/postmaster/t/001_connection_limits.pl index 8cfa6e0ced53..b6df0e8a3efa 100644 --- a/src/test/postmaster/t/001_connection_limits.pl +++ b/src/test/postmaster/t/001_connection_limits.pl @@ -7,6 +7,7 @@ use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::PgProto; use PostgreSQL::Test::Utils; use Test::More; @@ -84,6 +85,7 @@ sub background_psql_as_user for (my $i = 0; $i <= 20; $i++) { my $sock = $node->raw_connect(); + my $pgproto = PostgreSQL::Test::PgProto->new($sock); # On a busy system, the server might reject connections if # postmaster cannot accept() them fast enough. The exact limit @@ -93,14 +95,10 @@ sub background_psql_as_user # when it does so, we know that the backend has been launched # and we should be able to open another connection. - # SSLRequest packet consists of packet length followed by - # NEGOTIATE_SSL_CODE. - my $negotiate_ssl_code = pack("Nnn", 8, 1234, 5679); - my $sent = $sock->send($negotiate_ssl_code); + $pgproto->send_ssl_request(); # Read reply. We expect the server to reject it with 'N' - my $reply = ""; - $sock->recv($reply, 1); + my $reply = $pgproto->read_type(); is($reply, "N", "dead-end connection $i"); push(@raw_connections, $sock); diff --git a/src/test/postmaster/t/002_start_stop.pl b/src/test/postmaster/t/002_start_stop.pl index 036b296f72b8..499af04b7938 100644 --- a/src/test/postmaster/t/002_start_stop.pl +++ b/src/test/postmaster/t/002_start_stop.pl @@ -6,6 +6,7 @@ use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::PgProto; use PostgreSQL::Test::Utils; use Test::More; @@ -49,6 +50,7 @@ for (my $i = 0; $i <= 20; $i++) { my $sock = $node->raw_connect(); + my $pgproto = PostgreSQL::Test::PgProto->new($sock); # On a busy system, the server might reject connections if # postmaster cannot accept() them fast enough. The exact limit @@ -58,14 +60,10 @@ # when it does so, we know that the backend has been launched # and we should be able to open another connection. - # SSLRequest packet consists of packet length followed by - # NEGOTIATE_SSL_CODE. - my $negotiate_ssl_code = pack("Nnn", 8, 1234, 5679); - my $sent = $sock->send($negotiate_ssl_code); + $pgproto->send_ssl_request(); # Read reply. We expect the server to reject it with 'N' - my $reply = ""; - $sock->recv($reply, 1); + my $reply = $pgproto->read_type(); is($reply, "N", "dead-end connection $i"); push(@raw_connections, $sock); diff --git a/src/test/recovery/t/031_recovery_conflict.pl b/src/test/recovery/t/031_recovery_conflict.pl index 028b0b5f0e15..0f272f149b9d 100644 --- a/src/test/recovery/t/031_recovery_conflict.pl +++ b/src/test/recovery/t/031_recovery_conflict.pl @@ -7,8 +7,10 @@ use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::PgProto; use PostgreSQL::Test::Utils; use Test::More; +use Time::HiRes qw(usleep); # Set up nodes @@ -74,25 +76,10 @@ ## RECOVERY CONFLICT 1: Buffer pin conflict my $sect = "buffer pin conflict"; +my $cursor1 = "test_recovery_conflict_cursor"; $expected_conflicts++; -# Aborted INSERT on primary that will be cleaned up by vacuum. Has to be old -# enough so that there's not a snapshot conflict before the buffer pin -# conflict. - -$node_primary->safe_psql( - $test_db, - qq[ - BEGIN; - INSERT INTO $table1 VALUES (1,0); - ROLLBACK; - -- ensure flush, rollback doesn't do so - BEGIN; LOCK $table1; COMMIT; - ]); - -$node_primary->wait_for_replay_catchup($node_standby); - -my $cursor1 = "test_recovery_conflict_cursor"; +setup_bufferpin_conflict(); # DECLARE and use a cursor on standby, causing buffer with the only block of # the relation to be pinned on the standby @@ -120,8 +107,94 @@ check_conflict_log("User was holding shared buffer pin for too long"); $psql_standby->reconnect_and_clear(); -check_conflict_stat("bufferpin"); +check_conflict_stat("bufferpin", 1); + +SKIP: +{ + skip "those tests require working raw_connect()" + unless $node_standby->raw_connect_works(); + + ## RECOVERY CONFLICT 1 bis: Buffer pin conflict with conflicting query in ClientWrite + $sect = "buffer pin conflict (ClientWrite)"; + $expected_conflicts++; + + setup_bufferpin_conflict(); + + # The simplest way to get the user to use for the startup packet + # is to grab it from an existing psql session + my $user = $psql_standby->query_safe("SELECT current_user"); + + # Start the conflicting session + my $sock = $node_standby->raw_connect(); + my $pgproto = PostgreSQL::Test::PgProto->new($sock); + my %parameters = ( user => $user, database => $test_db, application_name => $ENV{PGAPPNAME} ); + + $pgproto->send_startup_message(\%parameters); + # Read until Ready For Query message + $pgproto->read_until_message('Z'); + my $pid = $pgproto->read_session_pid(); + + # We want the session to pin table1's block while staying in a ClientWrite + # state. To achieve that, we ask the server enough rows to saturate the + # buffer with the client not read those results. + $pgproto->send_simple_query(qq[ + BEGIN; + DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1; + FETCH FORWARD FROM $cursor1; + SELECT generate_series(1, 100000); + ]); + + # Check that our session is in ClientWrite + wait_for_wait_event($pid, 'ClientWrite'); + + # VACUUM FREEZE on the primary + $node_primary->safe_psql($test_db, qq[VACUUM FREEZE $table1;]); + + check_conflict_log("User was holding shared buffer pin for too long"); + + # The conflicting session should be terminated, consume everything until the socket is closed. + $pgproto->wait_until_closed(); + + check_conflict_stat("bufferpin", 2); + + ## RECOVERY CONFLICT 1 ter: Buffer pin conflict with conflicting query reports + ## an error and saturate socket buffer + $sect = "buffer pin conflict (Error report)"; + $expected_conflicts++; + setup_bufferpin_conflict(); + + # Start the conflicting session + $sock = $node_standby->raw_connect(); + $pgproto = PostgreSQL::Test::PgProto->new($sock); + + $pgproto->send_startup_message(\%parameters); + # Read until Ready For Query message + $pgproto->read_until_message('Z'); + $pid = $pgproto->read_session_pid(); + + # We want the session to pin table1's block while staying in a ClientWrite + # state and reporting an error to the client. + $pgproto->send_simple_query(qq[ + BEGIN; + DECLARE $cursor1 CURSOR FOR SELECT b FROM $table1; + FETCH FORWARD FROM $cursor1; + DO \$\$BEGIN RAISE 'endless scream: %', repeat('a', 2000000); END;\$\$; + ]); + + # Check that our session is in ClientWrite + wait_for_wait_event($pid, 'ClientWrite'); + + # VACUUM FREEZE on the primary + $node_primary->safe_psql($test_db, qq[VACUUM FREEZE $table1;]); + + check_conflict_log("User was holding shared buffer pin for too long"); + + # The conflicting session should be terminated, consume everything until the socket is closed. + $pgproto->wait_until_closed(); + + check_conflict_stat("bufferpin", 3); +} ## RECOVERY CONFLICT 2: Snapshot conflict $sect = "snapshot conflict"; @@ -153,7 +226,7 @@ check_conflict_log( "User query might have needed to see row versions that must be removed"); $psql_standby->reconnect_and_clear(); -check_conflict_stat("snapshot"); +check_conflict_stat("snapshot", 1); ## RECOVERY CONFLICT 3: Lock conflict @@ -176,7 +249,7 @@ check_conflict_log("User was holding a relation lock for too long"); $psql_standby->reconnect_and_clear(); -check_conflict_stat("lock"); +check_conflict_stat("lock", 1); ## RECOVERY CONFLICT 4: Tablespace conflict @@ -206,7 +279,7 @@ check_conflict_log( "User was or might have been using tablespace that must be dropped"); $psql_standby->reconnect_and_clear(); -check_conflict_stat("tablespace"); +check_conflict_stat("tablespace", 1); ## RECOVERY CONFLICT 5: Deadlock @@ -271,7 +344,7 @@ check_conflict_log("User transaction caused buffer deadlock with recovery."); $psql_standby->reconnect_and_clear(); -check_conflict_stat("deadlock"); +check_conflict_stat("deadlock", 1); # clean up for next tests $node_primary->safe_psql($test_db, qq[ROLLBACK PREPARED 'lock';]); @@ -310,6 +383,22 @@ done_testing(); +sub setup_bufferpin_conflict +{ + # Aborted INSERT on primary that will be cleaned up by vacuum. Has to be old + # enough so that there's not a snapshot conflict before the buffer pin + # conflict. + $node_primary->safe_psql($test_db, + qq[BEGIN; + INSERT INTO $table1 VALUES (1,0); + ROLLBACK; + -- ensure flush, rollback doesn't do so + BEGIN; LOCK $table1; COMMIT;] + ); + + $node_primary->wait_for_replay_catchup($node_standby); +} + sub check_conflict_log { my $message = shift; @@ -318,16 +407,45 @@ sub check_conflict_log $log_location = $node_standby->wait_for_log(qr/$message/, $log_location); cmp_ok($log_location, '>', $old_log_location, - "$sect: logfile contains terminated connection due to recovery conflict" + "$sect: logfile contains '$message'" ); } sub check_conflict_stat { my $conflict_type = shift; + my $expected_count = shift; my $count = $node_standby->safe_psql($test_db, qq[SELECT confl_$conflict_type FROM pg_stat_database_conflicts WHERE datname='$test_db';] ); - is($count, 1, "$sect: stats show conflict on standby"); + is($count, $expected_count, "$sect: stats show $count conflicts on standby (expected $expected_count)"); +} + +sub wait_for_wait_event +{ + my $pid = shift; + my $expected_wait_event = shift; + + my $max_attempts = 10 * $PostgreSQL::Test::Utils::timeout_default; + my $attempts = 0; + my $wait_event = ""; + + while ($attempts < $max_attempts) + { + $wait_event = $node_standby->safe_psql($test_db, + qq[SELECT wait_event FROM pg_stat_activity WHERE pid=$pid;] + ); + + if ($wait_event eq $expected_wait_event) { + last; + } + + # Wait 0.1 second before retrying. + usleep(100_000); + + $attempts++; + } + + is($wait_event, $expected_wait_event, "$sect: session with pid $pid has wait_event $wait_event"); }