diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index f373cead95f5..1931163a2b1c 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1538,6 +1538,46 @@ ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait) return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); } +/* + * Check if the buffer is already undergoing read AIO. If it is, assign the + * IO's wait reference to operation->io_wref, thereby allowing the caller to + * wait for that IO. + */ +static inline bool +ReadBuffersIOAlreadyInProgress(ReadBuffersOperation *operation, Buffer buffer) +{ + BufferDesc *desc; + uint32 buf_state; + PgAioWaitRef iow; + + pgaio_wref_clear(&iow); + + if (BufferIsLocal(buffer)) + { + desc = GetLocalBufferDescriptor(-buffer - 1); + buf_state = pg_atomic_read_u32(&desc->state); + if ((buf_state & BM_IO_IN_PROGRESS) && !(buf_state & BM_VALID)) + iow = desc->io_wref; + } + else + { + desc = GetBufferDescriptor(buffer - 1); + buf_state = LockBufHdr(desc); + + if ((buf_state & BM_IO_IN_PROGRESS) && !(buf_state & BM_VALID)) + iow = desc->io_wref; + UnlockBufHdr(desc, buf_state); + } + + if (pgaio_wref_valid(&iow)) + { + operation->io_wref = iow; + return true; + } + + return false; +} + /* * Helper for AsyncReadBuffers that tries to get the buffer ready for IO. */ @@ -1670,7 +1710,7 @@ WaitReadBuffers(ReadBuffersOperation *operation) * * we first check if we already know the IO is complete. */ - if (aio_ret->result.status == PGAIO_RS_UNKNOWN && + if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) && !pgaio_wref_check_done(&operation->io_wref)) { instr_time io_start = pgstat_prepare_io_time(track_io_timing); @@ -1689,11 +1729,66 @@ WaitReadBuffers(ReadBuffersOperation *operation) Assert(pgaio_wref_check_done(&operation->io_wref)); } - /* - * We now are sure the IO completed. Check the results. This - * includes reporting on errors if there were any. - */ - ProcessReadBuffersResult(operation); + if (unlikely(operation->foreign_io)) + { + Buffer buffer = operation->buffers[operation->nblocks_done]; + BufferDesc *desc; + uint32 buf_state; + + if (BufferIsLocal(buffer)) + { + desc = GetLocalBufferDescriptor(-buffer - 1); + buf_state = pg_atomic_read_u32(&desc->state); + } + else + { + desc = GetBufferDescriptor(buffer - 1); + buf_state = LockBufHdr(desc); + UnlockBufHdr(desc, buf_state); + } + + if (buf_state & BM_VALID) + { + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + /* + * Report and track this as a 'hit' for this backend, even + * though it must have started out as a miss in + * PinBufferForBlock(). The other backend (or ourselves, + * as part of a read started earlier) will track this as a + * 'read'. + */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(operation->forknum, + operation->blocknum + operation->nblocks_done, + operation->smgr->smgr_rlocator.locator.spcOid, + operation->smgr->smgr_rlocator.locator.dbOid, + operation->smgr->smgr_rlocator.locator.relNumber, + operation->smgr->smgr_rlocator.backend, + true); + + if (BufferIsLocal(buffer)) + pgBufferUsage.local_blks_hit += 1; + else + pgBufferUsage.shared_blks_hit += 1; + + if (operation->rel) + pgstat_count_buffer_hit(operation->rel); + + pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); + + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageHit; + } + } + else + { + /* + * We now are sure the IO completed. Check the results. This + * includes reporting on errors if there were any. + */ + ProcessReadBuffersResult(operation); + } } /* @@ -1779,6 +1874,43 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) io_object = IOOBJECT_RELATION; } + /* + * If AIO is in progress, be it in this backend or another backend, we + * just associate the wait reference with the operation and wait in + * WaitReadBuffers(). This turns out to be important for performance in + * two workloads: + * + * 1) A read stream that has to read the same block multiple times within + * the readahead distance. This can happen e.g. for the table accesses of + * an index scan. + * + * 2) Concurrent scans by multiple backends on the same relation. + * + * If we were to synchronously wait for the in-progress IO, we'd not be + * able to keep enough I/O in flight. + * + * If we do find there is ongoing I/O for the buffer, we set up a 1-block + * ReadBuffersOperation that WaitReadBuffers then can wait on. + * + * It's possible that another backend starts IO on the buffer between this + * check and the ReadBuffersCanStartIO(nowait = false) below. In that case + * we will synchronously wait for the IO below, but the window for that is + * small enough that it won't happen often enough to have a significant + * performance impact. + */ + if (ReadBuffersIOAlreadyInProgress(operation, buffers[nblocks_done])) + { + *nblocks_progress = 1; + operation->foreign_io = true; + + CheckReadBuffersOperation(operation, false); + + + return true; + } + + operation->foreign_io = false; + /* * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR * flag. The reason for that is that, hopefully, zero_damaged_pages isn't @@ -1836,9 +1968,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) /* * Check if we can start IO on the first to-be-read buffer. * - * If an I/O is already in progress in another backend, we want to wait - * for the outcome: either done, or something went wrong and we will - * retry. + * If a synchronous I/O is in progress in another backend (it can't be + * this backend), we want to wait for the outcome: either done, or + * something went wrong and we will retry. */ if (!ReadBuffersCanStartIO(buffers[nblocks_done], false)) { diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 9f6785910e01..21d4f22c2d8a 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -147,6 +147,7 @@ struct ReadBuffersOperation int flags; int16 nblocks; int16 nblocks_done; + bool foreign_io; PgAioWaitRef io_wref; PgAioReturn io_return; }; diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index 73d2fd68eaa1..d571d9da00d3 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -32,6 +32,8 @@ tests += { 'tests': [ 't/001_aio.pl', 't/002_io_workers.pl', + 't/003_initdb.pl', + 't/004_read_stream.pl', ], }, } diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index 3f0453619e89..e7f3358f2d19 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -7,126 +7,55 @@ use PostgreSQL::Test::Utils; use Test::More; +use FindBin; +use lib $FindBin::RealBin; -### -# Test io_method=worker -### -my $node_worker = create_node('worker'); -$node_worker->start(); - -test_generic('worker', $node_worker); -SKIP: -{ - skip 'Injection points not supported by this build', 1 - unless $ENV{enable_injection_points} eq 'yes'; - test_inject_worker('worker', $node_worker); -} +use TestAio; -$node_worker->stop(); +my %nodes; ### -# Test io_method=io_uring +# Create and configure one instance for each io_method ### -if (have_io_uring()) +foreach my $method (TestAio::supported_io_methods()) { - my $node_uring = create_node('io_uring'); - $node_uring->start(); - test_generic('io_uring', $node_uring); - $node_uring->stop(); -} - - -### -# Test io_method=sync -### - -my $node_sync = create_node('sync'); + my $node = PostgreSQL::Test::Cluster->new($method); -# just to have one test not use the default auto-tuning + $nodes{$method} = $node; + $node->init(); + $node->append_conf('postgresql.conf', "io_method=$method"); + TestAio::configure($node); +} -$node_sync->append_conf( +# Just to have one test not use the default auto-tuning +$nodes{'sync'}->append_conf( 'postgresql.conf', qq( -io_max_concurrency=4 + io_max_concurrency=4 )); -$node_sync->start(); -test_generic('sync', $node_sync); -$node_sync->stop(); - -done_testing(); - ### -# Test Helpers +# Execute the tests for each io_method ### -sub create_node +foreach my $method (TestAio::supported_io_methods()) { - local $Test::Builder::Level = $Test::Builder::Level + 1; - - my $io_method = shift; + my $node = $nodes{$method}; - my $node = PostgreSQL::Test::Cluster->new($io_method); - - # Want to test initdb for each IO method, otherwise we could just reuse - # the cluster. - # - # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the - # options specified by ->extra, if somebody puts -c io_method=xyz in - # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we - # detect it. - local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS}; - if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS} - && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/) - { - $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method"; - } - - $node->init(extra => [ '-c', "io_method=$io_method" ]); - - $node->append_conf( - 'postgresql.conf', qq( -shared_preload_libraries=test_aio -log_min_messages = 'DEBUG3' -log_statement=all -log_error_verbosity=default -restart_after_crash=false -temp_buffers=100 -)); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} - # Even though we used -c io_method=... above, if TEMP_CONFIG sets - # io_method, it'd override the setting persisted at initdb time. While - # using (and later verifying) the setting from initdb provides some - # verification of having used the io_method during initdb, it's probably - # not worth the complication of only appending if the variable is set in - # in TEMP_CONFIG. - $node->append_conf( - 'postgresql.conf', qq( -io_method=$io_method -)); +done_testing(); - ok(1, "$io_method: initdb"); - return $node; -} +### +# Test Helpers +### -sub have_io_uring -{ - # To detect if io_uring is supported, we look at the error message for - # assigning an invalid value to an enum GUC, which lists all the valid - # options. We need to use -C to deal with running as administrator on - # windows, the superuser check is omitted if -C is used. - my ($stdout, $stderr) = - run_command [qw(postgres -C invalid -c io_method=invalid)]; - die "can't determine supported io_method values" - unless $stderr =~ m/Available values: ([^\.]+)\./; - my $methods = $1; - note "supported io_method values are: $methods"; - - return ($methods =~ m/io_uring/) ? 1 : 0; -} sub psql_like { @@ -1490,8 +1419,8 @@ sub test_ignore_checksum } -# Run all tests that are supported for all io_methods -sub test_generic +# Run all tests that for the specified node / io_method +sub test_io_method { my $io_method = shift; my $node = shift; @@ -1526,10 +1455,23 @@ sub test_generic test_ignore_checksum($io_method, $node); test_checksum_createdb($io_method, $node); + # generic injection tests SKIP: { skip 'Injection points not supported by this build', 1 unless $ENV{enable_injection_points} eq 'yes'; test_inject($io_method, $node); } + + # worker specific injection tests + if ($io_method eq 'worker') + { + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + + test_inject_worker($io_method, $node); + } + } } diff --git a/src/test/modules/test_aio/t/003_initdb.pl b/src/test/modules/test_aio/t/003_initdb.pl new file mode 100644 index 000000000000..c03ae58d00a2 --- /dev/null +++ b/src/test/modules/test_aio/t/003_initdb.pl @@ -0,0 +1,71 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group +# +# Test initdb for each IO method. This is done separately from 001_aio.pl, as +# it isn't fast. This way the more commonly failing / hacked-on 001_aio.pl can +# be iterated on more quickly. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +foreach my $method (TestAio::supported_io_methods()) +{ + test_create_node($method); +} + +done_testing(); + + +sub test_create_node +{ + local $Test::Builder::Level = $Test::Builder::Level + 1; + + my $io_method = shift; + + my $node = PostgreSQL::Test::Cluster->new($io_method); + + # Want to test initdb for each IO method, otherwise we could just reuse + # the cluster. + # + # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the + # options specified by ->extra, if somebody puts -c io_method=xyz in + # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we + # detect it. + local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS}; + if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS} + && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/) + { + $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method"; + } + + $node->init(extra => [ '-c', "io_method=$io_method" ]); + + TestAio::configure($node); + + # Even though we used -c io_method=... above, if TEMP_CONFIG sets + # io_method, it'd override the setting persisted at initdb time. While + # using (and later verifying) the setting from initdb provides some + # verification of having used the io_method during initdb, it's probably + # not worth the complication of only appending if the variable is set in + # in TEMP_CONFIG. + $node->append_conf( + 'postgresql.conf', qq( +io_method=$io_method +)); + + ok(1, "$io_method: initdb"); + + $node->start(); + $node->stop(); + ok(1, "$io_method: start & stop"); + + return $node; +} diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl new file mode 100644 index 000000000000..89cfabbb1d31 --- /dev/null +++ b/src/test/modules/test_aio/t/004_read_stream.pl @@ -0,0 +1,282 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +my $node = PostgreSQL::Test::Cluster->new('test'); +$node->init(); + +$node->append_conf( + 'postgresql.conf', qq( +shared_preload_libraries=test_aio +log_min_messages = 'DEBUG3' +log_statement=all +log_error_verbosity=default +restart_after_crash=false +temp_buffers=100 +max_connections=8 +io_method=worker +)); + +$node->start(); +test_setup($node); +$node->stop(); + + +foreach my $method (TestAio::supported_io_methods()) +{ + $node->adjust_conf('postgresql.conf', 'io_method', 'worker'); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} + +done_testing(); + + +sub test_setup +{ + my $node = shift; + + $node->safe_psql( + 'postgres', qq( +CREATE EXTENSION test_aio; + +CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10); +INSERT INTO largeish(k) SELECT generate_series(1, 10000); +)); + ok(1, "setup"); +} + + +sub test_repeated_blocks +{ + my $io_method = shift; + my $node = shift; + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + # Preventing larger reads makes testing easier + $psql->query_safe( + qq/ +SET io_combine_limit = 1; +/); + + # test miss of the same block twice in a row + $psql->query_safe( + qq/ +SELECT evict_rel('largeish'); +/); + $psql->query_safe( + qq/ +SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]); +/); + ok(1, "$io_method: stream missing the same block repeatedly"); + + $psql->query_safe( + qq/ +SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]); +/); + ok(1, "$io_method: stream hitting the same block repeatedly"); + + # test hit of the same block twice in a row + $psql->query_safe( + qq/ +SELECT evict_rel('largeish'); +/); + $psql->query_safe( + qq/ +SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]); +/); + ok(1, "$io_method: stream accessing same block"); + + $psql->quit(); +} + + +sub test_inject_foreign +{ + my $io_method = shift; + my $node = shift; + + my $psql_a = $node->background_psql('postgres', on_error_stop => 0); + my $psql_b = $node->background_psql('postgres', on_error_stop => 0); + + my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/); + + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads succeeding. + ### + $psql_a->query_safe( + qq/ +SELECT evict_rel('largeish'); +/); + + $psql_b->query_safe( + qq/ +SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish')); +/); + + $psql_b->{stdin} .= qq/ +SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1); +/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/ +SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait'; +/, + 'completion_wait'); + + $psql_a->{stdin} .= qq/ +SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]); +/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + ok(1, + qq/$io_method: read stream encounters succeeding IO by another backend/ + ); + + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads failing. + ### + $psql_a->query_safe( + qq/ +SELECT evict_rel('largeish'); +/); + + $psql_b->query_safe( + qq/ +SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish')); +/); + + $psql_b->query_safe( + qq/ +SELECT inj_io_short_read_attach(-errno_from_string('EIO'), pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish')); +/); + + $psql_b->{stdin} .= qq/ +SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1); +/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/ +SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait'; +/, + 'completion_wait'); + + $psql_a->{stdin} .= qq/ +SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]); +/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + $psql_b->{run}->pump_nb(); + like( + $psql_b->{stderr}, + qr/.*ERROR.*could not read blocks 5..5.*$/, + "$io_method: injected error occurred"); + $psql_b->{stderr} = ''; + $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/); + + + ok(1, + qq/$io_method: read stream encounters failing IO by another backend/); + + + ### + # Test read stream encountering two buffers that are undergoing the same + # IO, started by another backend + ### + $psql_a->query_safe( + qq/ +SELECT evict_rel('largeish'); +/); + + $psql_b->query_safe( + qq/ +SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish')); +/); + + $psql_b->{stdin} .= qq/ +SELECT read_rel_block_ll('largeish', blockno=>2, nblocks=>3); +/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/ +SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait'; +/, + 'completion_wait'); + + $psql_a->{stdin} .= qq/ +SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 4]); +/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,4\}/); + + ok(1, qq/$io_method: read stream encounters two buffer read in one IO/); + + + $psql_a->quit(); + $psql_b->quit(); +} + + +sub test_io_method +{ + my $io_method = shift; + my $node = shift; + + test_repeated_blocks($io_method, $node); + + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + test_inject_foreign($io_method, $node); + } +} diff --git a/src/test/modules/test_aio/t/TestAio.pm b/src/test/modules/test_aio/t/TestAio.pm new file mode 100644 index 000000000000..5bc80a9b130d --- /dev/null +++ b/src/test/modules/test_aio/t/TestAio.pm @@ -0,0 +1,90 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +=pod + +=head1 NAME + +TestAio - helpers for writing AIO related tests + +=cut + +package TestAio; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + + +=pod + +=head1 METHODS + +=over + +=item TestAio::supported_io_methods() + +Return an array of all the supported values for the io_method GUC + +=cut + +sub supported_io_methods() +{ + my @io_methods = ('worker'); + + push(@io_methods, "io_uring") if have_io_uring(); + + # Return sync last, as it will least commonly fail + push(@io_methods, "sync"); + + return @io_methods; +} + + +=item TestAio::configure() + +Prepare a cluster for AIO test + +=cut + +sub configure +{ + my $node = shift; + + $node->append_conf( + 'postgresql.conf', qq( +shared_preload_libraries=test_aio +log_min_messages = 'DEBUG3' +log_statement=all +log_error_verbosity=default +restart_after_crash=false +temp_buffers=100 +)); + +} + + +=pod + +=item TestAio::have_io_uring() + +Return if io_uring is supported + +=cut + +sub have_io_uring +{ + # To detect if io_uring is supported, we look at the error message for + # assigning an invalid value to an enum GUC, which lists all the valid + # options. We need to use -C to deal with running as administrator on + # windows, the superuser check is omitted if -C is used. + my ($stdout, $stderr) = + run_command [qw(postgres -C invalid -c io_method=invalid)]; + die "can't determine supported io_method values" + unless $stderr =~ m/Available values: ([^\.]+)\./; + my $methods = $1; + note "supported io_method values are: $methods"; + + return ($methods =~ m/io_uring/) ? 1 : 0; +} + +1; diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e4..da7cc03829af 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll( RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +CREATE FUNCTION evict_rel(rel regclass) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; @@ -41,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4) RETURNS pg_catalog.int4 STRICT AS 'MODULE_PATHNAME' LANGUAGE C; -CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool) +CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool, assign_io bool DEFAULT false) RETURNS pg_catalog.bool STRICT AS 'MODULE_PATHNAME' LANGUAGE C; @@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +/* + * Read stream related functions + */ +CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4) +RETURNS SETOF record STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + + /* * Handle related functions @@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C; /* * Injection point related functions */ -CREATE FUNCTION inj_io_short_read_attach(result int) -RETURNS pg_catalog.void STRICT +CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_completion_continue() +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_short_read_detach() diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index d7eadeab256e..6d53ad29ce5a 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -20,16 +20,23 @@ #include "access/relation.h" #include "fmgr.h" +#include "funcapi.h" #include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/checksum.h" +#include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/procnumber.h" +#include "storage/read_stream.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/rel.h" +#include "utils/wait_event.h" PG_MODULE_MAGIC; @@ -37,13 +44,30 @@ PG_MODULE_MAGIC; typedef struct InjIoErrorState { + ConditionVariable cv; + bool enabled_short_read; bool enabled_reopen; + bool enabled_completion_wait; + Oid completion_wait_relfilenode; + pid_t completion_wait_pid; + uint32 completion_wait_event; + bool short_read_result_set; + Oid short_read_relfilenode; + pid_t short_read_pid; int short_read_result; } InjIoErrorState; +typedef struct BlocksReadStreamData +{ + int nblocks; + int curblock; + uint32 *blocks; +} BlocksReadStreamData; + + static InjIoErrorState *inj_io_error_state; /* Shared memory init callbacks */ @@ -85,10 +109,13 @@ test_aio_shmem_startup(void) inj_io_error_state->enabled_short_read = false; inj_io_error_state->enabled_reopen = false; + ConditionVariableInit(&inj_io_error_state->cv); + inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait"); + #ifdef USE_INJECTION_POINTS InjectionPointAttach("aio-process-completion-before-shared", "test_aio", - "inj_io_short_read", + "inj_io_completion_hook", NULL, 0); InjectionPointLoad("aio-process-completion-before-shared"); @@ -384,7 +411,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS) if (nblocks <= 0 || nblocks > PG_IOV_MAX) elog(ERROR, "nblocks is out of range"); - rel = relation_open(relid, AccessExclusiveLock); + rel = relation_open(relid, AccessShareLock); for (int i = 0; i < nblocks; i++) { @@ -458,6 +485,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +PG_FUNCTION_INFO_V1(evict_rel); +Datum +evict_rel(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation rel; + int32 buffers_evicted, + buffers_flushed, + buffers_skipped; + + rel = relation_open(relid, AccessExclusiveLock); + + EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed, + &buffers_skipped); + + relation_close(rel, AccessExclusiveLock); + + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(invalidate_rel_block); Datum invalidate_rel_block(PG_FUNCTION_ARGS) @@ -610,6 +658,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +static BlockNumber +read_stream_for_blocks_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + BlocksReadStreamData *stream_data = callback_private_data; + + if (stream_data->curblock >= stream_data->nblocks) + return InvalidBlockNumber; + return stream_data->blocks[stream_data->curblock++]; +} + +PG_FUNCTION_INFO_V1(read_stream_for_blocks); +Datum +read_stream_for_blocks(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Relation rel; + BlocksReadStreamData stream_data; + ReadStream *stream; + + InitMaterializedSRF(fcinfo, 0); + + /* + * We expect the input to be an N-element int4 array; verify that. We + * don't need to use deconstruct_array() since the array data is just + * going to look like a C array of N int4 values. + */ + if (ARR_NDIM(blocksarray) != 1 || + ARR_HASNULL(blocksarray) || + ARR_ELEMTYPE(blocksarray) != INT4OID) + elog(ERROR, "expected 1 dimensional int4 array"); + + stream_data.curblock = 0; + stream_data.nblocks = ARR_DIMS(blocksarray)[0]; + stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray); + + rel = relation_open(relid, AccessShareLock); + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + MAIN_FORKNUM, + read_stream_for_blocks_cb, + &stream_data, + 0); + + for (int i = 0; i < stream_data.nblocks; i++) + { + Buffer buf = read_stream_next_buffer(stream, NULL); + Datum values[3] = {0}; + bool nulls[3] = {0}; + + if (!BufferIsValid(buf)) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i); + + values[0] = Int32GetDatum(i); + values[1] = UInt32GetDatum(stream_data.blocks[i]); + values[2] = UInt32GetDatum(buf); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + + ReleaseBuffer(buf); + } + + if (read_stream_next_buffer(stream, NULL) != InvalidBuffer) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid", + stream_data.nblocks + 1); + + read_stream_end(stream); + + relation_close(rel, NoLock); + + return (Datum) 0; +} + + PG_FUNCTION_INFO_V1(handle_get); Datum handle_get(PG_FUNCTION_ARGS) @@ -680,15 +808,98 @@ batch_end(PG_FUNCTION_ARGS) } #ifdef USE_INJECTION_POINTS -extern PGDLLEXPORT void inj_io_short_read(const char *name, - const void *private_data, - void *arg); +extern PGDLLEXPORT void inj_io_completion_hook(const char *name, + const void *private_data, + void *arg); extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data, void *arg); -void -inj_io_short_read(const char *name, const void *private_data, void *arg) +static bool +inj_io_short_read_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_short_read) + return false; + + if (!inj_io_error_state->short_read_result_set) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->short_read_pid != 0 && + inj_io_error_state->short_read_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->short_read_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode) + return false; + + /* + * Only shorten reads that are actually longer than the target size, + * otherwise we can trigger over-reads. + */ + if (inj_io_error_state->short_read_result >= ioh->result) + return false; + + return true; +} + +static bool +inj_io_completion_wait_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_completion_wait) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->completion_wait_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->completion_wait_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode) + return false; + + return true; +} + +static void +inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg) +{ + PgAioHandle *ioh = (PgAioHandle *) arg; + + if (!inj_io_completion_wait_matches(ioh)) + return; + + ConditionVariablePrepareToSleep(&inj_io_error_state->cv); + + while (true) + { + if (!inj_io_completion_wait_matches(ioh)) + break; + + ConditionVariableSleep(&inj_io_error_state->cv, + inj_io_error_state->completion_wait_event); + } + + ConditionVariableCancelSleep(); +} + +static void +inj_io_short_read_hook(const char *name, const void *private_data, void *arg) { PgAioHandle *ioh = (PgAioHandle *) arg; @@ -697,58 +908,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg) inj_io_error_state->enabled_reopen), errhidestmt(true), errhidecontext(true)); - if (inj_io_error_state->enabled_short_read) + if (inj_io_short_read_matches(ioh)) { + struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; + int32 old_result = ioh->result; + int32 new_result = inj_io_error_state->short_read_result; + int32 processed = 0; + + ereport(LOG, + errmsg("short read inject point, changing result from %d to %d", + old_result, new_result), + errhidestmt(true), errhidecontext(true)); + /* - * Only shorten reads that are actually longer than the target size, - * otherwise we can trigger over-reads. + * The underlying IO actually completed OK, and thus the "invalid" + * portion of the IOV actually contains valid data. That can hide a + * lot of problems, e.g. if we were to wrongly mark a buffer, that + * wasn't read according to the shortened-read, IO as valid, the + * contents would look valid and we might miss a bug. + * + * To avoid that, iterate through the IOV and zero out the "failed" + * portion of the IO. */ - if (inj_io_error_state->short_read_result_set - && ioh->op == PGAIO_OP_READV - && inj_io_error_state->short_read_result <= ioh->result) + for (int i = 0; i < ioh->op_data.read.iov_length; i++) { - struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; - int32 old_result = ioh->result; - int32 new_result = inj_io_error_state->short_read_result; - int32 processed = 0; - - ereport(LOG, - errmsg("short read inject point, changing result from %d to %d", - old_result, new_result), - errhidestmt(true), errhidecontext(true)); - - /* - * The underlying IO actually completed OK, and thus the "invalid" - * portion of the IOV actually contains valid data. That can hide - * a lot of problems, e.g. if we were to wrongly mark a buffer, - * that wasn't read according to the shortened-read, IO as valid, - * the contents would look valid and we might miss a bug. - * - * To avoid that, iterate through the IOV and zero out the - * "failed" portion of the IO. - */ - for (int i = 0; i < ioh->op_data.read.iov_length; i++) + if (processed + iov[i].iov_len <= new_result) + processed += iov[i].iov_len; + else if (processed <= new_result) { - if (processed + iov[i].iov_len <= new_result) - processed += iov[i].iov_len; - else if (processed <= new_result) - { - uint32 ok_part = new_result - processed; - - memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); - processed += iov[i].iov_len; - } - else - { - memset((char *) iov[i].iov_base, 0, iov[i].iov_len); - } - } + uint32 ok_part = new_result - processed; - ioh->result = new_result; + memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); + processed += iov[i].iov_len; + } + else + { + memset((char *) iov[i].iov_base, 0, iov[i].iov_len); + } } + + ioh->result = new_result; } } +void +inj_io_completion_hook(const char *name, const void *private_data, void *arg) +{ + inj_io_completion_wait_hook(name, private_data, arg); + inj_io_short_read_hook(name, private_data, arg); +} + void inj_io_reopen(const char *name, const void *private_data, void *arg) { @@ -762,6 +971,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg) } #endif +PG_FUNCTION_INFO_V1(inj_io_completion_wait); +Datum +inj_io_completion_wait(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = true; + inj_io_error_state->completion_wait_pid = + PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0); + inj_io_error_state->completion_wait_relfilenode = + PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(inj_io_completion_continue); +Datum +inj_io_completion_continue(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = false; + inj_io_error_state->completion_wait_pid = 0; + inj_io_error_state->completion_wait_relfilenode = InvalidOid; + ConditionVariableBroadcast(&inj_io_error_state->cv); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(inj_io_short_read_attach); Datum inj_io_short_read_attach(PG_FUNCTION_ARGS) @@ -771,6 +1013,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS) inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0); if (inj_io_error_state->short_read_result_set) inj_io_error_state->short_read_result = PG_GETARG_INT32(0); + inj_io_error_state->short_read_pid = + PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1); + inj_io_error_state->short_read_relfilenode = + PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2); #else elog(ERROR, "injection points not supported"); #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e3c3523b5b2b..345d6dada7d9 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -294,6 +294,7 @@ BlockSampler BlockSamplerData BlockedProcData BlockedProcsData +BlocksReadStreamData BlocktableEntry BloomBuildState BloomFilter