Let's repaint a function
Do you remember “What color is your function?” blog post, quite popular a few years ago? If you don’t know it, read it now, it’s definitely worth it (and also quite entertaining).
Some time ago, while looking to for a way to mix gevent with a blocking API of an extension module written in C (and thus impossible to be tweaked with monkey patching ), I’ve came across a very interesting code snippet from Mike “zzzeek” Bayer , author of SQLAlchemy (and a few other well-known projects). This code shows a very clever way of mixing functions of different colors (that is, asynchronous and synchronous code).
Let’s take a closer look.
Greenlets
Well, before we begin, let’s recall what greenlet package (which, by the way, gevent builds upon) is all about. It’s essential to understand how greenlets works before delving any deeper.
Let me start with an excerpt from the project’s docs :
A “greenlet” is a small independent pseudo-thread. Think about it as a small stack of frames; the outermost (bottom) frame is the initial function you called, and the innermost frame is the one in which the greenlet is currently paused.
In code, greenlets are represented by objects of class
greenlet
.You work with greenlets by creating a number of such stacks and jumping execution between them. Jumps are never implicit: a greenlet must choose to jump to another greenlet, which will cause the former to suspend and the latter to resume where it was suspended. Jumping between greenlets is called “switching”.
So, we create a new greenlet instance passing it a function to execute:
1from greenlet import greenlet
2
3def foo():
4 ...
5
6gr = greenlet(foo)
At this point a new greenlet is not yet executing, we have to explicitly switch execution to it.
1gr.switch('foo', 'bar')
When we call gr.switch(...)
, current greenlet suspends execution, and execution switches to gr
greenlet (let’s call it a target greenlet). If gr
did not start yet, then it will start to run now and any switch()
arguments will be passed to the greenlet’s run()
function as its arguments.
If the target greenlet was executing before (and has suspended execution by switching to another greenlet), it resumes execution at the point it called switch()
of another greenlet instance. Arguments passed to gr.switch(...)
are returned from the switch()
call that suspended the target greenlet previously.
Each greenlet has a parent greenlet assigned:
Every greenlet, except the main greenlet, has a “parent” greenlet. The parent greenlet defaults to being the one in which the greenlet was created […]. In this way, greenlets are organized in a tree. Top-level code that doesn’t run in a user-created greenlet runs in the implicit main greenlet, which is the root of the tree.
The parent is where execution continues when a greenlet dies, whether by explicitly returning from its function, “falling off the end” of its function, or by raising an uncaught exception.
Initially, there is one greenlet that you don’t have to create: the main greenlet. This is the only greenlet that can ever have a parent of None. The main greenlet can never be dead. This is true for every thread in a process.
The following code example hopefully makes the whole switching concept clearer:
1from greenlet import greenlet, getcurrent
2
3# we start with a single greenlet called "main greenlet"
4main_greenlet = getcurrent()
5
6def foo(arg):
7 print('foo:', arg)
8 main_greenlet.switch(1)
9 print('foo: 2')
10 main_greenlet.switch('bar')
11 return 'done'
12
13# Create gr greenlet executing function foo(). It's not yet executing.
14gr = greenlet(foo)
15
16# When the greenlet is switched to, arguments to switch() call become
17# greenlet function parameters, 'foo: starting' gets printed.
18ret = gr.switch('starting')
19
20# When greenlet calls main_greenlet.switch(1), execution returns here.
21# gr.switch() returns main_greenlet.switch() arguments, thus 'main: 1'
22# gets printed here.
23print('main:', ret)
24
25# Execution is switched back to gr greenlet, it prints 'foo: 2',
26# then switches back to main greenlet, passing 'bar' argument
27# and 'main: bar' gets printed.
28print('main:', gr.switch())
29
30# We switch back to the greenlet again, foo() finishes execution
31# returning 'done'. The next line prints 'main: done'.
32print('main:', gr.switch())
And that’s pretty much all we need to know.
The snippet
Now we are ready to dissect the actual code snippet, slightly modified by me for this blog post (mostly stripped from parts not relevant here, like time measurements and asserts). Let’s start with __main__
:
1import asyncio
2import random
3import sys
4
5import asyncpg
6import greenlet
7
8
9if __name__ == "__main__":
10
11 def add_and_select_data(conn, data):
12 row = await_(conn.fetchrow("insert into mytable(data) values ($1) returning id", data))
13 id_ = row[0]
14
15 result = await_(conn.fetchrow("select data from mytable where id=($1)", id_))
16 return result[0]
17
18 async def run_request():
19 conn = await (asyncpg.connect(database="test"))
20
21 for i in range(100):
22 random_data = "random %d" % (random.randint(1, 1000000))
23
24 retval = await greenlet_spawn(add_and_select_data, conn, random_data)
25 assert retval == random_data, "%s != %s" % (retval, random_data)
26
27 await (conn.close())
28
29 asyncio.run(run_request())
It start execution from the run_request()
asynchronous function, which opens a database connection using asynchronous asyncpg
database driver, then executes a loop - on each iteration it calls greenlet_spawn()
, passing it add_and_select_data()
function as parameter.
Now add_and_select_data()
function is where it becomes interesting. This function is a piece of synchronous code, but it uses conn.fetchrow()
call, which is asynchronous (and it doesn’t use asyncio.run()
or anything like that)! What’s the trick here?
Here’s the rest of the snippet:
1async def greenlet_spawn(fn, *args):
2
3 result_future = asyncio.Future()
4
5 def run_greenlet_target():
6 result_future.set_result(fn(*args))
7 return None
8
9 async def run_greenlet():
10 gl = greenlet.greenlet(run_greenlet_target)
11 greenlet_coroutine = gl.switch()
12
13 while greenlet_coroutine is not None:
14 task = asyncio.create_task(greenlet_coroutine)
15 try:
16 await task
17 except:
18 # this allows an exception to be raised within
19 # the moderated greenlet so that it can continue
20 # its expected flow.
21 greenlet_coroutine = gl.throw(*sys.exc_info())
22 else:
23 greenlet_coroutine = gl.switch(task.result())
24
25 await run_greenlet()
26
27 return result_future.result()
28
29
30def await_(coroutine):
31 current = greenlet.getcurrent()
32 parent = current.parent
33 if not parent:
34 raise Exception("can't use await_() function outside a greenlet")
35
36 return parent.switch(coroutine)
The idea is as follows:
- a call to
greenlet_spawn()
creates a new greenlet (let’s call it a child greenlet) running a given function (wrapped inrun_greenlet_target()
, which is there only to pass thefn
return value toresult_future
), - then the following steps are executed in a loop:
- the execution switches to a child greenlet,
- the greenlet function executes up to the point where it needs to use asynchronous API, at that point it calls
await_()
passing it a coroutine (which is what an asynchronous function returns if executed synchronously) that has to be run to completion in an asynchronous context, await_()
switches back to the parent greenlet, passing it the coroutine (remember, how we can useswitch()
to pass arbitrary data between greenlets?),- the parent greenlet resumes execution (see
run_greenlet()
), picks the coroutine passed from child greenlet and spawns a new asynchronous task to execute the coroutine. It then awaits the task. - if async task completes successfully, its result (i.e. return value of the async code passed from child greenlet via
await_()
) is passed back to the child greenlet. It is then returned fromawait_()
call. From the perspective of the synchronousfn
function it looks like if theawait_()
block executed as a regular blocking, synchronous code. - if async task raised an exception, it is re-thrown in the child greenlet. Again, from the perspective of the synchronous
fn
function it looks like if theawait_()
block raised an exception. - loop inside
run_greenlet()
stops oncegl.switch()
returnsNone
, which happens whenfn
completes (this is theNone
returned from therun_greenlet_target()
wrapper).
- any value returned by
fn
is returned fromgreenlet_spawn()
(by the way, usingFuture
here is not really necessary).
And that’s it, simple and clever. Of course there are some limitations - we have to remember it’s still a single thread, so fn
can’t execute any real blocking code. But that’s not the point here (by the way, now I understand why SQLAlchemy
added greenlet
to its dependencies).
Could we call it repainting a function?