0

I have a small peculiar task at hand and I couldn't figure out how to best implement a solution.

I have three workstations that are connected to a NAS running Ubuntu 20.04 LTS, via InfiniBand with 40gbps of bandwidth. This NAS is equipped with a 2TB NVMe SSD as write cache, and 7 RAID0 units as the main storage.

These workstations will spit out raw data to this NAS for later use, each of these machines will spit out somewhere around 6TB of data files each day, sizing from 100 - 300 GB each file. In order to prevent the network gets too crowded, I have them output the data to the NVMe cache first, then I plan to distribute the data files from there - exactly one file to each of the RAID0 units concurrently to maximize disk IO. For example, file1 goes to array0, file2 goes to array1, file3 goes to array2, and so on.

Now I am writing a script on the NAS side (preferably as a systemd service, but I can do with nohup) to monitor the cache, and send the file to these RAID arrays.

Here's what I come up with, and it is very close to my goal thanks to this post.

import queue, threading, os, time
import shutil

bfr_drive = '/home/test_folder' # cache
ext = ".dat" # data file extension
array = 0 # simluated array as t0-t6
fileList = [] # list of files to be moved from cache to storage
destPath = '/home/test_folder/t'
fileQueue = queue.Queue()


class ThreadedCopy:
    totalFiles = 0
    copyCount = 0
    array = 0
    lock = threading.Lock()

    def __init__(self):
        for file_name in os.listdir(bfr_drive):
            if file_name.endswith(ext):
                fileList.append(os.path.join(bfr_drive, file_name))
                fileList.sort()

        self.totalFiles = len(fileList)

        print (str(self.totalFiles) + " files to copy.")
        self.threadWorkerCopy(fileList)


    def CopyWorker(self):
        global array
        while True:
            fileName = fileQueue.get()
            shutil.copy(fileName, destPath+str(array))
            array += 1
            if array > 6:
                array = 0
            fileQueue.task_done()

            with self.lock:
                self.copyCount += 1
                
                percent = (self.copyCount * 100) / self.totalFiles
                
                print (str(percent) + " percent copied.")

    def threadWorkerCopy(self, fileNameList):
        # global array
        for i in range(4):
            t = threading.Thread(target=self.CopyWorker)
            t.daemon = True
            t.start()
            # array += 1
            # if array > 6:
                # array = 0
            print ("current array is:" + str(array)) # output prints array0 for 4 times, did not iterate
            
          
        for fileName in fileNameList:
            fileQueue.put(fileName)
        fileQueue.join()

ThreadedCopy()

Now, the Python script can successfully distribute files, but only after what's after the number in for i in range(4). For example, if I set it to 4, then the workers will use the same path (array0) for the first 4 files, only then they will start iterating through the arrays to 1, 2, 3, etc.

Would someone please point out how can I distribute the files? I think I am heading in the right direction, however, I just can't wrap my head around why those workers are stuck with the same directory at the start.

EDIT: I had the earlier version of code with the path iteration is at the spawning process threadWorkerCopy. I now had it moved to the actual worker function which is CopyWorker. The problem still stands true.

6
  • Do I understand right that you want to copy the first file to t0, the 2nd file to t1, the 3rd file to t2 etc, the 7th file to t6 8th file to t0 and so on? Please edit your question to clarify this. Commented May 17, 2021 at 18:05
  • When I test your code, the output of print ("current array is:" + str(array)) iterates from 1 to 4, but all CopyWorker threads will then use the value 4 and not start iterating later. Commented May 17, 2021 at 18:10
  • @Bodo Yes. I would like to have the file1 to array0, file2 to array1, file3 to array2, etc etc. Commented May 17, 2021 at 18:23
  • 1
    Please edit your question to add this clarification. All information about your problem should be in the question, not in comments. Commented May 17, 2021 at 18:26
  • @Bodo That makes it even more strange. I thought what was happening was the iterated path isn't passed to the CopyWorker. But even the process of spawning the worker itself is an iteration loop. Just really, really strange. Commented May 17, 2021 at 18:27

1 Answer 1

1

The problem is that you don't generate new values of array in the worker threads but only when creating the threads in threadWorkerCopy.
The result will depend on the actual timing on your system. Every worker thread will use the value of array at the time when it reads the value. This may be concurrent to threadWorkerCopy incrementing the value or afterwards, so you may get files in different directories or all in the same directory.

To get a new number for every copying process, the number in array must be incremented in the worker threads. In this case you have to prevent concurrent access to array by two or more threads at the same time. You can implement this with another lock.

For testing I replaced the directory listing with a hard-coded list of example file names and replaced the copying with printing the values.

import queue, threading, os, time
import shutil

bfr_drive = '/home/test_folder' # cache
ext = ".dat" # data file extension
array = 0 # simluated array as t0-t6
fileList = [] # list of files to be moved from cache to storage
destPath = '/home/test_folder/t'
fileQueue = queue.Queue()


class ThreadedCopy:
    totalFiles = 0
    copyCount = 0
    array = 0
    lock = threading.Lock()
    lockArray = threading.Lock()

    def __init__(self):
        # directory listing replaced with hard-coded list for testing
        for file_name in [ 'foo.dat', 'bar.dat', 'baz.dat', 'a.dat', 'b.dat', 'c.dat', 'd.dat', 'e.dat', 'f.dat', 'g.dat' ] :
        #for file_name in os.listdir(bfr_drive):
            if file_name.endswith(ext):
                fileList.append(os.path.join(bfr_drive, file_name))
                fileList.sort()

        self.totalFiles = len(fileList)

        print (str(self.totalFiles) + " files to copy.")
        self.threadWorkerCopy(fileList)


    def CopyWorker(self):
        global array
        while True:
            fileName = fileQueue.get()

            with self.lockArray:
                myArray = array
                array += 1
                if array > 6:
                    array = 0

            # actual copying replaced with output for testing
            print('copying', fileName, destPath+str(myArray))
            #shutil.copy(fileName, destPath+str(myArray))

            with self.lock:
                self.copyCount += 1

                percent = (self.copyCount * 100) / self.totalFiles

                print (str(percent) + " percent copied.")

            # moved to end because otherwise main thread may terminate before the workers
            fileQueue.task_done()

    def threadWorkerCopy(self, fileNameList):
        for i in range(4):
            t = threading.Thread(target=self.CopyWorker)
            t.daemon = True
            t.start()

        for fileName in fileNameList:
            fileQueue.put(fileName)
        fileQueue.join()

ThreadedCopy()

This prints something like this (may change between different runs):

10 files to copy.
copying /home/test_folder\a.dat /home/test_folder/t0
10.0 percent copied.
copying /home/test_folder\baz.dat /home/test_folder/t3
20.0 percent copied.
copying /home/test_folder\b.dat /home/test_folder/t1
copying /home/test_folder\c.dat /home/test_folder/t4
copying /home/test_folder\bar.dat /home/test_folder/t2
copying /home/test_folder\d.dat /home/test_folder/t5
30.0 percent copied.
copying /home/test_folder\e.dat /home/test_folder/t6
40.0 percent copied.
copying /home/test_folder\f.dat /home/test_folder/t0
50.0 percent copied.
copying /home/test_folder\foo.dat /home/test_folder/t1
60.0 percent copied.
copying /home/test_folder\g.dat /home/test_folder/t2
70.0 percent copied.
80.0 percent copied.
90.0 percent copied.
100.0 percent copied.

Notes:

I moved the line fileQueue.task_done() to the end of CopyWorker. Otherwise I don't get all percentage output lines and sometimes an error message

Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads

Maybe you should wait for the end of all worker threads before the end of the main thread.

I did not check if there are any further errors in the code.


Edit after the code in the question has been changed:

The modified code still contains the problem that the worker threads will still do some output after fileQueue.task_done() so that the main thread may end before the workers.

The modified code contains race conditions when the worker threads access array, so the behavior may be unexpected.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you very much! Your explanation is very thorough and easy to understand. I have never tried to code in a multithreading fashion, this is my first time trying. Can't do it without your help.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.