Commit a patch submitted by jasonysli@tencent.com to fix a race condition in
authorPavan Deolasee <pavan.deolasee@gmail.com>
Wed, 19 Aug 2015 03:10:34 +0000 (08:40 +0530)
committerPavan Deolasee <pavan.deolasee@gmail.com>
Wed, 19 Aug 2015 03:10:34 +0000 (08:40 +0530)
SharedQueueUnBind

We found it was caused by SQueuesLock race condition between SharedQueueUnBind
and SharedQueueBind. In rare situation, for example, when cluster memory is
low, processes running much slower, when producer process timeout and wait on
SQueuesLock, some consumers just enter SharedQueueBind and successfully attach
to the shared queue. After SharedQueueBind release SQueuesLock
SharedQueueUnBind just remove the shared queue from SharedQueues, and set
sq_sync to NULL.When SharedQueueRead, the consumer coredump for SEGV.
The fix is when SharedQueueUnBind got SQueuesLock, recheck whether there are
still consumers running on the shared queue, if so, SharedQueueUnBind need to
wait until no more consumers running or timeout. The patch also fix the
SharedQueues search failure elog in SharedQueueBind to ERROR to avoid
unnecessary cluster reinitialize.

src/backend/pgxc/squeue/squeue.c

index 41f82a79669c249299fa03f10e076c46b284c757..74cdccaaf32fed7ff197130e860677e53761adf7 100644 (file)
@@ -1204,6 +1204,12 @@ SharedQueueUnBind(SharedQueue squeue)
 {
        SQueueSync *sqsync = squeue->sq_sync;
        int                     wait_result = 0;
+       int         i                = 0;
+       int         consumer_running = 0;
+       char        *pcursor             = NULL;
+
+
+CHECK:
 
        /* loop while there are active consumers */
        for (;;)
@@ -1227,11 +1233,6 @@ SharedQueueUnBind(SharedQueue squeue)
                                /* producer will continue waiting */
                                ResetLatch(&sqsync->sqs_producer_latch);
                        }
-#ifdef SQUEUE_STAT
-                       else
-                               elog(LOG, "Done %s node %d, %ld writes and %ld reads, %ld buffer writes, %ld buffer reads, %ld tuples returned to buffer",
-                                        squeue->sq_key, cstate->cs_node, cstate->stat_writes, cstate->stat_reads, cstate->stat_buff_writes, cstate->stat_buff_reads, cstate->stat_buff_returns);
-#endif
 
                        LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
                }
@@ -1249,8 +1250,38 @@ SharedQueueUnBind(SharedQueue squeue)
 #ifdef SQUEUE_STAT
        elog(DEBUG1, "Producer %s is done, there were %ld pauses", squeue->sq_key, squeue->stat_paused);
 #endif
+       elog(LOG, "Producer %s is done", squeue->sq_key);
 
        LWLockAcquire(SQueuesLock, LW_EXCLUSIVE);
+
+       /*
+        * In rear situation, after consumers just bind to the shared queue, the producer timeout and remove the shared queue.
+        * This will cause a SEGV in the consumer. So here recheck if there are some consumers binded to the queue, if so, we need to wait them to 
+        * finish.
+        */
+       consumer_running = 0;
+       for (i = 0; i < squeue->sq_nconsumers; i++)
+       {
+               ConsState *cstate = &squeue->sq_consumers[i];
+
+               LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock, LW_EXCLUSIVE);
+
+               /* found a consumer running */
+               if (CONSUMER_ACTIVE == cstate->cs_status && cstate->cs_pid != 0)
+               {
+                       consumer_running++;
+               }
+
+               LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
+       }
+
+       if (consumer_running)
+       {
+               elog(DEBUG1, "Producer %s have %d consumers still running, recheck now", squeue->sq_key, consumer_running);
+               LWLockRelease(SQueuesLock);
+               goto CHECK;
+       }
+       
        /* All is done, clean up */
        DisownLatch(&sqsync->sqs_producer_latch);