Eventlet/Documentation

From Second Life Wiki
Jump to navigation Jump to search

Eventlet Overview

Note: This documentation is still under development and refers to the experimental mercurial branch of eventlet. This branch should stabilize soon and be available as a downloadable tarball.

Eventlet is an easy to use networking library written in Python. Eventlet is capable of supporting a large number of sockets per process by using nonblocking I/O, cooperatively multiplexing them through a single main loop. This approach allows for the implementation of massively concurrent servers which would require prohibitive amounts of memory under a traditional preemptive multi-threading or multi-process model. However, nonblocking I/O libraries such as asyncore or twisted can be difficult for programmers to use because they require the use of continuation passing style. This means code must be broken up into functions which initiate operations, and functions which will be called when the operation is complete, known as "callbacks."

Eventlet avoids these difficulties by using a coroutine library called greenlet. Coroutines allow Eventlet to cooperatively reenter the main loop whenever an I/O operation is initiated, switching back to the original coroutine only when the operating system indicates the operation has completed. This means code written using Eventlet looks just like code written using the traditional multi-threading or multi-process model, while avoiding the locking problems associated with preemption and requiring very little memory.

from eventlet import api

participants = [ ]

def read_chat_forever(writer, reader):
    line = reader.readline()
    while line:
        print "Chat:", line.strip()
        for p in participants:
            if p is not writer: # Don’t echo
                p.write(line)
        line = reader.readline()
    participants.remove(writer)
    print "Participant left chat."

try:
    print "ChatServer starting up on port 3000"
    server = api.tcp_listener(('0.0.0.0', 3000))
    while True:
        new_connection, address = server.accept()
        print "Participant joined chat."
        new_writer = new_connection.makefile('w')
        participants.append(new_writer)
        api.spawn(read_chat_forever, new_writer, new_connection.makefile('r'))
except KeyboardInterrupt:
    print "\nChatServer exiting."

Listing 1: chatserver.py Example Chat Server

Let's look at a simple example: a chat server.

The server shown in Listing 1 is very easy to understand. If it was written using Python’s threading module instead of eventlet, the control flow and code layout would be exactly the same. The call to api.tcp_listener would be replaced with the appropriate calls to Python’s built-in socket module, and the call to api.spawn would be replaced with the appropriate call to the thread module. However, if implemented using the thread module, each new connection would require the operating system to allocate another 8 MB stack, meaning this simple program would consume all of the RAM on a machine with 1 GB of memory with only 128 users connected, without even taking into account memory used by any objects on the heap! Using eventlet, this simple program should be able to accommodate thousands and thousands of simultaneous users, consuming very little RAM and very little CPU.

Figure 1: chatserver.py Flowchart

What sort of servers would require concurrency like this? A typical Web server might measure traffic on the order of 10 requests per second; at any given moment, the server might only have a handful of HTTP connections open simultaneously. However, a chat server, instant messenger server, or multiplayer game server will need to maintain one connection per connected user to be able to send messages to them as other users chat or make moves in the game. Also, as advanced Web development techniques such as Ajax, Ajax polling, and Comet (the “Long Poll”) become more popular, Web servers will need to be able to deal with many more simultaneous requests. In fact, since the Comet technique involves the client making a new request as soon as the server closes an old one, a Web server servicing Comet clients has the same characteristics as a chat or game server: one connection per connected user.

Basic usage

Most of the APIs required for basic eventlet usage are exported by the eventlet.api module. We have already seen two of these in listing one: api.tcp_listener, for creating a TCP server socket, and api.spawn, for spawning a new coroutine and executing multiple blocks of code conceptually in parallel. There are only a few more basic APIs: connect_tcp for creating a TCP client socket, ssl_listener and connect_ssl for creating encrypted SSL sockets, and sleep, call_after, and exc_after to arrange for code to be called after a delay. Let’s look at these in detail.

spawn(function, *args, **keyword)

Create a new coroutine, or cooperative thread of control, within which to execute function. The function will be called with the given args and keyword arguments and will remain in control unless it cooperatively yields by calling a socket method or sleep. spawn returns control to the caller immediately, and function will be called in a future main loop iteration.

sleep(time)

Yield control to another eligible coroutine until at least time seconds have elapsed. time may be specified as an integer, or a float if fractional seconds are desired. Calling sleep with a time of 0 is the canonical way of expressing a cooperative yield. For example, if one is looping over a large list performing an expensive calculation without calling any socket methods, it’s a good idea to call sleep(0) occasionally; otherwise nothing else will run.

call_after(time, function, *args, **keyword)

Schedule function to be called after time seconds have elapsed. time may be specified as an integer, or a float if fractional seconds are desired. The function will be called with the given args and keyword arguments, and will be executed within the main loop’s coroutine.

exc_after(time, exception_object)

from eventlet import api, httpc

def read_with_timeout():
    cancel = api.exc_after(30, api.TimeoutError())
    try:
        httpc.get(‘http://www.google.com/’)
    except api.TimeoutError:
        print “Timed out!”
    else:
        cancel.cancel()

Listing 2: exc_after.py Cancelling a yielding operation

Schedule exception_object to be raised into the current coroutine after time seconds have elapsed. This only works if the current coroutine is yielding, and is generally used to set timeouts after which a network operation or series of operations will be canceled. Returns a timer object with a cancel method which should be used to prevent the exception if the operation completes successfully.

named(name)

Return an object given its dotted module path, name. For example, passing the string “os.path.join” will return the join function object from the os.path module, “eventlet.api” will return the api module object from the eventlet package, and “mulib.mu.Resource” will return the Resource class from the mulib.mu module.

Socket Functions

Eventlet’s socket objects have the same interface as the standard library socket.socket object, except they will automatically cooperatively yield control to other eligible coroutines instead of blocking. Eventlet also has the ability to monkey patch the standard library socket.socket object so that code which uses it will also automatically cooperatively yield; see Using the Standard Library with Eventlet.

tcp_listener(address)

Listen on the given address, a tuple of (ip, port), with a TCP socket. Returns a socket object on which one should call accept() to accept a connection on the newly bound socket.

connect_tcp(address)

Create a TCP connection to address, a tuple of (ip, port), and return the socket.

ssl_listener(address, certificate, private_key)

Listen on the given address, a tuple of (ip, port), with a TCP socket that can do SSL. certificate and private_key should be the filename of the appropriate certificate and private key files to use with the SSL socket.

connect_ssl(address)

TODO: Not implemented yet. The standard library method of wrapping a socket.socket object with a socket.ssl object works, but see Using the Standard Library with Eventlet to learn about using wrap_socket_with_coroutine_socket.

Using the Standard Library with Eventlet

wrap_socket_with_coroutine_socket

Eventlet’s socket object, whose implementation can be found in the eventlet.greenio module, is designed to match the interface of the standard library socket.socket object. However, it is often useful to be able to use existing code which uses socket.socket directly without modifying it to use the eventlet apis. To do this, one must call wrap_socket_with_coroutine_socket. It is only necessary to do this once, at the beginning of the program, and it should be done before any socket objects which will be used are created. At some point we may decide to do this automatically upon import of eventlet; if you have an opinion about whether this is a good or a bad idea, please let us know.

wrap_select_with_coroutine_select

Some code which is written in a multithreaded style may perform some tricks, such as calling select with only one file descriptor and a timeout to prevent the operation from being unbounded. For this specific situation there is wrap_select_with_coroutine_select; however it’s always a good idea when trying any new library with eventlet to perform some tests to ensure eventlet is properly able to multiplex the operations. If you find a library which appears not to work, please mention it on the mailing list to find out whether someone has already experienced this and worked around it, or whether the library needs to be investigated and accommodated. One idea which could be implemented would add a file mapping between common module names and corresponding wrapper functions, so that eventlet could automatically execute monkey patch functions based on the modules that are imported.

TODO: We need to monkey patch os.pipe, stdin and stdout. Support for non-blocking pipes is done, but no monkey patching yet.

Communication Between Coroutines

channel

Used for synchronizing senders and receivers across coroutines. Has a "send" and a "receive" method.

from eventlet import api, channel

done = []

def producer(num, chan):
    for x in range(5):
        chan.send((num, x))
    done.append('P%s' % num)

def consumer(num, chan):
    for x in range(5):
        producer_num, iter_num = chan.receive()
        print "C%sP%s %s" % (num, producer_num, iter_num)
    done.append('C%s' % num)

chan = channel.channel()
for x in range(5):
    api.spawn(producer, x, chan)
    api.spawn(consumer, x, chan)

while len(done) < 10:
    api.sleep()

event

Used to send one event from one coroutine to many others. Has a "send" and a "wait" method. Sending more than once is not allowed and will cause an exception; waiting more than once is allowed and will always produce the same value.

from eventlet import api, coros

def waiter(num, evt):
    result = evt.wait()
    print num, result

evt = coros.event()

for x in range(5):
    api.spawn(waiter, x, evt)

api.sleep(5)
evt.send('hello')
api.sleep(1)

CoroutinePool

A CoroutinePool is an object which maintains a number of coroutines. Functions can be executed on the coroutine pool and will be executed in parallel, up to the limits set by the size of the pool. Once the pool is full, coroutines attempting to execute a function in the pool will block.

CoroutinePools have execute and execute_async methods. execute returns an event object whose wait method will eventually return the result the executed function returned. execute_async has no return value and the return value of the executed function is inaccessible.

from eventlet import api, coros

THE_POOL = coros.CoroutinePool()

def slow_thing(x):
    api.sleep(1)
    print "slow", x


for x in range(20):
    THE_POOL.execute(slow_thing, x)

Actor

An actor is an object which has it's own coroutine. An actor has a mailbox in which it accepts messages from other coroutines. Inside the actor's coroutine, it will remove things from the mailbox and process them as it is able to. Actor has a cast method which will enqueue a message for the actor. Actor also has a received method which subclasses should override; received will be called each time the actor's coroutine is able to process a message.

from eventlet import api, coros

class SlowActor(coros.Actor):
    def received(self, event):
        api.sleep(1)
        print "actor", event

actor = SlowActor()

def sender(x):
    actor.cast(x)
    print "sender", x

for x in range(10):
    api.spawn(sender, x)

api.sleep()
while actor._mailbox:
    api.sleep()

Using eventlet.wsgi

eventlet.wsgi.server(sock, site, log=None, environ=None, max_size=None)

Starts an http server on the listening tcp socket sock, delegating requests to the wsgi application passed as site. If log is passed, it should be an open file to write log messages to. stderr is used if no log is passed. environ should be a dictionary to use as the default values for all the environment dictionaries passed to the wsgi application. max_size is the maximum number of simultaneous requests the server will serve; defaults to 1024.

Using the Backdoor

'eventlet.backdoor.backdoor((conn, addr), locals=None)

A backdoor socket is a socket that runs an interactive interpreter. Run a backdoor socket on the listening socket object conn. addr should be the (ip, port) tuple which the connection is listening on. locals, if provided, should be a dictionary which will be used as the locals dictionary for code executed through the backdoor.


from eventlet import api, backdoor

addr = ('127.0.0.1', 33333)

api.spawn(api.tcp_server, api.tcp_listener(addr), backdoor.backdoor)

sock = api.connect_tcp(addr)

sock.send('print "hello world"\n')
print sock.recv(1024)

Integrating Blocking Code with Threads

In the language of programs which use nonblocking I/O, code which takes longer than some very small interval to execute without yielding is said to “block.” An easy way to make these functions cooperate is to use the eventlet.tpool module which implements a thread pool.

eventlet.tpool.erpc(func, *args, **kw) TODO Rename to execute?

Execute function in a thread, suspending the current coroutine until the function has completed. There is currently only one threadpool, whose size is 20 by default. All threads are started at program startup.

eventlet.tpool.Proxy(obj, autowrap=())

Wrap any object such that any methods called on the object are executed in the threadpool. Return values of any types provided in the optional autowrap argument will also automatically be wrapped in a Proxy object.

Multiple thread pools?

It would be nice if the tpool module was refactored into a class whose instances were each threadpools. It would be nice if the pool size could grow and shrink. The base class eventlet.pools.Pool would probably be easy to use for this.

Database Access

Most of the existing DB API implementations, especially MySQLdb, block in C. Therefore, eventlet’s monkey patching of the socket module is not enough; since the database adapter does not use the python socket methods, calling them will block the entire process. Thus, any usage of them must call these blocking methods in the thread pool. To facilitate this, eventlet’s DB pool module provides some convenient objects.

eventlet.db_pool.DatabaseConnector(module, credentials, min_size=0, max_size=4, conn_pool=None, *args, **kwargs)

Create an object which knows how to connect to databases and return a pool from which one can get a database connection object. module is a module which has a "connect" function which returns a DBApi 2 Connection object. credentials is a dictionary of host names to username/password. The min_size and max_size arguments control the maximum number of simultaneous connections allowed to each database. The conn_pool argument can be either SaranwrappedConnectionPool which will use one python process per database connection, or TpooledConnectionPool which will use eventlet.tpool and threads instead of processes. Thread pool is the default.

import MySQLdb
import eventlet.db_pool

connector = eventlet.db_pool.DatabaseConnector(MySQLdb, {'mydb.example.com': {'user': 'foo', 'password': 'bar'}})

pool = connector.get('mydb.example.com', 'databasename')

conn = pool.get()
curs = conn.cursor()

curs.execute('select * from foo')
results = curs.fetchall()

Hub and Coroutine Library Negotiation

Supported Multiplexing Libraries

Eventlet performs multiplexing using the standard library select.select, select.poll, or the libevent module, which in turn supports kqueue on FreeBSD and OS X and epoll on Linux. Eventlet will try to automatically determine what is installed and use what it thinks will provide best performance. Eventlet can also run inside of the nginx Web server using the mod_wsgi package for nginx. Using the nginx mod_wsgi package provides by far the best performance for serving HTTP; it almost seems impossible that Python should be able to achieve this level of performance, but it’s true. See Using Eventlet with Nginx.

If you wish, you may manipulate the event hub using the following apis:

get_hub()

Get the current hub singleton object.

get_default_hub()

Eventlet's best guess about which hub will be the best for the current environment.

use_hub(module)

Use the Hub class inside the given module as the singleton event hub, throwing away whatever was used before.

Supported Greenlet Packagings

Eventlet can also use several different coroutine implementations; the original is the greenlet package from py.lib. Eventlet can also use the cheese shop’s packaging of greenlet; easy_install greenlet is generally the easiest way to get greenlet installed and running. Eventlet can also run inside of stackless-pypy without threads, and on Stackless Python 2.5.1, although it runs with an inferior emulation of greenlet implemented using tasklets, and is slower than Eventlet running on a plain python with the greenlet package installed. Another future candidate for experimentation is the libCoroutine package from the Io language, although it would need to be wrapped for Python first.

Using Eventlet with Nginx

Download nginx from [1] and clone the mercurial (hg) repository at [2]. mod_wsgi works out of the box on nginx 0.5; to run on 0.6 change ngx_http_discard_body to ngx_http_discard_request_body in ngx_http_wsgi_handler.c. Unpack them and run:

./configure --add-module=/path/to/mod_wsgi/ --with-debug

Then make and make install; this will install nginx in /usr/local/nginx.

In the /usr/local/nginx/conf/nginx.conf file, set up a location with the wsgi_pass directive pointing to the nginx_mod_wsgi support module. Also, use the wsgi_var directive to set the eventlet_nginx_wsgi_app variable to the dotted path to your wsgi application. For example, mulib.mu.handle calls the handle function in the mu module inside the mulib package.

   location / {
       wsgi_pass /path/to/eventlet/support/nginx_mod_wsgi.py;
       wsgi_var eventlet_nginx_wsgi_app mulib.mu.handle;
   }

Finally, run sudo /usr/local/nginx/sbin/nginx and visit [3] in a web browser.

Now, any requests to your wsgi application which make use of the socket module will automatically cooperatively yield back into nginx so it may serve another request, until the socket operation is ready to complete, when it is resumed.

Advanced APIs

trampoline(fd, read=None, write=None, timeout=None)

Suspend the current coroutine until the given socket object or file descriptor is ready to read, ready to write, or the specified timeout elapses, depending on keyword arguments specified. To wait for fd to be ready to read, pass read=True; ready to write, pass write=True. To specify a timeout, pass the timeout argument in seconds. If the specified timeout elapses before the socket is ready to read or write, TimeoutError will be raised instead of trampoline returning.

Once this returns, if read=True was passed, it's safe to call recv on the file descriptor at least once until it raises EWOULDBLOCK. If write=True was passed, it's safe to call send on the file descriptor until send returns 0.

hub.switch()

It's possible to yield control to any other coroutine which is ready to run by calling api.get_hub().switch(). Note that if you just do this, your coroutine will never be resumed and will just be garbage! However, using api.getcurrent(), it's possible to arrange for some other coroutine to resume you by giving them a reference to the greenlet returned from api.getcurrent(). Using this, it's possible to create your own flow control objects, similar to eventlet.coros.event.

TODO Using tracked_greenlet and subclassing GreenletContext (need to do an example)