Serving HTTP with continuations, or how to make Twisted and Stackless Python play nice

In my neverending quest for The One True Way To Serve Web Applications I've recently become interested in the concept of continuations and how they can make single-process, asynchronous programming as easy and intuitive as simple blocking calls. A recent example is NeverBlock, built on Ruby 1.9 fibers.

Getting really close: plain Python >=2.5 and Twisted

Having already some experience with Twisted I decided to try to (ab)use Python 2.5 generators to implement continuations. The basic idea was to wrap the request logic inside a generator, and have it yield out the Twisted referred whenever it made any I/O operation: (all the boilerplate Twisted code has been left out)

def render_GET(self, request):
    self.continuation = self.handle(request)
    # self.continuation now contains the generator object
    # advance it to at least the first yield
    df = self.continuation.next()
    # resume the generator on the df callback
    df.addCallback(self.resumer)
    return server.NOT_DONE_YET

def handle(self, request):
    result = yield dbPool.runQuery('SELECT * FROM test')
    request.write(result)

def resumer(self, result):
    self.continuation.send(result)

This worked fine. It was also useless for any practical purpose since generators only work from inside a single call frame, so it was impossible to add any complex code to the handle() method. Any and all request processing that potentially made any I/O operation (and thus yielded the deferred) would have need to reside directly in the body of the method.

This is where anybody else would just use threads and call it a day. Not me of course. I wanted to follow the Twisted philosophy of never blocking and handling thousands of request on a single process/thread, but deferreds force the programmer to use callbacks and chop up your logic in many methods.

Enter Stackless Python

Stackless Python is a partial reimplentation of Python that adds a continuation library to the language. Its principle is very simple: you can have any arbitrary amount of in-process, very lightweight, cooperative threads, called tasklets. In the main tasklet (the one your app starts on) you call stackless.run() as the main loop of your app. And inside each tasklet you call stackless.schedule() when you want to give up the CPU to the runloop. A simple inter-tasklet communication primitive rounds up the library (stackless.channel).

My idea then was to create a new tasklet for every request Twisted received, and have them to be scheduled() away when they found themselves waiting on the completion of a deferred. The deferred callback would restart the tasklet, somehow. It turns out there was already an entire collection of examples combining Twisted and Stackless in Google Code.

In particular the TwistedWebserverThreaded.py example was basically what I wanted, but it highlighted a problem: both Stackless and Twisted make use of a runloop, and both of them assume they are the runloop. The example made use of a separate thread to run the Twisted runloop. I decided there was a better way, if giving up on a Stackless feature was an option.

Giving up on preemption

Stackless supports preemption by the way of allowing its runloop to be called for a certain amount of bytecodes. This allows it to reschedule a different tasklet, but it also introduces all the annoying problems associated with preemptive code, without any of the benefits (no true multicore execution). I was more than happy to give up on it since all I wanted was continuations.

And so I had an idea: make the Twisted runloop run for as long as possible, but make sure it always calls schedule() on the completion of any I/O. Since all I asked from Stackless was multiplexing I/O in a single process, and not threading emulation, this was perfectly fine for me.

Example

I've implemented an example on how to integrate Twisted and Stackless in a HTTP server. Every request spawns a tasklet, and it waits on a channel whenever it has to wait for the completion of a deferred. The callback of the deferred will signal that channel for completion and send the deferred result. Since the callback execution happens in the context of the Twisted runloop tasklet it gives us the opportunity to call schedule() inside it, thus giving the request tasklets the chance to run again.

In the example I present a nonsensical request processor that does 3 asynchronous SQL requests, and from time to time 1 HTTP request and a very simple inter-tasklet communication in the form of a very lousy chat.

Base controller

class ResumableController():

    def tasklet(self):
        self.return_channel = NWChannel()
        self.me = stackless.getcurrent()
        self.stamp = random.randint(0, 1000000000)
        self.handle()

The chat receiver is just a blocking call into a Stackless channel.

# chat methods
def waitForChat(self):
    return chatChannel.receive()

The database uses the standard Twisted Enterprise asynchronous database pool.

# db methods
def waitForSQL(self, sql):
    d = dbPool.runQuery(sql)
    return self.waitForDeferred(d, self.succesfulSQLDeferred)

def succesfulSQLDeferred(self, result):
    r = pickle.dumps(result)
    self.return_channel.send_nowait(r)
    self.reschedule()

The HTTP requester uses the Twisted HTTP client.

# http client methods
def waitForHTTPClient(self, url):
    d = client.getPage(url)
    return self.waitForDeferred(d, self.succesfulHTTPDeferred)

def succesfulHTTPDeferred(self, result):
    r = html.PRE(result)
    self.return_channel.send_nowait(r)
    self.reschedule()

Common methods, including the common handling for deferreds.

def waitForDeferred(self, d, success):
    d.addCallback(success)
    d.addErrback(self.errorDeferred)
    return self.waitForChannel()

def waitForChannel(self):
    return self.return_channel.receive()

def errorDeferred(self, fault):
    self.return_channel.send_exception_nowait(fault.type, fault.value)
    self.reschedule()

def reschedule(self):
    if stackless.getcurrent() != self.me:
        stackless.schedule()

Twisted resource

The reactor.callLater(0.0, stackless.schedule) is the equivalent of the deferreds giving up the control to the Stackless runloop, only in a different way since we need to return first from the render_GET method.

class ClientRequestHandler(resource.Resource):
    isLeaf = True

    def __init__(self):
        resource.Resource.__init__(self)

    def render_GET(self, request):
        request.write('request arrives<br>')
        c = ExampleController(request)
        stackless.tasklet(c.tasklet)()
        request.write('still in the reactor tasklet<br>')
        reactor.callLater(0.0, stackless.schedule)
        return server.NOT_DONE_YET

Example controller

handle is where all the fun is. All the wait* methods block, but they also inmediately give up the control to the Stackless runloop, which ultimately gives control to the Twisted runloop. When a deferred calls back the control is given up again from Twisted to Stackless, which wakes up the request tasklet. The result is a working continuation based HTTP server in Python, with full I/O-based scheduling!

class ExampleController(ResumableController):

    def __init__(self, request):
        self.request = request

    def handle(self):
        self.request.write('hi, we are now inside the request tasklet<br>')

        # replace this query with something valid for your DB
        sql = 'select * from test limit 10'

        self.request.write('<br><br>QUERY 1:')
        self.request.write(self.waitForSQL(sql))

        if (random.randint(0, 9) <= 1):
            self.request.write('<br><br>HTTP:')
            self.request.write(self.waitForHTTPClient('http://www.google.com/'))

        self.request.write('<br><br>QUERY 2:')
        self.request.write(self.waitForSQL(sql))

        if (random.randint(0, 9) <= 1):
            self.request.write('<br><br>CHAT PRODUCER:')
            self.request.write('sending to {0:d} clients'.format(chatChannel.balance))
            chatChannel.send('hi from {0:d} (and to other {1:d} chaps)'.format(self.stamp, chatChannel.balance))

        if (random.randint(0, 9) <= 2):
            self.request.write('<br><br>CHAT CONSUMER:')
            self.request.write(self.waitForChat())

        self.request.write('<br><br>QUERY 3:')
        self.request.write(self.waitForSQL(sql))

        self.request.finish()

blogroll

social