◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。
主动对象模式是一种并发设计模式,它将方法执行与方法调用解耦。此模式的主要目标是通过在单独的线程中执行操作来引入异步行为,同时向客户端提供同步接口。这是通过消息传递、请求队列和调度机制的组合来实现的。
假设我们需要进行计算,可能是 api 调用、数据库查询等。我不会实现任何异常处理,因为我太懒了。
def compute(x, y): time.sleep(2) # some time taking task return x + y
下面是我们如何在不使用主动对象模式的情况下处理并发请求的示例。
import threading import time def main(): # start threads directly results = {} def worker(task_id, x, y): results[task_id] = compute(x, y) print("submitting tasks...") thread1 = threading.thread(target=worker, args=(1, 5, 10)) thread2 = threading.thread(target=worker, args=(2, 15, 20)) thread1.start() thread2.start() print("doing other work...") thread1.join() thread2.join() # retrieve results print("result 1:", results[1]) print("result 2:", results[2]) if __name__ == "__main__": main()
线程管理:直接管理线程会增加复杂性,尤其是随着任务数量的增加。
缺乏抽象:客户端负责管理线程的生命周期,将任务管理与业务逻辑耦合。
可扩展性问题:如果没有适当的队列或调度机制,就无法控制任务执行顺序。
响应能力有限:客户端必须等待线程加入才能访问结果。
下面是主动对象模式的 python 实现,使用线程和队列来执行与上面相同的操作。我们将一一介绍每个部分:
methodrequest: 封装方法、参数和用于存储结果的 future。
class methodrequest: def __init__(self, method, args, kwargs, future): self.method = method self.args = args self.kwargs = kwargs self.future = future def execute(self): try: result = self.method(*self.args, **self.kwargs) self.future.set_result(result) except exception as e: self.future.set_exception(e)
调度程序:在单独的线程中持续处理来自activation_queue的请求。
import threading import queue class scheduler(threading.thread): def __init__(self): super().__init__() self.activation_queue = queue.queue() self._stop_event = threading.event() def enqueue(self, request): self.activation_queue.put(request) def run(self): while not self._stop_event.is_set(): try: request = self.activation_queue.get(timeout=0.1) request.execute() except queue.empty: continue def stop(self): self._stop_event.set() self.join()
servant:实现实际逻辑(例如,计算方法)。
import time class servant: def compute(self, x, y): time.sleep(2) return x + y
proxy:将方法调用转换为请求并返回结果的 future。
from concurrent.futures import future class proxy: def __init__(self, servant, scheduler): self.servant = servant self.scheduler = scheduler def compute(self, x, y): future = future() request = methodrequest(self.servant.compute, (x, y), {}, future) self.scheduler.enqueue(request) return future
客户端:异步提交任务并在需要时检索结果。
def main(): # Initialize components scheduler = Scheduler() scheduler.start() servant = Servant() proxy = Proxy(servant, scheduler) # Client makes an asynchronous call print("Submitting tasks...") future1 = proxy.compute(5, 10) future2 = proxy.compute(15, 20) # Perform other tasks while computation is ongoing print("Doing other work...") # Retrieve results print("Result 1:", future1.result()) print("Result 2:", future2.result()) # Shutdown scheduler scheduler.stop() if __name__ == "__main__": main()
主动对象模式是用于管理多线程环境中的异步操作的强大工具。通过将方法调用与执行分离,它可以确保更好的响应能力、可扩展性和更干净的代码库。虽然它具有一定的复杂性和潜在的性能开销,但它的好处使其成为需要高并发和可预测执行的场景的绝佳选择。然而,它的使用取决于当前的具体问题。与大多数模式和算法一样,不存在一刀切的解决方案。
维基百科 - 活动对象
◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。