Parallel Computing in Python
14 minute read
Gregor von Laszewski (laszewski@gmail.com)
In this module, we will review the available Python modules that can be used for parallel computing. Parallel computing can be in the form of either multi-threading or multi-processing. In multi-threading approach, the threads run in the same shared memory heap whereas in case of multi-processing, the memory heaps of processes are separate and independent, therefore the communication between the processes are a little bit more complex.
Multi-threading in Python
Threading in Python is perfect for I/O operations where the process is expected to be idle regularly, e.g. web scraping. This is a very useful feature because several applications and scripts might spend the majority of their runtime waiting for network or data I/O. In several cases, e.g. web scraping, the resources, i.e. downloading from different websites, are most of the time-independent. Therefore the processor can download in parallel and join the result at the end.
Thread
vs Threading
There are two built-in modules in Python that are related to
threading, namely thread
and threading
. The former module is
deprecated for some time in Python 2
, and in Python 3
it is renamed
to _thread
for the sake of backward incompatibilities. The
_thread
module provides low-level threading API for multi-threading
in Python, whereas the module threading
builds a high-level
threading interface on top of it.
The Thread()
is the main method of the threading
module, the two
important arguments of which are target
, for specifying the callable
object, and args
to pass the arguments for the target callable. We
illustrate these in the following example:
import threading
def hello_thread(thread_num):
print ("Hello from Thread ", thread_num)
if __name__ == '__main__':
for thread_num in range(5):
t = threading.Thread(target=hello_thread,arg=(thread_num,))
t.start()
This is the output of the previous example:
In [1]: %run threading.py
Hello from Thread 0
Hello from Thread 1
Hello from Thread 2
Hello from Thread 3
Hello from Thread 4
In case you are not familiar with the if __name__ == '__main__:'
statement, what it does is making sure that the code nested
under this condition will be run only if you run your module as a
program and it will not run in case your module is imported into another
file.
Locks
As mentioned prior, the memory space is shared between the
threads. This is at the same time beneficial and problematic: it is
beneficial in a sense that the communication between the threads
becomes easy, however, you might experience a strange outcome if you let
several threads change the same variable without caution, e.g. thread 2
changes variable x
while thread 1 is working with it. This is when
lock
comes into play. Using lock
, you can allow only one thread to
work with a variable. In other words, only a single thread can hold
the lock
. If the other threads need to work with that variable, they
have to wait until the other thread is done and the variable is
“unlocked.”
We illustrate this with a simple example:
import threading
global counter
counter = 0
def incrementer1():
global counter
for j in range(2):
for i in range(3):
counter += 1
print("Greeter 1 incremented the counter by 1")
print ("Counter is %d"%counter)
def incrementer2():
global counter
for j in range(2):
for i in range(3):
counter += 1
print("Greeter 2 incremented the counter by 1")
print ("Counter is now %d"%counter)
if __name__ == '__main__':
t1 = threading.Thread(target = incrementer1)
t2 = threading.Thread(target = incrementer2)
t1.start()
t2.start()
Suppose we want to print multiples of 3 between 1 and 12, i.e. 3, 6, 9
and 12. For the sake of argument, we try to do this using 2 threads
and a nested for loop. Then we create a global variable called counter
and we initialize it with 0. Then whenever each of the incrementer1
or incrementer2
functions are called, the counter
is incremented
by 3 twice (counter is incremented by 6 in each function call). If you
run the previous code, you should be really lucky if you get the
following as part of your output:
Counter is now 3
Counter is now 6
Counter is now 9
Counter is now 12
The reason is the conflict that happens between threads
while incrementing the counter
in the nested for loop. As you
probably noticed, the first level for loop is equivalent to adding 3
to the counter and the conflict that might happen is not effective on
that level but the nested for loop. Accordingly, the output of the
previous code is different in every run. This is an example output:
$ python3 lock_example.py
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 4
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 8
Greeter 1 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 10
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 12
We can fix this issue using a lock
: whenever one of the function is
going to increment the value by 3, it will acquire()
the lock and
when it is done the function will release()
the lock. This mechanism
is illustrated in the following code:
import threading
increment_by_3_lock = threading.Lock()
global counter
counter = 0
def incrementer1():
global counter
for j in range(2):
increment_by_3_lock.acquire(True)
for i in range(3):
counter += 1
print("Greeter 1 incremented the counter by 1")
print ("Counter is %d"%counter)
increment_by_3_lock.release()
def incrementer2():
global counter
for j in range(2):
increment_by_3_lock.acquire(True)
for i in range(3):
counter += 1
print("Greeter 2 incremented the counter by 1")
print ("Counter is %d"%counter)
increment_by_3_lock.release()
if __name__ == '__main__':
t1 = threading.Thread(target = incrementer1)
t2 = threading.Thread(target = incrementer2)
t1.start()
t2.start()
No matter how many times you run this code, the output would always be in the correct order:
$ python3 lock_example.py
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 3
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 6
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 9
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 12
Using the Threading
module increases both the overhead associated
with thread management as well as the complexity of the program and
that is why in many situations, employing multiprocessing
module
might be a better approach.
Multi-processing in Python
We already mentioned that multi-threading might not be sufficient in
many applications and we might need to use multiprocessing
sometimes,
or better to say most of the time. That is why we are dedicating this
subsection to this particular module. This module provides you with an
API for spawning processes the way you spawn threads using threading
module. Moreover, some functionalities are not even
available in threading
module, e.g. the Pool
class which allows
you to run a batch of jobs using a pool of worker processes.
Process
Similar to threading
module which was employing thread
(aka
_thread
) under the hood, multiprocessing
employs the Process
class. Consider the following example:
from multiprocessing import Process
import os
def greeter (name):
proc_idx = os.getpid()
print ("Process {0}: Hello {1}!".format(proc_idx,name))
if __name__ == '__main__':
name_list = ['Harry', 'George', 'Dirk', 'David']
process_list = []
for name_idx, name in enumerate(name_list):
current_process = Process(target=greeter, args=(name,))
process_list.append(current_process)
current_process.start()
for process in process_list:
process.join()
In this example, after importing the Process
module we created a
greeter()
function that takes a name
and greets that person. It
also prints the pid
(process identifier) of the process that is
running it. Note that we used the os
module to get the pid
. In the
bottom of the code after checking the __name__='__main__'
condition,
we create a series of Process
es and start
them. Finally in the
last for loop and using the join
method, we tell Python to wait for
the processes to terminate. This is one of the possible outputs of the
code:
$ python3 process_example.py
Process 23451: Hello Harry!
Process 23452: Hello George!
Process 23453: Hello Dirk!
Process 23454: Hello David!
Pool
Consider the Pool
class as a pool of worker processes. There are
several ways for assigning jobs to the Pool
class and we will
introduce the most important ones in this section. These methods are
categorized as blocking
or non-blocking. The former means that after calling the API, it blocks the thread/process until it has the result or answer ready and the control returns only when the call completes. In the
non-blockin` on the other hand, the control returns
immediately.
Synchronous Pool.map()
We illustrate the Pool.map
method by re-implementing our previous
greeter example using Pool.map
:
from multiprocessing import Pool
import os
def greeter(name):
pid = os.getpid()
print("Process {0}: Hello {1}!".format(pid,name))
if __name__ == '__main__':
names = ['Jenna', 'David','Marry', 'Ted','Jerry','Tom','Justin']
pool = Pool(processes=3)
sync_map = pool.map(greeter,names)
print("Done!")
As you can see, we have seven names here but we do not want to
dedicate each greeting to a separate process. Instead, we do the whole
job of “greeting seven people” using “two processes.” We create a pool
of 3 processes with Pool(processes=3)
syntax and then we map an
iterable called names
to the greeter
function using
pool.map(greeter,names)
. As we expected, the greetings in the
output will be printed from three different processes:
$ python poolmap_example.py
Process 30585: Hello Jenna!
Process 30586: Hello David!
Process 30587: Hello Marry!
Process 30585: Hello Ted!
Process 30585: Hello Jerry!
Process 30587: Hello Tom!
Process 30585: Hello Justin!
Done!
Note that Pool.map()
is in blocking
category and does not return
the control to your script until it is done calculating the
results. That is why Done!
is printed after all of the greetings are
over.
Asynchronous Pool.map_async()
As the name implies, you can use the map_async
method, when you want
assign many function calls to a pool of worker processes
asynchronously. Note that unlike map
, the order of the results is
not guaranteed (as oppose to map
) and the control is returned
immediately. We now implement the previous example using map_async
:
from multiprocessing import Pool
import os
def greeter(name):
pid = os.getpid()
print("Process {0}: Hello {1}!".format(pid,name))
if __name__ == '__main__':
names = ['Jenna', 'David','Marry', 'Ted','Jerry','Tom','Justin']
pool = Pool(processes=3)
async_map = pool.map_async(greeter,names)
print("Done!")
async_map.wait()
As you probably noticed, the only difference (clearly apart from the
map_async
method name) is calling the wait()
method in the last
line. The wait()
method tells your script to wait for the result of
map_async
before terminating:
$ python poolmap_example.py
Done!
Process 30740: Hello Jenna!
Process 30741: Hello David!
Process 30740: Hello Ted!
Process 30742: Hello Marry!
Process 30740: Hello Jerry!
Process 30741: Hello Tom!
Process 30742: Hello Justin!
Note that the order of the results are not preserved. Moreover,
Done!
is printer before any of the results, meaning that if we do
not use the wait()
method, you probably will not see the result at
all.
Locks
The way multiprocessing
module implements locks is almost identical
to the way the threading
module does. After importing Lock
from
multiprocessing
all you need to do is to acquire
it, do some
computation and then release
the lock. We will clarify the use of
Lock
by providing an example in next section about process
communication.
Process Communication
Process communication in multiprocessing
is one of the most
important, yet complicated, features for better use of this module. As
oppose to threading
, the Process
objects will not have access to
any shared variable by default, i.e. no shared memory space between
the processes by default. This effect is illustrated in the following
example:
from multiprocessing import Process, Lock, Value
import time
global counter
counter = 0
def incrementer1():
global counter
for j in range(2):
for i in range(3):
counter += 1
print ("Greeter1: Counter is %d"%counter)
def incrementer2():
global counter
for j in range(2):
for i in range(3):
counter += 1
print ("Greeter2: Counter is %d"%counter)
if __name__ == '__main__':
t1 = Process(target = incrementer1 )
t2 = Process(target = incrementer2 )
t1.start()
t2.start()
Probably you already noticed that this is almost identical to our
example in threading
section. Now, take a look at the strange
output:
$ python communication_example.py
Greeter1: Counter is 3
Greeter1: Counter is 6
Greeter2: Counter is 3
Greeter2: Counter is 6
As you can see, it is as if the processes does not see each other. Instead of having two processes one counting to 6 and the other counting from 6 to 12, we have two processes counting to 6.
Nevertheless, there are several ways that Process
es from
multiprocessing
can communicate with each other, including Pipe
,
Queue
, Value
, Array
and Manager
. Pipe
and Queue
are
appropriate for inter-process message passing. To be more specific,
Pipe
is useful for process-to-process scenarios while Queue
is
more appropriate for processes-toprocesses ones. Value
and
Array
are both used to provide synchronized access to a shared
data (very much like shared memory) and Managers
can be used on
different data types. In the following sub-sections, we cover both
Value
and Array
since they are both lightweight, yet useful,
approach.
Value
The following example re-implements the broken example in the previous
section. We fix the strange output, by using both Lock
and Value
:
from multiprocessing import Process, Lock, Value
import time
increment_by_3_lock = Lock()
def incrementer1(counter):
for j in range(3):
increment_by_3_lock.acquire(True)
for i in range(3):
counter.value += 1
time.sleep(0.1)
print ("Greeter1: Counter is %d"%counter.value)
increment_by_3_lock.release()
def incrementer2(counter):
for j in range(3):
increment_by_3_lock.acquire(True)
for i in range(3):
counter.value += 1
time.sleep(0.05)
print ("Greeter2: Counter is %d"%counter.value)
increment_by_3_lock.release()
if __name__ == '__main__':
counter = Value('i',0)
t1 = Process(target = incrementer1, args=(counter,))
t2 = Process(target = incrementer2 , args=(counter,))
t2.start()
t1.start()
The usage of Lock
object in this example is identical to the example
in threading
section. The usage of counter
is on the other hand
the novel part. First, note that counter is not a global variable
anymore and instead it is a Value
which returns a ctypes
object
allocated from a shared memory between the processes. The first
argument 'i'
indicates a signed integer, and the second argument
defines the initialization value. In this case we are assigning a
signed integer in the shared memory initialized to size 0 to the
counter
variable. We then modified our two functions and pass this
shared variable as an argument. Finally, we change the way we
increment the counter
since the counter is not a Python integer anymore
but a ctypes
signed integer where we can access its value using the
value
attribute. The output of the code is now as we expected:
$ python mp_lock_example.py
Greeter2: Counter is 3
Greeter2: Counter is 6
Greeter1: Counter is 9
Greeter1: Counter is 12
The last example related to parallel processing, illustrates the use
of both Value
and Array
, as well as a technique to pass multiple
arguments to a function. Note that the Process
object does not
accept multiple arguments for a function and therefore we need this or
similar techniques for passing multiple arguments. Also, this
technique can also be used when you want to pass multiple arguments to
map
or map_async
:
from multiprocessing import Process, Lock, Value, Array
import time
from ctypes import c_char_p
increment_by_3_lock = Lock()
def incrementer1(counter_and_names):
counter= counter_and_names[0]
names = counter_and_names[1]
for j in range(2):
increment_by_3_lock.acquire(True)
for i in range(3):
counter.value += 1
time.sleep(0.1)
name_idx = counter.value//3 -1
print ("Greeter1: Greeting {0}! Counter is {1}".format(names.value[name_idx],counter.value))
increment_by_3_lock.release()
def incrementer2(counter_and_names):
counter= counter_and_names[0]
names = counter_and_names[1]
for j in range(2):
increment_by_3_lock.acquire(True)
for i in range(3):
counter.value += 1
time.sleep(0.05)
name_idx = counter.value//3 -1
print ("Greeter2: Greeting {0}! Counter is {1}".format(names.value[name_idx],counter.value))
increment_by_3_lock.release()
if __name__ == '__main__':
counter = Value('i',0)
names = Array (c_char_p,4)
names.value = ['James','Tom','Sam', 'Larry']
t1 = Process(target = incrementer1, args=((counter,names),))
t2 = Process(target = incrementer2 , args=((counter,names),))
t2.start()
t1.start()
In this example, we created a multiprocessing.Array()
object and
assigned it to a variable called names
. As we mentioned before, the
first argument is the ctype
data type and since we want to create an
array of strings with a length of 4 (second argument), we imported the
c_char_p
and passed it as the first argument.
Instead of passing the arguments separately, we merged both the
Value
and Array
objects in a tuple and passed the tuple to the
functions. We then modified the functions to unpack the objects in the
first two lines in both functions. Finally, we changed the print
statement in a way that each process greets a particular name. The
output of the example is:
$ python3 mp_lock_example.py
Greeter2: Greeting James! Counter is 3
Greeter2: Greeting Tom! Counter is 6
Greeter1: Greeting Sam! Counter is 9
Greeter1: Greeting Larry! Counter is 12