💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
# 协程(微线程,纤程) [TOC] --- ## 基础概念 多线程和多进程的模型虽然解决了并发问题,但是系统不能无上限地增加线程。由于系统切换线程的开销也很大,所以,一旦线程数量过多,CPU的时间就花在线程切换上了,真正运行代码的时间就少了,结果导致性能严重下降。由于我们要解决的问题是CPU高速执行能力和IO设备的龟速严重不匹配,多线程和多进程只是解决这一问题的一种方法。 另一种解决IO问题的方法是异步IO。当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就去执行其他代码了。一段时间后,当IO返回结果时,再通知CPU进行处理。 ### 同步IO模型和异步IO模型 ```python # 同步IO模型: do_some_code() f = open('/path/to/file', 'r') r = f.read() # <== 线程停在此处等待IO操作结果 # IO操作完成后线程才能继续执行: do_some_code(r) # 异步io模型 loop = get_event_loop() # 需要一个消息循环 while True: # 主线程不断地重复“读取消息-处理消息”这一过程 event = loop.get_event() process_event(event) ``` 在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,**在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。对于大多数IO密集型的应用程序,使用异步IO将大大提升系统的多任务处理能力。** 协程是实现异步IO的高级形式,又称微线程,纤程。英文名Coroutine。 子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。 子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。 ``` def A(): print('1') print('2') print('3') def B(): print('x') print('y') print('z') # 协程执行结果,可能如下 1 2 x y 3 z ``` 看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势? 主要行效率比多线程高很多,主要表现一下两点: 1. 不用切换线程,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。 2. 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了。 因为协程是一个线程执行,那怎么利用多核CPU呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。 Python对协程的支持是通过generator实现的。 在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。 ```python import inspect def consumer(): r = '' while True: # 3. 通过yield拿到消息n(最开始send进来的n为none,不返回只做启动用) # yield关键字右边可以不需要加表达式(yield默认返回None) n = yield r if not n: return # 4. 拿到n之后进行处理 print('[CONSUMER] Consuming %s...' % n) # 5. 处理完成,再下个循环又通过yield返回 r = '200 OK' def produce(c): # 1. 调用c.send(None)启动生成器; # GEN_CREATED: 等待开始执行 print(inspect.getgeneratorstate(c)) c.send(None) n = 0 while n < 3: n = n + 1 print('[PRODUCER] Producing %s...' % n) # 2. 一旦生产了东西,通过c.send(n)切换到consumer执行 r = c.send(n) print(inspect.getgeneratorstate(c)) # 6. 得到consumer处理的结果,再通过下一个循环,继续生产下一条消息 print('[PRODUCER] Consumer return: %s' % r) # 在close前,状态 都是 GEN_SUSPENDED # 在yield表达式处暂停 print(inspect.getgeneratorstate(c)) # 7. produce决定不生产了,通过c.close()关闭consumer,整个过程结束。 c.close() # close后 状态为GEN_CLOSED # 执行结束 print(inspect.getgeneratorstate(c)) c = consumer() produce(c) ``` > 整个流程无锁,由一个线程执行,`produce`和`consumer`协作完成任务,所以称为“协程”,而非线程的抢占式多任务。 ### 协程生成器的基本行为 协程有四个状态,可以使用`inspect.getgeneratorstate(...)`函数确定: GEN_CREATED # 等待开始执行 GEN_RUNNING # 解释器正在执行(只有在多线程应用中才能看到这个状态) GEN_SUSPENDED # 在yield表达式处暂停 GEN_CLOSED # 执行结束 ### 生成器api 1. `.send(value)`方法,生成器可以使用`.send(...)`方法发送数据,发送的数据会成为生成器函数中yield表达式的值。如上列中的n和r 2. `.throw(...)`方法,让调用方抛出异常,在生成器中处理 3. `.close()`方法,终止生成器 ## asyncio asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持,asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。 ### @asyncio.coroutine 用asyncio提供的@asyncio.coroutine可以把一个generator标记为coroutine类型,然后在coroutine内部用yield from调用另一个coroutine实现异步操作。 ```python import asyncio import threading # @asyncio.coroutine把一个generator标记为coroutine类型 @asyncio.coroutine def baby(num): print('baby %s sleep! (%s)' % (num,threading.currentThread())) # 异步调用asyncio.sleep(2)生成器: 假设是一个耗时2秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。 yield from asyncio.sleep(2) print('baby %s week up! (%s)' % (num,threading.currentThread())) # 获取EventLoop: 事件循环对象 loop = asyncio.get_event_loop() tasks = [baby(1), baby(2), baby(3)] # 把上面coroutine扔到EventLoop中执行 loop.run_until_complete(asyncio.wait(tasks)) loop.close() ''' baby 1 sleep! (<_MainThread(MainThread, started 29028)>) baby 2 sleep! (<_MainThread(MainThread, started 29028)>) baby 3 sleep! (<_MainThread(MainThread, started 29028)>) # (暂停约2秒,并且是在同一线程里面,实现了并发) baby 1 week up! (<_MainThread(MainThread, started 29028)>) baby 2 week up! (<_MainThread(MainThread, started 29028)>) baby 3 week up! (<_MainThread(MainThread, started 29028)>) 1. baby(1)执行到yield,线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环baby(2) 2. baby(2)执行到yield,线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环baby(3) 3. baby(3)执行到yield,线程不会等待asyncio.sleep(),而是直接中断并执行消息循环baby(1),至此所有操作都是以极快的时间完成的,花费了极少时间,此时三个baby同时都在睡眠,(asyncio.sleep) 4. 等待baby(1)睡眠完成,此时几乎同时其他baby也的睡眠也结束了,所以接着执行各个baby的打印wake up操作.结束整个消息循环,run_until_complete结束. ''' ``` **用asyncio的异步网络连接来获取sina、sohu和163的网站首页** ```python import asyncio @asyncio.coroutine def wget(host): print('wget %s...' % host) connect = asyncio.open_connection(host, 80) reader, writer = yield from connect header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host writer.write(header.encode('utf-8')) # 刷新底层传输的写缓冲区。也就是把需要发送出去的数据,从缓冲区发送出去。没有手工刷新,asyncio为你自动刷新了。当执行到reader.readline()时,asyncio知道应该把发送缓冲区的数据发送出去了。 yield from writer.drain() while True: line = yield from reader.readline() if line == b'\r\n': break print('%s header > %s' % (host, line.decode('utf-8').rstrip())) # Ignore the body, close the socket writer.close() loop = asyncio.get_event_loop() tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']] loop.run_until_complete(asyncio.wait(tasks)) loop.close() ``` ### async/await async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换: 1. 把@asyncio.coroutine替换为async; 2. 把yield from替换为await。 使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。**协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行**。 ```python import asyncio import threading async def baby(num): print('baby %s sleep! (%s)' % (num,threading.currentThread())) await asyncio.sleep(1) print('baby %s week up! (%s)' % (num,threading.currentThread())) loop = asyncio.get_event_loop() # ???? 执行完的顺序让人疑惑 tasks = [baby(2), baby(1), baby(3),baby(4),baby(5)] loop.run_until_complete(asyncio.wait(tasks)) loop.close() ``` > tips: await 和 yield from 可以理解为 “不等了” (主线程是一个事件循环,执行到await,就“我不等了,您慢慢执行,我先走一步,好了再给我说”) ### 绑定回调 ```python import time import asyncio now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) return 'Done after {}s'.format(x) def callback(future): print('Callback: ', future.result()) start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) loop.run_until_complete(task) print('TIME: ', now() - start) ``` **利用future对象回调别的函数** ?? future对象的特性,以下代码不太懂 ?? ```python import asyncio import functools def callback(future, n): print('{}: future done: {}'.format(n, future.result())) async def register_callbacks(all_done): print('registering callbacks on future') # 偏函数配合回调,all_done是future对象 all_done.add_done_callback(functools.partial(callback, n=1)) all_done.add_done_callback(functools.partial(callback, n=2)) async def main(all_done): # 到此同步中断,异步执行回调函数注册 await register_callbacks(all_done) print('setting result of future') all_done.set_result('the result') event_loop = asyncio.get_event_loop() try: all_done = asyncio.Future() event_loop.run_until_complete(main(all_done)) finally: event_loop.close() ''' registering callbacks on future setting result of future 1: future done: the result 2: future done: the result ''' ``` ### 多线程与asyncio对比 **多线程** ```python # sinner_thread.py import threading import itertools import time import sys # 这个类定义一个可变对象,用于从外部控制线程 class Signal: go = True # 这个函数会在单独的线程中运行,signal 参数是前边定义的Signal类的实例 def spin(msg, signal): write, flush = sys.stdout.write, sys.stdout.flush # itertools.cycle 函数从指定的序列中反复不断地生成元素 for char in itertools.cycle('|/-\\'): status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) # 使用退格符把光标移回行首 time.sleep(.1) # 每 0.1 秒刷新一次 if not signal.go: # 如果 go属性不是 True,退出循环 break write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除状态消息,把光标移回开头 def slow_function(): # 模拟耗时操作 # 假装等待I/O一段时间 time.sleep(20) # 调用sleep 会阻塞主线程,这么做事为了释放GIL,创建从属线程 return 42 # 这个函数设置从属线程,显示线程对象,运行耗时计算,最后杀死进程 def supervisor(): signal = Signal() spinner = threading.Thread(target=spin, args=('thinking!', signal)) print('spinner object:', spinner) # 显示线程对象 输出 spinner object: <Thread(Thread-1, initial)> spinner.start() # 启动从属进程 result = slow_function() # 运行slow_function 行数,阻塞主线程。同时从属线程以动画形式旋转指针 # python 并没有提供终止线程的API,所以若想关闭线程,必须给线程发送消息。这里我们使用signal.go 属性:在主线程中把它设置为False后,spinner 线程会接收到,然后退出 signal.go = False spinner.join() # 等待spinner 线程结束 return result def main(): result = supervisor() print('Answer', result) if __name__ == '__main__': main() ``` **协程** ```python # spinner_asyncio.py # 通过协程以动画的形式显示文本式旋转指针 import asyncio import itertools import sys async def spin(msg): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle('|/-\\'): # itertools.cycle 函数从指定的序列中反复不断地生成元素 status = char + ' ' + msg write(status) flush() write('\x08' * len(status)) # 使用退格符把光标移回行首 try: # 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 这样的休眠不会阻塞事件循环 # 除非想阻塞主线程,从而冻结事件循环或整个应用,否则不要再 asyncio 协程中使用 time.sleep().如果协程需要在一段时间内什么都不做,应该使用 yield from asyncio.sleep(DELAY) # 此处相当于另一协程 await asyncio.sleep(0.1) # 如果 spin 函数苏醒后抛出 asyncio.CancelledError 异常,其原因是发出了取消请求 except asyncio.CancelledError as e: print(str(e)) break write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除状态消息,把光标移回开头 async def slow_function(): # 5 现在此函数是协程,使用休眠假装进行I/O 操作时,使用 yield from 继续执行事件循环 # 假装等待I/O一段时间 await asyncio.sleep(3) # 此表达式把控制权交给主循环,在休眠结束后回复这个协程 return 42 # ?? 不能改为asynic supervisor 否则asyncio.async会报错 ,已找到原因已被asyncio.ensure_future替代?? async def supervisor(): spinner = asyncio.ensure_future(spin('thinking!')) # asyncio.async() 函数排定协程的运行时间,使用一个 Task 对象包装spin 协程,并立即返回 print('spinner object:', spinner) # Task 对象,输出类似 spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>> # 驱动slow_function() 函数,结束后,获取返回值。同事事件循环继续运行, # 因为slow_function 函数最后使用yield from asyncio.sleep(3) 表达式把控制权交给主循环 result = await slow_function() # Task 对象可以取消;取消后会在协程当前暂停的yield处抛出 asyncio.CancelledError 异常 # 协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消 spinner.cancel() return result def main(): loop = asyncio.get_event_loop() # 获取事件循环引用 # 驱动supervisor 协程,让它运行完毕;这个协程的返回值是这次调用的返回值 result = loop.run_until_complete(supervisor()) loop.close() print('Answer', result) if __name__ == '__main__': main() ``` **分析两段代码** 1. Task对象不由自己动手实例化,而是通过把协程传给 asyncio.async(...) 函数或 loop.create_task(...) 方法获取Task 对象已经排定了运行时间;而Thread 实例必须调用start方法,明确告知它运行 2. 在线程版supervisor函数中,slow_function 是普通的函数,由线程直接调用,而异步版的slow_function 函数是协程,由yield from 驱动。 3. 没有API能从外部终止线程,因为线程随时可能被中断。而协程如果想终止任务,可以使用Task.cancel() 实例方法,在协程内部抛出CancelledError 异常。协程可以在暂停的yield 处捕获这个异常,处理终止请求 4. supervisor 协程必须在main 函数中由loop.run_until_complete 方法执行。 5. **协程和线程相比关键的一个优点是**线程必须记住保留锁,去保护程序中的重要部分,防止多步操作在执行的过程中中断,而协程默认会做好保护,我们必须显式产出(使用yield 或 yield from 交出控制权)才能让程序的余下部分运行。 #### asyncio.Future:故意不阻塞 asyncio.Future 类与 concurrent.futures.Future 类的接口基本一致,不过实现方式不同,不可互换。在 concurrent.futures.Future 中,future只是调度执行某物的结果。在 asyncio 包中,BaseEventLoop.create_task(...) 方法接收一个协程,排定它的运行时间,然后返回一个asyncio.Task 实例(也是asyncio.Future 类的实例,因为 Task 是 Future 的子类,用于包装协程。(在 concurrent.futures.Future 中,类似的操作是Executor.submit(...))。 与concurrent.futures.Future 类似,asyncio.Future 类也提供了: * `.done()` 返回布尔值,表示Future 是否已经执行 * `.add_done_callback()` 这个方法只有一个参数,类型是可调用对象,Future运行结束后会回调这个对象。 * `.result()` 这个方法没有参数,因此不能指定超时时间。 如果调用 .result() 方法时期还没有运行完毕,会抛出 asyncio.InvalidStateError 异常。 ### 协程嵌套 (协程的常用方式) ```python import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] ''' # wait 有timeout参数 dones, pendings = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result()) ''' # 如果使用的是 asyncio.gather创建协程对象,那么await的返回值就是协程运行的结果。 # 使用asyncio.wait(tasks)返回的顺序有点难以理解,但使用asyncio.gather(*tasks)返回值的顺序就好理解得多 results = await asyncio.gather(*tasks) for result in results: print('Task ret: ', result) start = now() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print('TIME: ', now() - start) ``` **抛出返回值到run_until_complete:** ```python async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.gather(*tasks) start = now() loop = asyncio.get_event_loop() # 也可以直接返回到run_until_complete处理协程结果 results = loop.run_until_complete(main()) for result in results: print('Task ret: ', result) print('TIME: ', now() - start) ``` ***使用as_completed** ```python async def main(num): tasks = [] i = 1 while i <= num : tasks.append(asyncio.ensure_future(do_some_work(i))) i += 1 for task in asyncio.as_completed(tasks): result = await task print('Task ret: {}'.format(result)) start = now() loop = asyncio.get_event_loop() done = loop.run_until_complete(main(10)) print('TIME: ', now() - start) ``` ### 协程停止 future对象有几个状态:Pending,Running,Done,Cancelled. 创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消。可以使用asyncio.Task获取事件循环的task ```python try: loop = asyncio.get_event_loop() loop.run_until_complete(main(5)) except KeyboardInterrupt as e: for task in asyncio.Task.all_tasks(): # 启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。然后通过循环asyncio.Task取消future。 print(task) # 返回true或false,已执行的返回false print(task.cancel()) #loop stop之后还需要再次开启事件循环,最后再close,不然还会抛出异常: # 抛出异常后要重新启动循环 loop.stop() loop.run_forever() finally: loop.close() ``` **批量停止** ```python loop = asyncio.get_event_loop() task = asyncio.ensure_future(main(5)) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) print('-------------------') # 批量停止,如果全部停止成功就直接返回true,与上列不同 print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever() finally: loop.close() ``` ### 不同线程的事件循环 很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。 ```python import asyncio from threading import Thread import time now = lambda: time.time() def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() new_loop.call_soon_threadsafe(more_work, 6) new_loop.call_soon_threadsafe(more_work, 3) # 这里的计时没有意义,因为more_work具体的执行是在新的两个线程里面 print('TIME: {}'.format(now() - start)) ``` 启动上述代码之后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法,后者因为time.sleep操作是同步阻塞的,因此运行完毕more_work需要大致6 + 3 ### 新线程协程 ```python def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def do_some_work(x): print('Waiting {}'.format(x)) await asyncio.sleep(x) print('Done after {}s'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop) print('TIME: {}'.format(time.time() - start)) ``` 上述的例子,主线程中创建一个new_loop,然后在另外的两个子线程中开启一个无限事件循环。主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block。一共执行的时间大概在6s左右。 ### master-worker主从模式 对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。 为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。 ```python while True: task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) ``` ### 停止子线程 如果一切正常,那么上面的例子很完美。可是,需要停止程序,直接ctrl+c,会抛出KeyboardInterrupt错误,我们修改一下主循环: ```python try: while True: task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except KeyboardInterrupt as e: print(e) new_loop.stop() ``` 可是实际上并不好使,虽然主线程try了KeyboardInterrupt异常,但是子线程并没有退出,为了解决这个问题,可以设置子线程为守护线程,这样当主线程结束的时候,子线程也随机退出。 ```python new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.setDaemon(True) # 设置子线程为守护线程 t.start() try: while True: # print('start rpop') task = rcon.rpop("queue") if not task: time.sleep(1) continue asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except KeyboardInterrupt as e: print(e) new_loop.stop() ``` 线程停止程序的时候,主线程退出后,子线程也随机退出才了,并且停止了子线程的协程任务。 ```python try: while True: # 用brpop方法,会block住task,如果主线程有消息,才会消费。 # 这种方式更适合队列消费,不用上面的要停顿一秒 _, task = rcon.brpop("queue") asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop) except Exception as e: print('error', e) new_loop.stop() finally: pass ``` ### 协程消费 主线程用于监听队列,然后子线程的做事件循环的worker是一种方式。还有一种方式实现这种类似master-worker的方案。即把监听队列的无限循环挪进协程中。程序初始化就创建若干个协程,实现类似并行的效果。一般这个方案就可以了 ```python import time import asyncio import redis now = lambda : time.time() # 最多开多少个协程 MAX_COROUTINES = 10 def get_redis(): connection_pool = redis.ConnectionPool(host='127.0.0.1', db=1,port=6379) # connection_pool = redis.ConnectionPool(host='172.28.3.24', db=1,port=6379) return redis.Redis(connection_pool=connection_pool) rcon = get_redis() async def worker(): print('Start worker') while True: start = now() task = rcon.rpop("queue") if not task: await asyncio.sleep(1) continue print('Wait ', int(task)) await asyncio.sleep(int(task)) print('Done ', task, now() - start) def main(): i = 0 while i < MAX_COROUTINES: asyncio.ensure_future(worker()) i += 1 loop = asyncio.get_event_loop() try: loop.run_forever() except KeyboardInterrupt as e: print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) loop.stop() loop.run_forever() finally: loop.close() if __name__ == '__main__': main() ``` ## aiohttp asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。 asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。 ### 实现web服务器 ```python import asyncio from aiohttp import web async def index(request): await asyncio.sleep(0.5) return web.Response(body=b'<h1>Index</h1>') async def hello(request): await asyncio.sleep(0.5) text = '<h1>hello, %s!</h1>' % request.match_info['name'] return web.Response(body=text.encode('utf-8')) async def init(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', index) app.router.add_route('GET', '/hello/{name}', hello) srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000) print('Server started at http://127.0.0.1:8000...') return srv loop = asyncio.get_event_loop() loop.run_until_complete(init(loop)) loop.run_forever() ``` ## 例1,[使用asyncio 和 aiohttp 包下载国旗][1] ## 例2,[使用python-aiohttp爬取网易云音乐,今日头条,搭建微信公众平台][2] [1]: https://gitee.com/nixi8/Python/tree/master/script [2]: https://github.com/SigalHu/WeiXin