The actor model in python with gevent

What’s so amazing about actors and why you should care.

Ian Juma
4 min readAug 18, 2016

tl;dr

Actors on a thread

Writing concurrent systems is hard. A few people want to reason about low-level concurrency control primitives such as conditional locks, mutexes and semaphores; higher level mechanisms such as the actor model present lots of benefits from supervision to recovery of services/ processes. This is a sample project on using the event based actor model in python designed using gevent.

An actor is a high level concurrency primitive that allows you to model concurrent computations using entities that interact through message passing. It’s just a thing that receives a message and act’s on it; it’s very similar to an object, it can receive a message through constructor parameters and act on data through methods (only diff is actors don’t share internal state)

Actors have an address so they can receive messages; these messages are stored in mail boxes (queues) and they can also create other actors. They really behave like threads, right? Yes and no. Actors don’t map one-on-one to threads, several actors could belong to an execution context on one thread.

Writing a simple I/O bound group of actors to fetch stats off an API (and write to mySQL) github project here

Gevent, is an async library that allows you to spin up light-weight green threads called greenlets

Using a Greenlet instance we can create an actor such as:

import gevent
from gevent.queue import Queue
class Actor(gevent.Greenlet):
def __init__(self):
self.inbox = Queue() # FIFO
gevent.Greenlet.__init__(self)
def receive(self):
raise NotImplemented()
def _run(self):
self.running = True
while self.running:
message = self.inbox.get()
self.receive(message)

At the most basic level this is really what an actor is. Well, there’s lots of parts that are missing; how should we manage fault tolerance? Do we have a supervisor strategy? Who’s monitoring that process? We won’t cover that here. Warning; this is an event-based actor.

So, if we have a service maybe pullStats we could inherit from the base class baseActor; and implement an actor service that will fetch data from an API.

Here’s an example of the base actor service that sub-classes the Actor class. the base service build’s a URL and processes the response by passing a message to itself depending on the group of response. (this API returns a list of Users, and at processUser response we handle a specific user)

import gevent
import settings
import requests
import gevent.monkey
gevent.monkey.patch_socket()
requests.adapters.DEFAULT_RETRIES = 5
from logger import loggerfrom actor import Actor
from urllib import urlencode
from requests.exceptions import Timeout
class BaseService(Actor):
base_url = settings.base_url
date = ''
def build_url(self, username = None):
if username is None:
return self.base_url + self.get_view()
else:
f = { 'username': username }
return self.base_url + self.get_view() + "&" + urlencode(f)
def fetch_(self, url):
headers = { 'apikey': settings.api_key }
try:
# reporting to snoop - re-run
r = requests.get(url, headers = headers, timeout=120)
resp = r.json()
return resp
except Timeout as e:
logger.error('timeout exception as {}'.format(e))
raise e
except Exception as ex:
logger.error('other exception as {}'.format(ex))
raise ex
def receive(self, message):
username = message.get('username')
self.date = message.get('date')
if username is not None:
thread = gevent.spawn( self.fetch_, self.build_url(username = username) )
thread.join()
gevent.sleep(0)
return self.processUserResponse( thread.value, username )
else:
thread = gevent.spawn( self.fetch_, self.build_url(username = None) )
thread.join()
gevent.sleep(0)
return self.processResponse( thread.value )def get_view(self):
raise NotImplemented
def processResponse(self, response):
raise NotImplemented
def processUserResponse(self, response):
raise NotImplemented

For any services we might want to write; we extend the base behavior and implement the NotImplemented methods.

monkey patching the socket allows us to make the standard socket co-operative by replacing it with gevents sockets.

After creating a greenlet we join it so as to wait for the greenlet to finish, then we sleep to switch context. Since the actors are event based, this is great for I/O tasks. About CPU bound tasks; as long as we don’t block too long, we’re fine.

So, actors can receive messages, but how do we send one?

Remember the queue on the actor definition? We’ll just call the instance of the service and put an element in the queue; by calling the put method we’ve added a letter (message) to the actors mailbox.

statsService = PullStats()statsService.start()
statsService.inbox.put(job)
# we then join the services/ service
gevent.joinall([ service0, service1, service2])

Clone the project, play around with it; I got very interesting results. Using the actor model made this really easy to reason about, and much easier to implement.

--

--

Ian Juma

Scala, Pythonista and some Go lang- Interested in distributed systems, concurrency and functional languages. https://t.co/bAIjRS08nK