Skip to content

Commit db047cc

Browse files
committed
Still working on the parallel radix sort.
1 parent f1727cb commit db047cc

File tree

1 file changed

+160
-1
lines changed

1 file changed

+160
-1
lines changed

src/main/java/com/github/coderodde/util/ParallelRadixSort.java

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,145 @@ private static void parallelRadixSortImpl(
189189
return;
190190
}
191191

192+
int startIndex = sourceFromIndex;
193+
int subrangeLength = rangeLength / threads;
194+
195+
BucketSizeCounterThread[] bucketSizeCounterThreads =
196+
new BucketSizeCounterThread[threads];
197+
198+
// Spawn all but the rightmost bucket size counter thread. The rightmost
199+
// thread will be run in this thread as a mild optimization:
200+
for (int i = 0; i != bucketSizeCounterThreads.length - 1; i++) {
201+
BucketSizeCounterThread bucketSizeCounterThread =
202+
new BucketSizeCounterThread(
203+
source,
204+
startIndex,
205+
startIndex += subrangeLength,
206+
recursionDepth);
207+
208+
bucketSizeCounterThread.start();
209+
bucketSizeCounterThreads[i] = bucketSizeCounterThread;
210+
}
211+
212+
// Run the last bucket size counter thread in this thread:
213+
BucketSizeCounterThread lastBucketSizeCounterThread =
214+
new BucketSizeCounterThread(
215+
source,
216+
startIndex,
217+
sourceFromIndex + rangeLength,
218+
recursionDepth);
219+
220+
// Run the last bucket size thread in this thread:
221+
lastBucketSizeCounterThread.run();
222+
bucketSizeCounterThreads[threads - 1] = lastBucketSizeCounterThread;
223+
224+
// Join all the spawned bucket size counter threads:
225+
for (int i = 0; i != threads - 1; i++) {
226+
BucketSizeCounterThread bucketSizeCounterThread =
227+
bucketSizeCounterThreads[i];
228+
229+
try {
230+
bucketSizeCounterThread.join();
231+
} catch (InterruptedException ex) {
232+
throw new RuntimeException(
233+
"Could not join a bucket size counter thread.",
234+
ex);
235+
}
236+
}
237+
238+
// Build the global bucket size map:
239+
int[] globalBucketSizeMap = new int[BUCKETS];
240+
241+
for (int i = 0; i != threads; i++) {
242+
int[] localBucketSizeMap =
243+
bucketSizeCounterThreads[i].getLocalBucketSizeMap();
244+
245+
for (int j = 0; j != BUCKETS; j++) {
246+
globalBucketSizeMap[j] += localBucketSizeMap[j];
247+
}
248+
}
249+
250+
int numberOfNonemptyBuckets = 0;
251+
252+
for (int i = 0; i != BUCKETS; i++) {
253+
if (globalBucketSizeMap[i] != 0) {
254+
numberOfNonemptyBuckets++;
255+
}
256+
}
257+
258+
int spawnDegree = Math.min(numberOfNonemptyBuckets, threads);
259+
int[] startIndexMap = new int[BUCKETS];
260+
261+
for (int i = 1; i != BUCKETS; i++) {
262+
startIndexMap[i] = startIndexMap[i - 1]
263+
+ globalBucketSizeMap[i - 1];
264+
}
265+
266+
int[][] processedMaps = new int[spawnDegree][BUCKETS];
267+
268+
// Make the preprocessing map independent of each thread:
269+
for (int i = 1; i != spawnDegree; i++) {
270+
int[] partialBucketSizeMap =
271+
bucketSizeCounterThreads[i - 1].getLocalBucketSizeMap();
272+
273+
for (int j = 0; j != BUCKETS; j++) {
274+
processedMaps[i][j] = processedMaps[i - 1][j]
275+
+ partialBucketSizeMap[j];
276+
}
277+
}
278+
279+
int sourceStartIndex = sourceFromIndex;
280+
int targetStartIndex = targetFromIndex;
281+
282+
BucketInserterThread[] bucketInserterThreads =
283+
new BucketInserterThread[spawnDegree];
284+
285+
// Spawn all but the rightmost bucket inserter thread. The rightmost
286+
// thread will be run in this thread as a mild optimization:
287+
for (int i = 0; i != spawnDegree - 1; i++) {
288+
BucketInserterThread bucketInserterThread =
289+
new BucketInserterThread(
290+
source,
291+
target,
292+
sourceStartIndex += subrangeLength,
293+
targetStartIndex += subrangeLength,
294+
startIndexMap,
295+
processedMaps[i],
296+
subrangeLength,
297+
recursionDepth);
298+
299+
bucketInserterThread.start();
300+
bucketInserterThreads[i] = bucketInserterThread;
301+
}
192302

303+
BucketInserterThread lastBucketInserterThread =
304+
new BucketInserterThread(
305+
source,
306+
target,
307+
sourceStartIndex,
308+
targetStartIndex,
309+
startIndexMap,
310+
processedMaps[spawnDegree - 1],
311+
rangeLength - (spawnDegree - 1) * subrangeLength,
312+
recursionDepth);
313+
314+
// Run the last, rightmost bucket inserter thread in this thread:
315+
lastBucketInserterThread.run();
316+
bucketInserterThreads[threads - 1] = lastBucketInserterThread;
317+
318+
// Join all the spawned bucket inserter threads:
319+
for (int i = 0; i != threads - 1; i++) {
320+
BucketInserterThread bucketInserterThread =
321+
bucketInserterThreads[i];
322+
323+
try {
324+
bucketInserterThread.join();
325+
} catch (InterruptedException ex) {
326+
throw new RuntimeException(
327+
"Could not join a bucket inserter thread.",
328+
ex);
329+
}
330+
}
193331
}
194332

195333
private static void rangeCheck(
@@ -542,6 +680,27 @@ public void run() {
542680

543681
private static final class SorterThread extends Thread {
544682

545-
private final
683+
684+
}
685+
686+
private static final class BucketKeyList {
687+
private final int[] bucketKeys;
688+
private int size;
689+
690+
BucketKeyList(int capacity) {
691+
this.bucketKeys = new int[capacity];
692+
}
693+
694+
void addBucketKey(int bucketKey) {
695+
this.bucketKeys[size++] = bucketKey;
696+
}
697+
698+
int getBucketKey(int index) {
699+
return this.bucketKeys[index];
700+
}
701+
702+
int size() {
703+
return size;
704+
}
546705
}
547706
}

0 commit comments

Comments
 (0)