Parallel Computing in Python

Gregor von Laszewski (laszewski@gmail.com)

In this module, we will review the available Python modules that can be used for parallel computing. Parallel computing can be in the form of either multi-threading or multi-processing. In multi-threading approach, the threads run in the same shared memory heap whereas in case of multi-processing, the memory heaps of processes are separate and independent, therefore the communication between the processes are a little bit more complex.

Multi-threading in Python

Threading in Python is perfect for I/O operations where the process is expected to be idle regularly, e.g. web scraping. This is a very useful feature because several applications and scripts might spend the majority of their runtime waiting for network or data I/O. In several cases, e.g. web scraping, the resources, i.e. downloading from different websites, are most of the time-independent. Therefore the processor can download in parallel and join the result at the end.

Thread vs Threading

There are two built-in modules in Python that are related to threading, namely thread and threading. The former module is deprecated for some time in Python 2, and in Python 3 it is renamed to _thread for the sake of backward incompatibilities. The _thread module provides low-level threading API for multi-threading in Python, whereas the module threading builds a high-level threading interface on top of it.

The Thread() is the main method of the threading module, the two important arguments of which are target, for specifying the callable object, and args to pass the arguments for the target callable. We illustrate these in the following example:

import threading

def hello_thread(thread_num):
    print ("Hello from Thread ", thread_num)

if __name__ == '__main__':
    for thread_num in range(5):
        t = threading.Thread(target=hello_thread,arg=(thread_num,))
        t.start()

This is the output of the previous example:

In [1]: %run threading.py
Hello from Thread  0
Hello from Thread  1
Hello from Thread  2
Hello from Thread  3
Hello from Thread  4

In case you are not familiar with the if __name__ == '__main__:' statement, what it does is making sure that the code nested under this condition will be run only if you run your module as a program and it will not run in case your module is imported into another file.

Locks

As mentioned prior, the memory space is shared between the threads. This is at the same time beneficial and problematic: it is beneficial in a sense that the communication between the threads becomes easy, however, you might experience a strange outcome if you let several threads change the same variable without caution, e.g. thread 2 changes variable x while thread 1 is working with it. This is when lock comes into play. Using lock, you can allow only one thread to work with a variable. In other words, only a single thread can hold the lock. If the other threads need to work with that variable, they have to wait until the other thread is done and the variable is “unlocked.”

We illustrate this with a simple example:

import threading

global counter
counter = 0

def incrementer1():
    global counter
    for j in range(2):
        for i in range(3):
            counter += 1
            print("Greeter 1 incremented the counter by 1")
        print ("Counter is %d"%counter)

def incrementer2():
    global counter
    for j in range(2):
        for i in range(3):
            counter += 1
            print("Greeter 2 incremented the counter by 1")
        print ("Counter is now %d"%counter)


if __name__ == '__main__':
    t1 = threading.Thread(target = incrementer1)
    t2 = threading.Thread(target = incrementer2)

    t1.start()
    t2.start()

Suppose we want to print multiples of 3 between 1 and 12, i.e. 3, 6, 9 and 12. For the sake of argument, we try to do this using 2 threads and a nested for loop. Then we create a global variable called counter and we initialize it with 0. Then whenever each of the incrementer1 or incrementer2 functions are called, the counter is incremented by 3 twice (counter is incremented by 6 in each function call). If you run the previous code, you should be really lucky if you get the following as part of your output:

Counter is now 3
Counter is now 6
Counter is now 9
Counter is now 12

The reason is the conflict that happens between threads while incrementing the counter in the nested for loop. As you probably noticed, the first level for loop is equivalent to adding 3 to the counter and the conflict that might happen is not effective on that level but the nested for loop. Accordingly, the output of the previous code is different in every run. This is an example output:

$ python3 lock_example.py
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 4
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 8
Greeter 1 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 10
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 12

We can fix this issue using a lock: whenever one of the function is going to increment the value by 3, it will acquire() the lock and when it is done the function will release() the lock. This mechanism is illustrated in the following code:

import threading

increment_by_3_lock = threading.Lock()

global counter
counter = 0

def incrementer1():
    global counter
    for j in range(2):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter += 1
            print("Greeter 1 incremented the counter by 1")
        print ("Counter is %d"%counter)
        increment_by_3_lock.release()

def incrementer2():
    global counter
    for j in range(2):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter += 1
            print("Greeter 2 incremented the counter by 1")
        print ("Counter is %d"%counter)
        increment_by_3_lock.release()

if __name__ == '__main__':
    t1 = threading.Thread(target = incrementer1)
    t2 = threading.Thread(target = incrementer2)

    t1.start()
    t2.start()

No matter how many times you run this code, the output would always be in the correct order:

$ python3 lock_example.py
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 3
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 6
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 9
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 12

Using the Threading module increases both the overhead associated with thread management as well as the complexity of the program and that is why in many situations, employing multiprocessing module might be a better approach.

Multi-processing in Python

We already mentioned that multi-threading might not be sufficient in many applications and we might need to use multiprocessing sometimes, or better to say most of the time. That is why we are dedicating this subsection to this particular module. This module provides you with an API for spawning processes the way you spawn threads using threading module. Moreover, some functionalities are not even available in threading module, e.g. the Pool class which allows you to run a batch of jobs using a pool of worker processes.

Process

Similar to threading module which was employing thread (aka _thread) under the hood, multiprocessing employs the Process class. Consider the following example:

from multiprocessing import Process
import os

def greeter (name):
    proc_idx = os.getpid()
    print ("Process {0}: Hello {1}!".format(proc_idx,name))

if __name__ == '__main__':
    name_list = ['Harry', 'George', 'Dirk', 'David']
    process_list = []
    for name_idx, name in enumerate(name_list):
        current_process = Process(target=greeter, args=(name,))
        process_list.append(current_process)
        current_process.start()
    for process in process_list:
        process.join()

In this example, after importing the Process module we created a greeter() function that takes a name and greets that person. It also prints the pid (process identifier) of the process that is running it. Note that we used the os module to get the pid. In the bottom of the code after checking the __name__='__main__' condition, we create a series of Processes and start them. Finally in the last for loop and using the join method, we tell Python to wait for the processes to terminate. This is one of the possible outputs of the code:

$ python3 process_example.py
Process 23451: Hello Harry!
Process 23452: Hello George!
Process 23453: Hello Dirk!
Process 23454: Hello David!

Pool

Consider the Pool class as a pool of worker processes. There are several ways for assigning jobs to the Pool class and we will introduce the most important ones in this section. These methods are categorized as blocking or non-blocking. The former means that after calling the API, it blocks the thread/process until it has the result or answer ready and the control returns only when the call completes. In thenon-blockin` on the other hand, the control returns immediately.

Synchronous Pool.map()

We illustrate the Pool.map method by re-implementing our previous greeter example using Pool.map:

from multiprocessing import Pool
import os

def greeter(name):
    pid = os.getpid()
    print("Process {0}: Hello {1}!".format(pid,name))

if __name__ == '__main__':
    names = ['Jenna', 'David','Marry', 'Ted','Jerry','Tom','Justin']
    pool = Pool(processes=3)
    sync_map = pool.map(greeter,names)
    print("Done!")

As you can see, we have seven names here but we do not want to dedicate each greeting to a separate process. Instead, we do the whole job of “greeting seven people” using “two processes.” We create a pool of 3 processes with Pool(processes=3) syntax and then we map an iterable called names to the greeter function using pool.map(greeter,names). As we expected, the greetings in the output will be printed from three different processes:

$ python poolmap_example.py
Process 30585: Hello Jenna!
Process 30586: Hello David!
Process 30587: Hello Marry!
Process 30585: Hello Ted!
Process 30585: Hello Jerry!
Process 30587: Hello Tom!
Process 30585: Hello Justin!
Done!

Note that Pool.map() is in blocking category and does not return the control to your script until it is done calculating the results. That is why Done! is printed after all of the greetings are over.

Asynchronous Pool.map_async()

As the name implies, you can use the map_async method, when you want assign many function calls to a pool of worker processes asynchronously. Note that unlike map, the order of the results is not guaranteed (as oppose to map) and the control is returned immediately. We now implement the previous example using map_async:

from multiprocessing import Pool
import os

def greeter(name):
    pid = os.getpid()
    print("Process {0}: Hello {1}!".format(pid,name))

if __name__ == '__main__':
    names = ['Jenna', 'David','Marry', 'Ted','Jerry','Tom','Justin']
    pool = Pool(processes=3)
    async_map = pool.map_async(greeter,names)
    print("Done!")
    async_map.wait()

As you probably noticed, the only difference (clearly apart from the map_async method name) is calling the wait() method in the last line. The wait() method tells your script to wait for the result of map_async before terminating:

$ python poolmap_example.py
Done!
Process 30740: Hello Jenna!
Process 30741: Hello David!
Process 30740: Hello Ted!
Process 30742: Hello Marry!
Process 30740: Hello Jerry!
Process 30741: Hello Tom!
Process 30742: Hello Justin!

Note that the order of the results are not preserved. Moreover, Done! is printer before any of the results, meaning that if we do not use the wait() method, you probably will not see the result at all.

Locks

The way multiprocessing module implements locks is almost identical to the way the threading module does. After importing Lock from multiprocessing all you need to do is to acquire it, do some computation and then release the lock. We will clarify the use of Lock by providing an example in next section about process communication.

Process Communication

Process communication in multiprocessing is one of the most important, yet complicated, features for better use of this module. As oppose to threading, the Process objects will not have access to any shared variable by default, i.e. no shared memory space between the processes by default. This effect is illustrated in the following example:

from multiprocessing import Process, Lock, Value
import time

global counter
counter = 0

def incrementer1():
    global counter
    for j in range(2):
        for i in range(3):
            counter += 1
        print ("Greeter1: Counter is %d"%counter)

def incrementer2():
    global counter
    for j in range(2):
        for i in range(3):
            counter += 1
        print ("Greeter2: Counter is %d"%counter)


if __name__ == '__main__':

    t1 = Process(target = incrementer1 )
    t2 = Process(target = incrementer2 )
    t1.start()
    t2.start()

Probably you already noticed that this is almost identical to our example in threading section. Now, take a look at the strange output:

$ python communication_example.py
Greeter1: Counter is 3
Greeter1: Counter is 6
Greeter2: Counter is 3
Greeter2: Counter is 6

As you can see, it is as if the processes does not see each other. Instead of having two processes one counting to 6 and the other counting from 6 to 12, we have two processes counting to 6.

Nevertheless, there are several ways that Processes from multiprocessing can communicate with each other, including Pipe, Queue, Value, Array and Manager. Pipe and Queue are appropriate for inter-process message passing. To be more specific, Pipe is useful for process-to-process scenarios while Queue is more appropriate for processes-toprocesses ones. Value and Array are both used to provide synchronized access to a shared data (very much like shared memory) and Managers can be used on different data types. In the following sub-sections, we cover both Value and Array since they are both lightweight, yet useful, approach.

Value

The following example re-implements the broken example in the previous section. We fix the strange output, by using both Lock and Value:

from multiprocessing import Process, Lock, Value
import time

increment_by_3_lock = Lock()


def incrementer1(counter):
    for j in range(3):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter.value += 1
            time.sleep(0.1)
        print ("Greeter1: Counter is %d"%counter.value)
        increment_by_3_lock.release()

def incrementer2(counter):
    for j in range(3):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter.value += 1
            time.sleep(0.05)
        print ("Greeter2: Counter is %d"%counter.value)
        increment_by_3_lock.release()


if __name__ == '__main__':

    counter = Value('i',0)
    t1 = Process(target = incrementer1, args=(counter,))
    t2 = Process(target = incrementer2 , args=(counter,))
    t2.start()
    t1.start()

The usage of Lock object in this example is identical to the example in threading section. The usage of counter is on the other hand the novel part. First, note that counter is not a global variable anymore and instead it is a Value which returns a ctypes object allocated from a shared memory between the processes. The first argument 'i' indicates a signed integer, and the second argument defines the initialization value. In this case we are assigning a signed integer in the shared memory initialized to size 0 to the counter variable. We then modified our two functions and pass this shared variable as an argument. Finally, we change the way we increment the counter since the counter is not a Python integer anymore but a ctypes signed integer where we can access its value using the value attribute. The output of the code is now as we expected:

$ python mp_lock_example.py
Greeter2: Counter is 3
Greeter2: Counter is 6
Greeter1: Counter is 9
Greeter1: Counter is 12

The last example related to parallel processing, illustrates the use of both Value and Array, as well as a technique to pass multiple arguments to a function. Note that the Process object does not accept multiple arguments for a function and therefore we need this or similar techniques for passing multiple arguments. Also, this technique can also be used when you want to pass multiple arguments to map or map_async:

from multiprocessing import Process, Lock, Value, Array
import time
from ctypes import c_char_p


increment_by_3_lock = Lock()


def incrementer1(counter_and_names):
    counter=  counter_and_names[0]
    names = counter_and_names[1]
    for j in range(2):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter.value += 1
            time.sleep(0.1)
        name_idx = counter.value//3 -1
        print ("Greeter1: Greeting {0}! Counter is {1}".format(names.value[name_idx],counter.value))
        increment_by_3_lock.release()

def incrementer2(counter_and_names):
    counter=  counter_and_names[0]
    names = counter_and_names[1]
    for j in range(2):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter.value += 1
            time.sleep(0.05)
        name_idx = counter.value//3 -1
        print ("Greeter2: Greeting {0}! Counter is {1}".format(names.value[name_idx],counter.value))
        increment_by_3_lock.release()


if __name__ == '__main__':
    counter = Value('i',0)
    names = Array (c_char_p,4)
    names.value = ['James','Tom','Sam', 'Larry']
    t1 = Process(target = incrementer1, args=((counter,names),))
    t2 = Process(target = incrementer2 , args=((counter,names),))
    t2.start()
    t1.start()

In this example, we created a multiprocessing.Array() object and assigned it to a variable called names. As we mentioned before, the first argument is the ctype data type and since we want to create an array of strings with a length of 4 (second argument), we imported the c_char_p and passed it as the first argument.

Instead of passing the arguments separately, we merged both the Value and Array objects in a tuple and passed the tuple to the functions. We then modified the functions to unpack the objects in the first two lines in both functions. Finally, we changed the print statement in a way that each process greets a particular name. The output of the example is:

$ python3 mp_lock_example.py
Greeter2: Greeting James! Counter is 3
Greeter2: Greeting Tom! Counter is 6
Greeter1: Greeting Sam! Counter is 9
Greeter1: Greeting Larry! Counter is 12
Last modified June 20, 2021 : spelling (d59b3b2d)