python 中的异步上下文管理器是处理并发应用程序中资源的游戏规则改变者。它们就像常规的上下文管理器,但有一点不同 - 它们可以与异步代码无缝协作。
让我们从基础开始。要创建异步上下文管理器,我们需要实现两个特殊方法:__aenter__ 和 __aexit__。这些是我们在常规上下文管理器中使用的 __enter__ 和 __exit__ 的异步版本。
class asyncresource: async def __aenter__(self): print("acquiring resource") await asyncio.sleep(1) # simulating async acquisition return self async def __aexit__(self, exc_type, exc_value, traceback): print("releasing resource") await asyncio.sleep(1) # simulating async release async def main(): async with asyncresource() as resource: print("using resource") asyncio.run(main())
在此示例中,我们模拟资源的异步获取和释放。 async with 语句负责在正确的时间调用 __aenter__ 和 __aexit__。
import asyncpg class databasepool: def __init__(self, dsn): self.dsn = dsn self.pool = none async def __aenter__(self): self.pool = await asyncpg.create_pool(self.dsn) return self.pool async def __aexit__(self, exc_type, exc_value, traceback): await self.pool.close() async def main(): async with databasepool('postgresql://user:password@localhost/db') as pool: async with pool.acquire() as conn: result = await conn.fetch('select * from users') print(result) asyncio.run(main())
异步上下文管理器中的错误处理与常规错误处理类似。如果上下文中发生错误,则 __aexit__ 方法会接收异常信息。我们可以处理这些错误或让它们传播:
class errorhandlingresource: async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): if exc_type is valueerror: print("caught valueerror, suppressing") return true # suppress the exception return false # let other exceptions propagate async def main(): async with errorhandlingresource(): raise valueerror("oops!") print("this will be printed") async with errorhandlingresource(): raise runtimeerror("unhandled!") print("this won't be printed") asyncio.run(main())
在此示例中,我们抑制 valueerror 但允许其他异常传播。
异步上下文管理器也非常适合实现分布式锁。这是一个使用 redis 的简单示例:
import aioredis class distributedlock: def __init__(self, redis, lock_name, expire=10): self.redis = redis self.lock_name = lock_name self.expire = expire async def __aenter__(self): while true: locked = await self.redis.set(self.lock_name, "1", expire=self.expire, nx=true) if locked: return self await asyncio.sleep(0.1) async def __aexit__(self, exc_type, exc_value, traceback): await self.redis.delete(self.lock_name) async def main(): redis = await aioredis.create_redis_pool('redis://localhost') async with distributedlock(redis, "my_lock"): print("critical section") await redis.close() asyncio.run(main())
class asynctransaction: def __init__(self, conn): self.conn = conn async def __aenter__(self): await self.conn.execute('begin') return self async def __aexit__(self, exc_type, exc_value, traceback): if exc_type is none: await self.conn.execute('commit') else: await self.conn.execute('rollback') async def transfer_funds(from_account, to_account, amount): async with asynctransaction(conn): await conn.execute('update accounts set balance = balance - $1 where id = $2', amount, from_account) await conn.execute('update accounts set balance = balance + $1 where id = $2', amount, to_account)
异步上下文管理器可以与其他异步原语结合使用,以获得更强大的模式。例如,我们可以将它们与 asyncio.gather 一起使用来进行并行资源管理:
async def process_data(data): async with ResourceManager() as rm: results = await asyncio.gather( rm.process(data[0]), rm.process(data[1]), rm.process(data[2]) ) return results
总之,异步上下文管理器是管理异步 python 代码中资源的强大工具。它们提供了一种干净、直观的方式来处理异步设置和拆卸、错误处理和资源清理。通过掌握异步上下文管理器,您将能够构建强大的、可扩展的 python 应用程序,这些应用程序可以轻松处理复杂的并发工作流程。
