Parallel Programming in Python — Lesson 4. Multi-processing

Avner Ben
CodeX

--

This is the fourth in a series of lessons, covering the various facilities that the Python programming language offers for parallel programming and the motivation for using each of them. In the previous lessons, we explored the applicative requirement for event-driven design, and learned to distinguish the ones that really require parallel code (e.g. by multi-threading). We studied the Python thread facility in detail and discussed the the need for synchronization (between threads) and the various synchronization primitives available. In this lesson we take a look at the Python light multiprocessing alternative (to multi-threading).

Sections in this lesson:

  1. Trading Process for Thread
  2. Using Event as synchronized Boolean
  3. Sharing data between processes
  4. Serializing data between processes
  5. Additional multiprocessing functionality
  6. Invoking another program, blocked
  7. Handling failed invoked program execution
  8. Collecting invoked program output

1. Trading Process for Thread

The Process, featured by Python’s multiprocessing module’s, closely resembles the Thread, featured by Python’s threading module (that we have studied in the previous lessons), but lives in a separate memory space, courtesy the process supplied by the operating system. It features similar synchronization primitives (on interface level), plus some additional functionality. But, below the surface, the implementation accounts for the different memory spaces involved, as we shall immediately see.

Let us start by adjusting the Producer/Consumer example from the previous lesson (precisely: the version synchronized by message queue), and adjust it to use processes instead of threads, to see if that works. Besides that, the program is hardly changed!

Notes (corresponding to commented numbers):

  1. This time we are using the multiprocessing module. In order to maintain the logic of our threading example, we import the join-able queue. (for some reason, the default message queue here is not join-able).
  2. This time, the Producer inherits multiprocessing.Process.
  3. The Consumer (which is a plain function, not encapsulated like the Producer) is explicitly launched in a process.
  4. The Producer is initialized using the Consumer’s queue.

Output: (if your IDE does not show the output of the other process, run from the command line!)

Round 1 
Round 2
Round 3
Round 4
Round 5
Round 6
Round 7
Round 8
Round 9
Round 10
Round 11
Round 12
...

Something is wrong! The program continues to run past the tenth message and must be killed. What went wrong? The following debugging messages may help clarify the error.

  1. The program is still running. The end-flag must be false.
  2. The program is not running any more. The end-flag must be true.

Output: (if your IDE does not show the output of the other process, run from the command line!)

... 
Round 7
Running: flag = False
Round 8
Running: flag = False
Round 9
Running: flag = False
Round 10
Stopping: flag = True
Running: flag = False
Round 11
...

The program continues to run after it should have stopped. Apparently, the end-flag that it sees is still false, in spite of the (verified!) fact that it was set to true a minute earlier!

How come the end-flag was set to true (which was verified), but nevertheless remains false? The solution is simple: it is not the same end-flag! The end-flag sits in the Producer object, and there are two Producers in this story. And why are there two Producers? Because there are two processes (“multiprocessing”, remember?). It goes like this:

  1. The main process creates a Producer, which initializes its end-flag to false.
  2. The Producer (which is declared as a process), true to its name, launches another process in the background and silently creates there a copy of itself.
  3. When requested to start, the Producer’s main loop is launched, but in the Producer’s process. Whatever happens to that Producer (e.g. the value of its end-flag) has nothing to do with the Producer that remains in the main process.
  4. The main program orders its Producer to stop. Since this method call is synchronous, it affects the end-flag in the local Producer. The real producer (the one in the process) has no reason to aware of that, and therefore, will continue to run indefinitely.

And why did this trick work when the Producer was a thread? Because there was only one Producer (thread), and it sat in the same memory space as its client. In that configuration, we could communicate with the Producer both asynchronously and synchronously.

The fact of the two separate objects (and in two separate processes) is demonstrated beyond doubt, by printing the process ID and address of the Producer:

Output: (if your IDE does not show the output of the other process, run from the command line!)

Main program. Process: 5884 
Producer constructor. Process: 5884
Producer main loop. Process: 7372
Round 1
Round 2
Round 3
Round 4
Round 5
Round 6
Round 7
Round 8
Round 9
Round 10
Producer stop method. Process: 5884
Round 11
Round 12
...

As we can tell by the process id’s, the Producer is initialized in the main program. The constructor is not called again fir the Producer in the child process (because, once initialized, it was silently copied there — pickled and unpickled). The request to stop was sent to the Producer in the main program. The Producer main loop occurs in another process!

Morale: communication with a process must be asynchronous! We cannot use a variable to communicate between processes (because the variable is sitting in the wrong memory space, relative to the other process). Python’s multiprocessing offers us two solutions:

  1. To use a synchronization primitive (the multiprocessing version) instead.
  2. To use a synchronized variable (and let the multiprocessing module marshal the value between the processes).

2. Using an Event as synchronized Boolean

This example upgrades the Boolean end-flag into an Event, used in a limited capability as a global Boolean whose value is accessible to both processes. (But it is not used to actually hold and release anyone, which is what a real Event is expected to do).

  1. The program uses multiprocessing.Event.
  2. The Producer receives the event from the outside. The Producer is reduced to a function, because it no longer has state (the end-flag), and the “stop” setter (the only addressable method it had) is no longer needed.
  3. The value of the event is queried non-blocking.
  4. The Producer function is launched using the event (which is local to the main program).
  5. The main program signals the event.

Output: (if your IDE does not show the output of the other process, run from the command line!)

Round 1 
Round 2
Round 3
Round 4
Round 5
Round 6
Round 7
Round 8
Round 9
Round 10
[End of input]

3. Sharing data between processes

Instead of using an Event in the capacity of Boolean, it turns out that we can use a proper Boolean, courtesy of the multiprocessing module. For some reason (performance, probably), the repertory of variables to be shared among processes is limited to C-language basic types, as defined in the Python/C interface (and, contrary with the Python custom, is strong-typed!).

  1. We are going to use multiprocessing.Value
  2. The program uses the ctypes (C-language interface) module.
  3. The Producer is upgraded back to a class (because it has state and features a setter).
  4. multiprocessing.Value encapsulates a C variable whose value is automatically shared between processes.
  5. The truth value of a Value returns the truth value of the variable inside (Boolean in this case).
  6. The value stored inside the Value object is assigned to True (python style, rather than “true”, C-style). A disastrous program bug here would be to assign the new value to the value-holder, rather than to its content (Python will obey silently, but the result may not match your intention!) as in: self.toStop = True. This will replace the multiprocessing.Value with a Python Boolean, which is syntactically correct, but puts an end to the synchronization!
  7. The end-flag synchronization no longer concerns the main program.

Additional shared-value functionality:

  • Scalar Values may be any of the familiar C-language types, such as bool, char, int, uint, long, double, etc.)
  • For multiple values, use a shared (homogeneous) Array.
  • The Value type may also be specified by a single-letter code, such as ‘c’ (for character), ‘i’ for signed integer, etc.
  • (Some) Python types (including dict and list) may also shared, requiring to launch a Manager process.

4. Serializing data between processes

Where the communication is limited to a steady stream between two processes, the queue-based synchronization may be somewhat simplified by a Pipe.

Notes:

  1. The program uses multiprocessing.Pipe.
  2. The Producer is initialized using (one side of) the Pipe.
  3. The producer sends the message through the Pipe.
  4. This solution replaces the message-queue event-like behavior with ack. After sending the message, the Producer waits for ack (coming from the same Pipe), blocking.
  5. The remote connection must be explicitly closed (on the Producer side), to free the other side.
  6. The Consumer is initialized using (the other side of) the Pipe.
  7. The Consumer waits for message from the pipe, blocking.
  8. Attempting to receive from a closed connection (on the other side) results in error, which the Consumer interprets as legitimate end of message stream
  9. The Consumer waits for two seconds and then sends the ack. (Here, the Consumer sets the pace).
  10. The remote connection is explicitly closed on the Consumer side, as well.
  11. The main program opens a Pipe, giving two connections (unpacked from a two-tuple).
  12. The Consumer is initialized using one connection side of the Pipe.
  13. The Producer is initialized using the other connection side of the Pipe.
  14. The local connections require explicit closing too, to dispose of the Pipe properly!

Additional Pipe functionality:

  • By default, the Pipe is full-duplex (both connections may be used to send and receive). This may be constrained (e.g. the left side to receive only and the right side to send only).
  • The Pipe may transfer, in addition to Python strings, also raw byte arrays and arbitrary Python objects, provided they are pickle-able (serialize-able using Python’s pickle facility).

5. Additional multiprocessing functionality

Using the multiprocessing facility (as replacement for multi-threading) involves the obvious overhead of launching the extra process and context switching, but this is considered negligible, especially when compared with the benefits, which are mainly:

  • Being less prone to shared-resource corruption and race conditions.
  • Obtaining simultaneity (where needed), liberated from the tyranny of the infamous GIL (Python’s Global Interpreter Lock).

The general consensus is to keep using multi-threading for I/O intensive applications and multiprocessing for CPU intensive applications.

Another useful feature of the multiprocessing module (but not applicable to our Producer/Consumer example) is a pool of worker processes that may be used to invoke a function on multiple values and collect the results using asyncResult — kind of Future value.

6. Invoking another program, blocked

The name of the module — “multiprocessing” — is somewhat misleading. The processes referred-to are launched automatically and, while can see them through the operating system process list, they are not intended to be addressed as processes proper. If that is what you need (to launch a process explicitly, monitor its progress and, e.g., collect its output), you must resort to the subprocess module, that interfaces the OS process functionality.

The subprocess module replaces various system calls with a unified and simple interface. The typical functionality is to invoke another program blocking, (i.e. wait for it to return) and retrieve either its return code, or its entire console output, post fact. Obviously, this facility is not suitable for our Producer/Consumer test-case, which requires handling the messages one by one.

Following is an example of invoking a shell command (a program, run in a separate process) and retrieving its return code:

>>> rc = subprocess.call( ['dir', 'C:\\python27'], shell=True )
Volume in drive C has no label. Volume Serial Number is 3AC3-56FA Directory of C:\\python27
19-Aug-16 21:19 <DIR> .
19-Aug-16 21:19 <DIR> ..
19-Aug-16 21:19 <DIR> DLLs
19-Aug-16 21:19 <DIR> Doc
19-Aug-16 21:19 <DIR> include
12-Nov-16 19:18 <DIR> Lib
19-Aug-16 21:19 <DIR> libs
27-Jun-16 14:22 38,591
LICENSE.txt
26-Jun-16 21:27 460,946
NEWS.txt 08-Nov-15 19:05 12,139
py2exe-wininst.log 27-Jun-16 14:20 27,136
python.exe 27-Jun-16 14:20 27,648
pythonw.exe ...
12 File(s) 1,328,385 bytes 10 Dir(s) 99,487,002,624 bytes free
>>> rc 0

7. Handling failed program execution

The following example invokes a shell command and catches its exception (practically, non-zero return code):

>>> try: 
rc = subprocess.check_call( ['dir', 'C:\\python270'], shell=True )
except subprocess.CalledProcessError as error:
print('Command failed. rc:', error.returncode)
Volume in drive C has no label. Volume Serial Number is 3AC3-56FA
Directory of C:\
File Not Found
Command failed.
rc: 1

8. Collecting the invoked program output

The following example invokes a shell command and collects it entire console output (in one piece) as a string:*

try: 
data = subprocess.check_output( ['dir', 'C:\\python27'], shell=True)
print(f'received {len(data.split('\n'))} lines')
except subprocess.CalledProcessError as error:
print('command failed. rc:', error.returncode)
received 30 lines

What next?

In the next lessons, we are going to consider alternative parallel solutions to the classical multi-threading/processing in Python: cooperative processing with pull and push iterators and dispatch-based cooperative processing (async execution).

  1. Introduction
  2. The Thread
  3. Synchronization Primitives (Multi-threading)
  4. Synchronization Primitives (Multi-processing) — (you are here!)
  5. Cooperative Processing — synchronous
  6. Cooperative Processing — asynchronous

--

--

Avner Ben
CodeX
Writer for

Born 1951. Active since 1983 as programmer, instructor, mentor in object-oriented design/programming in C++, Python etc. Author of DL/0 design language