Simple HTTP PubSub server with Twisted

Jabber and its XMPP protocol pioneered the basic idea behind most contemporary HTTP push implementations. They call it BOSH:

The technique employed by this protocol achieves both low latency and low bandwidth consumption by encouraging the connection manager not to respond to a request until it actually has data to send to the client. As soon as the client receives a response from the connection manager it sends another request, thereby ensuring that the connection manager is (almost) always holding a request that it can use to “push” data to the client.

Basically it goes like this:

  1. The client makes a HTTP request to the server
  2. The server does not send any response
  3. When the server has asynchronous data to send to the client, it looks for and open, pending connection from step 2, and sends the data as the request response
  4. The client receives the data and immediately starts another request like in step 1

This is not the task for Apache

The traditional fork-and-serve model for HTTP servers does not work with the BOSH pattern. You can force it into submission by using shared memory or switching to threads with critical sections. But in the end you are using a process and/or a thread for every push client connection. And those connections are going to be open and silent for 5 minutes, tying up with them the entire child process that was forked to handle them. In those same 5 minutes that thread or process could have handled thousands of normal, short-lived requests.

The right tool for the job

Single-process, single-threaded event servers are all on the rage nowadays. They offer a processing model based on simultaneously handling thousands of request in a single process, using the very efficient asynchronous I/O primitives that most current operating systems offer.

But a BOSH-like server has to go a step beyond. The actual logic of the server, not just its socket plumbing, has to be asynchronous and event-based. For this reason you just cannot put a FCGI PHP spawner behind a nginx frontend and expect to scale beyond 10 push clients. You won’t DoS the nginx process with your push connections, but the FCGI processes are sill running under the 1 connection = 1 process paradigm.

For this reason I decided to do my own server, helped by the many asynchronous programming libraries and frameworks available. I discarded a C, libevent based implementation early on an went on to look into EventMachine. It’s a very focused and lean Ruby framework implemented on top of libevent, and it offers standard servers and clients for some protocols, like HTTP. In the end I went with Twisted. I’ve wanted to try Python for a long time and Twisted looks like a very stable and well tested framework.

The PubSub paradigm

For this small server I used a very simple model for the push server. The server is independent from the application model and it only concerns itself with a single task: maintaining a list of open client requests, sorted by a channel ID. This ID is opaque to the push server. Clients ask the push server to be put on the waiting list of a certain channel, identified by its ID. And the application server asks the push server to wake up the waiting clients of a certain channel, identified by its ID.

This is a simple interpretation of the Publish/Subscribe paradigm, and in this implementation the server is not going to care about queuing messages, complex grouping of clients, and all the other features it would need to be a robust, generic PubSub system like Bayeux.

Implementation

The server has 2 open ports:

  • Port 8080: open to the internet. This is the port clients connect to subscribe and wait for a channel notification.
  • Port 9100: open only to localhost. This is the port that the application server, which is implemented in Apache + PHP, connects to deliver a notification to a channel. Connections to this port are fast and short-lived.

For testing and developing the server I modified Peak Notes to give it a Comet-like behavior. Peak Notes is already a full Ajax application, with a full client model, so it was just a matter of requesting a new snapshot of the current note board when a notification arrived. A more intelligent and efficient implementation would return the data that actually changed in the notification request.

The PHP application server code was very easily modified: every operation that changed the database in any way also sent a notification to the appropriate channel, by using the notification server on port 9100. Channels ID are just a salted MD5 of the user ID, and this ID is also transferred to the client during login. A shared note board would need some other naming scheme for the channel IDs but those are currently unsupported.

The client resources, code and Ajax requests are all served from notes.olivepeak.com on port 80, but the PubSub server was on port 8080. This posed an interesting cross-domain problem typical in Javascript. It was solved by using a JSONP wrapper for the push connections.

simple-pubsub.py

This is the Twisted HTTP resource that handles client subscriptions. On an incoming GET it checks for a fixed path in the URI, /subscriptions/channel/, and tries to extract the channel ID from it. On success it adds the request to the channel list and returns server.NOT_DONE_YET. This tells Twisted to not return anything to the client, but leave the connection open for later processing.

class ClientRequester(resource.Resource):
	isLeaf = True

	def __init__(self, i):
		resource.Resource.__init__(self)
		self.internal = i 

	def render_GET(self, request):
		channel = getChannelID('/subscriptions/channel/', request.uri)
		if (channel == False):
			return '{ "status" : "error" }'
	
		self.internal.addClientDelegate(channel, ClientDelegate(request))

		return server.NOT_DONE_YET

ClientDelegate wraps the client connection and has the methods for sending back the notification and closing the connection. It also wraps the notification JSON in the client-provided callback function.

class ClientDelegate:

	def __init__(self, r):
		self.request = r
		# JSONP stuff, check errors bla bla bla
		self.callbackName = r.args['callback'][0]

	def end(self, status, revision):
		try:
			rev = ''
			if revision != False:
				rev = ', "revision" : "' + revision + '"'
			self.request.write(self.callbackName + '({ "status" : "' + status + '"' + rev + '})')
			self.request.finish()
		except:
			print "Client lost patience"

	def notify(self, revision):
		self.end('changed', revision)

NotificationRequester is the Twisted HTTP resource that handles the application server notification requests. It also expects a channel ID after the /notifications/channel/ part of the URI. There’s also support for sending a revision ID to the clients. This is an extra bit of information specific to Peak Notes, and it is used for optimizing and skipping some model synchronizations. A more featured, fine-grained delivery would be better.

class NotificationRequester(resource.Resource):
	isLeaf = True

	def __init__(self):
		resource.Resource.__init__(self)
		self.clients = { }

	def render_GET(self, request):
		channel = getChannelID('/notifications/channel/', request.uri)
		revision = request.args['revision'][0]

		if (channel == False):
			return '{ "status" : "error" }'

		print "Waking up clients on channel " + channel

		if (channel in self.clients):
			oldL = self.clients[channel]
			self.clients[channel] = [ ]
			for client in oldL:
				client.notify(revision)

		return '{ "status" : "ok" }'

	def addClientDelegate(self, channel, delegate):
		if (channel in self.clients):
			self.clients[channel].append(delegate)  
		else:
			self.clients[channel] = [ delegate ]

		print "Registered new client into channel " + channel
		print "Current clients:"
		print self.clients

You can find the full source of the server here. There is some additional code to handle server-side cleanups of client timeouts and the twistd initialization code. To run it invoke twistd as twistd -noy simple-pubsub.py. Simple testing on localhost with a browser is possible. For example, open a browser window with the following URL:

http://localhost:8080/subscriptions/channel/9cdfb439c7876e703e307864c9167a15?callback=test

The browser will try to load it, awaiting for data to arrive. Then in a second browser window open:

http://localhost:9100/notifications/channel/9cdfb439c7876e703e307864c9167a15?revision=100

This request will be immediately served, and the first one will finish its loading and show a Javascript snipet: test({ "status" : "changed", "revision" : "100"})