Skip to content

Commit 1adbafe

Browse files
committed
...
1 parent 81313c9 commit 1adbafe

File tree

2 files changed

+104
-63
lines changed

2 files changed

+104
-63
lines changed

src/main/java/io/github/coderodde/graph/pathfinding/delayed/impl/ThreadPoolBidirectionalBFSPathFinder.java

Lines changed: 97 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.github.coderodde.graph.pathfinding.delayed.AbstractDelayedGraphPathFinder;
44
import io.github.coderodde.graph.pathfinding.delayed.AbstractNodeExpander;
55
import io.github.coderodde.graph.pathfinding.delayed.ProgressLogger;
6+
import java.util.ArrayDeque;
67

78
import java.util.ArrayList;
89
import java.util.Collections;
@@ -17,7 +18,6 @@
1718
import java.util.concurrent.ConcurrentLinkedDeque;
1819
import java.util.concurrent.Semaphore;
1920
import java.util.concurrent.TimeUnit;
20-
import java.util.concurrent.atomic.AtomicInteger;
2121
import java.util.logging.Level;
2222
import java.util.logging.Logger;
2323

@@ -750,39 +750,39 @@ private static final class SearchState<N> {
750750
/**
751751
* The list of frontiers queues.
752752
*/
753-
private final List<Deque<N>> frontierQueues =
754-
Collections.synchronizedList(new ArrayList<>());
753+
private final List<Deque<N>> frontierQueues = new ArrayList<>();
755754

756755
/**
757756
* The list of frontier sets.
758757
*/
759-
private final List<Set<N>> frontierSets =
760-
Collections.synchronizedList(new ArrayList<>());
758+
private final List<Set<N>> frontierSets = new ArrayList<>();
761759

762760
/**
763761
* The integer denoting the depth of the current frontier queue in the
764762
* {@code frontierQueues}.
765763
*/
766-
private final AtomicInteger currentFrontierDepth = new AtomicInteger(0);
764+
private int currentFrontierDepth = 0;
767765

768766
/**
769767
* This map maps each discovered node to its best distance estimate.
770768
*/
771-
private final Map<N, Integer> distance =
772-
Collections.synchronizedMap(new HashMap<>());
769+
private final Map<N, Integer> distance = new HashMap<>();
773770

774771
/**
775772
* This map maps each discovered node to its predecessor on the current
776773
* shortest path.
777774
*/
778-
private final Map<N, N> parents =
779-
Collections.synchronizedMap(new HashMap<>());
775+
private final Map<N, N> parents = new HashMap<>();
780776

781777
/**
782778
* The set storing all visited nodes.
783779
*/
784-
private final Set<N> visited =
785-
Collections.synchronizedSet(new HashSet<>());
780+
private final Set<N> visited = new HashSet<>();
781+
782+
/**
783+
* A mutex for synchronizing the search data structures above.
784+
*/
785+
private final Semaphore stateMutex = new Semaphore(1, true);
786786

787787
/**
788788
* The set of all the threads working on this particular direction.
@@ -816,26 +816,28 @@ private static final class SearchState<N> {
816816
* node should be the target node.
817817
*/
818818
SearchState(final N initialNode) {
819-
this.currentFrontierDepth.set(0);
820819
this.distance.put(initialNode, 0);
821-
this.parents.put(initialNode, null);
822-
this.frontierQueues.add(new ConcurrentLinkedDeque<>());
823-
this.frontierSets.add(Collections.synchronizedSet(new HashSet<>()));
820+
this.parents .put(initialNode, null);
821+
822+
this.frontierQueues.add(new ArrayDeque<>());
823+
this.frontierSets .add(new HashSet<>());
824824

825825
this.frontierQueues.get(0).add(initialNode);
826826
this.frontierSets .get(0).add(initialNode);
827+
828+
this.frontierQueues.add(new ArrayDeque<>());
829+
this.frontierSets .add(new HashSet<>());
827830
}
828831

829832
void setOppositeSearchState(final SearchState<N> oppositeSearchState) {
830833
this.oppositeSearchState = oppositeSearchState;
831834
}
832835

833836
N getQueueHead() {
834-
final int currentFrontierQueueIndex = currentFrontierDepth.get();
835-
final Deque<N> queue =
836-
frontierQueues.get(currentFrontierQueueIndex);
837-
838-
return queue.peekFirst();
837+
lockStateMutex();
838+
final N head = frontierQueues.get(currentFrontierDepth).peekFirst();
839+
unlockStateMutex();
840+
return head;
839841
}
840842

841843
void lockThreadSetMutex() {
@@ -886,6 +888,14 @@ void wakeupAllSleepingThreads() {
886888
sleepingThreadSet.clear();
887889
unlockThreadSetMutex();
888890
}
891+
892+
void lockStateMutex() {
893+
stateMutex.acquireUninterruptibly();
894+
}
895+
896+
void unlockStateMutex() {
897+
stateMutex.release();
898+
}
889899
}
890900

891901
/**
@@ -910,7 +920,7 @@ private abstract static class SleepingThread extends Thread {
910920
protected final int threadSleepTrials;
911921

912922
/**
913-
* The boolean flag indicating whether this thread is a master thread.
923+
* The Boolean flag indicating whether this thread is a master thread.
914924
* If not, it is called a slave thread.
915925
*/
916926
protected final boolean isMasterThread;
@@ -1130,30 +1140,18 @@ private void processCurrentInMasterThread() {
11301140

11311141
private N getTouchNode() {
11321142
sharedSearchState.lock();
1143+
sharedSearchState.forwardSearchState .lockStateMutex();
1144+
sharedSearchState.backwardSearchState.lockStateMutex();
11331145

11341146
N touchNode = null;
11351147

11361148
final int currentQueueIndexForward =
11371149
sharedSearchState.forwardSearchState
1138-
.currentFrontierDepth
1139-
.get();
1150+
.currentFrontierDepth;
11401151

11411152
final int currentQueueIndexBackward =
11421153
sharedSearchState.backwardSearchState
1143-
.currentFrontierDepth
1144-
.get();
1145-
1146-
final Deque<N> currentForwardQueue =
1147-
sharedSearchState
1148-
.forwardSearchState
1149-
.frontierQueues
1150-
.get(currentQueueIndexForward);
1151-
1152-
final Deque<N> currentBackwardQueue =
1153-
sharedSearchState
1154-
.backwardSearchState
1155-
.frontierQueues
1156-
.get(currentQueueIndexBackward);
1154+
.currentFrontierDepth;
11571155

11581156
final Set<N> currentForwardSet =
11591157
sharedSearchState
@@ -1167,13 +1165,13 @@ private N getTouchNode() {
11671165
.frontierSets
11681166
.get(currentQueueIndexBackward);
11691167

1170-
final int currentQueueSizeForward = currentForwardQueue .size();
1171-
final int currentQueueSizeBackward = currentBackwardQueue.size();
1172-
11731168
touchNode = scanFrom(currentForwardSet,
11741169
currentBackwardSet);
11751170

1171+
sharedSearchState.backwardSearchState.unlockStateMutex();
1172+
sharedSearchState.forwardSearchState .unlockStateMutex();
11761173
sharedSearchState.unlock();
1174+
11771175
return touchNode;
11781176
}
11791177

@@ -1207,19 +1205,32 @@ private void processCurrentInSlaveThread() {
12071205
return;
12081206
}
12091207

1210-
final int currentFrontierDepth = searchState.currentFrontierDepth
1211-
.get();
1212-
final Deque<N> currentFrontier =
1208+
final int currentFrontierDepth = searchState.currentFrontierDepth;
1209+
1210+
Deque<N> currentFrontier =
12131211
searchState.frontierQueues.get(currentFrontierDepth);
12141212

12151213
if (currentFrontier.isEmpty()) {
12161214
final N touchNode = getTouchNode();
12171215

12181216
if (touchNode != null) {
1219-
sharedSearchState.requestGlobalStop();
1217+
sharedSearchState.lock();
1218+
sharedSearchState.forwardSearchState .lockStateMutex();
1219+
sharedSearchState.backwardSearchState.lockStateMutex();
12201220
sharedSearchState.loadShortestPath();
1221+
sharedSearchState.backwardSearchState.unlockStateMutex();
1222+
sharedSearchState.forwardSearchState .unlockStateMutex();
1223+
sharedSearchState.unlock();
1224+
sharedSearchState.requestGlobalStop();
1225+
return;
12211226
} else {
1222-
searchState.currentFrontierDepth.incrementAndGet();
1227+
1228+
searchState.currentFrontierDepth++;
1229+
1230+
currentFrontier =
1231+
searchState.frontierQueues
1232+
.get(currentFrontierDepth + 1);
1233+
12231234
searchState.frontierQueues.add(
12241235
new ConcurrentLinkedDeque<>());
12251236

@@ -1228,7 +1239,7 @@ private void processCurrentInSlaveThread() {
12281239
}
12291240
}
12301241

1231-
final N current = currentFrontier.removeFirst();
1242+
final N current = currentFrontier.peekFirst();
12321243

12331244
if (current == null) {
12341245
// Nothing to do, go to sleep.
@@ -1265,21 +1276,52 @@ private void unlock() {
12651276
* @param current the node of which to generate the successor nodes.
12661277
*/
12671278
private void expand() {
1268-
final int currentFrontierDepth =
1269-
this.searchState.currentFrontierDepth.get();
1279+
final int currentFrontierDepth =
1280+
this.searchState.currentFrontierDepth;
12701281

12711282
final Deque<N> currentFrontierQueue =
12721283
this.searchState.frontierQueues.get(currentFrontierDepth);
12731284

12741285
final Set<N> currentFrontierSet =
12751286
this.searchState.frontierSets.get(currentFrontierDepth);
12761287

1277-
final N current = currentFrontierQueue.removeFirst();
1288+
N current = currentFrontierQueue.peekFirst();
12781289

12791290
if (current == null) {
1280-
return;
1291+
if (searchState.frontierQueues
1292+
.get(currentFrontierDepth + 1)
1293+
.isEmpty()) {
1294+
// Here, no source/target path at all. Halt and return an
1295+
// empty path:
1296+
sharedSearchState.requestGlobalStop();
1297+
return;
1298+
}
1299+
1300+
// Here, there is more graph to explore, but we need to check
1301+
// for presence of a shortest path:
1302+
sharedSearchState.touchNode = getTouchNode();
1303+
1304+
if (sharedSearchState.touchNode != null) {
1305+
sharedSearchState.getShortestPath();
1306+
sharedSearchState.requestGlobalStop();
1307+
return;
1308+
}
1309+
1310+
// No path yet, but we have more nodes to explore. Advance to
1311+
// the next frontier level:
1312+
current = this.searchState
1313+
.frontierQueues
1314+
.get(currentFrontierDepth)
1315+
.getFirst();
1316+
1317+
this.searchState.currentFrontierDepth++;
1318+
this.searchState.frontierQueues.add(new ArrayDeque<>());
1319+
this.searchState.frontierSets .add(new HashSet<>());
12811320
}
12821321

1322+
// Discharge the head node in the current depth queue:
1323+
currentFrontierQueue.removeFirst();
1324+
12831325
final ExpansionThread<N> expansionThread =
12841326
new ExpansionThread<>(current, nodeExpander);
12851327

@@ -1311,17 +1353,16 @@ private void expand() {
13111353

13121354
visited.add(current);
13131355

1314-
// Once here, the expansion completed within expansionJoinDuration!
1315-
13161356
for (final N successor : expansionThread.getSuccessorList()) {
13171357
if (visited.contains(successor)) {
13181358
continue;
13191359
}
13201360

13211361
// TODO: remove this?
1322-
if (searchState.distance.getOrDefault(successor, -1)
1323-
<= currentFrontierDepth) {
1324-
1362+
if (distance.containsKey(successor) && distance.get(successor) <= currentFrontierDepth) {
1363+
// if (searchState.distance.getOrDefault(successor, currentFrontierDepth)
1364+
// == currentFrontierDepth) {
1365+
System.out.println("hello HELLO!");
13251366
continue;
13261367
}
13271368

src/test/java/io/github/coderodde/graph/pathfinding/delayed/impl/ThreadPoolBidirectionalBFSPathFinderTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void testCorrectnessOnSmallGraph() {
120120

121121

122122
// This test may take a several seconds.
123-
@Test
123+
//@Test
124124
public void testCorrectness() {
125125

126126
System.out.println("testCorrectness() begin:");
@@ -182,7 +182,7 @@ public void testCorrectness() {
182182
}
183183

184184
// This test may take a several seconds too complete.
185-
@Test
185+
//@Test
186186
public void returnsEmptyPathOnDisconnectedGraph() {
187187
final int nodes = disconnectedDelayedDirectedGraph.size();
188188
final int sourceNodeIndex = random.nextInt(nodes / 2);
@@ -220,7 +220,7 @@ public void returnsEmptyPathOnDisconnectedGraph() {
220220
System.out.println("returnsEmptyPathOnDisconnectedGraph() done.");
221221
}
222222

223-
@Test
223+
//@Test
224224
public void haltsOnFailingNodes() {
225225

226226
final DirectedGraphNode sourceNode =
@@ -242,7 +242,7 @@ public void haltsOnFailingNodes() {
242242
System.out.println("haltsOnFailingNodes() done.");
243243
}
244244

245-
@Test
245+
//@Test
246246
public void omitsFaultyLinks() {
247247
final DirectedGraphNode a = new DirectedGraphNode(1, true, 100);
248248
final DirectedGraphNode b1 = new DirectedGraphNode(2, true, 100);
@@ -282,7 +282,7 @@ public void omitsFaultyLinks() {
282282
System.out.println("omitsFaultyLinks() done.");
283283
}
284284

285-
@Test
285+
//@Test
286286
public void halt() {
287287
final DirectedGraphNode source = new DirectedGraphNode(1, true, 10_000);
288288
final DirectedGraphNode target = new DirectedGraphNode(2, true, 10_000);
@@ -357,7 +357,7 @@ public void run() {
357357
System.out.println("Second finder halted!");
358358
}
359359

360-
@Test
360+
//@Test
361361
public void fluentApiSearchBuilding() {
362362
DirectedGraphNode source = new DirectedGraphNode(1);
363363
DirectedGraphNode target = new DirectedGraphNode(2);
@@ -409,7 +409,7 @@ public void fluentApiSearchBuilding() {
409409
.search();
410410
}
411411

412-
@Test
412+
//@Test
413413
public void undirectedGraphTest() {
414414
System.out.println("undirectedGraphTest()");
415415

0 commit comments

Comments
 (0)