Let's repaint a function

Do you remember “What color is your function?” blog post, quite popular a few years ago? If you don’t know it, read it now, it’s definitely worth it (and also quite entertaining).

Some time ago, while looking to for a way to mix gevent with a blocking API of an extension module written in C (and thus impossible to be tweaked with monkey patching ), I’ve came across a very interesting code snippet from Mike “zzzeek” Bayer , author of SQLAlchemy (and a few other well-known projects). This code shows a very clever way of mixing functions of different colors (that is, asynchronous and synchronous code).

Let’s take a closer look.

Greenlets

Well, before we begin, let’s recall what greenlet package (which, by the way, gevent builds upon) is all about. It’s essential to understand how greenlets works before delving any deeper.

Let me start with an excerpt from the project’s docs :

A “greenlet” is a small independent pseudo-thread. Think about it as a small stack of frames; the outermost (bottom) frame is the initial function you called, and the innermost frame is the one in which the greenlet is currently paused.

In code, greenlets are represented by objects of class greenlet.

You work with greenlets by creating a number of such stacks and jumping execution between them. Jumps are never implicit: a greenlet must choose to jump to another greenlet, which will cause the former to suspend and the latter to resume where it was suspended. Jumping between greenlets is called “switching”.

So, we create a new greenlet instance passing it a function to execute:

1from greenlet import greenlet
2
3def foo():
4    ...
5
6gr = greenlet(foo)

At this point a new greenlet is not yet executing, we have to explicitly switch execution to it.

1gr.switch('foo', 'bar')

When we call gr.switch(...), current greenlet suspends execution, and execution switches to gr greenlet (let’s call it a target greenlet). If gr did not start yet, then it will start to run now and any switch() arguments will be passed to the greenlet’s run() function as its arguments.

If the target greenlet was executing before (and has suspended execution by switching to another greenlet), it resumes execution at the point it called switch() of another greenlet instance. Arguments passed to gr.switch(...) are returned from the switch() call that suspended the target greenlet previously.

Each greenlet has a parent greenlet assigned:

Every greenlet, except the main greenlet, has a “parent” greenlet. The parent greenlet defaults to being the one in which the greenlet was created […]. In this way, greenlets are organized in a tree. Top-level code that doesn’t run in a user-created greenlet runs in the implicit main greenlet, which is the root of the tree.

The parent is where execution continues when a greenlet dies, whether by explicitly returning from its function, “falling off the end” of its function, or by raising an uncaught exception.

Initially, there is one greenlet that you don’t have to create: the main greenlet. This is the only greenlet that can ever have a parent of None. The main greenlet can never be dead. This is true for every thread in a process.

The following code example hopefully makes the whole switching concept clearer:

 1from greenlet import greenlet, getcurrent
 2
 3# we start with a single greenlet called "main greenlet"
 4main_greenlet = getcurrent()
 5
 6def foo(arg):
 7    print('foo:', arg)
 8    main_greenlet.switch(1)
 9    print('foo: 2')
10    main_greenlet.switch('bar')
11    return 'done'
12
13# Create gr greenlet executing function foo(). It's not yet executing. 
14gr = greenlet(foo)
15
16# When the greenlet is switched to, arguments to switch() call become
17# greenlet function parameters, 'foo: starting' gets printed.
18ret = gr.switch('starting')
19
20# When greenlet calls main_greenlet.switch(1), execution returns here.
21# gr.switch() returns main_greenlet.switch() arguments, thus 'main: 1' 
22# gets printed here.
23print('main:', ret)
24
25# Execution is switched back to gr greenlet, it prints 'foo: 2', 
26# then switches back to main greenlet, passing 'bar' argument
27# and 'main: bar' gets printed.
28print('main:', gr.switch())
29
30# We switch back to the greenlet again, foo() finishes execution 
31# returning 'done'. The next line prints 'main: done'.
32print('main:', gr.switch())

And that’s pretty much all we need to know.

The snippet

Now we are ready to dissect the actual code snippet, slightly modified by me for this blog post (mostly stripped from parts not relevant here, like time measurements and asserts). Let’s start with __main__:

 1import asyncio
 2import random
 3import sys
 4
 5import asyncpg
 6import greenlet
 7
 8
 9if __name__ == "__main__":
10
11    def add_and_select_data(conn, data):
12        row = await_(conn.fetchrow("insert into mytable(data) values ($1) returning id", data))
13        id_ = row[0]
14
15        result = await_(conn.fetchrow("select data from mytable where id=($1)", id_))
16        return result[0]
17
18    async def run_request():
19        conn = await (asyncpg.connect(database="test"))
20
21        for i in range(100):
22            random_data = "random %d" % (random.randint(1, 1000000))
23
24            retval = await greenlet_spawn(add_and_select_data, conn, random_data)
25            assert retval == random_data, "%s != %s" % (retval, random_data)
26
27        await (conn.close())
28
29    asyncio.run(run_request())

It start execution from the run_request() asynchronous function, which opens a database connection using asynchronous asyncpg database driver, then executes a loop - on each iteration it calls greenlet_spawn(), passing it add_and_select_data() function as parameter.

Now add_and_select_data() function is where it becomes interesting. This function is a piece of synchronous code, but it uses conn.fetchrow() call, which is asynchronous (and it doesn’t use asyncio.run() or anything like that)! What’s the trick here?

Here’s the rest of the snippet:

 1async def greenlet_spawn(fn, *args):
 2
 3    result_future = asyncio.Future()
 4
 5    def run_greenlet_target():
 6        result_future.set_result(fn(*args))
 7        return None
 8
 9    async def run_greenlet():
10        gl = greenlet.greenlet(run_greenlet_target)
11        greenlet_coroutine = gl.switch()
12
13        while greenlet_coroutine is not None:
14            task = asyncio.create_task(greenlet_coroutine)
15            try:
16                await task
17            except:
18                # this allows an exception to be raised within
19                # the moderated greenlet so that it can continue
20                # its expected flow.
21                greenlet_coroutine = gl.throw(*sys.exc_info())
22            else:
23                greenlet_coroutine = gl.switch(task.result())
24
25    await run_greenlet()
26
27    return result_future.result()
28
29
30def await_(coroutine):
31    current = greenlet.getcurrent()
32    parent = current.parent
33    if not parent:
34        raise Exception("can't use await_() function outside a greenlet")
35    
36    return parent.switch(coroutine)

The idea is as follows:

And that’s it, simple and clever. Of course there are some limitations - we have to remember it’s still a single thread, so fn can’t execute any real blocking code. But that’s not the point here (by the way, now I understand why SQLAlchemy added greenlet to its dependencies).

Could we call it repainting a function?