The actor model in python with gevent
tl;dr
Writing concurrent systems is hard. A few people want to reason about low-level concurrency control primitives such as conditional locks, mutexes and semaphores; higher level mechanisms such as the actor model present lots of benefits from supervision to recovery of services/ processes. This is a sample project on using the event based actor model in python designed using gevent.
An actor is a high level concurrency primitive that allows you to model concurrent computations using entities that interact through message passing. It’s just a thing that receives a message and act’s on it; it’s very similar to an object, it can receive a message through constructor parameters and act on data through methods (only diff is actors don’t share internal state)
Actors have an address so they can receive messages; these messages are stored in mail boxes (queues) and they can also create other actors. They really behave like threads, right? Yes and no. Actors don’t map one-on-one to threads, several actors could belong to an execution context on one thread.
Writing a simple I/O bound group of actors to fetch stats off an API (and write to mySQL) github project here
Gevent, is an async library that allows you to spin up light-weight green threads called greenlets
Using a Greenlet instance we can create an actor such as:
import gevent
from gevent.queue import Queueclass Actor(gevent.Greenlet):
def __init__(self):
self.inbox = Queue() # FIFO
gevent.Greenlet.__init__(self)
def receive(self):
raise NotImplemented() def _run(self):
self.running = True
while self.running:
message = self.inbox.get()
self.receive(message)
At the most basic level this is really what an actor is. Well, there’s lots of parts that are missing; how should we manage fault tolerance? Do we have a supervisor strategy? Who’s monitoring that process? We won’t cover that here. Warning; this is an event-based actor.
So, if we have a service maybe pullStats we could inherit from the base class baseActor; and implement an actor service that will fetch data from an API.
Here’s an example of the base actor service that sub-classes the Actor class. the base service build’s a URL and processes the response by passing a message to itself depending on the group of response. (this API returns a list of Users, and at processUser response we handle a specific user)
import gevent
import settings
import requestsimport gevent.monkey
gevent.monkey.patch_socket()
requests.adapters.DEFAULT_RETRIES = 5from logger import loggerfrom actor import Actor
from urllib import urlencode
from requests.exceptions import Timeoutclass BaseService(Actor):
base_url = settings.base_url
date = ''def build_url(self, username = None):
if username is None:
return self.base_url + self.get_view()
else:
f = { 'username': username }
return self.base_url + self.get_view() + "&" + urlencode(f)def fetch_(self, url):
headers = { 'apikey': settings.api_key }try:
# reporting to snoop - re-run
r = requests.get(url, headers = headers, timeout=120)
resp = r.json()
return resp
except Timeout as e:
logger.error('timeout exception as {}'.format(e))
raise e
except Exception as ex:
logger.error('other exception as {}'.format(ex))
raise exdef receive(self, message):
username = message.get('username')
self.date = message.get('date')if username is not None:
thread = gevent.spawn( self.fetch_, self.build_url(username = username) )
thread.join()
gevent.sleep(0)return self.processUserResponse( thread.value, username )
else:
thread = gevent.spawn( self.fetch_, self.build_url(username = None) )
thread.join()
gevent.sleep(0)return self.processResponse( thread.value )def get_view(self):
raise NotImplementeddef processResponse(self, response):
raise NotImplementeddef processUserResponse(self, response):
raise NotImplemented
For any services we might want to write; we extend the base behavior and implement the NotImplemented methods.
monkey patching the socket allows us to make the standard socket co-operative by replacing it with gevents sockets.
After creating a greenlet we join it so as to wait for the greenlet to finish, then we sleep to switch context. Since the actors are event based, this is great for I/O tasks. About CPU bound tasks; as long as we don’t block too long, we’re fine.
So, actors can receive messages, but how do we send one?
Remember the queue on the actor definition? We’ll just call the instance of the service and put an element in the queue; by calling the put method we’ve added a letter (message) to the actors mailbox.
statsService = PullStats()statsService.start()
statsService.inbox.put(job)# we then join the services/ service
gevent.joinall([ service0, service1, service2])
Clone the project, play around with it; I got very interesting results. Using the actor model made this really easy to reason about, and much easier to implement.