13371 concurrent TCP connections
in Python with coroutines
Péter Szabó, pts@google.com

Zürich, 2010-11-29

Who needs 13371 concurrent connections?

Coroutines are also useful for just a dozen connections:

How to run Ticker and Repeater in parallel?

View the full code listings for this presentation here.

line_count_ary = [0]
def Ticker():
  i = 0
  while True:
    i += 1
    print 'Tick %d with %d lines.' % (i, line_count_ary[0])
    Sleep(3)
def Repeater():
  print 'Hi, please type and press Enter.'
  while True:
    line = ReadLine()
    if not line:
      break
    line_count_ary[0] += 1
    print 'You typed %r.' % line
  print 'End of input.'

AddTask(Repeater)
Ticker()

Entities executing in parallel

Synchronous I/O with threads

  1. Create and start a thread for each I/O operation you want to run in parallel.
  2. Do synchronous, blocking I/O operations, i.e. let the thread wait (block) until it can read or write. Other threads can make progress in the meantime.

Can't have more than a few thousand threads because:

Asynchronous I/O with callbacks

  1. Use only a single thread or one thread per CPU core available.
  2. Do asynchronous I/O operations: for each I/O operation, supply your callback function, to be called when the operation completes or can be completed without waiting.
  3. Use an I/O framework (e.g. libevent, libev or glib) to manage your pending callbacks.

Disadvantage: the control flow of your code gets obfuscated.

I/O with callbacks hides the control flow

line_count_ary = [0]
def Ticker():
  i_ary = [0]
  def Callback():  # This was a while loop with a pre-test.
    i_ary[0] += 1
    print 'Tick %d with %d lines.' % (i_ary[0], line_count_ary[0])
    Sleep(3, Callback)
  Callback()
def Repeater():
  print 'Hi, please type and press Enter.'
  def Callback(line):  # This was a while loop with a post-test.
    if line:
      print 'You typed %r.' % line
      line_count_ary[0] += 1
      ReadLine(Callback)
    else:
      print 'End of input.'
  ReadLine(Callback)

SetNonBlocking(STDIN_FD); AddTask(Ticker); AddTask(Repeater)
MainLoop()

The same in JavaScript with node.js

var line_count = 0
function ticker() {
  var i = 0
  function callback() {
    i += 1
    console.log('Tick ' + i + ' with ' + line_count + ' lines.')
    setTimeout(callback, 3000)
  }
  callback()
}
function repeater() {
  var stdin = process.openStdin()
  console.log('Hi, please type and press Enter.')
  function callback(line) {
    if (line.length) {
      ++line_count
      console.log('You typed ' + util.inspect(line) + '.')
      readLine(stdin, callback)
    } else {
      console.log('End of input.')
    }
  }
  readLine(stdin, callback)
}
process.nextTick(ticker)
process.nextTick(repeater)

But we have to implement input buffering

function readLine(readStream, callback) {
  if (!('buf' in readStream))
    readStream.buf = []
  function onData(data) {
    data = data.toString('UTF-8')
    readStream.buf.push(data)
    if (data.indexOf('\n') < 0)
      return
    readStream.buf = readStream.buf.join('').split('\n').reverse()
    while (readStream.buf.length > 1)
      callback(readStream.buf.pop() + '\n')
    readStream.removeListener('data', onData)
    readStream.removeListener('end', onEnd)
  }
  function onEnd() {
    readStream.removeListener('data', onData)
    readStream.removeListener('end', onEnd)
    var data = readStream.buf.join('')
    readStream.buf = null
    callback(data)
  }
  readStream.on('data', onData)
  readStream.on('end', onEnd)
}

What is a coroutine?

Most threads are waiting

AcceptConnections()
nbsocket.accept()
waiting for fd 3 becoming readable
HandleHttpProxyConnection()
ReadHttpRequest()
nbfile.readline()
nbsocket.recv()
waiting for fd 4 becoming readable
HandleHttpProxyConnection()
WriteHttpResponseHeader()
nbfile.write()
nbsocket.send()
waiting for fd 5 becoming writable
CleanCachePeriodically()
sleep()
sleeping until timestamp 123456789.01 is reached

Asynchronous I/O with coroutines

  1. Create a coroutine for each task (e.g. active incoming HTTP request, timeout). Create millions if needed.
  2. Put all filehandles to non-blocking mode.
  3. Pick a coroutine, run it in the current thread until it would block on I/O. Repeat for other non-blocked coroutines.
  4. When all coroutines would block on I/O, wait for I/O progress (with select(), poll(), kqueue(), epoll_wait()), and continue with the corresponding coroutine.

Get nice code control flow (just like with threads) and good performance.

Same application code, with coroutines

line_count_ary = [0]

def Ticker():
  i = 0
  while True:
    i += 1
    print 'Tick %d with %d lines.' % (i, line_count_ary[0])
    Sleep(3)

def Repeater():
  print 'Hi, please type and press Enter.'
  while True:
    line = ReadLine()
    if not line:
      break
    line_count_ary[0] += 1
    print 'You typed %r.' % line
  print 'End of input.'

AddTask(Repeater)
Ticker()

I/O libraries for Python

See feature comparisons of these here and here.

(C)Python has the global interpreter lock, so 1 Python process can use at most 1 CPU core for running Python code.

Generators and iterators in Python

def SquaresUpto(n):  # A generator.
  i = 1
  while i * i <= n:
    yield i * i
    i += 1

def CubesUpto(n):  # Another generator.
  i = 1
  while i * i * i <= n:
    yield i * i * i
    i += 1

def Double(iter):  # Another generator.
  for i in iter:
    yield 2 * i

my_iter = SquaresUpto(100)  # An iterator from an generator.
for i in my_iter:
  print i
#: [2, 16, 54, 128]
print list(Double(CubesUpto(100)))

Advanced generator example

def Merge(iter1, iter2):
  i1 = i2 = None
  while True:
    if iter1 is not None and i1 is None:
      try:
        i1 = iter1.next()  # Run iter1 until it yields something (i1).
      except StopIteration:
        iter1 = None
    if iter2 is not None and i2 is None:
      try:
        i2 = iter2.next()
      except StopIteration:
        iter2 = None
    if i1 is None and i2 is None:
      break  # Can't `return' from a itererator.
    elif i2 is None or i1 < i2:
      yield i1
      i1 = None
    else:
      yield i2
      i2 = None

#: [1, 1, 4, 8, 9, 16, 25, 27, 36, 49, 64, 64, 81, 100]
print list(Merge(SquaresUpto(100), CubesUpto(100)))

Emulating coroutines with generators

Application code with generators

line_count_ary = [0]
def Ticker():  # A generator.
  i = 0
  while True:
    i += 1
    print 'Tick %d with %d lines.' % (i, line_count_ary[0])
    yield Sleep(3)
def Repeater():  # A generator.
  print 'Hi, please type and press Enter.'
  while True:
    line = yield ReadLine()
    if not line:
      break
    line_count_ary[0] += 1
    print 'You typed %r.' % line
  print 'End of input.'
SetNonBlocking(STDIN_FD)
AddTask(Ticker)
AddTask(Repeater)
MainLoop()

Input buffering with generators

stdin_read = []
def ReadLine():  # A generator.
  while True:
    try:
      got = os.read(STDIN_FD, 1024)
      if not got or '\n' in got: break
      stdin_read.append(got)
    except OSError, e:
      if e[0] != errno.EAGAIN: raise
      yield WaitForEvent({'read': STDIN_FD})
  if got:
    i = got.find('\n') + 1
    stdin_read.append(got[:i])
    line = ''.join(stdin_read)
    del stdin_read[:]
    if i < len(got):
      stdin_read.append(got[i:])
  else:  # EOF on stdin
    line = ''.join(stdin_read)
    del stdin_read[:]
  raise StopIteration(line)  # Disadvantage: can't use `return'.

Why generators?

Advantages for using generators to emulate coroutines:

Disadvantages:

Context switching

How is execution context switching from one coroutine to another implemented in Python?

Waiting for I/O progress opportunity

The select() Unix system call (and its Windows equivalents):

A coroutine-based I/O library

Usability

To make the I/O library easy to use for the programmer:

Performance

To increase performance of the coroutine-based I/O library:

?