Zürich, 2010-11-29
Coroutines are also useful for just a dozen connections:
View the full code listings for this presentation here.
line_count_ary = [0] def Ticker(): i = 0 while True: i += 1 print 'Tick %d with %d lines.' % (i, line_count_ary[0]) Sleep(3) def Repeater(): print 'Hi, please type and press Enter.' while True: line = ReadLine() if not line: break line_count_ary[0] += 1 print 'You typed %r.' % line print 'End of input.' AddTask(Repeater) Ticker()
yield
)
import greenlet
)
Can't have more than a few thousand threads because:
Disadvantage: the control flow of your code gets obfuscated.
line_count_ary = [0] def Ticker(): i_ary = [0] def Callback(): # This was a while loop with a pre-test. i_ary[0] += 1 print 'Tick %d with %d lines.' % (i_ary[0], line_count_ary[0]) Sleep(3, Callback) Callback() def Repeater(): print 'Hi, please type and press Enter.' def Callback(line): # This was a while loop with a post-test. if line: print 'You typed %r.' % line line_count_ary[0] += 1 ReadLine(Callback) else: print 'End of input.' ReadLine(Callback) SetNonBlocking(STDIN_FD); AddTask(Ticker); AddTask(Repeater) MainLoop()
var line_count = 0 function ticker() { var i = 0 function callback() { i += 1 console.log('Tick ' + i + ' with ' + line_count + ' lines.') setTimeout(callback, 3000) } callback() } function repeater() { var stdin = process.openStdin() console.log('Hi, please type and press Enter.') function callback(line) { if (line.length) { ++line_count console.log('You typed ' + util.inspect(line) + '.') readLine(stdin, callback) } else { console.log('End of input.') } } readLine(stdin, callback) } process.nextTick(ticker) process.nextTick(repeater)
function readLine(readStream, callback) { if (!('buf' in readStream)) readStream.buf = [] function onData(data) { data = data.toString('UTF-8') readStream.buf.push(data) if (data.indexOf('\n') < 0) return readStream.buf = readStream.buf.join('').split('\n').reverse() while (readStream.buf.length > 1) callback(readStream.buf.pop() + '\n') readStream.removeListener('data', onData) readStream.removeListener('end', onEnd) } function onEnd() { readStream.removeListener('data', onData) readStream.removeListener('end', onEnd) var data = readStream.buf.join('') readStream.buf = null callback(data) } readStream.on('data', onData) readStream.on('end', onEnd) }
select()
, poll()
, kqueue()
,
epoll_wait()
), and continue with the corresponding
coroutine.
Get nice code control flow (just like with threads) and good performance.
line_count_ary = [0] def Ticker(): i = 0 while True: i += 1 print 'Tick %d with %d lines.' % (i, line_count_ary[0]) Sleep(3) def Repeater(): print 'Hi, please type and press Enter.' while True: line = ReadLine() if not line: break line_count_ary[0] += 1 print 'You typed %r.' % line print 'End of input.' AddTask(Repeater) Ticker()
See feature comparisons of these here and here.
(C)Python has the global interpreter lock, so 1 Python process can use at most 1 CPU core for running Python code.
def SquaresUpto(n): # A generator. i = 1 while i * i <= n: yield i * i i += 1 def CubesUpto(n): # Another generator. i = 1 while i * i * i <= n: yield i * i * i i += 1 def Double(iter): # Another generator. for i in iter: yield 2 * i my_iter = SquaresUpto(100) # An iterator from an generator. for i in my_iter: print i #: [2, 16, 54, 128] print list(Double(CubesUpto(100)))
def Merge(iter1, iter2): i1 = i2 = None while True: if iter1 is not None and i1 is None: try: i1 = iter1.next() # Run iter1 until it yields something (i1). except StopIteration: iter1 = None if iter2 is not None and i2 is None: try: i2 = iter2.next() except StopIteration: iter2 = None if i1 is None and i2 is None: break # Can't `return' from a itererator. elif i2 is None or i1 < i2: yield i1 i1 = None else: yield i2 i2 = None #: [1, 1, 4, 8, 9, 16, 25, 27, 36, 49, 64, 64, 81, 100] print list(Merge(SquaresUpto(100), CubesUpto(100)))
.next()
(good).
ReadHttpRequest()
calls
readline()
, which calls
recv()
, which would have to wait for input, so the whole
traceback (all 3 stack frames) have to be suspended.
recv()
is done, call
readline_iter.next(new_data)
.
new_data = yield f.readline()
line_count_ary = [0] def Ticker(): # A generator. i = 0 while True: i += 1 print 'Tick %d with %d lines.' % (i, line_count_ary[0]) yield Sleep(3) def Repeater(): # A generator. print 'Hi, please type and press Enter.' while True: line = yield ReadLine() if not line: break line_count_ary[0] += 1 print 'You typed %r.' % line print 'End of input.' SetNonBlocking(STDIN_FD) AddTask(Ticker) AddTask(Repeater) MainLoop()
stdin_read = [] def ReadLine(): # A generator. while True: try: got = os.read(STDIN_FD, 1024) if not got or '\n' in got: break stdin_read.append(got) except OSError, e: if e[0] != errno.EAGAIN: raise yield WaitForEvent({'read': STDIN_FD}) if got: i = got.find('\n') + 1 stdin_read.append(got[:i]) line = ''.join(stdin_read) del stdin_read[:] if i < len(got): stdin_read.append(got[i:]) else: # EOF on stdin line = ''.join(stdin_read) del stdin_read[:] raise StopIteration(line) # Disadvantage: can't use `return'.
Advantages for using generators to emulate coroutines:
Disadvantages:
yield
are where not to
raise StopIteration(...)
instead of return
How is execution context switching from one coroutine to another implemented in Python?
locals()
dict and the bytecode
instruction pointer
The select()
Unix system call (and its Windows equivalents):
read()
and write()
and the non-blocking I/O mode)
to make progress on multiple I/O channels in a single thread
poll()
,
kqueue()
, epoll_wait()
)
select()
(or its alternatives) to figure out which
blocked coroutines become ready to run
file
and socket
classes (vs callback-based I/O libraries, which
provide a different interface)
To make the I/O library easy to use for the programmer:
file
,
socket
, SSL.SSLSocket
etc. with proper error
reporting and timeout handling
.rst
format, and upload it
to the web in HTML
To increase performance of the coroutine-based I/O library:
select()
with a system-specific alternative
(epoll_wait()
on Linux, kqueue()
on FreeBSD or
the Mac etc.)
readline
) in C (Pyrex)
?