Parallel Programming in Python — Lesson 3. Synchronization primitives

Avner Ben
CodeX

--

This is the third in a series of lessons, covering the various facilities that the Python programming language offers for parallel programming and the motivation for using each of them. In the previous lessons, we explored the applicative requirement for event-driven design, and learned to distinguish the ones that really require parallel code (e.g. by multi-threading). Then, we studied the Python thread facility in detail. In this lesson, we continue to explore the applicative need for synchronization (between threads) and the Python support for it.

Sections in this lesson:

  1. “Producer/Consumer” — the problem domain
  2. A critical section (of code)
  3. Lock: the basic synchronization primitive
  4. Condition Variable
  5. Event
  6. Encapsulating the message
  7. Message Queue
  8. When to use?
  9. Publish/Subscribe
  10. Socket I/O as synchronization primitive
  11. Additional synchronization primitives
  12. Homework: Tail Server

1. “Producer/Consumer” — the problem domain

The problem: Information emanates from its source in a stream of discrete “messages”, targeted for display (or other processing) at the designated end, requiring tight synchronization: (1) messages arrive in the correct order, and (2) messages are not lost along the way.

Guidelines to the solution: this object-oriented solution distinguishes between two object classes: Producers (one or more classes and objects) specialize in producing and Consumers (one or more classes and objects) specialize in consuming. (As far as possible, we try not to mix between the two functions. We would not want the same object to both produce and consume, with understandable exceptions, such as acknowledge). The encapsulation of functionality in distinct classes makes them relatively simple, reducing the design to the management of the communication between (or among) them. Event-driven design is invited, in order to uncouple the Consumer from the Producer (and vice-versa). Producers manufacture data and send it out according to some protocol, and consumers consume ready data, (according to the same protocol). Although simultaneity (Producer and Consumer performing their respective jobs at exactly the same time) is seldom required here, parallelism is! If we want to separate the production of the common resource and its consumption to separate objects, as loosely-coupled as possible, then each must be sent on its separate way — production loop on the one end and consumption loop on the other end — and the design challenge is reduced to synchronizing the two (or more) loops.

We begin with the archetypal and “old-fashioned” solution: multi-threading. (Later, we will consider more modern solutions that reduce the overhead of multi-threading by “cooperative” programming).

For the sake of simplicity, we restrict our example to one Producer and one Consumer. (Multiple Producer/Consumer solutions are, naturally, more complicated, but do not introduce a radically different design idiom). The Producer emits enumerated text messages every two seconds, to be displayed by the Consumer, hopefully in the same sequence. The starter prototype uses a global variable to store the current message. (Not the best of practices, admittedly, but schoolbook examples always start this way…). Obviously the Producer and Consumer live on separate threads.

Here is a starter prototype in Python (3.9), followed by notes and sample output:

Notes (corresponding to commented numbers):

  1. Global container for the current message.
  2. The Producer class follows the same pattern as the thread classes exemplified in there previous lessons. (See first example in lesson two, for details).
  3. The producer issues an enumerated message (overwriting the global variable), every two seconds.
  4. As part of the proprietary protocol, the Producer uses the global message, also as instruction for the Consumer to stop. Such shortcuts are common in communication. The alternative would be to dedicate another global (Boolean) for the purpose.
  5. The producer is not up yet. (Indicated by the message still being empty). (To do: replace this CPU-expensive workaround with a proper solution. The program may enter and exit the loop for thousands of times per second for no good reason. Sleeping is not an option, because it will disrupt the two second rate).
  6. The Consumer side of the protocol. A message reading “STOP” is interpreted as an instruction to exit the consumption loop (and is not displayed).
  7. Its job done, the Consumer prints an end-message, so we can verify the test program.
  8. The main program creates a Consumer and Producer and launches them. (In this non-synchronized prototype, the order of launching is significant). It then waits for twenty seconds (which enables ten messages to be passed along — one every two seconds).
  9. The main program signals the Producer to stop. (There is no need to signal the Consumer to stop. This is already taken care of by the Producer).
  10. The main program joins the Consumer, because it is the last thread to finalize.

Output:

Round 1 
Round 1
Round 2
Round 4
Round 4
Round 6
Round 7
Round 8
Round 9
Round 9
Round 10
[End of input]

Oops! The output leaves something to be desired… Obviously, this design must be missing something very fundamental about the problem domain!

We can easily demonstrate the wrong premise of this design, with the following example, which introduces a simple improvement to Consumer logic. (It still does not solve the problem, but it gives a clue to its source).

Notes:
1. This time, we capture the state at this point (in the local variable “message”). Further references to “message” retrieve the captured state. (On the contrary, in the original design, each reference to the global “theMessage” retrieved the then-current state of the same object, yielding inconsistent values, if the global variable was to change in between — which, for all we see, it did, and at least three times — see the missing and double items in the output above).

And why does this happen? A programmatic function is a series of capabilities to be realized in the order written. The good news are that this behavior is guaranteed at function invocation level (a.k.a. “thread”), just as we are used to. The bad news are that it is by no means guaranteed on program level! (Or computer level, for that matter). The operating system is free to hold the thread at any suitable point (usually when doing blocking I/O, but not exclusively so), and pass control over the CPU to another thread. (In a multi-core platform, you can affiliate your thread with a specific core, but even that is not guaranteed to ensure exclusive access to it). So, it is quite possible (and very likely), that the operating system will unleash the Producer to execute two expressions (exactly, the two dozen or so machine instructions compiled from them), hold it, unleash the Consumer to execute one expression, hold it, pass control to another thread, etc.

Morale: Not only, is it not guaranteed that the instructions in your thread will execute in the other written on computer level, quite the opposite is to be expected. Multi-threaded design must start with the recognition that the thread is not alone in the world! It should now become obvious that some programmatic practices had rather be avoided here. Especially, communicating through an unprotected global variable (which is a quick-and-dirty programmatic trick to begin with), in parallel programming, becomes invitation to disaster! While reference to global memory are sometimes inevitable (as in case of communication with a memory-mapped device), not protecting the access to it is poor programmatic practice. (In general, there are no poor programming practices that are unique to parallelism. Parallel programming has the nasty habit to emphasize poor programmatic practices that one may somehow get away with elsewhere…)

We are now going to discuss various ways to solve the problem of protecting the common resource, which all come down to either of the following:

  1. To protect (access to) the global resource.
  2. To avoid the global-resource medium altogether (communicating through other media).

Output:

Round 1 
Round 2
Round 3
Round 3
Round 5
Round 6
Round 7
Round 8
Round 9
Round 10
[End of input]

The problem: Access to the common data — which the consumer perceives as its own input and the producer perceives as its own output — is not synchronized, creating a so called race condition: both parties are competing for the same resource, resulting in a random result. (Depending which horse’s head happened to pass the finish line first — “por una cabeza”). Here, specifically, there are three ways for it to go wrong:

  1. The message may change (by the Producer) while being processed (by the consumer).
  2. The consumer may miss the new message (by polling its container prematurely), thus repeating the old message.
  3. The consumer may skip to the next message (waiting for a tad too long).

And why does this example invite race condition, unlike the “File Watcher” example of the previous lesson? Well, the other example was custom tailored to avoid race condition, to make it simple (but not quite realistic…)

  1. We gave the File Watcher plenty of time to feast over its input (two and a half probes on average — each two seconds, for a message that stays intact for whole five seconds).
  2. The File Watcher did not open the content of the input. It just took notice of the mere fact of its change.

Under such favorable conditions, it is hard to lose! But, a reliable Producer/Consumer architecture in the real world, requires synchronization, to be discussed next

2. A critical section (of code)

Let us confront the problem with a negative requirement: keeping the common data (our message) intact and true to fact, requires to prevent the data from being consumed while not yet ready. Specifically:

  • To hold the consumer (from accessing its input) while it’s next input is being produced.
  • To hold the producer (from issuing its output) while its latest output is being consumed.

Why the ambiguity? Why the complexity? Cannot we just hold the data? Well, data is inanimate matter, to be manipulated by procedure. In procedural programming, one cannot assign responsibility to data (with the exception, in the parallel context, of Atomic Integer, which is too simple to be of help here). We can only (1) find the relevant pieces of procedure (series of capabilities, waiting to be realized) that are responsible for the data, and (2) hold them from executing, when and if they may interfere (with the parallel unfolding of a peer thread)! In this solution, we have two critical pieces of procedure that must be held: one within the Producer and another within the Consumer. Such piece of procedure is known as a critical section (of code, to be held on request).

A critical section is a series of capabilities in a function that may be held according to a condition that is determined from the outside. (As opposed to conditions that are determined internally, such as if, while, etc.)

You may be familiar with the Critical Section synchronization primitive (not supported by Python). It indeed features two methods: to enter the Critical Section and to exit the Critical Section. As the names suggest, these two function calls delimit the Critical Section.

Synchronization, in procedural programming, may be reduced to the following design decisions:

  1. To identify the various critical sections.
  2. To select the conditions that will hold them. (Usually, a ready-made facility from the standard library). (No one prevents you from designing smarter synchronization mechanisms, based upon these primitives, if that is what you need. But coming up with something completely novel is rather unlikely, because these facilities are intimately tied to the operating system and hardware, beyond normal programmer’s reach).

3. Lock: the basic synchronization primitive

The Python Lock (called in other languages and platforms by such names as “Mutex”, “Critical Section” and “Binary Semaphore”) is the most basic synchronization primitive. It is seldom used all by itself (as we shall soon find out) and is a building brick of the other synchronization primitives we shall explore later. Here, we are going to use a Lock to delimit our two critical sections (the Producer net producing and the Consumer net consuming).

A Lock responds to acquire and release requests. By acquire, a thread requests exclusive right to enter the critical section. Naturally, this right will only be given to the requesting thread in the happy occasion that it has not already been given to someone else!

  • When this is not the case (another thread had already acquired the Lock and is now progressing deep within the critical section) the requesting thread is blocked (by the call to acquire, which does not return to it) and is queued.
  • When the current thread releases the Lock, the next pending thread is extracted, now holding the Lock and is free to proceed into the critical section (its pending call to acquire finally returns).
  • Of course, in the trivial case of acquiring a free Lock, the call to acquire returns immediately and the thread is free to proceed (potentially blocking other threads that may come in the middle).

Here is a refactored starter prototype, followed by notes and sample output:

Notes:

  1. The threading built-in library includes, in addition to thread, also a useful assortment of thread-related synchronization primitives.
  2. The Lock is a global object, because it keeps the common state for both Producer and Consumer. It is initialized by default. (A typical bug, that every self-respecting real-time programmer must go through as kind of rite of passage, is to try to synchronize the consumer and producer with two local locks. Each of these locks is protecting its own imaginary resource, but certainly not the common physical resource!)
  3. The with block defines the critical section on the Producer side. Lock is a “context manager”, which gives it the privilege of adding some actions on entry to — and on exit from — the block. Actually, the Lock is acquired on entry to the with-block and is released on exit from the with-block.
  4. The other with block defines the critical section on the Consumer side.
  5. Blank message ignored (Producer not yet ready). (To do: Fix this CPU-expensive workaround).
  6. The main program is unaffected by the improvements to the Producer/Consumer protocol.

Output:

Round 1 
Round 2
Round 3
Round 3
Round 5
Round 5
Round 6
Round 8
Round 8
Round 10
Round 10
[End of input]

Oops! It’s not over yet!

The problem: The Lock, in this example, indeed guarantees that the critical sections never interleave: the Producer never interferes while the Consumer retrieves the message and the Consumer never interferes while the Producer Preparers the message. But this still leaves two race conditions that must be taken care of:

  1. The Consumer re-acquires the lock before the producer (receiving the old message again)!
  2. The Producer re-acquires the lock before the consumer (and the current message is lost)!

A solution: The protocol shall feature an acknowledge (“ack” for short). The Producer shall only enter the critical section when the Consumer confirms handling the message (i.e. signals the ack). This finally solves the remaining race conditions.

Here is a refactored prototype, followed by notes and sample output:

Notes:

  1. Another global variable: a Boolean ack.
  2. The Producer avoids the critical section, as long as the Consumer has not acknowledged processing the pending message. (To do: Fix this CPU-expensive workaround).
  3. Its job done, the Producer turns off the ack. The ball is now in the Consumer’s field.
  4. The Consumer avoids the critical section, as long as its own ack is still pending. (The Producer is not done yet). (To do: Fix this CPU-expensive workaround).
  5. Empty message is avoided (Producer not yet ready). (To do: Fix this CPU-expensive workaround).
  6. Its job done, the Consumer acknowledges the Producer to proceed, but not before delaying both of them for two seconds. (In this specific configuration, it is the Consumer that sets the pace!)
  7. The main program is unaffected by the protocol change.

Output:

Round 1 
Round 2
Round 3
Round 4
Round 5
Round 6
Round 7
Round 8
Round 9
Round 10
[End of input]

Additional Python Lock functionality:

  • Using Lock as context manager (with-block) is the common — and common-sense — way. Most importantly, it prevents the common bug of exiting the critical section prematurely, neglecting to release the Lock, which leaves the program stuck indefinitely. However, no one prevents you from calling acquire and release explicitly, if that’s what it takes. (For example, in order to specify timeout).
  • It is possible to acquire the Lock non-blocking (using Boolean argument). The call to acquire returns Boolean success status — are you (or aren’t you) in control off the Lock now. (But of course, proceeding in spite of failure, voids the protocol!)
  • It is possible to acquire the Lock blocking (the default), but with timeout (infinite by default). (Consult the result status). A common practice is to iterate on acquiring with a short timeout (to prevent your thread from getting stuck due to the other side’s neglect).
  • Although the Python lock is just wrapper for the operating system lock, there are some restrictions: (1) The Python Lock is restricted to process scope. It cannot be given a name (to be controlled from another process). (2) Release may be called from any thread (rather than just from the thread that acquired the Lock).
  • RLock (for “Reentrant” Lock) solves the problem of recursive attempts to acquire the Lock from the same thread. Normally, this puts the current thread in deadlock, resulting in a stuck program, because the thread, now holding itself, will never release it again. On the contrary, the reentrant version adds an acquire counter (by the current thread). So that when the thread that currently holds the Lock tries to acquire it again, the count is increased (instead of holding the thread) and each release request by the current thread just decreases this count. The reentrant Lock is finally released, when the count reaches zero. Use RLock in recursive algorithms.

4. Condition Variable

Python already has a synchronization primitive that encapsulates Lock and condition. So, we may use this to simplify the previous prototype (and, in addition, solve its performance problems).

A Condition (also called Condition Variable) associates a critical section (delimited by Lock) with wait/notify capability similar to Event (see next section), often (but not necessarily) together with a Boolean test. The unique feature of the Condition is that waiting (for the holder of the resource to release it) “punches a hole” in the critical section and allows the other side to acquire the lock, proceeding with whatever it takes (for example, to prepare the common resource). When the other side notifies, the Condition re-acquires the lock and control progresses into the remainder of the critical section. This makes it a flexible and very general synchronization solution. (On the contrary, attempting to synchronize explicitly from within a critical section, delimited by a naked Lock, invites deadlock!)

Here is a refactored prototype, followed by notes (output remains the same):

Notes:

  1. This synchronization primitive is also imported from the threading library
  2. The receive condition is global, initialized by default.
  3. The (Lock within) the condition delimits the Production critical section.
  4. Instructed to end, the Producer releases the Consumer and exits the production loop.
  5. Its cycle job done, the Producer releases the Consumer to receive.
  6. Here the Producer sets the pace (which is more appropriate). Sleeping happens outside the critical section! Note that this Producer issues a message every two seconds regardless of the other side. (Contrary with the other solutions in this lesson, it does not wait for ack)! Since this Consumer does very little, this design works. Try to run it with reduced sleep times (fractions of a second) and it will not break. It will take very high rates to make this Consumer loose messages. (in which case, the ack may be due back).
  7. The (Lock within) the condition delimits the Consumption critical section.
  8. This example trades the ack from the receiver with verification of message change. (Assuming that, in this protocol, no two consecutive messages are the same!)
  9. At end of cycle, the Consumer records the current message.
  10. The main program is unaffected by the change of synchronization method.

Additional Condition functionality:

  • It is possible to associate multiple Conditions with the same lock (specified during construction time). By default, the Condition creates its own Lock.
  • Wait_for takes predicate (function, explicit or lambda) and (optionally) timeout, and waits for notification while the predicate returns false. (Actually, we do not expect it to perform more than once). Of course, you can write the loop by yourself, calling wait instead. Testing the condition is a (common) convention. In principle, you may wait unconditionally, using the condition as Event with Lock.
  • The call to notify releases one waiting thread (which is all we’ve got here). To release multiple waiting threads, either specify their number or use notify_all.

5. Event

Event looks like Condition Variable without the condition and the lock (leaving only wait/notify functionality). Indeed, it predates it by a couple of decades. The novel Condition Variable is often marketed as an improved version of the obsolete Event (e.g. see the documentation of C++x11). But it is, still, part of the Python (and other languages) threading infrastructure, so make your own judgement!

Event is a global Boolean blessed with the capability to hold interested threads (until True), in addition to being polled for truth like any Boolean. When “reset” (False), it holds the execution of all the threads that wait for it. When signaled (set to True) by a third party, the event lets them go (the calls to wait return to each of them).

In the following example, the use of two Events (one to control sending and the latter to control receiving) delimits the critical section and eliminates the need for a Lock.

Here is a refactored prototype, followed by notes (output remains the same):

Notes:

  1. This synchronization primitive is also imported from the threading library
  2. The receive and send Events are global variables (false by default).
  3. Sending is enabled on Producer initialization, to enable the first message.
  4. At the start of each cycle, the Producer waits for permission to send. There is no lock. The critical section is delimited by the event (between wait and clear). (To do: specify timeout. When wait failed: error).
  5. Instructed to stop, the Producer neutralizes the events and exits the loop.
  6. At the end of each cycle, the Producer permits receiving and holds sending.
  7. The pace is determined by the Producer, that waits for 2 seconds at the end of each cycle. (The Consumer is held by Event in the meanwhile).
  8. At the start of each cycle, the Consumer waits for permission to receive. There is no lock. The critical section is delimited by the event (between wait and clear). (To do: specify timeout. When wait failed: error).
  9. At the end of each cycle the Consumer permits sending and holds receiving.
  10. The main program is not affected by the change of synchronization method.

6. Encapsulating the message

I stated solemnly in the beginning of the lesson that “data is inanimate matter” to be bullied by procedure, and that (therefore) the sole purpose of synchronization primitives is to delimit the critical sections of code that manipulate it (the data). This, indeed, represents the state of the art. However, using object-oriented programming, we can assign the responsibility for synchronization to the data, by encapsulating the synchronization primitives with the data and adding methods for activating them. The result is a Message object that must be sent messages (in the object-oriented way), becoming responsible for its own integrity.

The following example refactors the example in section 5, encapsulating the two events (that used to be global) with the message:

Notes:

  1. The entire Synchronized Message functionality is now a class
  2. Class Synchronized Message encapsulates the former global message-related staff.
  3. Blocking get message. The requestor (the Producer, apparently) must wait for the send ack. (To do: timeout).
  4. Blocking set message. The requestor (the Consumer apparently) must wait for the “receive* ack. (To do: timeout).
  5. The other send/receive event functionality is given methods with application-significant names.
  6. The Producer is initialized using the message. (It is no longer global).
  7. The Producer resets the message to sending state.
  8. The producer sets the message, blocking (until released by the Consumer).
  9. The wait implicit in this request is redundant, but, in the present context, we can be sure that the event is signaled anyway.
  10. The Producer sets the message to enable sending and disable receiving.
  11. The Consumer is initialized using the message.
  12. The consumer gets the message, blocking (until released by the Producer). This idiom (obtaining the promised value, potentially blocked by the maker of the promise) is characteristic of the Future design pattern.
  13. The consumer sets the message to disable receiving and enable sending.
  14. The message, which used to be global, is now local to the main program and is passed to the Producer and Consumer.

And the following example refactors the example in section 4, encapsulating the condition (that used to be global) with the message, and interfacing for it:

Notes:

  1. The Synchronized Message implements the context manager interface, to imitate Condition behavior.
  2. The Synchronized Message implements the useful Condition interface, delegating to the Condition inside.
  3. The message is used to delimit the Producer’s critical section. Synchronization is requested from the message.
  4. This solution does not respect the message’s information hiding and does not feature blocking get and set.
  5. The message is used to delimit the Consumer’s critical section. Synchronization is requested from the message.

7. Message Queue

All synchronization primitive solutions presented thus far (Lock, Condition Variable and Event) specialized in various ways to protect access to a singular resource (a global string object, in our case) that is available to both Producer and Consumer sides. Although the program is emanating a series of messages, inside the program, there is a single message, that is overwritten in each round (with race conditions smartly avoided).

An alternative strategy is to serialize the output internally, to begin with. Here, nothing is overwritten! In each round, a new message is created internally and queued. The Producer is exempt from worrying about message integrity (besides flooding the queue with more messages than the Consumer can digest). It is the Consumer’s responsibility to extract the pending message from the queue, and on time.

While serialization (of the communication between Producer and Consumer) is more resource-expensive and complicated internally, it simplifies the programmer’s job (no critical sections to take care of) and reduces race-conditions. (If handled properly, of course).

There are two serialization strategies:

  1. To use the queue to uncouple Consumer from Producer. The Producer writes messages to the queue in its own pace. The Consumer extracts the messages from the queue in its own pace. The producer is oblivious to whether the message was consumed (it does not wait for ack from the other side). Such architecture is suitable for multi-casting messages to multiple consumers. For example, when your web browser gets a page from a remote web site server, the server (producer) could not care less about what you (a random consumer) do with the page and whether it has arrived in one piece to begin with. The only concern here is to verify that the queue does not overflow. (Messaging infrastructures tend to restrict the size of the queue!). In addition — the queue must live up to externally-imposed throughput requirements, if any.
  2. To use the queue as sort of “Event with backup”. The producer does wait for ack from the consumer, before pushing the next message. While this could, alternatively, be implemented simply by additional “ack” messages (and possibly dedicated queues for them), the Python queue (in accord with the UNIX queue) optionally provides Event-like functionality: you can join it (i.e. wait for signal from the sending side) and signal it (release the receiving side, as we do in the example below). This architecture enables multiple Producers (and one or multiple Consumers). Multiple Producers can send their messages to the same Consumer without chocking it. The various messages accumulate in the Consumer’s queue, and it is the Consumer’s responsibility to ack each Producer in turn, as necessary. Such architectures are frequent in real-time software.

Here is a refactored prototype, followed by notes (output remains the same):

Notes:

  1. The message queue functionality lives in separate library (and is not part of the threading library).
  2. The Producer receives the queue to write to (i.e. the Consumer’s).
  3. The producer sends the current message to its output queue.
  4. The Producer waits (for sign from the Consumer).
  5. The Consumer opens a queue (for input).
  6. The Consumer extracts the pending message from the queue (and is blocked until it arrives, if ever. (To do: Specify timeout and check for errors).
  7. Before exiting, the Consumer uses the queue to release the Producer.
  8. Message read and printed, the Consumer uses the queue to release the Producer.
  9. The main program first initializes the Consumer and then the Producer, using the Consumer’s message queue. This strict division of labor ensures a measure of information hiding! (The main program is immune from knowing how messages are passed and the Producer is immune from knowing where the messages it is sending go to).

Additional message queue features:

  • Query queue status: quantity of messages currently in queue / is empty, is full.
  • Blocking put (when full), with optional timeout.
  • Non-blocking get, with optional timeout.
  • Priority queue, where messages are retrieved by order of priority (assigned number).
  • LIFO queue (i.e. stack), where messages are retrieved in reverse order.

8. When to use?

As discussed in the introduction to this series, parallel design is invited in the presence of non-blocking input. We have a function “A” that produces output and another function “B” that consumes this very input (it may be the same object. The difference is of perspective), but — for various reasons — the two may not be sequenced procedurally ( temporally-coupled), as in (1) another function “C” calls “A” then “B”, (2) “A” calls “B” at the end, (3) “B” calls “A” at the start, etc. Consequently, the design must resort to a discontinued process: the two functions seem unrelated, but, since they are still data-coupled, the order of their execution and the transfer of the data between them are taken care of by subtler programmatic devices — proprietary (or standard, or third-party) communication protocol. For example, the functions may execute on different threads, synchronized by some of the primitives discussed above.

The solution to design problems involving non-blocking input depends upon the number of consumers (competing for this input), the number of producers (blocked, if at all, by the consumers), the rates involved on both sides, quality of service (can messages be lost? must the order be kept?) and the producer and consumer obligations to other tasks. Here are some brief guidelines:

  • The simple examples in this lesson, involve (1) one producer and one consumer that (2) are obliged to nothing else, and (3) the rate of transfer is dictated by the producer alone. This straightforward use case can do with a single instance of the common resource, synchronized by Condition Variable (as in example 4 above). An ack may be added to defend the Consumer from excessive rates of transfer.
  • Multiple consumers (fed by one producer) may still do with a singular global resource, but may require tighter synchronization. To name a few common restrictions: (1) the common resource must persist as long as not fully consumed (by all consumers) and (2) consumers must not change the common resource. The default synchronization primitives (Condition, Event, bare Lock) may still do, because they are built to hold multiple threads.
  • Multiple producers (feeding one consumer) require serialization (e.g. message queue). While keeping multiple global resources (and their locks) is possible, it will pose a maintenance nightmare and limit extensibility. It is better to concentrate the complexity in a dispatch logic, immediately after retrieving the next message from the queue.
  • Condition and Event are means for the consumer to block producer (holding the next output while the current has not yet been consumed). Where the rate of transfer requires from the producer to produce the next output before the consumer can take it (but messages must never be lost), serialization is due (e.g. by message queue).
  • Synchronization may be relaxed in polling use cases, where the producer (e.g. on dedicated hardware) outputs at high rates and the consumer samples the output at some interval. Here it is agreed in advance that some data is going to be lost. When the input is buffered, some synchronization may still to be observed, e.g. by signaling the duration of reading (or the opposite — the duration of writing), to prevent the consumer from getting incoherent (partially written-over) data. Such architectures are frequent in real-time software.

9. Publish/Subscribe

The Message Queue discussed above is specifically defined as Point-to-Point Message Queue. Although the producer does not send messages straight to the consumer (God forbid!), it does send them to a queue that represents, belongs to (and was very likely created by) the consumer. Two consumers invite two message queues. If two consumers use the same queue for input (which is legal), then one of them is going to starve (it will learn about a message, but, by the time it gets to the queue, the message may already have been taken by its peer).

For the next logical step beyond the simple point-to-point message queue, we transcend above the standard Python offering, to the world of “Publish/Subscribe”. Publish/Subscribe messaging infrastructures, such as RabbitMQ, Kafka, to name some of the better known, have long been famous for their logging capabilities, but recently have been taking off as key components in service (or micro-service) application frameworks, even in embedded and hard real-time applications, which speaks for a mature technology (apparently responding to a real need!)

A Publish/Subscribe infrastructure carries the message queue notion to the extreme, by uncoupling Consumer from Producer for good. The Producer publishes as much as it takes to a named topic, (practically, a message queue, but open to whoever knows its name). Consumers register to topics of interest (by name) and are notified when new messages arrive. Because synchronization (as well as low-level communication and protocol, in full-fledged platforms) is taken care of by the messaging platform, the Consumers in the system are reduced to event handlers — services (or micro-services in a distributed environment). Producers are also simplified (by the underlying automation), but to lesser extent.

In addition, anyone (with access to the system) may query and retrieve from this topic database. Whether messages remain available (once all registered Consumers have reviewed them) is configurable and may be used for such purposes as logging, in-memory database, testing and simulation (e.g. by injecting messages from the outside, actually activating the appropriate consumers and producers, in order to re-run recorded or imaginary use-cases, and log the results for verification, e.g. by automatic tools).

While Python does not have built-in support for Publish/Subscribe, there are a number of third party offerings. For this example we use pypubsub (install with pip, etc.), which is quite mature software. It is limited to process scope and features a strict and simple synchronization protocol which (if you can live with it), can be used to reduce the application into a simple event-driven design, with all the benefits of extensibility and maintainability.

Publishing is done by sending a message to a topic. (There are no designated publishers — anyone one can publish to any topic). Consuming is done by registration of callbacks. (Functions — consumption does not require encapsulation). The publish/subscribe protocol is straightforward synchronous. Sending a message blocks the sender and calls all registered listeners one by one, blocking, to handle the message. (pypubsub does not support asynchronous handling. You can implement asynchronous consumption in the obvious way, by reducing your listeners to interrupt-service-routines that just signal the real consumers, possibly duplicating the message objects — which are weak-referenced — to pick up from there, but in their own threads).

In the example below, the consumer is no longer a thread (just registering a callback), but the Producer is still a thread, to maintain the current logic.

Here is a refactored prototype, followed by notes (output remains the same):

Notes:

  1. Importing from the pubsub library.
  2. Instructed to end, the Producer publishes an end-message to the “EnumeratedMessages” topic, with argument named “message”. The end code is extended and clarified (from just “[STOP]” to the honorable “[END-ENUMERATED-MESSAGES]”, to make it survive the distributed open-to-all environment.
  3. Its cycle job done, the Producer publishes an enumerated message to the “EnumeratedMessages” topic, with argument named “message”.
  4. The Consumer is not required to be a thread, in this design.
  5. The Consumer registers a listener callback to the “EnumeratedMessages” topic. This, incidentally, creates the topic and defines the listener’s signature.
  6. The listener callback takes one argument, called “message”.
  7. The listener consumes a single message (no event loop) and is devoid of synchronization.
  8. Alternatively, the Producer does not have to be a separate thread. In that case, the call to start would block the main program. But this would not achieve much. In order to maintain the present logic, the main program should first launch a timer thread (to stop the producer after 20 seconds). So a second thread is required, one way or the other! In addition, such a design makes it hard to use multiple Producers.
  9. The main program joins the Producer. (Because the Consumer is no longer a thread).

Additional Publish/Subscribe functionality:

  1. The pypubsub library supports multiple producers and multiple consumers and a hierarchical topic database, so there is much more to it than presented above. Consult its documentation for further details.

10. Socket I/O as synchronization primitive

Parallel design is by no means limited to those half a dozen primitives, supplied by the built-in threading library. Any platform that supports the functionality of blocking and notification allows (and often invites) parallel designs, e.g. such as featured in this lesson.

For example, consider producer and consumer that communicate through TCP/IP, using sockets. In this (rather simplified) example, the use of blocking send and receive takes care of synchronization in the obvious way

Notes:

  1. The socket library is used
  2. The producer is initialized using IP address.
  3. The Producer opens a network node for itself (“socket”) on the IP address supplied. This is a server socket (it does not connect to the outside world — the outside world is invited to connect to it). (To do: validate that the socket has indeed been granted. Possible errors: “port already in use”, etc.).
  4. The socket listens for connection requests to this address, asynchronously. (It silently opens a thread that polls the network for us, in the background).
  5. The Producer responds to the first (and only) connection request, blocking, resulting in another socket (for the connection). (To do: Specify timeout and decide what to do on failure. Optionally, check first if a connection request is pending).
  6. The producer retrieves the “READY” message, which lasts for exactly 5 bytes, blocking. (To do: Specify timeout. Validate that the message indeed reads “READY”).
  7. When it is time to stop, the Producer sends the “STOP” message (in bytes). The extra spaces at the end ensure that the Consumer, waiting for 7 bytes (no less!), does get the message.
  8. The Producer sends the next message through the socket (encoded to bytes). (To do: Validate that all bytes have indeed been sent, using the returned number).
  9. Its job done, the Producer closes the sockets (server and connection).
  10. The Consumer opens a client socket, connecting to the IP address of the Producer (as supplied by the application). (To do: Specify timeout and check for error).
  11. The Consumer sends the “READY” message. (To do: Validate that five bytes have indeed been sent).
  12. The Consumer extracts the next message, expecting exactly 8 bytes, blocking. (To do: specify timeout, etc.).
  13. Its job done, the Consumer closes the (client) socket.
  14. The main program initializes the producer and consumer with the (fictitious) address of the local computer (“localhost”), port 8899 (hoping it is not in use). Besides that, the main program logic remains the same.

11. Additional Synchronization Primitives

Python also supports the following thread-level synchronization primitives:

  1. Semaphore. The original synchronization primitive. To allow a predefined number of threads in, then block additional requests until one released. Not applicable to our Producer/Consumer example. The simple Lock we have used is also defined as “Binary” Semaphore (e.g. in vxWorks), because it allows one thread in, then blocks the second on.
  2. Barrier. To block a predefined number of threads (that register individually) until the number is satisfied. Does not seem to apply to our Producer/Consumer example.A typical application is to wait for all participants to finish preparations and be ready to start. E.g. an application framework launches so many tasks. Due to dependencies among the tasks, it is essential that all tasks finish all their preparations (“end-initialize”), before they are allowed to start their event loops. E.g. it is in error to open a queue for output to some task, when the queue is not already open for input on the other side. In this case, all tasks are expected to open their queues for input during end-initialize. Then, it becomes safe to open them for output at the other side. Here a barrier is handy: given number of tasks, is serves as barrier between end-initialization and start of service (on system level).
  3. “concurrent.futures”. Thread/process pool infrastructure, mainly used by the *async/await* facility (to be discussed). This facility specializes in launching one-time functions in threads (or processes) and collecting their results. It does not seem to apply readily to our Producer/Consumer example where two parallel tasks exchange messages on a permanent basis (and continue to exist).

12. Exercise: Tail Server

For the sake of simplicity, we have limited the scope of the design, in this lesson, to a single Producer feeding a single Consumer. This was sufficient for demonstrating the bare facilities. Fortunately, the challenge of extending the logic to account for use-cases involving multiplicity does not change the design radically, but requires close attention to synchronization, and fire-proofing the protocol. Specifically:

  • One producer feeding multiple consumers.
  • One consumer fed by multiple producers.
  • Multiple consumers fed by my multiple producers.

Starter prototype: In the previous lesson, you were asked to extend the File Watcher example to supply UNIX-like Tail functionality. A Tail is initialized using file name and proceeds to print the lines appended to this file (assuming the file is only updated at the end side), in a fixed interval.

Here is the schoolbook solution (devoid of synchronization):

The challenge: This design features a single Tail function that is launched by the application and outputs to the console. Extend it into a Tail server that outputs the result of multiple tails (for multiple files) to the console. The files to tail are requested dynamically by the application. Experiment with the synchronization facilities suggested above.

Design highlights:

  • To launch Tail threads: responsibility of the Tail Server, commissioned (synchronously) by the application.
  • To print change messages (asynchronously): responsibility of the Tail Server. (The Tails only notify!)
  • To terminate the Tail server and its threads (gracefully!)

What next?

In the next lessons, we are going to consider alternative parallel solutions to multi-threading in Python: multi-processing, cooperative processing with pull and push iterators and dispatch-based cooperative processing (asynch invocation).

  1. Introduction
  2. The Thread
  3. Synchronization Primitives (Multi-threading) — (you are here!)
  4. Synchronization Primitives (Multi-processing)
  5. Cooperative Processing — synchronous
  6. Cooperative Processing — asynchronous

--

--

Avner Ben
CodeX
Writer for

Born 1951. Active since 1983 as programmer, instructor, mentor in object-oriented design/programming in C++, Python etc. Author of DL/0 design language