{
SQueueSync *sqsync = squeue->sq_sync;
int wait_result = 0;
+ int i = 0;
+ int consumer_running = 0;
+ char *pcursor = NULL;
+
+
+CHECK:
/* loop while there are active consumers */
for (;;)
/* producer will continue waiting */
ResetLatch(&sqsync->sqs_producer_latch);
}
-#ifdef SQUEUE_STAT
- else
- elog(LOG, "Done %s node %d, %ld writes and %ld reads, %ld buffer writes, %ld buffer reads, %ld tuples returned to buffer",
- squeue->sq_key, cstate->cs_node, cstate->stat_writes, cstate->stat_reads, cstate->stat_buff_writes, cstate->stat_buff_reads, cstate->stat_buff_returns);
-#endif
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
}
#ifdef SQUEUE_STAT
elog(DEBUG1, "Producer %s is done, there were %ld pauses", squeue->sq_key, squeue->stat_paused);
#endif
+ elog(LOG, "Producer %s is done", squeue->sq_key);
LWLockAcquire(SQueuesLock, LW_EXCLUSIVE);
+
+ /*
+ * In rear situation, after consumers just bind to the shared queue, the producer timeout and remove the shared queue.
+ * This will cause a SEGV in the consumer. So here recheck if there are some consumers binded to the queue, if so, we need to wait them to
+ * finish.
+ */
+ consumer_running = 0;
+ for (i = 0; i < squeue->sq_nconsumers; i++)
+ {
+ ConsState *cstate = &squeue->sq_consumers[i];
+
+ LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock, LW_EXCLUSIVE);
+
+ /* found a consumer running */
+ if (CONSUMER_ACTIVE == cstate->cs_status && cstate->cs_pid != 0)
+ {
+ consumer_running++;
+ }
+
+ LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
+ }
+
+ if (consumer_running)
+ {
+ elog(DEBUG1, "Producer %s have %d consumers still running, recheck now", squeue->sq_key, consumer_running);
+ LWLockRelease(SQueuesLock);
+ goto CHECK;
+ }
+
/* All is done, clean up */
DisownLatch(&sqsync->sqs_producer_latch);