Python协程中使用上下文

2023-06-08,,

在Python 3.7中,asyncio 协程加入了对上下文的支持。使用上下文就可以在一些场景下隐式地传递变量,比如数据库连接session等,而不需要在所有方法调用显示地传递这些变量。使用得当的话,可以提高接口的可读性和扩展性。

基本使用方式

协和的上下文是通过 contextvars 中的 ContextVar 对象来管理的。最基本的使用方式是在某一调用层次中设置上下文,然后在后续调用中使用。如下例所示:

import asyncio
import contextvars
from random import randint
from unittest import TestCase request_id_context = contextvars.ContextVar('request-id') async def inner(x):
request_id = request_id_context.get()
if request_id != x:
raise AssertionError('request_id %d from context does NOT equal with parameter x %d' % (request_id, x)) print('start handling inner request-%d, with x: %d' % (request_id, x))
await asyncio.sleep(randint(0, 3))
print('finish handling inner request-%d, with x: %d' % (request_id, x)) async def outer(i):
print('start handling outer request-%d' % i)
request_id_context.set(i)
await inner(i)
print('finish handling outer request-%d with request_id in context %d' % (i, request_id_context.get())) async def dispatcher():
await asyncio.gather(*[
outer(i) for i in range(0, 10)
]) class ContextTest(TestCase): def test(self):
asyncio.run(dispatcher())

上例中,在最后定义了一个单元测试用例对象 ContextTest 。它的方法 test 是程序的入口,使用 asyncio.run 方法来在协程中执行被测试的异步方法 dispatcherdispatcher 则并发启动10个异步方法 outerouter方法首先将在模块层定义的上下文变量 request_id_context 设置为当前调用指定的值,这个值对于每个 outer 的调用都是不同的。 然后在后续被调用的 inner 方法,以及 outer 方法内部访问了这个上下文变更。在 inner 方法内容,则比较了显示传入的 i 和从上下文变量中取出的 request_id

测试用例的执行结果如下:

start handling outer request-0
start handling inner request-0, with x: 0
start handling outer request-1
start handling inner request-1, with x: 1
start handling outer request-2
start handling inner request-2, with x: 2
start handling outer request-3
start handling inner request-3, with x: 3
start handling outer request-4
start handling inner request-4, with x: 4
start handling outer request-5
start handling inner request-5, with x: 5
start handling outer request-6
start handling inner request-6, with x: 6
start handling outer request-7
start handling inner request-7, with x: 7
start handling outer request-8
start handling inner request-8, with x: 8
start handling outer request-9
start handling inner request-9, with x: 9
finish handling inner request-3, with x: 3
finish handling outer request-3 with request_id in context 3
finish handling inner request-7, with x: 7
finish handling outer request-7 with request_id in context 7
finish handling inner request-1, with x: 1
finish handling outer request-1 with request_id in context 1
finish handling inner request-4, with x: 4
finish handling outer request-4 with request_id in context 4
finish handling inner request-5, with x: 5
finish handling outer request-5 with request_id in context 5
finish handling inner request-9, with x: 9
finish handling outer request-9 with request_id in context 9
finish handling inner request-0, with x: 0
finish handling outer request-0 with request_id in context 0
finish handling inner request-2, with x: 2
finish handling outer request-2 with request_id in context 2
finish handling inner request-6, with x: 6
finish handling outer request-6 with request_id in context 6
finish handling inner request-8, with x: 8
finish handling outer request-8 with request_id in context 8

可以看到,虽然每次 outer 方法对模块层同定义的同一个上下文变量 request_id_context 设置了不同的值,但后续并发访问相互之间并不会混淆或冲突。

不同调用层次间对上下文的修改

前一节展示了在设置了上下文变量后,在后续使用中读取这个变量的情况。这一节,我们看一下不用调用层次间对同一个上下文变量进行修改的情况。

在上一节代码上做了一些调整后如下:

import asyncio
import contextvars
from random import randint
from unittest import TestCase request_id_context = contextvars.ContextVar('request-id') obj_context = contextvars.ContextVar('obj') class A(object): def __init__(self, x):
self.x = x def __repr__(self):
return '<A|x: %d>' % self.x async def inner(x):
request_id = request_id_context.get()
if request_id != x:
raise AssertionError('request_id %d from context does NOT equal with parameter x %d' % (request_id, x)) print('start handling inner request-%d, with x: %d' % (request_id, x))
request_id_context.set(request_id * 10)
await asyncio.sleep(randint(0, 3)) obj = A(x)
obj_context.set(obj)
print('finish handling inner request-%d, with x: %d' % (request_id, x)) async def outer(i):
print('start handling outer request-%d with request_id in context %d' % (i, request_id_context.get()))
request_id_context.set(i)
await inner(i)
print('obj: %s in outer request-%d' % (obj_context.get(), i))
print('finish handling outer request-%d with request_id in context %d' % (i, request_id_context.get())) async def dispatcher():
request_id_context.set(-1)
await asyncio.gather(*[
outer(i) for i in range(0, 10)
])
print('finish all coroutines with request_id in context: %d' % (request_id_context.get())) class ContextTest(TestCase): def test(self):
asyncio.run(dispatcher())

具体调整

    dispatcher 中,开始启动协程前,将 request_id_context 设置为 -1 。 然后在所有的协程调用完毕后,再查看 request_context_id 的值。
    outer 中,在设置 request_id_context 之前,先查看它的值。
    inner 中,在检查和查看 request_id_context 之后,将它修改为其原始值的10倍。
    定义了一个对象 A ,以及一个用来传递 A 对象实例的上下文变量 obj_context
    inner 中,创建A的实例并保存到obj_context中。
    outer中,调用完inner方法后,查看obj_context上下文变量。

代码的执行结果如下:

start handling outer request-0 with request_id in context -1
start handling inner request-0, with x: 0
start handling outer request-1 with request_id in context -1
start handling inner request-1, with x: 1
start handling outer request-2 with request_id in context -1
start handling inner request-2, with x: 2
start handling outer request-3 with request_id in context -1
start handling inner request-3, with x: 3
start handling outer request-4 with request_id in context -1
start handling inner request-4, with x: 4
start handling outer request-5 with request_id in context -1
start handling inner request-5, with x: 5
start handling outer request-6 with request_id in context -1
start handling inner request-6, with x: 6
start handling outer request-7 with request_id in context -1
start handling inner request-7, with x: 7
start handling outer request-8 with request_id in context -1
start handling inner request-8, with x: 8
start handling outer request-9 with request_id in context -1
start handling inner request-9, with x: 9
finish handling inner request-6, with x: 6
obj: <A|x: 6> in outer request-6
finish handling outer request-6 with request_id in context 60
finish handling inner request-0, with x: 0
obj: <A|x: 0> in outer request-0
finish handling outer request-0 with request_id in context 0
finish handling inner request-2, with x: 2
obj: <A|x: 2> in outer request-2
finish handling outer request-2 with request_id in context 20
finish handling inner request-3, with x: 3
obj: <A|x: 3> in outer request-3
finish handling outer request-3 with request_id in context 30
finish handling inner request-5, with x: 5
obj: <A|x: 5> in outer request-5
finish handling outer request-5 with request_id in context 50
finish handling inner request-7, with x: 7
obj: <A|x: 7> in outer request-7
finish handling outer request-7 with request_id in context 70
finish handling inner request-8, with x: 8
obj: <A|x: 8> in outer request-8
finish handling outer request-8 with request_id in context 80
finish handling inner request-9, with x: 9
obj: <A|x: 9> in outer request-9
finish handling outer request-9 with request_id in context 90
finish handling inner request-1, with x: 1
obj: <A|x: 1> in outer request-1
finish handling outer request-1 with request_id in context 10
finish handling inner request-4, with x: 4
obj: <A|x: 4> in outer request-4
finish handling outer request-4 with request_id in context 40
finish all coroutines with request_id in context: -1

观察执行结果,可以看到对上下文变量的修改,有两种情况:

    对于已经设置过值的上下文变量,后续对其做的修改是单向传播的。尽管每个 outer 方法都 request_id_context 设置成了不同的值,但最后在 dispatcher 调用完所有的 outer 后,它取到的 request_id_context 仍然为 -1。 同样,inner方法虽然修改了request_id_context,但这个修改对调用它的outer是不可见的。另外一个方向,outer可以读取到调用它的dispatcher修改的值,inner也可以读取到outer的修改。
    如果是新设置的上下文变量,它的值可以传递到其所在方法的调用者。比如在inner中设置的obj_context,在outer中可以读取。

内存泄漏和上下文清理

根据Python文档, ContextVar对象会持有变量值的强引用,所以如果没有适当清理,会导致内存漏泄。我们使用以下代码演示这种问题。

import asyncio
import contextvars
from unittest import TestCase
import weakref obj_context = contextvars.ContextVar('obj')
obj_ref_dict = {} class A(object): def __init__(self, x):
self.x = x def __repr__(self):
return '<A|x: %d>' % self.x async def inner(x):
obj = A(x)
obj_context.set(obj)
obj_ref_dict[x] = weakref.ref(obj) async def outer(i):
await inner(i)
print('obj: %s in outer request-%d from obj_ref_dict' % (obj_ref_dict[i](), i)) async def dispatcher():
await asyncio.gather(*[
outer(i) for i in range(0, 10)
])
for i in range(0, 10):
print('obj-%d: %s in obj_ref_dict' % (i, obj_ref_dict[i]())) class ContextTest(TestCase): def test(self):
asyncio.run(dispatcher())

和上一节中的代码一样,inner方法在调用栈的最内部设置了上下文变量obj_context。不同的是,在设置上下文的同时,也将保存在上下文中的对象A的实例保存到一个弱引用中,以便后续通过弱引用来检查对象实例是否被回收。

代码的执行结果如下:

obj: <A|x: 0> in outer request-0 from obj_ref_dict
obj: <A|x: 1> in outer request-1 from obj_ref_dict
obj: <A|x: 2> in outer request-2 from obj_ref_dict
obj: <A|x: 3> in outer request-3 from obj_ref_dict
obj: <A|x: 4> in outer request-4 from obj_ref_dict
obj: <A|x: 5> in outer request-5 from obj_ref_dict
obj: <A|x: 6> in outer request-6 from obj_ref_dict
obj: <A|x: 7> in outer request-7 from obj_ref_dict
obj: <A|x: 8> in outer request-8 from obj_ref_dict
obj: <A|x: 9> in outer request-9 from obj_ref_dict
obj-0: <A|x: 0> in obj_ref_dict
obj-1: <A|x: 1> in obj_ref_dict
obj-2: <A|x: 2> in obj_ref_dict
obj-3: <A|x: 3> in obj_ref_dict
obj-4: <A|x: 4> in obj_ref_dict
obj-5: <A|x: 5> in obj_ref_dict
obj-6: <A|x: 6> in obj_ref_dict
obj-7: <A|x: 7> in obj_ref_dict
obj-8: <A|x: 8> in obj_ref_dict
obj-9: <A|x: 9> in obj_ref_dict

可以看到,无论是在outer中,还是在dispatcher中,所有inner方法保存的上下文变量都被没有被回收。所以我们必需在使用完上下文变量后,显示清理上下文,否则会导致内存泄漏。

这里,我们在inner方法的最后,将obj_context设置为None,就可以保证不会因为上下文而导致内存不会被回收:

async def inner(x):
obj = A(x)
obj_context.set(obj)
obj_ref_dict[x] = weakref.ref(obj)
obj_context.set(None)

修改后的代码执行结果如下:

obj: None in outer request-0 from obj_ref_dict
obj: None in outer request-1 from obj_ref_dict
obj: None in outer request-2 from obj_ref_dict
obj: None in outer request-3 from obj_ref_dict
obj: None in outer request-4 from obj_ref_dict
obj: None in outer request-5 from obj_ref_dict
obj: None in outer request-6 from obj_ref_dict
obj: None in outer request-7 from obj_ref_dict
obj: None in outer request-8 from obj_ref_dict
obj: None in outer request-9 from obj_ref_dict
obj-0: None in obj_ref_dict
obj-1: None in obj_ref_dict
obj-2: None in obj_ref_dict
obj-3: None in obj_ref_dict
obj-4: None in obj_ref_dict
obj-5: None in obj_ref_dict
obj-6: None in obj_ref_dict
obj-7: None in obj_ref_dict
obj-8: None in obj_ref_dict
obj-9: None in obj_ref_dict

可以看到,当outerdispatcher尝试通过弱引用来访问曾经保存在上下文中的对象实例时,这些对象都已经被回收了。

总结

在协程中使用 contextvars 模块中的_ContextVar_对象可以让我们方便在协程间保存上下文数据。在使用时要注意以下几点:

    contextvars 对协程的支持是从Python 3.7才开始的,使用时要注意Python版本。
    ContextVar 应当在模块级别定义和创建,一定不能在闭包中定义。
    保存在上下文中的变量一定要在使用完成后显示清理,否则会导致内存泄漏。

参考资料

https://docs.python.org/3/library/contextvars.html#asyncio-support
https://docs.python.org/3/library/contextvars.html#contextvars.ContextVar

Python协程中使用上下文的相关教程结束。

《Python协程中使用上下文.doc》

下载本文的Word格式文档,以方便收藏与打印。