@@ -88,7 +88,7 @@ static HTAB *xid2status;
8888static HTAB * gtid2xid ;
8989static DtmNodeState * local ;
9090static uint64 totalSleepInterrupts ;
91- static int DtmVacuumDelay = 10 ; /* sec */
91+ static int DtmVacuumDelay = 15 ; /* sec */
9292static bool finishing_prepared ;
9393
9494
@@ -99,6 +99,7 @@ static void DtmAdjustOldestXid(void);
9999static void DtmInitGlobalXmin (TransactionId xid );
100100static bool DtmDetectGlobalDeadLock (PGPROC * proc );
101101static void DtmAddSubtransactions (DtmTransStatus * ts , TransactionId * subxids , int nSubxids );
102+ static void DtmAdjustSubtransactions (DtmTransStatus * ts );
102103static char const * DtmGetName (void );
103104static size_t DtmGetTransactionStateSize (void );
104105static void DtmSerializeTransactionState (void * ctx );
@@ -283,6 +284,20 @@ DtmTransactionListInsertAfter(DtmTransStatus * after, DtmTransStatus * ts)
283284 }
284285}
285286
287+ static void
288+ DtmAdjustSubtransactions (DtmTransStatus * ts )
289+ {
290+ int i ;
291+ int nSubxids = ts -> nSubxids ;
292+ DtmTransStatus * sts = ts ;
293+
294+ for (i = 0 ; i < nSubxids ; i ++ ) {
295+ sts = sts -> next ;
296+ sts -> status = ts -> status ;
297+ Assert (sts -> cid == ts -> cid );
298+ }
299+ }
300+
286301/*
287302 * There can be different oldest XIDs at different cluster node.
288303 * Seince we do not have centralized aribiter, we have to rely in DtmVacuumDelay.
@@ -521,20 +536,23 @@ DtmLocalAccess(DtmCurrentTrans * x, GlobalTransactionId gtid, cid_t global_cid)
521536void
522537DtmLocalBeginPrepare (GlobalTransactionId gtid )
523538{
539+ // TransactionId xid = TwoPhaseGetTransactionId(gtid);
540+
524541 SpinLockAcquire (& local -> lock );
525542 {
526543 DtmTransStatus * ts ;
527544 DtmTransId * id ;
545+ bool found ;
528546
529547 id = (DtmTransId * ) hash_search (gtid2xid , gtid , HASH_FIND , NULL );
530548 Assert (id != NULL );
531549 Assert (TransactionIdIsValid (id -> xid ));
532- ts = (DtmTransStatus * ) hash_search (xid2status , & id -> xid , HASH_ENTER , NULL );
550+ ts = (DtmTransStatus * ) hash_search (xid2status , & id -> xid , HASH_ENTER , & found );
533551 ts -> status = TRANSACTION_STATUS_UNKNOWN ;
534552 ts -> cid = dtm_get_cid ();
535- ts -> nSubxids = id -> nSubxids ;
536- DtmTransactionListAppend ( ts ) ;
537- DtmAddSubtransactions (ts , id -> subxids , id -> nSubxids );
553+ if (! found )
554+ ts -> nSubxids = 0 ;
555+ DtmAdjustSubtransactions (ts );
538556 }
539557 SpinLockRelease (& local -> lock );
540558}
@@ -575,11 +593,7 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
575593 ts = (DtmTransStatus * ) hash_search (xid2status , & id -> xid , HASH_FIND , NULL );
576594 Assert (ts != NULL );
577595 ts -> cid = cid ;
578- for (i = 0 ; i < ts -> nSubxids ; i ++ )
579- {
580- ts = ts -> next ;
581- ts -> cid = cid ;
582- }
596+ DtmAdjustSubtransactions (ts );
583597 dtm_sync (cid );
584598
585599 DTM_TRACE ((stderr , "Prepare transaction %u(%s) with CSN %lu\n" , id -> xid , gtid , cid ));
@@ -625,39 +639,14 @@ DtmLocalFinish(bool is_commit)
625639
626640 ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , & found );
627641 ts -> status = is_commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED ;
628- if (found )
629- {
630642
631- if (is_commit ) // XXX: why only for commit?
632- {
633- int i ;
634- DtmTransStatus * sts = ts ;
635-
636- for (i = 0 ; i < ts -> nSubxids ; i ++ )
637- {
638- sts = sts -> next ;
639- Assert (sts -> cid == ts -> cid );
640- sts -> status = TRANSACTION_STATUS_COMMITTED ;
641- }
642- }
643- }
644- else
643+ if (!found )
645644 {
646- TransactionId * subxids ;
647-
648- Assert (!found );
649645 ts -> cid = dtm_get_cid ();
646+ ts -> nSubxids = 0 ;
650647 DtmTransactionListAppend (ts );
651- if (is_commit ) // XXX: why?
652- {
653- ts -> nSubxids = xactGetCommittedChildren (& subxids );
654- DtmAddSubtransactions (ts , subxids , ts -> nSubxids );
655- }
656- else
657- {
658- ts -> nSubxids = 0 ;
659- }
660648 }
649+ DtmAdjustSubtransactions (ts );
661650 }
662651 SpinLockRelease (& local -> lock );
663652
@@ -722,24 +711,35 @@ DtmLocalSavePreparedState(DtmCurrentTrans * x)
722711
723712 if (x -> gtid [0 ])
724713 {
714+ TransactionId * subxids ;
715+ TransactionId xid = GetCurrentTransactionId ();
716+ int nSubxids = xactGetCommittedChildren (& subxids );
717+
725718 SpinLockAcquire (& local -> lock );
726719 {
727720 DtmTransId * id = (DtmTransId * ) hash_search (gtid2xid , x -> gtid , HASH_FIND , NULL );
728721
729722 if (id != NULL )
730723 {
731- TransactionId * subxids ;
732- int nSubxids = xactGetCommittedChildren (& subxids );
733-
734724 id -> xid = GetCurrentTransactionId ();
735- if (nSubxids != 0 )
736- {
737- id -> subxids = (TransactionId * ) malloc (nSubxids * sizeof (TransactionId ));
738- id -> nSubxids = nSubxids ;
739- memcpy (id -> subxids , subxids , nSubxids * sizeof (TransactionId ));
740- }
725+
741726 }
742727 }
728+ // SpinLockRelease(&local->lock);
729+
730+
731+
732+ // SpinLockAcquire(&local->lock);
733+ {
734+ DtmTransStatus * ts ;
735+
736+ ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_ENTER , NULL );
737+ ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
738+ ts -> cid = dtm_get_cid ();
739+ ts -> nSubxids = nSubxids ;
740+ DtmTransactionListAppend (ts );
741+ DtmAddSubtransactions (ts , subxids , nSubxids );
742+ }
743743 SpinLockRelease (& local -> lock );
744744 }
745745}
0 commit comments