Python 从业十年是种什么体验?老程序员的一篇万字经验分享( 四 )

  • Python3 asyncio 简介()
上面的例子全部都基于 3.7 , 如果你还在使用 Py2 , 那么你也可以通过 gevent、tornado 用上协程 。
我个人倾向于 tornado , 因为更为白盒 , 而且写法和 3 接近 , 如果你也赞同 , 那么可以试试我以前给公司写的 kipp 库 , 基于 tornado 封装了更多的工具 。
Gevent Demo:
#!/usr/bin/env python# -*- coding: utf-8 -*-"""Gevent Pool & Child Tasks=========================You can use gevent.pool.Pool to limit the concurrency of coroutines.And you can create unlimit subtasks in each coroutine.Benchmark=========cost 2.675039052963257s for url cost 2.66813588142395s for url ipcost 2.674264907836914s for url user-agentcost 2.6776888370513916s for url getcost 3.97711181640625s for url headerstotal cost 3.9886841773986816s"""import timeimport geventfrom gevent.pool import Poolimport gevent.monkeypool = Pool(10) # set the concurrency limitgevent.monkey.patch_sockettry:import urllib2except ImportError:import urllib.request as urllib2TARGET_URLS = ('','ip','user-agent','headers','get',)def demo_child_task:"""Sub coroutine task"""gevent.sleep(2)def demo_task(url):"""Main coroutineYou should wrap your each task into one entry coroutine,then spawn its own sub coroutine tasks."""start_ts = time.timer = urllib2.urlopen(url)demo_child_taskprint('cost {}s for url {}'.format(time.time - start_ts, url))def main:start_ts = time.timepool.map(demo_task, TARGET_URLS)print('total cost {}s'.format(time.time - start_ts))if __name__ == '__main__':maintornado demo:
#!/usr/bin/env python# -*- coding: utf-8 -*-"""cost 0.5578329563140869s, get getcost 0.5621621608734131s, get ipcost 0.5613000392913818s, get user-agentcost 0.5709919929504395s, get cost 0.572376012802124s, get headerstotal cost 0.5809519290924072s"""import timeimport tornadoimport tornado.webimport tornado.httpclientTARGET_URLS = ['','ip','user-agent','headers','get',]@tornado.gen.coroutinedef demo_hanlder(ioloop):for i, url in enumerate(TARGET_URLS):demo_task(url, ioloop=ioloop)@tornado.gen.coroutinedef demo_task(url, ioloop=None):start_ts = time.timehttp_client = tornado.httpclient.AsyncHTTPClientr = yield http_client.fetch(url)# r is the response objectend_ts = time.timeprint('cost {}s, get {}'.format(end_ts - start_ts, url))TARGET_URLS.remove(url)if not TARGET_URLS:ioloop.stopdef main:start_ts = time.timeioloop = tornado.ioloop.IOLoop.instanceioloop.add_future(demo_hanlder(ioloop), lambda f: None)ioloop.start# total cost will equal to the longest taskprint('total cost {}s'.format(time.time - start_ts))if __name__ == '__main__':mainkipp demo:
from time import sleepfrom kipp.aio import coroutine2, run_until_complete, sleep, return_in_coroutinefrom kipp.utils import ThreadPoolExecutor, get_loggerexecutor = ThreadPoolExecutor(10)logger = get_logger@coroutine2def coroutine_demo:logger.info('start coroutine_demo')yield sleep(1)logger.info('coroutine_demo done')yield executor.submit(blocking_func)return_in_coroutine('yeo')def blocking_func:logger.info('start blocking task...')sleep(1)logger.info('blocking task return')return 'hard'@coroutine2def coroutine_main:logger.info('start coroutine_main')r = yield coroutine_demologger.info('coroutine_demo return: {}'.format(r))yield sleep(1)return_in_coroutine('coroutine_main yo')def main:f = coroutine_mainrun_until_complete(f)logger.info('coroutine_main return: {}'.format(f.result))if __name__ == '__main__':main使用 tornado 时需要注意 , 因为它依赖 generator 来模拟协程 , 所以函数无法返回 , 只能用 raise gen.Return 来模拟 。 3.4 里引入了 yield from 到 3.6 的 async/await 才算彻底解决了这个问题 。 还有就是小心 tornado 里的 Future 不是线程安全的 。
至于 gevent , 容我吐个槽 , 求别再提 monkey_patch 了…
library/asyncio-task.html 官方文档对于 asyncio 的描述很清晰易懂 , 推荐一读 。 一个小提示 , async 函数被调用后会创建一个 coroutine , 这时候该协程并不会运行 , 需要通过 ensure_future 或 create_task 方法生成 Task 后才会被调度执行 。
另外 , 一个进程内不要创建多个 ioloop 。
做一个小结 , 一个简单的做法是 , 启动程序后 , 分别创建一个进程池(进程数小于等于可用核数)、线程池和 ioloop , ioloop 负责调度一切的协程 , 遇到阻塞的调用时 , I/O 型的扔进线程池 , CPU 型的扔进进程池 , 这样代码逻辑简单 , 还能尽可能的利用机器性能 。 一个简单的完整示例:
"""? python process_thread_coroutine.py[2019-08-11 09:09:37,670Z - INFO - kipp] - main running...[2019-08-11 09:09:37,671Z - INFO - kipp] - coroutine_main running...[2019-08-11 09:09:37,671Z - INFO - kipp] - io_blocking_task running...[2019-08-11 09:09:37,690Z - INFO - kipp] - coroutine_task running...[2019-08-11 09:09:37,691Z - INFO - kipp] - coroutine_error running...[2019-08-11 09:09:37,691Z - INFO - kipp] - coroutine_error end, cost 0.00s[2019-08-11 09:09:37,693Z - INFO - kipp] - cpu_blocking_task running...[2019-08-11 09:09:38,674Z - INFO - kipp] - io_blocking_task end, cost 1.00s[2019-08-11 09:09:38,695Z - INFO - kipp] - coroutine_task end, cost 1.00s[2019-08-11 09:09:39,580Z - INFO - kipp] - cpu_blocking_task end, cost 1.89s[2019-08-11 09:09:39,582Z - INFO - kipp] - coroutine_main got [None, AttributeError('yo'), None, None][2019-08-11 09:09:39,582Z - INFO - kipp] - coroutine_main end, cost 1.91s[2019-08-11 09:09:39,582Z - INFO - kipp] - main end, cost 1.91s"""from time import sleep, timefrom asyncio import get_event_loop, sleep as asleep, gather, ensure_future, iscoroutinefrom concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, waitfrom functools import wrapsfrom kipp.utils import get_loggerlogger = get_loggerN_FORK = 4N_THREADS = 10thread_executor = ThreadPoolExecutor(max_workers=N_THREADS)process_executor = ProcessPoolExecutor(max_workers=N_FORK)ioloop = get_event_loopdef timer(func):@wraps(func)defwrapper(*args, **kw):logger.info(f"{func.__name__} running...")start_at = timetry:r = func(*args, **kw)finally:logger.info(f"{func.__name__} end, cost {time - start_at:.2f}s")return wrapperdef async_timer(func):@wraps(func)async defwrapper(*args, **kw):logger.info(f"{func.__name__} running...")start_at = timetry:return await func(*args, **kw)finally:logger.info(f"{func.__name__} end, cost {time - start_at:.2f}s")return wrapper@timerdef io_blocking_task:"""I/O 型阻塞调用"""sleep(1)@timerdef cpu_blocking_task:"""CPU 型阻塞调用"""for _ in range(1