diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index eb86402cae43..9cf27ead046a 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -24,8 +24,10 @@ * All notification messages are placed in the queue and later read out * by listening backends. * - * There is no central knowledge of which backend listens on which channel; - * every backend has its own list of interesting channels. + * We also maintain a dynamic shared hash table (dshash) that maps channel + * names to the set of backends listening on each channel. This table is + * created lazily on the first LISTEN command and grows dynamically as + * needed. * * Although there is only one queue, notifications are treated as being * database-local; this is done by including the sender's database OID @@ -68,16 +70,24 @@ * CommitTransaction() which will then do the actual transaction commit. * * After commit we are called another time (AtCommit_Notify()). Here we - * make any actual updates to the effective listen state (listenChannels). - * Then we signal any backends that may be interested in our messages - * (including our own backend, if listening). This is done by - * SignalBackends(), which scans the list of listening backends and sends a - * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't - * know which backend is listening on which channel so we must signal them - * all). We can exclude backends that are already up to date, though, and - * we can also exclude backends that are in other databases (unless they - * are way behind and should be kicked to make them advance their - * pointers). + * make any actual updates to the local listen state (listenChannelsHash) and + * shared channel hash table (channelHash). Then we signal any backends + * that may be interested in our messages (including our own backend, + * if listening). This is done by SignalBackends(), which consults the + * shared channel hash table to identify listeners for the channels that + * have pending notifications in the current database. Each selected + * backend is marked as having a wakeup pending to avoid duplicate signals, + * and a PROCSIG_NOTIFY_INTERRUPT signal is sent to it. + * + * When writing notifications, PreCommit_Notify() records the queue head + * position both before and after the write. Because all writers serialize + * on a cluster-wide heavyweight lock, no backend can insert entries between + * these two points. SignalBackends() uses this fact to directly advance any + * backend that is still positioned at the old head, or within the range + * written, avoiding unnecessary wakeups for idle listeners that have + * nothing to read. Backends that cannot be direct advanced are signaled + * if they are stuck behind the old queue head, or advancing to a position + * before the new queue head, since otherwise notifications could be delayed. * * Finally, after we are out of the transaction altogether and about to go * idle, we scan the queue for messages that need to be sent to our @@ -137,14 +147,17 @@ #include "commands/async.h" #include "common/hashfn.h" #include "funcapi.h" +#include "lib/dshash.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" +#include "storage/dsm_registry.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/procsignal.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/dsa.h" #include "utils/guc_hooks.h" #include "utils/memutils.h" #include "utils/ps_status.h" @@ -162,6 +175,43 @@ */ #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128) +/* + * Channel hash table definitions + * + * This hash table maps (database OID, channel name) keys to arrays of + * ListenerEntry structs representing the backends listening on each channel. + */ + +#define INITIAL_LISTENERS_ARRAY_SIZE 4 + +typedef struct ChannelHashKey +{ + Oid dboid; + char channel[NAMEDATALEN]; +} ChannelHashKey; + +/* + * Individual listener entry in the channel's listeners array. + * + * We store both the ProcNumber and an active flag. During PreCommit, + * we insert entries with active=false to pre-allocate space and avoid + * OOM failures after transaction commit. Then in AtCommit, we just set + * the active flag to true, which has acceptably low risk of failure. + */ +typedef struct ListenerEntry +{ + ProcNumber procno; /* backend's ProcNumber */ + bool active; /* true if actually listening */ +} ListenerEntry; + +typedef struct ChannelEntry +{ + ChannelHashKey key; + dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */ + int numListeners; /* Number of listeners currently stored */ + int allocatedListeners; /* Allocated size of array */ +} ChannelEntry; + /* * Struct representing an entry in the global notify queue * @@ -224,11 +274,14 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* returns true if x comes before y in queue order */ +#define QUEUE_POS_PRECEDES(x,y) \ + (asyncQueuePagePrecedes((x).page, (y).page) || \ + ((x).page == (y).page && (x).offset < (y).offset)) + /* * Parameter determining how often we try to advance the tail pointer: - * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is - * also the distance by which a backend in another database needs to be - * behind before we'll decide we need to wake it up to advance its pointer. + * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. * * Resist the temptation to make this really large. While that would save * work in some places, it would add cost in others. In particular, this @@ -246,6 +299,9 @@ typedef struct QueueBackendStatus Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ + bool wakeupPending; /* signal sent but not yet processed */ + bool isAdvancing; /* backend is advancing its position */ + QueuePosition advancingPos; /* target position backend is advancing to */ } QueueBackendStatus; /* @@ -260,9 +316,10 @@ typedef struct QueueBackendStatus * (since no other backend will inspect it). * * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the - * entries of other backends and also change the head pointer. When holding - * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends - * can change the tail pointers. + * entries of other backends and also change the head pointer. They can + * also advance other backends' queue positions, unless they are not + * in the process of doing that themselves. When holding both NotifyQueueLock and + * NotifyQueueTailLock in EXCLUSIVE mode, backends can change the tail pointers. * * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as * the control lock for the pg_notify SLRU buffers. @@ -288,11 +345,16 @@ typedef struct AsyncQueueControl ProcNumber firstListener; /* id of first listener, or * INVALID_PROC_NUMBER */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ + dsa_handle channelHashDSA; + dshash_table_handle channelHashDSH; QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; } AsyncQueueControl; static AsyncQueueControl *asyncQueueControl; +static dsa_area *channelDSA = NULL; +static dshash_table *channelHash = NULL; + #define QUEUE_HEAD (asyncQueueControl->head) #define QUEUE_TAIL (asyncQueueControl->tail) #define QUEUE_STOP_PAGE (asyncQueueControl->stopPage) @@ -301,6 +363,9 @@ static AsyncQueueControl *asyncQueueControl; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending) +#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing) +#define QUEUE_BACKEND_ADVANCING_POS(i) (asyncQueueControl->backend[i].advancingPos) /* * The SLRU buffer area through which we access the notification queue @@ -313,16 +378,16 @@ static SlruCtlData NotifyCtlData; #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ /* - * listenChannels identifies the channels we are actually listening to - * (ie, have committed a LISTEN on). It is a simple list of channel names, + * listenChannelsHash identifies the channels we are actually listening to + * (ie, have committed a LISTEN on). It is a hash table of channel names, * allocated in TopMemoryContext. */ -static List *listenChannels = NIL; /* list of C strings */ +static HTAB *listenChannelsHash = NULL; /* * State for pending LISTEN/UNLISTEN actions consists of an ordered list of * all actions requested in the current transaction. As explained above, - * we don't actually change listenChannels until we reach transaction commit. + * we don't actually change listenChannelsHash until we reach transaction commit. * * The list is kept in CurTransactionContext. In subtransactions, each * subtransaction has its own list in its own CurTransactionContext, but @@ -391,6 +456,7 @@ typedef struct NotificationList int nestingLevel; /* current transaction nesting depth */ List *events; /* list of Notification structs */ HTAB *hashtab; /* hash of NotificationHash structs, or NULL */ + HTAB *channelHashtab; /* hash of unique channel names, or NULL */ struct NotificationList *upper; /* details for upper transaction levels */ } NotificationList; @@ -401,6 +467,11 @@ struct NotificationHash Notification *event; /* => the actual Notification struct */ }; +struct ChannelHash +{ + char channel[NAMEDATALEN]; +}; + static NotificationList *pendingNotifies = NULL; /* @@ -418,6 +489,20 @@ static bool unlistenExitRegistered = false; /* True if we're currently registered as a listener in asyncQueueControl */ static bool amRegisteredListener = false; +/* + * Queue head positions for direct advancement. + * These are captured during PreCommit_Notify while holding the heavyweight + * lock on database 0, ensuring no other backend can insert notifications + * between them. SignalBackends uses these to advance idle backends. + */ +static QueuePosition queueHeadBeforeWrite; +static QueuePosition queueHeadAfterWrite; + +/* + * List of channels with pending notifications in the current transaction. + */ +static List *pendingNotifyChannels = NIL; + /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ static bool tryAdvanceTail = false; @@ -428,11 +513,10 @@ bool Trace_notify = false; int max_notify_queue_pages = 1048576; /* local function prototypes */ -static inline int64 asyncQueuePageDiff(int64 p, int64 q); static inline bool asyncQueuePagePrecedes(int64 p, int64 q); static void queue_listen(ListenActionKind action, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); -static void Exec_ListenPreCommit(void); +static void Exec_ListenPreCommit(const char *channel); static void Exec_ListenCommit(const char *channel); static void Exec_UnlistenCommit(const char *channel); static void Exec_UnlistenAllCommit(void); @@ -456,25 +540,117 @@ static void AddEventToPendingNotifies(Notification *n); static uint32 notification_hash(const void *key, Size keysize); static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); +static inline void ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel); +static dshash_hash channelHashFunc(const void *key, size_t size, void *arg); +static void initChannelHash(void); /* - * Compute the difference between two queue page numbers. + * Determines whether p precedes q. * Previously this function accounted for a wraparound. */ -static inline int64 -asyncQueuePageDiff(int64 p, int64 q) +static inline bool +asyncQueuePagePrecedes(int64 p, int64 q) { - return p - q; + return p < q; } /* - * Determines whether p precedes q. - * Previously this function accounted for a wraparound. + * channelHashFunc + * Hash function for channel keys. */ -static inline bool -asyncQueuePagePrecedes(int64 p, int64 q) +static dshash_hash +channelHashFunc(const void *key, size_t size, void *arg) { - return p < q; + const ChannelHashKey *k = (const ChannelHashKey *) key; + dshash_hash h; + + h = DatumGetUInt32(hash_uint32(k->dboid)); + h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel, + strnlen(k->channel, NAMEDATALEN))); + + return h; +} + +/* parameters for the channel hash table */ +static const dshash_parameters channelDSHParams = { + sizeof(ChannelHashKey), + sizeof(ChannelEntry), + dshash_memcmp, + channelHashFunc, + dshash_memcpy, + LWTRANCHE_NOTIFY_CHANNEL_HASH +}; + +/* + * initChannelHash + * Lazy initialization of the channel hash table. + */ +static void +initChannelHash(void) +{ + MemoryContext oldcontext; + + /* Quick exit if we already did this */ + if (asyncQueueControl->channelHashDSH != DSHASH_HANDLE_INVALID && + channelHash != NULL) + return; + + /* Otherwise, use a lock to ensure only one process creates the table */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + + /* Be sure any local memory allocated by DSA routines is persistent */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + + if (asyncQueueControl->channelHashDSH == DSHASH_HANDLE_INVALID) + { + /* Initialize dynamic shared hash table for channel hash */ + channelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH); + dsa_pin(channelDSA); + dsa_pin_mapping(channelDSA); + channelHash = dshash_create(channelDSA, &channelDSHParams, NULL); + + /* Store handles in shared memory for other backends to use */ + asyncQueueControl->channelHashDSA = dsa_get_handle(channelDSA); + asyncQueueControl->channelHashDSH = + dshash_get_hash_table_handle(channelHash); + } + else if (!channelHash) + { + /* Attach to existing dynamic shared hash table */ + channelDSA = dsa_attach(asyncQueueControl->channelHashDSA); + dsa_pin_mapping(channelDSA); + channelHash = dshash_attach(channelDSA, &channelDSHParams, + asyncQueueControl->channelHashDSH, + NULL); + } + + MemoryContextSwitchTo(oldcontext); + LWLockRelease(NotifyQueueLock); +} + +/* + * initListenChannelsHash + * Lazy initialization of the local listen channels hash table. + */ +static void +initListenChannelsHash(void) +{ + HASHCTL hash_ctl; + + /* Quick exit if we already did this */ + if (listenChannelsHash != NULL) + return; + + /* Initialize local hash table for this backend's listened channels */ + memset(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(struct ChannelHash); + + listenChannelsHash = + hash_create("Listen Channels", + 64, + &hash_ctl, + HASH_ELEM | HASH_STRINGS); } /* @@ -520,12 +696,17 @@ AsyncShmemInit(void) QUEUE_STOP_PAGE = 0; QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER; asyncQueueControl->lastQueueFillWarn = 0; + asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID; + asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID; for (int i = 0; i < MaxBackends; i++) { QUEUE_BACKEND_PID(i) = InvalidPid; QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + SET_QUEUE_POS(QUEUE_BACKEND_ADVANCING_POS(i), 0, 0); + QUEUE_BACKEND_WAKEUP_PENDING(i) = false; + QUEUE_BACKEND_IS_ADVANCING(i) = false; } } @@ -656,6 +837,7 @@ Async_Notify(const char *channel, const char *payload) notifies->events = list_make1(n); /* We certainly don't need a hashtable yet */ notifies->hashtab = NULL; + notifies->channelHashtab = NULL; notifies->upper = pendingNotifies; pendingNotifies = notifies; } @@ -682,7 +864,7 @@ Async_Notify(const char *channel, const char *payload) * Common code for listen, unlisten, unlisten all commands. * * Adds the request to the list of pending actions. - * Actual update of the listenChannels list happens during transaction + * Actual update of the listenChannelsHash happens during transaction * commit. */ static void @@ -782,30 +964,49 @@ Async_UnlistenAll(void) * SQL function: return a set of the channel names this backend is actively * listening to. * - * Note: this coding relies on the fact that the listenChannels list cannot + * Note: this coding relies on the fact that the listenChannelsHash cannot * change within a transaction. */ Datum pg_listening_channels(PG_FUNCTION_ARGS) { FuncCallContext *funcctx; + HASH_SEQ_STATUS *status; /* stuff done only on the first call of the function */ if (SRF_IS_FIRSTCALL()) { + MemoryContext oldcontext; + /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); + + /* Initialize hash table iteration if we have any channels */ + if (listenChannelsHash != NULL) + { + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS)); + hash_seq_init(status, listenChannelsHash); + funcctx->user_fctx = status; + MemoryContextSwitchTo(oldcontext); + } + else + { + funcctx->user_fctx = NULL; + } } /* stuff done on every call of the function */ funcctx = SRF_PERCALL_SETUP(); + status = (HASH_SEQ_STATUS *) funcctx->user_fctx; - if (funcctx->call_cntr < list_length(listenChannels)) + if (status != NULL) { - char *channel = (char *) list_nth(listenChannels, - funcctx->call_cntr); + struct ChannelHash *entry; - SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); + entry = (struct ChannelHash *) hash_seq_search(status); + if (entry != NULL) + SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel)); } SRF_RETURN_DONE(funcctx); @@ -877,7 +1078,7 @@ PreCommit_Notify(void) switch (actrec->action) { case LISTEN_LISTEN: - Exec_ListenPreCommit(); + Exec_ListenPreCommit(actrec->channel); break; case LISTEN_UNLISTEN: /* there is no Exec_UnlistenPreCommit() */ @@ -893,6 +1094,36 @@ PreCommit_Notify(void) if (pendingNotifies) { ListCell *nextNotify; + bool firstIteration = true; + + /* + * Build list of unique channels for SignalBackends(). + * + * If we have a channelHashtab, use it to efficiently get the unique + * channels. Otherwise, fall back to the linear approach. + */ + pendingNotifyChannels = NIL; + if (pendingNotifies->channelHashtab != NULL) + { + HASH_SEQ_STATUS status; + struct ChannelHash *channelEntry; + + hash_seq_init(&status, pendingNotifies->channelHashtab); + while ((channelEntry = (struct ChannelHash *) hash_seq_search(&status)) != NULL) + pendingNotifyChannels = lappend(pendingNotifyChannels, channelEntry->channel); + } + else + { + /* Linear approach for small number of notifications */ + foreach_ptr(Notification, n, pendingNotifies->events) + { + char *channel = n->data; + + /* Add if not already in list */ + if (!list_member_ptr(pendingNotifyChannels, channel)) + pendingNotifyChannels = lappend(pendingNotifyChannels, channel); + } + } /* * Make sure that we have an XID assigned to the current transaction. @@ -902,6 +1133,13 @@ PreCommit_Notify(void) */ (void) GetCurrentTransactionId(); + /* + * We will be calling SignalBackends() at AtCommit_Notify time, so + * make sure its auxiliary data structures exist now, where an ERROR + * will still abort the transaction cleanly. + */ + initChannelHash(); + /* * Serialize writers by acquiring a special lock that we hold till * after commit. This ensures that queue entries appear in commit @@ -921,6 +1159,22 @@ PreCommit_Notify(void) LockSharedObject(DatabaseRelationId, InvalidOid, 0, AccessExclusiveLock); + /* + * For the direct advancement optimization in SignalBackends(), we + * need to ensure that no other backend can insert queue entries + * between queueHeadBeforeWrite and queueHeadAfterWrite. The + * heavyweight lock above provides this guarantee, since it serializes + * all writers. + * + * Note: if the heavyweight lock were ever removed for scalability + * reasons, we could achieve the same guarantee by holding + * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather + * than releasing and reacquiring it for each page as we do below. + */ + + /* Initialize queueHeadBeforeWrite to a safe default */ + SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0); + /* Now push the notifications into the queue */ nextNotify = list_head(pendingNotifies->events); while (nextNotify != NULL) @@ -938,12 +1192,20 @@ PreCommit_Notify(void) * point in time we can still roll the transaction back. */ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + if (firstIteration) + { + queueHeadBeforeWrite = QUEUE_HEAD; + firstIteration = false; + } + asyncQueueFillWarning(); if (asyncQueueIsFull()) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("too many notifications in the NOTIFY queue"))); nextNotify = asyncQueueAddEntries(nextNotify); + queueHeadAfterWrite = QUEUE_HEAD; + LWLockRelease(NotifyQueueLock); } @@ -956,7 +1218,7 @@ PreCommit_Notify(void) * * This is called at transaction commit, after committing to clog. * - * Update listenChannels and clear transaction-local state. + * Update listenChannelsHash and clear transaction-local state. * * If we issued any notifications in the transaction, send signals to * listening backends (possibly including ourselves) to process them. @@ -1001,7 +1263,8 @@ AtCommit_Notify(void) } /* If no longer listening to anything, get out of listener array */ - if (amRegisteredListener && listenChannels == NIL) + if (amRegisteredListener && + (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0)) asyncQueueUnregister(); /* @@ -1037,147 +1300,270 @@ AtCommit_Notify(void) * This function must make sure we are ready to catch any incoming messages. */ static void -Exec_ListenPreCommit(void) +Exec_ListenPreCommit(const char *channel) { - QueuePosition head; - QueuePosition max; - ProcNumber prevListener; + ChannelHashKey key; + ChannelEntry *entry; + bool found; + ListenerEntry *listeners; /* - * Nothing to do if we are already listening to something, nor if we - * already ran this routine in this transaction. + * If this is our first LISTEN in this transaction, register as a + * listener. */ - if (amRegisteredListener) + if (!amRegisteredListener) + { + QueuePosition head; + QueuePosition max; + ProcNumber prevListener; + + if (Trace_notify) + elog(DEBUG1, "Exec_ListenPreCommit(%s,%d)", channel, MyProcPid); + + /* + * Before registering, make sure we will unlisten before dying. (Note: + * this action does not get undone if we abort later.) + */ + if (!unlistenExitRegistered) + { + before_shmem_exit(Async_UnlistenOnExit, 0); + unlistenExitRegistered = true; + } + + /* + * This is our first LISTEN, so establish our pointer. + * + * We set our pointer to the global tail pointer and then move it + * forward over already-committed notifications. This ensures we + * cannot miss any not-yet-committed notifications. We might get a + * few more but that doesn't hurt. + * + * In some scenarios there might be a lot of committed notifications + * that have not yet been pruned away (because some backend is being + * lazy about reading them). To reduce our startup time, we can look + * at other backends and adopt the maximum "pos" pointer of any + * backend that's in our database; any notifications it's already + * advanced over are surely committed and need not be re-examined by + * us. (We must consider only backends connected to our DB, because + * others will not have bothered to check committed-ness of + * notifications in our DB.) + * + * We need exclusive lock here so we can look at other backends' + * entries and manipulate the list links. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + head = QUEUE_HEAD; + max = QUEUE_TAIL; + prevListener = INVALID_PROC_NUMBER; + for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) + { + if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); + /* Also find last listening backend before this one */ + if (i < MyProcNumber) + prevListener = i; + } + QUEUE_BACKEND_POS(MyProcNumber) = max; + QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; + QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; + /* Insert backend into list of listeners at correct position */ + if (prevListener != INVALID_PROC_NUMBER) + { + QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener); + QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber; + } + else + { + QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER; + QUEUE_FIRST_LISTENER = MyProcNumber; + } + LWLockRelease(NotifyQueueLock); + + /* Now we are listed in the global array, so remember we're listening */ + amRegisteredListener = true; + + /* + * Try to move our pointer forward as far as possible. This will skip + * over already-committed notifications, which we want to do because + * they might be quite stale. Note that we are not yet listening on + * anything, so we won't deliver such notifications to our frontend. + * Also, although our transaction might have executed NOTIFY, those + * message(s) aren't queued yet so we won't skip them here. + */ + if (!QUEUE_POS_EQUAL(max, head)) + asyncQueueReadAllNotifications(); + } + + /* Do nothing if we are already listening on this channel */ + if (IsListeningOn(channel)) return; - if (Trace_notify) - elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); + /* + * Add the channel to listenChannelsHash. This can OOM, but we're still + * in PreCommit so the transaction can abort safely. + */ + initListenChannelsHash(); + (void) hash_search(listenChannelsHash, channel, HASH_ENTER, NULL); /* - * Before registering, make sure we will unlisten before dying. (Note: - * this action does not get undone if we abort later.) + * Now update the shared channelHash. We insert an entry with + * active=false, which will be flipped to true in Exec_ListenCommit. */ - if (!unlistenExitRegistered) - { - before_shmem_exit(Async_UnlistenOnExit, 0); - unlistenExitRegistered = true; - } + initChannelHash(); + + ChannelHashPrepareKey(&key, MyDatabaseId, channel); /* - * This is our first LISTEN, so establish our pointer. - * - * We set our pointer to the global tail pointer and then move it forward - * over already-committed notifications. This ensures we cannot miss any - * not-yet-committed notifications. We might get a few more but that - * doesn't hurt. - * - * In some scenarios there might be a lot of committed notifications that - * have not yet been pruned away (because some backend is being lazy about - * reading them). To reduce our startup time, we can look at other - * backends and adopt the maximum "pos" pointer of any backend that's in - * our database; any notifications it's already advanced over are surely - * committed and need not be re-examined by us. (We must consider only - * backends connected to our DB, because others will not have bothered to - * check committed-ness of notifications in our DB.) - * - * We need exclusive lock here so we can look at other backends' entries - * and manipulate the list links. + * Find or create the channel entry. For new entries, we initialize + * listenersArray to InvalidDsaPointer as a marker. */ - LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); - head = QUEUE_HEAD; - max = QUEUE_TAIL; - prevListener = INVALID_PROC_NUMBER; - for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) - { - if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) - max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); - /* Also find last listening backend before this one */ - if (i < MyProcNumber) - prevListener = i; - } - QUEUE_BACKEND_POS(MyProcNumber) = max; - QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; - QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; - /* Insert backend into list of listeners at correct position */ - if (prevListener != INVALID_PROC_NUMBER) + entry = dshash_find_or_insert(channelHash, &key, &found); + + if (!found) + entry->listenersArray = InvalidDsaPointer; + + if (!DsaPointerIsValid(entry->listenersArray)) { - QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener); - QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber; + /* First listener for this channel */ + entry->listenersArray = dsa_allocate(channelDSA, + sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE); + entry->numListeners = 0; + entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE; } - else + + listeners = (ListenerEntry *) dsa_get_address(channelDSA, + entry->listenersArray); + + if (entry->numListeners >= entry->allocatedListeners) { - QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER; - QUEUE_FIRST_LISTENER = MyProcNumber; + int new_size = entry->allocatedListeners * 2; + dsa_pointer new_array = dsa_allocate(channelDSA, + sizeof(ListenerEntry) * new_size); + ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(channelDSA, + new_array); + + memcpy(new_listeners, listeners, + sizeof(ListenerEntry) * entry->numListeners); + + dsa_free(channelDSA, entry->listenersArray); + entry->listenersArray = new_array; + entry->allocatedListeners = new_size; + listeners = new_listeners; } - LWLockRelease(NotifyQueueLock); - /* Now we are listed in the global array, so remember we're listening */ - amRegisteredListener = true; + listeners[entry->numListeners].procno = MyProcNumber; + listeners[entry->numListeners].active = false; + entry->numListeners++; - /* - * Try to move our pointer forward as far as possible. This will skip - * over already-committed notifications, which we want to do because they - * might be quite stale. Note that we are not yet listening on anything, - * so we won't deliver such notifications to our frontend. Also, although - * our transaction might have executed NOTIFY, those message(s) aren't - * queued yet so we won't skip them here. - */ - if (!QUEUE_POS_EQUAL(max, head)) - asyncQueueReadAllNotifications(); + dshash_release_lock(channelHash, entry); } /* * Exec_ListenCommit --- subroutine for AtCommit_Notify * - * Add the channel to the list of channels we are listening on. + * Activate the channel entry that was pre-allocated in Exec_ListenPreCommit. + * This is called after commit to clog, so it's important to have very low + * probability of failure. By design, all we do here is set the active + * flag. */ static void Exec_ListenCommit(const char *channel) { - MemoryContext oldcontext; + ChannelHashKey key; + ChannelEntry *entry; + ListenerEntry *listeners; + int i; - /* Do nothing if we are already listening on this channel */ - if (IsListeningOn(channel)) - return; + if (Trace_notify) + elog(DEBUG1, "Exec_ListenCommit(%s,%d)", channel, MyProcPid); /* - * Add the new channel name to listenChannels. - * - * XXX It is theoretically possible to get an out-of-memory failure here, - * which would be bad because we already committed. For the moment it - * doesn't seem worth trying to guard against that, but maybe improve this - * later. + * The entry has been created in Exec_ListenPreCommit. If we get here, + * channelHash and the entry must exist. */ - oldcontext = MemoryContextSwitchTo(TopMemoryContext); - listenChannels = lappend(listenChannels, pstrdup(channel)); - MemoryContextSwitchTo(oldcontext); + Assert(channelHash != NULL); + ChannelHashPrepareKey(&key, MyDatabaseId, channel); + entry = dshash_find(channelHash, &key, true); + Assert(entry != NULL); + + listeners = (ListenerEntry *) dsa_get_address(channelDSA, + entry->listenersArray); + + for (i = 0; i < entry->numListeners; i++) + { + if (listeners[i].procno == MyProcNumber) + { + listeners[i].active = true; + dshash_release_lock(channelHash, entry); + return; + } + } + + /* If the entry is not found, it's a bug. */ + Assert(false); + dshash_release_lock(channelHash, entry); } /* * Exec_UnlistenCommit --- subroutine for AtCommit_Notify * - * Remove the specified channel name from listenChannels. + * Unlisten the specified channel for this backend. */ static void Exec_UnlistenCommit(const char *channel) { - ListCell *q; + ChannelHashKey key; + ChannelEntry *entry; + ListenerEntry *listeners; + int i; if (Trace_notify) elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); - foreach(q, listenChannels) - { - char *lchan = (char *) lfirst(q); + /* Remove from our local cache */ + if (listenChannelsHash != NULL) + (void) hash_search(listenChannelsHash, channel, HASH_REMOVE, NULL); + + /* Now remove from the shared channelHash */ + if (channelHash == NULL) + return; + + ChannelHashPrepareKey(&key, MyDatabaseId, channel); + + /* Look up the channel with exclusive lock so we can modify it */ + entry = dshash_find(channelHash, &key, true); + if (entry == NULL) + return; + + listeners = (ListenerEntry *) dsa_get_address(channelDSA, + entry->listenersArray); - if (strcmp(lchan, channel) == 0) + for (i = 0; i < entry->numListeners; i++) + { + if (listeners[i].procno == MyProcNumber) { - listenChannels = foreach_delete_current(listenChannels, q); - pfree(lchan); - break; + entry->numListeners--; + if (i < entry->numListeners) + memmove(&listeners[i], &listeners[i + 1], + sizeof(ListenerEntry) * (entry->numListeners - i)); + + if (entry->numListeners == 0) + { + /* Last listener for this channel */ + dsa_free(channelDSA, entry->listenersArray); + dshash_delete_entry(channelHash, entry); + } + else + { + dshash_release_lock(channelHash, entry); + } + + return; } } + dshash_release_lock(channelHash, entry); + /* * We do not complain about unlistening something not being listened; * should we? @@ -1192,34 +1578,68 @@ Exec_UnlistenCommit(const char *channel) static void Exec_UnlistenAllCommit(void) { + dshash_seq_status status; + ChannelEntry *entry; + if (Trace_notify) elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); - list_free_deep(listenChannels); - listenChannels = NIL; + /* Clear our local cache */ + if (listenChannelsHash != NULL) + { + hash_destroy(listenChannelsHash); + listenChannelsHash = NULL; + } + + /* Now remove from the shared channelHash */ + if (channelHash == NULL) + return; + + dshash_seq_init(&status, channelHash, true); + while ((entry = dshash_seq_next(&status)) != NULL) + { + if (entry->key.dboid == MyDatabaseId) + { + ListenerEntry *listeners; + int i; + + listeners = (ListenerEntry *) dsa_get_address(channelDSA, + entry->listenersArray); + + for (i = 0; i < entry->numListeners; i++) + { + if (listeners[i].procno == MyProcNumber) + { + entry->numListeners--; + if (i < entry->numListeners) + memmove(&listeners[i], &listeners[i + 1], + sizeof(ListenerEntry) * (entry->numListeners - i)); + + if (entry->numListeners == 0) + { + dsa_free(channelDSA, entry->listenersArray); + dshash_delete_current(&status); + } + break; + } + } + } + } + dshash_seq_term(&status); } /* * Test whether we are actively listening on the given channel name. * * Note: this function is executed for every notification found in the queue. - * Perhaps it is worth further optimization, eg convert the list to a sorted - * array so we can binary-search it. In practice the list is likely to be - * fairly short, though. */ static bool IsListeningOn(const char *channel) { - ListCell *p; - - foreach(p, listenChannels) - { - char *lchan = (char *) lfirst(p); + if (listenChannelsHash == NULL) + return false; - if (strcmp(lchan, channel) == 0) - return true; - } - return false; + return (hash_search(listenChannelsHash, channel, HASH_FIND, NULL) != NULL); } /* @@ -1229,7 +1649,7 @@ IsListeningOn(const char *channel) static void asyncQueueUnregister(void) { - Assert(listenChannels == NIL); /* else caller error */ + Assert(listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0); /* else caller error */ if (!amRegisteredListener) /* nothing to do */ return; @@ -1241,6 +1661,7 @@ asyncQueueUnregister(void) /* Mark our entry as invalid */ QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid; QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid; + QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; /* and remove it from the list */ if (QUEUE_FIRST_LISTENER == MyProcNumber) QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber); @@ -1565,12 +1986,21 @@ asyncQueueFillWarning(void) /* * Send signals to listening backends. * - * Normally we signal only backends in our own database, since only those - * backends could be interested in notifies we send. However, if there's - * notify traffic in our database but no traffic in another database that - * does have listener(s), those listeners will fall further and further - * behind. Waken them anyway if they're far enough behind, so that they'll - * advance their queue position pointers, allowing the global tail to advance. + * Normally we signal only backends in our own database, that are + * listening on the channels with pending notifies, since only those + * backends are interested in notifies we send. + * + * Backends that are not interested in our notifies, that are known + * to still be positioned at the old queue head, or anywhere in the + * queue region we just wrote, can be safely advanced directly to the + * new head, since that region is known to contain only our own + * notifications. This avoids unnecessary wakeups when there is + * nothing of interest to them. + * + * Backends that are not interested in our notifies, that are advancing + * to a target position before the new queue head, or that are not + * advancing and are stationary at a position before the old queue head + * needs to be signaled since notifications could otherwise be delayed. * * Since we know the ProcNumber and the Pid the signaling is quite cheap. * @@ -1583,6 +2013,9 @@ SignalBackends(void) int32 *pids; ProcNumber *procnos; int count; + ListCell *lc; + + Assert(channelHash != NULL || pendingNotifyChannels == NIL); /* * Identify backends that we need to signal. We don't want to send @@ -1597,36 +2030,97 @@ SignalBackends(void) count = 0; LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); - for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) + foreach(lc, pendingNotifyChannels) { - int32 pid = QUEUE_BACKEND_PID(i); - QueuePosition pos; + char *channel = (char *) lfirst(lc); + ChannelEntry *entry = NULL; + ListenerEntry *listeners; - Assert(pid != InvalidPid); - pos = QUEUE_BACKEND_POS(i); - if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + if (channelHash != NULL) { + ChannelHashKey key; + + ChannelHashPrepareKey(&key, MyDatabaseId, channel); + entry = dshash_find(channelHash, &key, false); + } + + if (entry == NULL) + continue; /* No listeners registered for this channel */ + + listeners = (ListenerEntry *) dsa_get_address(channelDSA, + entry->listenersArray); + + for (int j = 0; j < entry->numListeners; j++) + { + ProcNumber i; + int32 pid; + QueuePosition pos; + /* - * Always signal listeners in our own database, unless they're - * already caught up (unlikely, but possible). + * Only signal backends that have active=true. Backends with + * active=false have done LISTEN in PreCommit but not yet + * committed, so they're not really listening yet. */ + if (!listeners[j].active) + continue; + + i = listeners[j].procno; + + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + continue; + + pos = QUEUE_BACKEND_POS(i); + pid = QUEUE_BACKEND_PID(i); + + /* Skip if caught up */ if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) continue; + + Assert(pid != InvalidPid); + + QUEUE_BACKEND_WAKEUP_PENDING(i) = true; + pids[count] = pid; + procnos[count] = i; + count++; } - else + + dshash_release_lock(channelHash, entry); + } + + if (pendingNotifies != NULL) + { + for (ProcNumber i = QUEUE_FIRST_LISTENER; + i != INVALID_PROC_NUMBER; + i = QUEUE_NEXT_LISTENER(i)) { - /* - * Listeners in other databases should be signaled only if they - * are far behind. - */ - if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), - QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY) + QueuePosition pos; + int32 pid; + + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) continue; + + pos = QUEUE_BACKEND_POS(i); + pid = QUEUE_BACKEND_PID(i); + + if (QUEUE_BACKEND_IS_ADVANCING(i) ? + QUEUE_POS_PRECEDES(QUEUE_BACKEND_ADVANCING_POS(i), queueHeadAfterWrite) : + QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite)) + { + Assert(pid != InvalidPid); + + QUEUE_BACKEND_WAKEUP_PENDING(i) = true; + pids[count] = pid; + procnos[count] = i; + count++; + } + else if (!QUEUE_BACKEND_IS_ADVANCING(i) && + QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite)) + { + Assert(!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite)); + + QUEUE_BACKEND_POS(i) = queueHeadAfterWrite; + } } - /* OK, need to signal this one */ - pids[count] = pid; - procnos[count] = i; - count++; } LWLockRelease(NotifyQueueLock); @@ -1673,12 +2167,97 @@ AtAbort_Notify(void) /* * If we LISTEN but then roll back the transaction after PreCommit_Notify, * we have registered as a listener but have not made any entry in - * listenChannels. In that case, deregister again. + * listenChannelsHash. In that case, deregister again. */ - if (amRegisteredListener && listenChannels == NIL) + if (amRegisteredListener && + (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0)) asyncQueueUnregister(); - /* And clean up */ + /* + * Remove any channels we added during Exec_ListenPreCommit. We need to + * clean up both the local listenChannelsHash and any inactive entries in + * the shared channelHash to avoid accumulating stale data. + */ + if (pendingActions != NULL) + { + ListCell *p; + + foreach(p, pendingActions->actions) + { + ListenAction *actrec = (ListenAction *) lfirst(p); + ChannelHashKey key; + ChannelEntry *entry; + ListenerEntry *listeners; + bool removeFromLocal; + bool found; + int i; + + if (actrec->action != LISTEN_LISTEN) + continue; + + /* + * For each LISTEN action, determine if we should clean up the + * local and/or shared hash entries. If we have an active=true + * entry in the shared hash, we were already listening from a + * previous transaction, so leave everything alone. Otherwise, + * clean up what this transaction added. + */ + removeFromLocal = true; + found = false; + + if (channelHash != NULL) + { + ChannelHashPrepareKey(&key, MyDatabaseId, actrec->channel); + entry = dshash_find(channelHash, &key, true); + + if (entry != NULL) + { + listeners = (ListenerEntry *) dsa_get_address(channelDSA, + entry->listenersArray); + + for (i = 0; i < entry->numListeners; i++) + { + if (listeners[i].procno == MyProcNumber) + { + found = true; + + if (listeners[i].active) + { + /* Already committed - leave both hashes alone */ + removeFromLocal = false; + } + else + { + /* Inactive - remove from shared hash */ + entry->numListeners--; + if (i < entry->numListeners) + memmove(&listeners[i], &listeners[i + 1], + sizeof(ListenerEntry) * (entry->numListeners - i)); + + if (entry->numListeners == 0) + { + dsa_free(channelDSA, entry->listenersArray); + dshash_delete_entry(channelHash, entry); + } + else + dshash_release_lock(channelHash, entry); + } + break; + } + } + + if (!found) + dshash_release_lock(channelHash, entry); + } + } + + /* Remove from local hash if appropriate */ + if (removeFromLocal && listenChannelsHash != NULL) + (void) hash_search(listenChannelsHash, actrec->channel, + HASH_REMOVE, NULL); + } + } + ClearPendingActionsAndNotifies(); } @@ -1854,20 +2433,29 @@ asyncQueueReadAllNotifications(void) QueuePosition head; Snapshot snapshot; - /* Fetch current state */ + /* + * Fetch current state, indicate to others that we have woken up, and that + * we now will be advancing our position. + */ LWLockAcquire(NotifyQueueLock, LW_SHARED); /* Assert checks that we have a valid state entry */ Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber)); - pos = QUEUE_BACKEND_POS(MyProcNumber); + QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false; head = QUEUE_HEAD; - LWLockRelease(NotifyQueueLock); + pos = QUEUE_BACKEND_POS(MyProcNumber); if (QUEUE_POS_EQUAL(pos, head)) { /* Nothing to do, we have read all notifications already. */ + LWLockRelease(NotifyQueueLock); return; } + QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true; + QUEUE_BACKEND_ADVANCING_POS(MyProcNumber) = head; + + LWLockRelease(NotifyQueueLock); + /*---------- * Get snapshot we'll use to decide which xacts are still in progress. * This is trickier than it might seem, because of race conditions. @@ -1954,6 +2542,8 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); + + QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false; QUEUE_BACKEND_POS(MyProcNumber) = pos; LWLockRelease(NotifyQueueLock); @@ -2051,7 +2641,7 @@ asyncQueueProcessPageEntries(QueuePosition *current, * over it on the first LISTEN in a session, and not get stuck on * it indefinitely. */ - if (listenChannels == NIL) + if (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0) continue; if (TransactionIdDidCommit(qe->xid)) @@ -2306,7 +2896,7 @@ ProcessIncomingNotify(bool flush) notifyInterruptPending = false; /* Do nothing else if we aren't actively listening */ - if (listenChannels == NIL) + if (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0) return; if (Trace_notify) @@ -2410,13 +3000,15 @@ AddEventToPendingNotifies(Notification *n) { Assert(pendingNotifies->events != NIL); - /* Create the hash table if it's time to */ + /* Create the hash tables if it's time to */ if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES && pendingNotifies->hashtab == NULL) { HASHCTL hash_ctl; ListCell *l; + Assert(pendingNotifies->channelHashtab == NULL); + /* Create the hash table */ hash_ctl.keysize = sizeof(Notification *); hash_ctl.entrysize = sizeof(struct NotificationHash); @@ -2429,10 +3021,22 @@ AddEventToPendingNotifies(Notification *n) &hash_ctl, HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT); + /* Create the channel hash table */ + memset(&hash_ctl, 0, sizeof(hash_ctl)); + hash_ctl.keysize = NAMEDATALEN; + hash_ctl.entrysize = sizeof(struct ChannelHash); + hash_ctl.hcxt = CurTransactionContext; + pendingNotifies->channelHashtab = + hash_create("Pending Notify Channels", + 64L, + &hash_ctl, + HASH_ELEM | HASH_STRINGS | HASH_CONTEXT); + /* Insert all the already-existing events */ foreach(l, pendingNotifies->events) { Notification *oldn = (Notification *) lfirst(l); + char *channel = oldn->data; bool found; (void) hash_search(pendingNotifies->hashtab, @@ -2440,22 +3044,42 @@ AddEventToPendingNotifies(Notification *n) HASH_ENTER, &found); Assert(!found); + + /* Insert channel into channelHashtab */ + (void) hash_search(pendingNotifies->channelHashtab, + channel, + HASH_ENTER, + &found); + /* found may be true if multiple events on same channel */ } } /* Add new event to the list, in order */ pendingNotifies->events = lappend(pendingNotifies->events, n); - /* Add event to the hash table if needed */ + /* Add event to the hash tables if needed */ if (pendingNotifies->hashtab != NULL) { bool found; + Assert(pendingNotifies->channelHashtab != NULL); + (void) hash_search(pendingNotifies->hashtab, &n, HASH_ENTER, &found); Assert(!found); + + /* Add channel to channelHashtab */ + { + char *channel = n->data; + + (void) hash_search(pendingNotifies->channelHashtab, + channel, + HASH_ENTER, + &found); + /* found may be true if we already have an event on this channel */ + } } } @@ -2493,7 +3117,7 @@ notification_match(const void *key1, const void *key2, Size keysize) return 1; /* not equal */ } -/* Clear the pendingActions and pendingNotifies lists. */ +/* Clear the pendingActions, pendingNotifies, and pendingNotifyChannels lists. */ static void ClearPendingActionsAndNotifies(void) { @@ -2505,6 +3129,7 @@ ClearPendingActionsAndNotifies(void) */ pendingActions = NULL; pendingNotifies = NULL; + pendingNotifyChannels = NIL; } /* @@ -2515,3 +3140,16 @@ check_notify_buffers(int *newval, void **extra, GucSource source) { return check_slru_buffers("notify_buffers", newval); } + + +/* + * ChannelHashPrepareKey + * Prepare a channel key for use as a hash key. + */ +static inline void +ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel) +{ + memset(key, 0, sizeof(ChannelHashKey)); + key->dboid = dboid; + strlcpy(key->channel, channel, NAMEDATALEN); +} diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index f39830dbb344..e57bc12b602f 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -370,6 +370,7 @@ SubtransBuffer "Waiting for I/O on a sub-transaction SLRU buffer." MultiXactOffsetBuffer "Waiting for I/O on a multixact offset SLRU buffer." MultiXactMemberBuffer "Waiting for I/O on a multixact member SLRU buffer." NotifyBuffer "Waiting for I/O on a NOTIFY message SLRU buffer." +NotifyChannelHash "Waiting to access the NOTIFY channel hash table." SerialBuffer "Waiting for I/O on a serializable transaction conflict SLRU buffer." WALInsert "Waiting to insert WAL data into a memory buffer." BufferContent "Waiting to access a data page in memory." diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index 5b0ce383408c..4236965e72a9 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -101,6 +101,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer) PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer) PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer) PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer) +PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash) PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer) PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert) PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent) diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 556e1805893a..443a6eb669fc 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,4 +1,4 @@ -Parsed test spec with 3 sessions +Parsed test spec with 7 sessions starting permutation: listenc notify1 notify2 notify3 notifyf step listenc: LISTEN c1; LISTEN c2; @@ -47,6 +47,105 @@ notifier: NOTIFY "c2" with payload "payload" from notifier notifier: NOTIFY "c1" with payload "payloads" from notifier notifier: NOTIFY "c2" with payload "payloads" from notifier +starting permutation: listenc notifys_simple +step listenc: LISTEN c1; LISTEN c2; +step notifys_simple: + BEGIN; + SAVEPOINT s1; + NOTIFY c1, 'simple1'; + NOTIFY c2, 'simple2'; + RELEASE SAVEPOINT s1; + COMMIT; + +notifier: NOTIFY "c1" with payload "simple1" from notifier +notifier: NOTIFY "c2" with payload "simple2" from notifier + +starting permutation: lsbegin lssavepoint lslisten lsrelease lscommit lsnotify +step lsbegin: BEGIN; +step lssavepoint: SAVEPOINT s1; +step lslisten: LISTEN c1; LISTEN c2; +step lsrelease: RELEASE SAVEPOINT s1; +step lscommit: COMMIT; +step lsnotify: NOTIFY c1, 'subxact_test'; +listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact + +starting permutation: lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify +step lsbegin: BEGIN; +step lslisten_outer: LISTEN c3; +step lssavepoint: SAVEPOINT s1; +step lslisten: LISTEN c1; LISTEN c2; +step lsrelease: RELEASE SAVEPOINT s1; +step lscommit: COMMIT; +step lsnotify: NOTIFY c1, 'subxact_test'; +listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact + +starting permutation: lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check +step lsbegin: BEGIN; +step lssavepoint: SAVEPOINT s1; +step lslisten: LISTEN c1; LISTEN c2; +step lsrollback: ROLLBACK TO SAVEPOINT s1; +step lscommit: COMMIT; +step lsnotify_check: NOTIFY c1, 'should_not_receive'; + +starting permutation: listenc notify_many_with_dup +step listenc: LISTEN c1; LISTEN c2; +step notify_many_with_dup: + BEGIN; + SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s; + SELECT pg_notify('c1', 'msg1'); + COMMIT; + +pg_notify +--------- + + + + + + + + + + + + + + + + + +(17 rows) + +pg_notify +--------- + +(1 row) + +notifier: NOTIFY "c1" with payload "msg1" from notifier +notifier: NOTIFY "c1" with payload "msg2" from notifier +notifier: NOTIFY "c1" with payload "msg3" from notifier +notifier: NOTIFY "c1" with payload "msg4" from notifier +notifier: NOTIFY "c1" with payload "msg5" from notifier +notifier: NOTIFY "c1" with payload "msg6" from notifier +notifier: NOTIFY "c1" with payload "msg7" from notifier +notifier: NOTIFY "c1" with payload "msg8" from notifier +notifier: NOTIFY "c1" with payload "msg9" from notifier +notifier: NOTIFY "c1" with payload "msg10" from notifier +notifier: NOTIFY "c1" with payload "msg11" from notifier +notifier: NOTIFY "c1" with payload "msg12" from notifier +notifier: NOTIFY "c1" with payload "msg13" from notifier +notifier: NOTIFY "c1" with payload "msg14" from notifier +notifier: NOTIFY "c1" with payload "msg15" from notifier +notifier: NOTIFY "c1" with payload "msg16" from notifier +notifier: NOTIFY "c1" with payload "msg17" from notifier + +starting permutation: listenc llisten l2listen l3listen lslisten +step listenc: LISTEN c1; LISTEN c2; +step llisten: LISTEN c1; LISTEN c2; +step l2listen: LISTEN c1; +step l3listen: LISTEN c1; +step lslisten: LISTEN c1; LISTEN c2; + starting permutation: llisten notify1 notify2 notify3 notifyf lcheck step llisten: LISTEN c1; LISTEN c2; step notify1: NOTIFY c1; @@ -95,6 +194,8 @@ listener: NOTIFY "c2" with payload "" from notifier starting permutation: l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop step l2listen: LISTEN c1; +listener2: NOTIFY "c1" with payload "" from notifier +listener2: NOTIFY "c1" with payload "" from notifier step l2begin: BEGIN; step notify1: NOTIFY c1; step lbegins: BEGIN ISOLATION LEVEL SERIALIZABLE; @@ -104,6 +205,17 @@ step l2commit: COMMIT; listener2: NOTIFY "c1" with payload "" from notifier step l2stop: UNLISTEN *; +starting permutation: lch_listen nch_notify lch_check +step lch_listen: LISTEN ch; +step nch_notify: NOTIFY ch, 'aa'; +step lch_check: SELECT 1 AS x; +x +- +1 +(1 row) + +listener_ch: NOTIFY "ch" with payload "aa" from notifier_ch + starting permutation: llisten lbegin usage bignotify usage step llisten: LISTEN c1; LISTEN c2; step lbegin: BEGIN; diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index 0b8cfd910837..0a01e777b98e 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -31,6 +31,20 @@ step notifys1 { ROLLBACK TO SAVEPOINT s2; COMMIT; } +step notifys_simple { + BEGIN; + SAVEPOINT s1; + NOTIFY c1, 'simple1'; + NOTIFY c2, 'simple2'; + RELEASE SAVEPOINT s1; + COMMIT; +} +step notify_many_with_dup { + BEGIN; + SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s; + SELECT pg_notify('c1', 'msg1'); + COMMIT; +} step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; } step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; } teardown { UNLISTEN *; } @@ -53,6 +67,38 @@ step l2begin { BEGIN; } step l2commit { COMMIT; } step l2stop { UNLISTEN *; } +# Third listener session for testing array growth. + +session listener3 +step l3listen { LISTEN c1; } +teardown { UNLISTEN *; } + +# Listener session for cross-session notification test with channel 'ch'. + +session listener_ch +step lch_listen { LISTEN ch; } +step lch_check { SELECT 1 AS x; } +teardown { UNLISTEN *; } + +# Notifier session for cross-session notification test with channel 'ch'. + +session notifier_ch +step nch_notify { NOTIFY ch, 'aa'; } + +# Session for testing LISTEN in subtransaction with separate steps. + +session listen_subxact +step lsbegin { BEGIN; } +step lslisten_outer { LISTEN c3; } +step lssavepoint { SAVEPOINT s1; } +step lslisten { LISTEN c1; LISTEN c2; } +step lsrelease { RELEASE SAVEPOINT s1; } +step lsrollback { ROLLBACK TO SAVEPOINT s1; } +step lscommit { COMMIT; } +step lsnotify { NOTIFY c1, 'subxact_test'; } +step lsnotify_check { NOTIFY c1, 'should_not_receive'; } +teardown { UNLISTEN *; } + # Trivial cases. permutation listenc notify1 notify2 notify3 notifyf @@ -60,6 +106,24 @@ permutation listenc notify1 notify2 notify3 notifyf # Check simple and less-simple deduplication. permutation listenc notifyd1 notifyd2 notifys1 +# Check simple NOTIFY reparenting when parent has no action. +permutation listenc notifys_simple + +# Check LISTEN reparenting in subtransaction. +permutation lsbegin lssavepoint lslisten lsrelease lscommit lsnotify + +# Check LISTEN merge path when both outer and inner transactions have actions. +permutation lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify + +# Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions). +permutation lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check + +# Check notification_match function (triggered by hash table duplicate detection). +permutation listenc notify_many_with_dup + +# Check ChannelHashAddListener array growth. +permutation listenc llisten l2listen l3listen lslisten + # Cross-backend notification delivery. We use a "select 1" to force the # listener session to check for notifies. In principle we could just wait # for delivery, but that would require extra support in isolationtester @@ -73,6 +137,10 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck # and notify queue is not empty permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop +# Check that notifications sent from a backend that has not done LISTEN +# are properly delivered to a listener in another backend. +permutation lch_listen nch_notify lch_check + # Verify that pg_notification_queue_usage correctly reports a non-zero result, # after submitting notifications while another connection is listening for # those notifications and waiting inside an active transaction. We have to diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9dd65b102544..c4c3748f623c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -413,6 +413,8 @@ CatalogIdMapEntry CatalogIndexState ChangeVarNodes_callback ChangeVarNodes_context +ChannelEntry +ChannelHashKey CheckPoint CheckPointStmt CheckpointStatsData @@ -1567,6 +1569,7 @@ ListDictionary ListParsedLex ListenAction ListenActionKind +ListenerEntry ListenStmt LoInfo LoadStmt