diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index eb86402cae43..9cf27ead046a 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,10 @@
* All notification messages are placed in the queue and later read out
* by listening backends.
*
- * There is no central knowledge of which backend listens on which channel;
- * every backend has its own list of interesting channels.
+ * We also maintain a dynamic shared hash table (dshash) that maps channel
+ * names to the set of backends listening on each channel. This table is
+ * created lazily on the first LISTEN command and grows dynamically as
+ * needed.
*
* Although there is only one queue, notifications are treated as being
* database-local; this is done by including the sender's database OID
@@ -68,16 +70,24 @@
* CommitTransaction() which will then do the actual transaction commit.
*
* After commit we are called another time (AtCommit_Notify()). Here we
- * make any actual updates to the effective listen state (listenChannels).
- * Then we signal any backends that may be interested in our messages
- * (including our own backend, if listening). This is done by
- * SignalBackends(), which scans the list of listening backends and sends a
- * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- * know which backend is listening on which channel so we must signal them
- * all). We can exclude backends that are already up to date, though, and
- * we can also exclude backends that are in other databases (unless they
- * are way behind and should be kicked to make them advance their
- * pointers).
+ * make any actual updates to the local listen state (listenChannelsHash) and
+ * shared channel hash table (channelHash). Then we signal any backends
+ * that may be interested in our messages (including our own backend,
+ * if listening). This is done by SignalBackends(), which consults the
+ * shared channel hash table to identify listeners for the channels that
+ * have pending notifications in the current database. Each selected
+ * backend is marked as having a wakeup pending to avoid duplicate signals,
+ * and a PROCSIG_NOTIFY_INTERRUPT signal is sent to it.
+ *
+ * When writing notifications, PreCommit_Notify() records the queue head
+ * position both before and after the write. Because all writers serialize
+ * on a cluster-wide heavyweight lock, no backend can insert entries between
+ * these two points. SignalBackends() uses this fact to directly advance any
+ * backend that is still positioned at the old head, or within the range
+ * written, avoiding unnecessary wakeups for idle listeners that have
+ * nothing to read. Backends that cannot be direct advanced are signaled
+ * if they are stuck behind the old queue head, or advancing to a position
+ * before the new queue head, since otherwise notifications could be delayed.
*
* Finally, after we are out of the transaction altogether and about to go
* idle, we scan the queue for messages that need to be sent to our
@@ -137,14 +147,17 @@
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
+#include "lib/dshash.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
+#include "storage/dsm_registry.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/dsa.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
@@ -162,6 +175,43 @@
*/
#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
+/*
+ * Channel hash table definitions
+ *
+ * This hash table maps (database OID, channel name) keys to arrays of
+ * ListenerEntry structs representing the backends listening on each channel.
+ */
+
+#define INITIAL_LISTENERS_ARRAY_SIZE 4
+
+typedef struct ChannelHashKey
+{
+ Oid dboid;
+ char channel[NAMEDATALEN];
+} ChannelHashKey;
+
+/*
+ * Individual listener entry in the channel's listeners array.
+ *
+ * We store both the ProcNumber and an active flag. During PreCommit,
+ * we insert entries with active=false to pre-allocate space and avoid
+ * OOM failures after transaction commit. Then in AtCommit, we just set
+ * the active flag to true, which has acceptably low risk of failure.
+ */
+typedef struct ListenerEntry
+{
+ ProcNumber procno; /* backend's ProcNumber */
+ bool active; /* true if actually listening */
+} ListenerEntry;
+
+typedef struct ChannelEntry
+{
+ ChannelHashKey key;
+ dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
+ int numListeners; /* Number of listeners currently stored */
+ int allocatedListeners; /* Allocated size of array */
+} ChannelEntry;
+
/*
* Struct representing an entry in the global notify queue
*
@@ -224,11 +274,14 @@ typedef struct QueuePosition
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) || \
+ ((x).page == (y).page && (x).offset < (y).offset))
+
/*
* Parameter determining how often we try to advance the tail pointer:
- * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
- * also the distance by which a backend in another database needs to be
- * behind before we'll decide we need to wake it up to advance its pointer.
+ * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.
*
* Resist the temptation to make this really large. While that would save
* work in some places, it would add cost in others. In particular, this
@@ -246,6 +299,9 @@ typedef struct QueueBackendStatus
Oid dboid; /* backend's database OID, or InvalidOid */
ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
QueuePosition pos; /* backend has read queue up to here */
+ bool wakeupPending; /* signal sent but not yet processed */
+ bool isAdvancing; /* backend is advancing its position */
+ QueuePosition advancingPos; /* target position backend is advancing to */
} QueueBackendStatus;
/*
@@ -260,9 +316,10 @@ typedef struct QueueBackendStatus
* (since no other backend will inspect it).
*
* When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
- * entries of other backends and also change the head pointer. When holding
- * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
- * can change the tail pointers.
+ * entries of other backends and also change the head pointer. They can
+ * also advance other backends' queue positions, unless they are not
+ * in the process of doing that themselves. When holding both NotifyQueueLock and
+ * NotifyQueueTailLock in EXCLUSIVE mode, backends can change the tail pointers.
*
* SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
* the control lock for the pg_notify SLRU buffers.
@@ -288,11 +345,16 @@ typedef struct AsyncQueueControl
ProcNumber firstListener; /* id of first listener, or
* INVALID_PROC_NUMBER */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
+ dsa_handle channelHashDSA;
+ dshash_table_handle channelHashDSH;
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
} AsyncQueueControl;
static AsyncQueueControl *asyncQueueControl;
+static dsa_area *channelDSA = NULL;
+static dshash_table *channelHash = NULL;
+
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
@@ -301,6 +363,9 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
+#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing)
+#define QUEUE_BACKEND_ADVANCING_POS(i) (asyncQueueControl->backend[i].advancingPos)
/*
* The SLRU buffer area through which we access the notification queue
@@ -313,16 +378,16 @@ static SlruCtlData NotifyCtlData;
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
/*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on). It is a simple list of channel names,
+ * listenChannelsHash identifies the channels we are actually listening to
+ * (ie, have committed a LISTEN on). It is a hash table of channel names,
* allocated in TopMemoryContext.
*/
-static List *listenChannels = NIL; /* list of C strings */
+static HTAB *listenChannelsHash = NULL;
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
* all actions requested in the current transaction. As explained above,
- * we don't actually change listenChannels until we reach transaction commit.
+ * we don't actually change listenChannelsHash until we reach transaction commit.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
@@ -391,6 +456,7 @@ typedef struct NotificationList
int nestingLevel; /* current transaction nesting depth */
List *events; /* list of Notification structs */
HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
+ HTAB *channelHashtab; /* hash of unique channel names, or NULL */
struct NotificationList *upper; /* details for upper transaction levels */
} NotificationList;
@@ -401,6 +467,11 @@ struct NotificationHash
Notification *event; /* => the actual Notification struct */
};
+struct ChannelHash
+{
+ char channel[NAMEDATALEN];
+};
+
static NotificationList *pendingNotifies = NULL;
/*
@@ -418,6 +489,20 @@ static bool unlistenExitRegistered = false;
/* True if we're currently registered as a listener in asyncQueueControl */
static bool amRegisteredListener = false;
+/*
+ * Queue head positions for direct advancement.
+ * These are captured during PreCommit_Notify while holding the heavyweight
+ * lock on database 0, ensuring no other backend can insert notifications
+ * between them. SignalBackends uses these to advance idle backends.
+ */
+static QueuePosition queueHeadBeforeWrite;
+static QueuePosition queueHeadAfterWrite;
+
+/*
+ * List of channels with pending notifications in the current transaction.
+ */
+static List *pendingNotifyChannels = NIL;
+
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
static bool tryAdvanceTail = false;
@@ -428,11 +513,10 @@ bool Trace_notify = false;
int max_notify_queue_pages = 1048576;
/* local function prototypes */
-static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
-static void Exec_ListenPreCommit(void);
+static void Exec_ListenPreCommit(const char *channel);
static void Exec_ListenCommit(const char *channel);
static void Exec_UnlistenCommit(const char *channel);
static void Exec_UnlistenAllCommit(void);
@@ -456,25 +540,117 @@ static void AddEventToPendingNotifies(Notification *n);
static uint32 notification_hash(const void *key, Size keysize);
static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
+static inline void ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel);
+static dshash_hash channelHashFunc(const void *key, size_t size, void *arg);
+static void initChannelHash(void);
/*
- * Compute the difference between two queue page numbers.
+ * Determines whether p precedes q.
* Previously this function accounted for a wraparound.
*/
-static inline int64
-asyncQueuePageDiff(int64 p, int64 q)
+static inline bool
+asyncQueuePagePrecedes(int64 p, int64 q)
{
- return p - q;
+ return p < q;
}
/*
- * Determines whether p precedes q.
- * Previously this function accounted for a wraparound.
+ * channelHashFunc
+ * Hash function for channel keys.
*/
-static inline bool
-asyncQueuePagePrecedes(int64 p, int64 q)
+static dshash_hash
+channelHashFunc(const void *key, size_t size, void *arg)
{
- return p < q;
+ const ChannelHashKey *k = (const ChannelHashKey *) key;
+ dshash_hash h;
+
+ h = DatumGetUInt32(hash_uint32(k->dboid));
+ h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+ strnlen(k->channel, NAMEDATALEN)));
+
+ return h;
+}
+
+/* parameters for the channel hash table */
+static const dshash_parameters channelDSHParams = {
+ sizeof(ChannelHashKey),
+ sizeof(ChannelEntry),
+ dshash_memcmp,
+ channelHashFunc,
+ dshash_memcpy,
+ LWTRANCHE_NOTIFY_CHANNEL_HASH
+};
+
+/*
+ * initChannelHash
+ * Lazy initialization of the channel hash table.
+ */
+static void
+initChannelHash(void)
+{
+ MemoryContext oldcontext;
+
+ /* Quick exit if we already did this */
+ if (asyncQueueControl->channelHashDSH != DSHASH_HANDLE_INVALID &&
+ channelHash != NULL)
+ return;
+
+ /* Otherwise, use a lock to ensure only one process creates the table */
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+ /* Be sure any local memory allocated by DSA routines is persistent */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+ if (asyncQueueControl->channelHashDSH == DSHASH_HANDLE_INVALID)
+ {
+ /* Initialize dynamic shared hash table for channel hash */
+ channelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
+ dsa_pin(channelDSA);
+ dsa_pin_mapping(channelDSA);
+ channelHash = dshash_create(channelDSA, &channelDSHParams, NULL);
+
+ /* Store handles in shared memory for other backends to use */
+ asyncQueueControl->channelHashDSA = dsa_get_handle(channelDSA);
+ asyncQueueControl->channelHashDSH =
+ dshash_get_hash_table_handle(channelHash);
+ }
+ else if (!channelHash)
+ {
+ /* Attach to existing dynamic shared hash table */
+ channelDSA = dsa_attach(asyncQueueControl->channelHashDSA);
+ dsa_pin_mapping(channelDSA);
+ channelHash = dshash_attach(channelDSA, &channelDSHParams,
+ asyncQueueControl->channelHashDSH,
+ NULL);
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+ LWLockRelease(NotifyQueueLock);
+}
+
+/*
+ * initListenChannelsHash
+ * Lazy initialization of the local listen channels hash table.
+ */
+static void
+initListenChannelsHash(void)
+{
+ HASHCTL hash_ctl;
+
+ /* Quick exit if we already did this */
+ if (listenChannelsHash != NULL)
+ return;
+
+ /* Initialize local hash table for this backend's listened channels */
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = NAMEDATALEN;
+ hash_ctl.entrysize = sizeof(struct ChannelHash);
+
+ listenChannelsHash =
+ hash_create("Listen Channels",
+ 64,
+ &hash_ctl,
+ HASH_ELEM | HASH_STRINGS);
}
/*
@@ -520,12 +696,17 @@ AsyncShmemInit(void)
QUEUE_STOP_PAGE = 0;
QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
asyncQueueControl->lastQueueFillWarn = 0;
+ asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
+ asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
for (int i = 0; i < MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
QUEUE_BACKEND_DBOID(i) = InvalidOid;
QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ SET_QUEUE_POS(QUEUE_BACKEND_ADVANCING_POS(i), 0, 0);
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
+ QUEUE_BACKEND_IS_ADVANCING(i) = false;
}
}
@@ -656,6 +837,7 @@ Async_Notify(const char *channel, const char *payload)
notifies->events = list_make1(n);
/* We certainly don't need a hashtable yet */
notifies->hashtab = NULL;
+ notifies->channelHashtab = NULL;
notifies->upper = pendingNotifies;
pendingNotifies = notifies;
}
@@ -682,7 +864,7 @@ Async_Notify(const char *channel, const char *payload)
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
- * Actual update of the listenChannels list happens during transaction
+ * Actual update of the listenChannelsHash happens during transaction
* commit.
*/
static void
@@ -782,30 +964,49 @@ Async_UnlistenAll(void)
* SQL function: return a set of the channel names this backend is actively
* listening to.
*
- * Note: this coding relies on the fact that the listenChannels list cannot
+ * Note: this coding relies on the fact that the listenChannelsHash cannot
* change within a transaction.
*/
Datum
pg_listening_channels(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
+ HASH_SEQ_STATUS *status;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
+ MemoryContext oldcontext;
+
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
+
+ /* Initialize hash table iteration if we have any channels */
+ if (listenChannelsHash != NULL)
+ {
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
+ hash_seq_init(status, listenChannelsHash);
+ funcctx->user_fctx = status;
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ funcctx->user_fctx = NULL;
+ }
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
+ status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
- if (funcctx->call_cntr < list_length(listenChannels))
+ if (status != NULL)
{
- char *channel = (char *) list_nth(listenChannels,
- funcctx->call_cntr);
+ struct ChannelHash *entry;
- SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+ entry = (struct ChannelHash *) hash_seq_search(status);
+ if (entry != NULL)
+ SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
}
SRF_RETURN_DONE(funcctx);
@@ -877,7 +1078,7 @@ PreCommit_Notify(void)
switch (actrec->action)
{
case LISTEN_LISTEN:
- Exec_ListenPreCommit();
+ Exec_ListenPreCommit(actrec->channel);
break;
case LISTEN_UNLISTEN:
/* there is no Exec_UnlistenPreCommit() */
@@ -893,6 +1094,36 @@ PreCommit_Notify(void)
if (pendingNotifies)
{
ListCell *nextNotify;
+ bool firstIteration = true;
+
+ /*
+ * Build list of unique channels for SignalBackends().
+ *
+ * If we have a channelHashtab, use it to efficiently get the unique
+ * channels. Otherwise, fall back to the linear approach.
+ */
+ pendingNotifyChannels = NIL;
+ if (pendingNotifies->channelHashtab != NULL)
+ {
+ HASH_SEQ_STATUS status;
+ struct ChannelHash *channelEntry;
+
+ hash_seq_init(&status, pendingNotifies->channelHashtab);
+ while ((channelEntry = (struct ChannelHash *) hash_seq_search(&status)) != NULL)
+ pendingNotifyChannels = lappend(pendingNotifyChannels, channelEntry->channel);
+ }
+ else
+ {
+ /* Linear approach for small number of notifications */
+ foreach_ptr(Notification, n, pendingNotifies->events)
+ {
+ char *channel = n->data;
+
+ /* Add if not already in list */
+ if (!list_member_ptr(pendingNotifyChannels, channel))
+ pendingNotifyChannels = lappend(pendingNotifyChannels, channel);
+ }
+ }
/*
* Make sure that we have an XID assigned to the current transaction.
@@ -902,6 +1133,13 @@ PreCommit_Notify(void)
*/
(void) GetCurrentTransactionId();
+ /*
+ * We will be calling SignalBackends() at AtCommit_Notify time, so
+ * make sure its auxiliary data structures exist now, where an ERROR
+ * will still abort the transaction cleanly.
+ */
+ initChannelHash();
+
/*
* Serialize writers by acquiring a special lock that we hold till
* after commit. This ensures that queue entries appear in commit
@@ -921,6 +1159,22 @@ PreCommit_Notify(void)
LockSharedObject(DatabaseRelationId, InvalidOid, 0,
AccessExclusiveLock);
+ /*
+ * For the direct advancement optimization in SignalBackends(), we
+ * need to ensure that no other backend can insert queue entries
+ * between queueHeadBeforeWrite and queueHeadAfterWrite. The
+ * heavyweight lock above provides this guarantee, since it serializes
+ * all writers.
+ *
+ * Note: if the heavyweight lock were ever removed for scalability
+ * reasons, we could achieve the same guarantee by holding
+ * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
+ * than releasing and reacquiring it for each page as we do below.
+ */
+
+ /* Initialize queueHeadBeforeWrite to a safe default */
+ SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
+
/* Now push the notifications into the queue */
nextNotify = list_head(pendingNotifies->events);
while (nextNotify != NULL)
@@ -938,12 +1192,20 @@ PreCommit_Notify(void)
* point in time we can still roll the transaction back.
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ if (firstIteration)
+ {
+ queueHeadBeforeWrite = QUEUE_HEAD;
+ firstIteration = false;
+ }
+
asyncQueueFillWarning();
if (asyncQueueIsFull())
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
+ queueHeadAfterWrite = QUEUE_HEAD;
+
LWLockRelease(NotifyQueueLock);
}
@@ -956,7 +1218,7 @@ PreCommit_Notify(void)
*
* This is called at transaction commit, after committing to clog.
*
- * Update listenChannels and clear transaction-local state.
+ * Update listenChannelsHash and clear transaction-local state.
*
* If we issued any notifications in the transaction, send signals to
* listening backends (possibly including ourselves) to process them.
@@ -1001,7 +1263,8 @@ AtCommit_Notify(void)
}
/* If no longer listening to anything, get out of listener array */
- if (amRegisteredListener && listenChannels == NIL)
+ if (amRegisteredListener &&
+ (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0))
asyncQueueUnregister();
/*
@@ -1037,147 +1300,270 @@ AtCommit_Notify(void)
* This function must make sure we are ready to catch any incoming messages.
*/
static void
-Exec_ListenPreCommit(void)
+Exec_ListenPreCommit(const char *channel)
{
- QueuePosition head;
- QueuePosition max;
- ProcNumber prevListener;
+ ChannelHashKey key;
+ ChannelEntry *entry;
+ bool found;
+ ListenerEntry *listeners;
/*
- * Nothing to do if we are already listening to something, nor if we
- * already ran this routine in this transaction.
+ * If this is our first LISTEN in this transaction, register as a
+ * listener.
*/
- if (amRegisteredListener)
+ if (!amRegisteredListener)
+ {
+ QueuePosition head;
+ QueuePosition max;
+ ProcNumber prevListener;
+
+ if (Trace_notify)
+ elog(DEBUG1, "Exec_ListenPreCommit(%s,%d)", channel, MyProcPid);
+
+ /*
+ * Before registering, make sure we will unlisten before dying. (Note:
+ * this action does not get undone if we abort later.)
+ */
+ if (!unlistenExitRegistered)
+ {
+ before_shmem_exit(Async_UnlistenOnExit, 0);
+ unlistenExitRegistered = true;
+ }
+
+ /*
+ * This is our first LISTEN, so establish our pointer.
+ *
+ * We set our pointer to the global tail pointer and then move it
+ * forward over already-committed notifications. This ensures we
+ * cannot miss any not-yet-committed notifications. We might get a
+ * few more but that doesn't hurt.
+ *
+ * In some scenarios there might be a lot of committed notifications
+ * that have not yet been pruned away (because some backend is being
+ * lazy about reading them). To reduce our startup time, we can look
+ * at other backends and adopt the maximum "pos" pointer of any
+ * backend that's in our database; any notifications it's already
+ * advanced over are surely committed and need not be re-examined by
+ * us. (We must consider only backends connected to our DB, because
+ * others will not have bothered to check committed-ness of
+ * notifications in our DB.)
+ *
+ * We need exclusive lock here so we can look at other backends'
+ * entries and manipulate the list links.
+ */
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ head = QUEUE_HEAD;
+ max = QUEUE_TAIL;
+ prevListener = INVALID_PROC_NUMBER;
+ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+ {
+ if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
+ /* Also find last listening backend before this one */
+ if (i < MyProcNumber)
+ prevListener = i;
+ }
+ QUEUE_BACKEND_POS(MyProcNumber) = max;
+ QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
+ QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
+ /* Insert backend into list of listeners at correct position */
+ if (prevListener != INVALID_PROC_NUMBER)
+ {
+ QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
+ QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
+ }
+ else
+ {
+ QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
+ QUEUE_FIRST_LISTENER = MyProcNumber;
+ }
+ LWLockRelease(NotifyQueueLock);
+
+ /* Now we are listed in the global array, so remember we're listening */
+ amRegisteredListener = true;
+
+ /*
+ * Try to move our pointer forward as far as possible. This will skip
+ * over already-committed notifications, which we want to do because
+ * they might be quite stale. Note that we are not yet listening on
+ * anything, so we won't deliver such notifications to our frontend.
+ * Also, although our transaction might have executed NOTIFY, those
+ * message(s) aren't queued yet so we won't skip them here.
+ */
+ if (!QUEUE_POS_EQUAL(max, head))
+ asyncQueueReadAllNotifications();
+ }
+
+ /* Do nothing if we are already listening on this channel */
+ if (IsListeningOn(channel))
return;
- if (Trace_notify)
- elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid);
+ /*
+ * Add the channel to listenChannelsHash. This can OOM, but we're still
+ * in PreCommit so the transaction can abort safely.
+ */
+ initListenChannelsHash();
+ (void) hash_search(listenChannelsHash, channel, HASH_ENTER, NULL);
/*
- * Before registering, make sure we will unlisten before dying. (Note:
- * this action does not get undone if we abort later.)
+ * Now update the shared channelHash. We insert an entry with
+ * active=false, which will be flipped to true in Exec_ListenCommit.
*/
- if (!unlistenExitRegistered)
- {
- before_shmem_exit(Async_UnlistenOnExit, 0);
- unlistenExitRegistered = true;
- }
+ initChannelHash();
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
/*
- * This is our first LISTEN, so establish our pointer.
- *
- * We set our pointer to the global tail pointer and then move it forward
- * over already-committed notifications. This ensures we cannot miss any
- * not-yet-committed notifications. We might get a few more but that
- * doesn't hurt.
- *
- * In some scenarios there might be a lot of committed notifications that
- * have not yet been pruned away (because some backend is being lazy about
- * reading them). To reduce our startup time, we can look at other
- * backends and adopt the maximum "pos" pointer of any backend that's in
- * our database; any notifications it's already advanced over are surely
- * committed and need not be re-examined by us. (We must consider only
- * backends connected to our DB, because others will not have bothered to
- * check committed-ness of notifications in our DB.)
- *
- * We need exclusive lock here so we can look at other backends' entries
- * and manipulate the list links.
+ * Find or create the channel entry. For new entries, we initialize
+ * listenersArray to InvalidDsaPointer as a marker.
*/
- LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
- head = QUEUE_HEAD;
- max = QUEUE_TAIL;
- prevListener = INVALID_PROC_NUMBER;
- for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
- {
- if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
- max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
- /* Also find last listening backend before this one */
- if (i < MyProcNumber)
- prevListener = i;
- }
- QUEUE_BACKEND_POS(MyProcNumber) = max;
- QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
- QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
- /* Insert backend into list of listeners at correct position */
- if (prevListener != INVALID_PROC_NUMBER)
+ entry = dshash_find_or_insert(channelHash, &key, &found);
+
+ if (!found)
+ entry->listenersArray = InvalidDsaPointer;
+
+ if (!DsaPointerIsValid(entry->listenersArray))
{
- QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener);
- QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber;
+ /* First listener for this channel */
+ entry->listenersArray = dsa_allocate(channelDSA,
+ sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE);
+ entry->numListeners = 0;
+ entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
}
- else
+
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ if (entry->numListeners >= entry->allocatedListeners)
{
- QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
- QUEUE_FIRST_LISTENER = MyProcNumber;
+ int new_size = entry->allocatedListeners * 2;
+ dsa_pointer new_array = dsa_allocate(channelDSA,
+ sizeof(ListenerEntry) * new_size);
+ ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ new_array);
+
+ memcpy(new_listeners, listeners,
+ sizeof(ListenerEntry) * entry->numListeners);
+
+ dsa_free(channelDSA, entry->listenersArray);
+ entry->listenersArray = new_array;
+ entry->allocatedListeners = new_size;
+ listeners = new_listeners;
}
- LWLockRelease(NotifyQueueLock);
- /* Now we are listed in the global array, so remember we're listening */
- amRegisteredListener = true;
+ listeners[entry->numListeners].procno = MyProcNumber;
+ listeners[entry->numListeners].active = false;
+ entry->numListeners++;
- /*
- * Try to move our pointer forward as far as possible. This will skip
- * over already-committed notifications, which we want to do because they
- * might be quite stale. Note that we are not yet listening on anything,
- * so we won't deliver such notifications to our frontend. Also, although
- * our transaction might have executed NOTIFY, those message(s) aren't
- * queued yet so we won't skip them here.
- */
- if (!QUEUE_POS_EQUAL(max, head))
- asyncQueueReadAllNotifications();
+ dshash_release_lock(channelHash, entry);
}
/*
* Exec_ListenCommit --- subroutine for AtCommit_Notify
*
- * Add the channel to the list of channels we are listening on.
+ * Activate the channel entry that was pre-allocated in Exec_ListenPreCommit.
+ * This is called after commit to clog, so it's important to have very low
+ * probability of failure. By design, all we do here is set the active
+ * flag.
*/
static void
Exec_ListenCommit(const char *channel)
{
- MemoryContext oldcontext;
+ ChannelHashKey key;
+ ChannelEntry *entry;
+ ListenerEntry *listeners;
+ int i;
- /* Do nothing if we are already listening on this channel */
- if (IsListeningOn(channel))
- return;
+ if (Trace_notify)
+ elog(DEBUG1, "Exec_ListenCommit(%s,%d)", channel, MyProcPid);
/*
- * Add the new channel name to listenChannels.
- *
- * XXX It is theoretically possible to get an out-of-memory failure here,
- * which would be bad because we already committed. For the moment it
- * doesn't seem worth trying to guard against that, but maybe improve this
- * later.
+ * The entry has been created in Exec_ListenPreCommit. If we get here,
+ * channelHash and the entry must exist.
*/
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- listenChannels = lappend(listenChannels, pstrdup(channel));
- MemoryContextSwitchTo(oldcontext);
+ Assert(channelHash != NULL);
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+ entry = dshash_find(channelHash, &key, true);
+ Assert(entry != NULL);
+
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procno == MyProcNumber)
+ {
+ listeners[i].active = true;
+ dshash_release_lock(channelHash, entry);
+ return;
+ }
+ }
+
+ /* If the entry is not found, it's a bug. */
+ Assert(false);
+ dshash_release_lock(channelHash, entry);
}
/*
* Exec_UnlistenCommit --- subroutine for AtCommit_Notify
*
- * Remove the specified channel name from listenChannels.
+ * Unlisten the specified channel for this backend.
*/
static void
Exec_UnlistenCommit(const char *channel)
{
- ListCell *q;
+ ChannelHashKey key;
+ ChannelEntry *entry;
+ ListenerEntry *listeners;
+ int i;
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
- foreach(q, listenChannels)
- {
- char *lchan = (char *) lfirst(q);
+ /* Remove from our local cache */
+ if (listenChannelsHash != NULL)
+ (void) hash_search(listenChannelsHash, channel, HASH_REMOVE, NULL);
+
+ /* Now remove from the shared channelHash */
+ if (channelHash == NULL)
+ return;
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+ /* Look up the channel with exclusive lock so we can modify it */
+ entry = dshash_find(channelHash, &key, true);
+ if (entry == NULL)
+ return;
+
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ entry->listenersArray);
- if (strcmp(lchan, channel) == 0)
+ for (i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procno == MyProcNumber)
{
- listenChannels = foreach_delete_current(listenChannels, q);
- pfree(lchan);
- break;
+ entry->numListeners--;
+ if (i < entry->numListeners)
+ memmove(&listeners[i], &listeners[i + 1],
+ sizeof(ListenerEntry) * (entry->numListeners - i));
+
+ if (entry->numListeners == 0)
+ {
+ /* Last listener for this channel */
+ dsa_free(channelDSA, entry->listenersArray);
+ dshash_delete_entry(channelHash, entry);
+ }
+ else
+ {
+ dshash_release_lock(channelHash, entry);
+ }
+
+ return;
}
}
+ dshash_release_lock(channelHash, entry);
+
/*
* We do not complain about unlistening something not being listened;
* should we?
@@ -1192,34 +1578,68 @@ Exec_UnlistenCommit(const char *channel)
static void
Exec_UnlistenAllCommit(void)
{
+ dshash_seq_status status;
+ ChannelEntry *entry;
+
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
- list_free_deep(listenChannels);
- listenChannels = NIL;
+ /* Clear our local cache */
+ if (listenChannelsHash != NULL)
+ {
+ hash_destroy(listenChannelsHash);
+ listenChannelsHash = NULL;
+ }
+
+ /* Now remove from the shared channelHash */
+ if (channelHash == NULL)
+ return;
+
+ dshash_seq_init(&status, channelHash, true);
+ while ((entry = dshash_seq_next(&status)) != NULL)
+ {
+ if (entry->key.dboid == MyDatabaseId)
+ {
+ ListenerEntry *listeners;
+ int i;
+
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procno == MyProcNumber)
+ {
+ entry->numListeners--;
+ if (i < entry->numListeners)
+ memmove(&listeners[i], &listeners[i + 1],
+ sizeof(ListenerEntry) * (entry->numListeners - i));
+
+ if (entry->numListeners == 0)
+ {
+ dsa_free(channelDSA, entry->listenersArray);
+ dshash_delete_current(&status);
+ }
+ break;
+ }
+ }
+ }
+ }
+ dshash_seq_term(&status);
}
/*
* Test whether we are actively listening on the given channel name.
*
* Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it. In practice the list is likely to be
- * fairly short, though.
*/
static bool
IsListeningOn(const char *channel)
{
- ListCell *p;
-
- foreach(p, listenChannels)
- {
- char *lchan = (char *) lfirst(p);
+ if (listenChannelsHash == NULL)
+ return false;
- if (strcmp(lchan, channel) == 0)
- return true;
- }
- return false;
+ return (hash_search(listenChannelsHash, channel, HASH_FIND, NULL) != NULL);
}
/*
@@ -1229,7 +1649,7 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister(void)
{
- Assert(listenChannels == NIL); /* else caller error */
+ Assert(listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
return;
@@ -1241,6 +1661,7 @@ asyncQueueUnregister(void)
/* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
+ QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
/* and remove it from the list */
if (QUEUE_FIRST_LISTENER == MyProcNumber)
QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
@@ -1565,12 +1986,21 @@ asyncQueueFillWarning(void)
/*
* Send signals to listening backends.
*
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send. However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind. Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * Normally we signal only backends in our own database, that are
+ * listening on the channels with pending notifies, since only those
+ * backends are interested in notifies we send.
+ *
+ * Backends that are not interested in our notifies, that are known
+ * to still be positioned at the old queue head, or anywhere in the
+ * queue region we just wrote, can be safely advanced directly to the
+ * new head, since that region is known to contain only our own
+ * notifications. This avoids unnecessary wakeups when there is
+ * nothing of interest to them.
+ *
+ * Backends that are not interested in our notifies, that are advancing
+ * to a target position before the new queue head, or that are not
+ * advancing and are stationary at a position before the old queue head
+ * needs to be signaled since notifications could otherwise be delayed.
*
* Since we know the ProcNumber and the Pid the signaling is quite cheap.
*
@@ -1583,6 +2013,9 @@ SignalBackends(void)
int32 *pids;
ProcNumber *procnos;
int count;
+ ListCell *lc;
+
+ Assert(channelHash != NULL || pendingNotifyChannels == NIL);
/*
* Identify backends that we need to signal. We don't want to send
@@ -1597,36 +2030,97 @@ SignalBackends(void)
count = 0;
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
- for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+ foreach(lc, pendingNotifyChannels)
{
- int32 pid = QUEUE_BACKEND_PID(i);
- QueuePosition pos;
+ char *channel = (char *) lfirst(lc);
+ ChannelEntry *entry = NULL;
+ ListenerEntry *listeners;
- Assert(pid != InvalidPid);
- pos = QUEUE_BACKEND_POS(i);
- if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ if (channelHash != NULL)
{
+ ChannelHashKey key;
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+ entry = dshash_find(channelHash, &key, false);
+ }
+
+ if (entry == NULL)
+ continue; /* No listeners registered for this channel */
+
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (int j = 0; j < entry->numListeners; j++)
+ {
+ ProcNumber i;
+ int32 pid;
+ QueuePosition pos;
+
/*
- * Always signal listeners in our own database, unless they're
- * already caught up (unlikely, but possible).
+ * Only signal backends that have active=true. Backends with
+ * active=false have done LISTEN in PreCommit but not yet
+ * committed, so they're not really listening yet.
*/
+ if (!listeners[j].active)
+ continue;
+
+ i = listeners[j].procno;
+
+ if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+ continue;
+
+ pos = QUEUE_BACKEND_POS(i);
+ pid = QUEUE_BACKEND_PID(i);
+
+ /* Skip if caught up */
if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
continue;
+
+ Assert(pid != InvalidPid);
+
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+ pids[count] = pid;
+ procnos[count] = i;
+ count++;
}
- else
+
+ dshash_release_lock(channelHash, entry);
+ }
+
+ if (pendingNotifies != NULL)
+ {
+ for (ProcNumber i = QUEUE_FIRST_LISTENER;
+ i != INVALID_PROC_NUMBER;
+ i = QUEUE_NEXT_LISTENER(i))
{
- /*
- * Listeners in other databases should be signaled only if they
- * are far behind.
- */
- if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
- QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
+ QueuePosition pos;
+ int32 pid;
+
+ if (QUEUE_BACKEND_WAKEUP_PENDING(i))
continue;
+
+ pos = QUEUE_BACKEND_POS(i);
+ pid = QUEUE_BACKEND_PID(i);
+
+ if (QUEUE_BACKEND_IS_ADVANCING(i) ?
+ QUEUE_POS_PRECEDES(QUEUE_BACKEND_ADVANCING_POS(i), queueHeadAfterWrite) :
+ QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
+ {
+ Assert(pid != InvalidPid);
+
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+ pids[count] = pid;
+ procnos[count] = i;
+ count++;
+ }
+ else if (!QUEUE_BACKEND_IS_ADVANCING(i) &&
+ QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
+ {
+ Assert(!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite));
+
+ QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+ }
}
- /* OK, need to signal this one */
- pids[count] = pid;
- procnos[count] = i;
- count++;
}
LWLockRelease(NotifyQueueLock);
@@ -1673,12 +2167,97 @@ AtAbort_Notify(void)
/*
* If we LISTEN but then roll back the transaction after PreCommit_Notify,
* we have registered as a listener but have not made any entry in
- * listenChannels. In that case, deregister again.
+ * listenChannelsHash. In that case, deregister again.
*/
- if (amRegisteredListener && listenChannels == NIL)
+ if (amRegisteredListener &&
+ (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0))
asyncQueueUnregister();
- /* And clean up */
+ /*
+ * Remove any channels we added during Exec_ListenPreCommit. We need to
+ * clean up both the local listenChannelsHash and any inactive entries in
+ * the shared channelHash to avoid accumulating stale data.
+ */
+ if (pendingActions != NULL)
+ {
+ ListCell *p;
+
+ foreach(p, pendingActions->actions)
+ {
+ ListenAction *actrec = (ListenAction *) lfirst(p);
+ ChannelHashKey key;
+ ChannelEntry *entry;
+ ListenerEntry *listeners;
+ bool removeFromLocal;
+ bool found;
+ int i;
+
+ if (actrec->action != LISTEN_LISTEN)
+ continue;
+
+ /*
+ * For each LISTEN action, determine if we should clean up the
+ * local and/or shared hash entries. If we have an active=true
+ * entry in the shared hash, we were already listening from a
+ * previous transaction, so leave everything alone. Otherwise,
+ * clean up what this transaction added.
+ */
+ removeFromLocal = true;
+ found = false;
+
+ if (channelHash != NULL)
+ {
+ ChannelHashPrepareKey(&key, MyDatabaseId, actrec->channel);
+ entry = dshash_find(channelHash, &key, true);
+
+ if (entry != NULL)
+ {
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procno == MyProcNumber)
+ {
+ found = true;
+
+ if (listeners[i].active)
+ {
+ /* Already committed - leave both hashes alone */
+ removeFromLocal = false;
+ }
+ else
+ {
+ /* Inactive - remove from shared hash */
+ entry->numListeners--;
+ if (i < entry->numListeners)
+ memmove(&listeners[i], &listeners[i + 1],
+ sizeof(ListenerEntry) * (entry->numListeners - i));
+
+ if (entry->numListeners == 0)
+ {
+ dsa_free(channelDSA, entry->listenersArray);
+ dshash_delete_entry(channelHash, entry);
+ }
+ else
+ dshash_release_lock(channelHash, entry);
+ }
+ break;
+ }
+ }
+
+ if (!found)
+ dshash_release_lock(channelHash, entry);
+ }
+ }
+
+ /* Remove from local hash if appropriate */
+ if (removeFromLocal && listenChannelsHash != NULL)
+ (void) hash_search(listenChannelsHash, actrec->channel,
+ HASH_REMOVE, NULL);
+ }
+ }
+
ClearPendingActionsAndNotifies();
}
@@ -1854,20 +2433,29 @@ asyncQueueReadAllNotifications(void)
QueuePosition head;
Snapshot snapshot;
- /* Fetch current state */
+ /*
+ * Fetch current state, indicate to others that we have woken up, and that
+ * we now will be advancing our position.
+ */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
- pos = QUEUE_BACKEND_POS(MyProcNumber);
+ QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
head = QUEUE_HEAD;
- LWLockRelease(NotifyQueueLock);
+ pos = QUEUE_BACKEND_POS(MyProcNumber);
if (QUEUE_POS_EQUAL(pos, head))
{
/* Nothing to do, we have read all notifications already. */
+ LWLockRelease(NotifyQueueLock);
return;
}
+ QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true;
+ QUEUE_BACKEND_ADVANCING_POS(MyProcNumber) = head;
+
+ LWLockRelease(NotifyQueueLock);
+
/*----------
* Get snapshot we'll use to decide which xacts are still in progress.
* This is trickier than it might seem, because of race conditions.
@@ -1954,6 +2542,8 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
+
+ QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
QUEUE_BACKEND_POS(MyProcNumber) = pos;
LWLockRelease(NotifyQueueLock);
@@ -2051,7 +2641,7 @@ asyncQueueProcessPageEntries(QueuePosition *current,
* over it on the first LISTEN in a session, and not get stuck on
* it indefinitely.
*/
- if (listenChannels == NIL)
+ if (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0)
continue;
if (TransactionIdDidCommit(qe->xid))
@@ -2306,7 +2896,7 @@ ProcessIncomingNotify(bool flush)
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
- if (listenChannels == NIL)
+ if (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0)
return;
if (Trace_notify)
@@ -2410,13 +3000,15 @@ AddEventToPendingNotifies(Notification *n)
{
Assert(pendingNotifies->events != NIL);
- /* Create the hash table if it's time to */
+ /* Create the hash tables if it's time to */
if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
pendingNotifies->hashtab == NULL)
{
HASHCTL hash_ctl;
ListCell *l;
+ Assert(pendingNotifies->channelHashtab == NULL);
+
/* Create the hash table */
hash_ctl.keysize = sizeof(Notification *);
hash_ctl.entrysize = sizeof(struct NotificationHash);
@@ -2429,10 +3021,22 @@ AddEventToPendingNotifies(Notification *n)
&hash_ctl,
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
+ /* Create the channel hash table */
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = NAMEDATALEN;
+ hash_ctl.entrysize = sizeof(struct ChannelHash);
+ hash_ctl.hcxt = CurTransactionContext;
+ pendingNotifies->channelHashtab =
+ hash_create("Pending Notify Channels",
+ 64L,
+ &hash_ctl,
+ HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+
/* Insert all the already-existing events */
foreach(l, pendingNotifies->events)
{
Notification *oldn = (Notification *) lfirst(l);
+ char *channel = oldn->data;
bool found;
(void) hash_search(pendingNotifies->hashtab,
@@ -2440,22 +3044,42 @@ AddEventToPendingNotifies(Notification *n)
HASH_ENTER,
&found);
Assert(!found);
+
+ /* Insert channel into channelHashtab */
+ (void) hash_search(pendingNotifies->channelHashtab,
+ channel,
+ HASH_ENTER,
+ &found);
+ /* found may be true if multiple events on same channel */
}
}
/* Add new event to the list, in order */
pendingNotifies->events = lappend(pendingNotifies->events, n);
- /* Add event to the hash table if needed */
+ /* Add event to the hash tables if needed */
if (pendingNotifies->hashtab != NULL)
{
bool found;
+ Assert(pendingNotifies->channelHashtab != NULL);
+
(void) hash_search(pendingNotifies->hashtab,
&n,
HASH_ENTER,
&found);
Assert(!found);
+
+ /* Add channel to channelHashtab */
+ {
+ char *channel = n->data;
+
+ (void) hash_search(pendingNotifies->channelHashtab,
+ channel,
+ HASH_ENTER,
+ &found);
+ /* found may be true if we already have an event on this channel */
+ }
}
}
@@ -2493,7 +3117,7 @@ notification_match(const void *key1, const void *key2, Size keysize)
return 1; /* not equal */
}
-/* Clear the pendingActions and pendingNotifies lists. */
+/* Clear the pendingActions, pendingNotifies, and pendingNotifyChannels lists. */
static void
ClearPendingActionsAndNotifies(void)
{
@@ -2505,6 +3129,7 @@ ClearPendingActionsAndNotifies(void)
*/
pendingActions = NULL;
pendingNotifies = NULL;
+ pendingNotifyChannels = NIL;
}
/*
@@ -2515,3 +3140,16 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("notify_buffers", newval);
}
+
+
+/*
+ * ChannelHashPrepareKey
+ * Prepare a channel key for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel)
+{
+ memset(key, 0, sizeof(ChannelHashKey));
+ key->dboid = dboid;
+ strlcpy(key->channel, channel, NAMEDATALEN);
+}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index f39830dbb344..e57bc12b602f 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -370,6 +370,7 @@ SubtransBuffer "Waiting for I/O on a sub-transaction SLRU buffer."
MultiXactOffsetBuffer "Waiting for I/O on a multixact offset SLRU buffer."
MultiXactMemberBuffer "Waiting for I/O on a multixact member SLRU buffer."
NotifyBuffer "Waiting for I/O on a NOTIFY message SLRU buffer."
+NotifyChannelHash "Waiting to access the NOTIFY channel hash table."
SerialBuffer "Waiting for I/O on a serializable transaction conflict SLRU buffer."
WALInsert "Waiting to insert WAL data into a memory buffer."
BufferContent "Waiting to access a data page in memory."
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 5b0ce383408c..4236965e72a9 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -101,6 +101,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer)
PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer)
PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
+PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash)
PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 556e1805893a..443a6eb669fc 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 3 sessions
+Parsed test spec with 7 sessions
starting permutation: listenc notify1 notify2 notify3 notifyf
step listenc: LISTEN c1; LISTEN c2;
@@ -47,6 +47,105 @@ notifier: NOTIFY "c2" with payload "payload" from notifier
notifier: NOTIFY "c1" with payload "payloads" from notifier
notifier: NOTIFY "c2" with payload "payloads" from notifier
+starting permutation: listenc notifys_simple
+step listenc: LISTEN c1; LISTEN c2;
+step notifys_simple:
+ BEGIN;
+ SAVEPOINT s1;
+ NOTIFY c1, 'simple1';
+ NOTIFY c2, 'simple2';
+ RELEASE SAVEPOINT s1;
+ COMMIT;
+
+notifier: NOTIFY "c1" with payload "simple1" from notifier
+notifier: NOTIFY "c2" with payload "simple2" from notifier
+
+starting permutation: lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
+step lsbegin: BEGIN;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrelease: RELEASE SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify: NOTIFY c1, 'subxact_test';
+listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
+
+starting permutation: lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
+step lsbegin: BEGIN;
+step lslisten_outer: LISTEN c3;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrelease: RELEASE SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify: NOTIFY c1, 'subxact_test';
+listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
+
+starting permutation: lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
+step lsbegin: BEGIN;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrollback: ROLLBACK TO SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify_check: NOTIFY c1, 'should_not_receive';
+
+starting permutation: listenc notify_many_with_dup
+step listenc: LISTEN c1; LISTEN c2;
+step notify_many_with_dup:
+ BEGIN;
+ SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
+ SELECT pg_notify('c1', 'msg1');
+ COMMIT;
+
+pg_notify
+---------
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+(17 rows)
+
+pg_notify
+---------
+
+(1 row)
+
+notifier: NOTIFY "c1" with payload "msg1" from notifier
+notifier: NOTIFY "c1" with payload "msg2" from notifier
+notifier: NOTIFY "c1" with payload "msg3" from notifier
+notifier: NOTIFY "c1" with payload "msg4" from notifier
+notifier: NOTIFY "c1" with payload "msg5" from notifier
+notifier: NOTIFY "c1" with payload "msg6" from notifier
+notifier: NOTIFY "c1" with payload "msg7" from notifier
+notifier: NOTIFY "c1" with payload "msg8" from notifier
+notifier: NOTIFY "c1" with payload "msg9" from notifier
+notifier: NOTIFY "c1" with payload "msg10" from notifier
+notifier: NOTIFY "c1" with payload "msg11" from notifier
+notifier: NOTIFY "c1" with payload "msg12" from notifier
+notifier: NOTIFY "c1" with payload "msg13" from notifier
+notifier: NOTIFY "c1" with payload "msg14" from notifier
+notifier: NOTIFY "c1" with payload "msg15" from notifier
+notifier: NOTIFY "c1" with payload "msg16" from notifier
+notifier: NOTIFY "c1" with payload "msg17" from notifier
+
+starting permutation: listenc llisten l2listen l3listen lslisten
+step listenc: LISTEN c1; LISTEN c2;
+step llisten: LISTEN c1; LISTEN c2;
+step l2listen: LISTEN c1;
+step l3listen: LISTEN c1;
+step lslisten: LISTEN c1; LISTEN c2;
+
starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
step llisten: LISTEN c1; LISTEN c2;
step notify1: NOTIFY c1;
@@ -95,6 +194,8 @@ listener: NOTIFY "c2" with payload "" from notifier
starting permutation: l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
step l2listen: LISTEN c1;
+listener2: NOTIFY "c1" with payload "" from notifier
+listener2: NOTIFY "c1" with payload "" from notifier
step l2begin: BEGIN;
step notify1: NOTIFY c1;
step lbegins: BEGIN ISOLATION LEVEL SERIALIZABLE;
@@ -104,6 +205,17 @@ step l2commit: COMMIT;
listener2: NOTIFY "c1" with payload "" from notifier
step l2stop: UNLISTEN *;
+starting permutation: lch_listen nch_notify lch_check
+step lch_listen: LISTEN ch;
+step nch_notify: NOTIFY ch, 'aa';
+step lch_check: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+listener_ch: NOTIFY "ch" with payload "aa" from notifier_ch
+
starting permutation: llisten lbegin usage bignotify usage
step llisten: LISTEN c1; LISTEN c2;
step lbegin: BEGIN;
diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 0b8cfd910837..0a01e777b98e 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -31,6 +31,20 @@ step notifys1 {
ROLLBACK TO SAVEPOINT s2;
COMMIT;
}
+step notifys_simple {
+ BEGIN;
+ SAVEPOINT s1;
+ NOTIFY c1, 'simple1';
+ NOTIFY c2, 'simple2';
+ RELEASE SAVEPOINT s1;
+ COMMIT;
+}
+step notify_many_with_dup {
+ BEGIN;
+ SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
+ SELECT pg_notify('c1', 'msg1');
+ COMMIT;
+}
step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
teardown { UNLISTEN *; }
@@ -53,6 +67,38 @@ step l2begin { BEGIN; }
step l2commit { COMMIT; }
step l2stop { UNLISTEN *; }
+# Third listener session for testing array growth.
+
+session listener3
+step l3listen { LISTEN c1; }
+teardown { UNLISTEN *; }
+
+# Listener session for cross-session notification test with channel 'ch'.
+
+session listener_ch
+step lch_listen { LISTEN ch; }
+step lch_check { SELECT 1 AS x; }
+teardown { UNLISTEN *; }
+
+# Notifier session for cross-session notification test with channel 'ch'.
+
+session notifier_ch
+step nch_notify { NOTIFY ch, 'aa'; }
+
+# Session for testing LISTEN in subtransaction with separate steps.
+
+session listen_subxact
+step lsbegin { BEGIN; }
+step lslisten_outer { LISTEN c3; }
+step lssavepoint { SAVEPOINT s1; }
+step lslisten { LISTEN c1; LISTEN c2; }
+step lsrelease { RELEASE SAVEPOINT s1; }
+step lsrollback { ROLLBACK TO SAVEPOINT s1; }
+step lscommit { COMMIT; }
+step lsnotify { NOTIFY c1, 'subxact_test'; }
+step lsnotify_check { NOTIFY c1, 'should_not_receive'; }
+teardown { UNLISTEN *; }
+
# Trivial cases.
permutation listenc notify1 notify2 notify3 notifyf
@@ -60,6 +106,24 @@ permutation listenc notify1 notify2 notify3 notifyf
# Check simple and less-simple deduplication.
permutation listenc notifyd1 notifyd2 notifys1
+# Check simple NOTIFY reparenting when parent has no action.
+permutation listenc notifys_simple
+
+# Check LISTEN reparenting in subtransaction.
+permutation lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
+
+# Check LISTEN merge path when both outer and inner transactions have actions.
+permutation lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
+
+# Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions).
+permutation lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
+
+# Check notification_match function (triggered by hash table duplicate detection).
+permutation listenc notify_many_with_dup
+
+# Check ChannelHashAddListener array growth.
+permutation listenc llisten l2listen l3listen lslisten
+
# Cross-backend notification delivery. We use a "select 1" to force the
# listener session to check for notifies. In principle we could just wait
# for delivery, but that would require extra support in isolationtester
@@ -73,6 +137,10 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck
# and notify queue is not empty
permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
+# Check that notifications sent from a backend that has not done LISTEN
+# are properly delivered to a listener in another backend.
+permutation lch_listen nch_notify lch_check
+
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
# after submitting notifications while another connection is listening for
# those notifications and waiting inside an active transaction. We have to
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9dd65b102544..c4c3748f623c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -413,6 +413,8 @@ CatalogIdMapEntry
CatalogIndexState
ChangeVarNodes_callback
ChangeVarNodes_context
+ChannelEntry
+ChannelHashKey
CheckPoint
CheckPointStmt
CheckpointStatsData
@@ -1567,6 +1569,7 @@ ListDictionary
ListParsedLex
ListenAction
ListenActionKind
+ListenerEntry
ListenStmt
LoInfo
LoadStmt