Python的协程详细解释

在实际中遇到这样一个问题,公司软件发布上线自动化。

说简单点,就是需要去登录一个上线的内部网站,然后爬下所有的上线数据。

然后根据爬下来的数据整理好,可以一起上线的,就并发多线程,其实就是去传参数点击一个链接等返回。

不能并发的就单线程点链接。

那这个事情必须更有效率,单线程的没问题,用 python 的 request 就可以实现了。

我们仔细研究一下协程,先讲一下历史:

使用Python的人往往纠结在多线程、多进程,哪个效率更高?到底用哪个好呢?

其实 Python 的多进程和多线程,相对于别家的协程和异步处理机制,都不行,线程之间切换耗费 CPU 和寄存器,OS 的调度不可控,多进程之间通讯也不便。性能根本不行。

后来呢 Python 改进了语法,出现了 yiled from 充当协程调度,有人就根据这个特性开发了第三方的协程框架,Tornado,Gevent等。

官方也不能坐视不理啊,任凭别人出风头,于是 Python 之父深入简出3年,苦心钻研自家的协程,async/await 和 asyncio 库,并放到 Python3.5 后成为官方原生的协程。

对于 http请求、读写文件、读写数据库这种高延时的 IO 操作,协程是个大杀器,优点非常多;它可以在预料到一个阻塞将发生时,挂起当前协程,跑去执行其它协程,同时把事件注册到循环中,实现了多协程并发,其实这玩意是跟 Nodejs 的回调学的。

看下图,详细解释下,左边我们有100个网页请求,并发100个协程请求(其实也是1个1个发),当需要等待长时间回应回应时,挂起当前协程,并注册一个回调函数到事件循环(Event Loop)中,执行下一个协程,当有协程事件完成再通过回调函数唤醒挂起的协程,然后返回结果。

这个跟 nodejs 的回调函数基本一样,我们必须注意主进程和协程的关系,如果我在一个主进程中,触发协程函数,有100个协程,那么必须等待100个协程都结束后,才能回到正常的那个主进程中。当然,主进程也可能也是一个协程。

那么协程的基本用法

  • async f(n) 声明一个函数是协程的

  • await f(n) 挂起当前协程,把控制权交回 event loop,并且执行f(n)和注册之后的f(n)回调。

    举个例子:如果在 g() 这个函数中执行了 await f(),那么g()函数会被挂起,并等待 f() 函数有结果结束,然后返回 g() 继续执行。

1async def get(url):
2    async with aiohttp.ClientSession() as session:
3        async with session.get(url) as response: 
4            return await response.text() 

最后一行 await 是挂起命令,挂起当前函数 get() ,并执行 response.text() 和注册回调,等待 response.text() 执行完成后重新激活当前函數get()继续执行,返回。

所以 await 只叫做挂起是不太对的,感觉应该叫做 挂起并注册回调 比较合适。

看以下程序,在 Python 3.7 之前,协程是这么用的:

 1import time
 2import asyncio
 3
 4now = lambda : time.time()
 5
 6async def do_some_work(x): 
 7    print('Waiting: ', x) 
 8
 9start = now()
10coroutine = do_some_work(2)
11loop = asyncio.get_event_loop()
12loop.run_until_complete(coroutine)
13print('TIME: ', now() - start)

我们指定了一个协程 coroutine ,然后定义了一个事件循环 loop,loop 是需要 run_until_complete 所有的协程,然后交出控制权,返回正常的主进程。

跟上图完全匹配。

在 Python 3.7 之后,简化了用法,一句 asyncio.run 就可以了:

1asyncio.run(do_some_work(2))

上面程序就变了,省了好多,但是副作用是第一次看到的人会不明白它是怎么进化过来的:

 1import time
 2import asyncio
 3
 4now = lambda : time.time()
 5
 6async def do_some_work(x): 
 7    print('Waiting: ', x) 
 8
 9start = now()
10asyncio.run(do_some_work(2))
11print('TIME: ', now() - start)

如果我们要访问一个网站的100个网页,单线程的做法是:请求一次,回来一次,然后进行下一个

1for url in urls
2  response=get(url)
3  results=parse(response)

这样效率很低,协程呢,做法就不同了,一次发起100个请求(准确的说也是一个一个发),不同的是协程不会死等返回,而是发一个请求,挂起,再发一个再挂起,发起100个,就挂起100个,然后注册并等待100个返回,效率提升了100倍。可以理解为同时做了100件事,做到由自己调度而不是交给CPU,程序的并发由自己来控制,而不是交由 OS 去调度,效率极大的提高了。

进化到协程,我们把费 IO 的 get 函数抽出来放到协程里:

 1async def get(url:str):
 2    my_conn = aiohttp.TCPConnector(limit=10)
 3    async with aiohttp.ClientSession(connector=my_conn) as session:
 4        async with session.get(url) as resp:
 5            return await resp.text()
 6
 7
 8for url in urls
 9  response=asyncio.run(get(url))
10  results=parse(response)
11        

具体到我们的项目,我们首先要登录一个网页拿到 cookie,这个过程其实就一个协程,没人会登录个几百次吧。然后把放了 cookie 的 session 取出来,供后面的协程再复用就可以了,示例代码如下:

 1import aiohttp
 2import asyncio 
 3
 4async def login():
 5    my_conn = aiohttp.TCPConnector(limit=10)
 6    async with aiohttp.ClientSession(connector=my_conn) as session:
 7        data = {'loginname':'wangbadan','password':'Fuckyouall'}
 8        async with session.post('http://192.168.1.3/user/login',data=data) as resp:
 9            print(resp.url)
10            print(resp.status)
11            print(await resp.text())
12            return session
13
14session = asyncio.run(login())
15print(f"{session}")

再给一个完全版的主函数是进程,下载是协程的例子,注意里面的 aiohttp.TCPConnector(limit=10),限制一下并发是10个,否则会被服务器 Ban 掉:

 1import asyncio
 2import time 
 3import aiohttp
 4from aiohttp.client import ClientSession
 5
 6async def download_link(url:str,session:ClientSession):
 7    async with session.get(url) as response:
 8        result = await response.text()
 9        print(f'Read {len(result)} from {url}')
10
11async def download_all(urls:list):
12    my_conn = aiohttp.TCPConnector(limit=10)
13    async with aiohttp.ClientSession(connector=my_conn) as session:
14        tasks = []
15        for url in urls:
16            task = asyncio.ensure_future(download_link(url=url,session=session))
17            tasks.append(task)
18        await asyncio.gather(*tasks,return_exceptions=True) # the await must be nest inside of the session
19
20url_list = ["https://www.google.com","https://www.bing.com"]*50
21print(url_list)
22start = time.time()
23asyncio.run(download_all(url_list))
24end = time.time()
25print(f'download {len(url_list)} links in {end - start} seconds')

协程里的 session 也有很多种用法,参考下面的链接就好:

https://blog.csdn.net/weixin_39643613/article/details/109171090

我们也给出简单易用的线程池版,说不定以后会用上:

image-20211112091410467

 1import requests
 2from requests.sessions import Session
 3import time
 4from concurrent.futures import ThreadPoolExecutor
 5from threading import Thread,local
 6
 7url_list = ["https://www.google.com/","https://www.bing.com"]*50
 8thread_local = local()
 9
10def get_session() -> Session:
11    if not hasattr(thread_local,'session'):
12        thread_local.session = requests.Session()
13    return thread_local.session
14
15def download_link(url:str):
16    session = get_session()
17    with session.get(url) as response:
18        print(f'Read {len(response.content)} from {url}')
19
20def download_all(urls:list) -> None:
21    with ThreadPoolExecutor(max_workers=10) as executor:
22        executor.map(download_link,url_list)
23
24start = time.time()
25download_all(url_list)
26end = time.time()
27print(f'download {len(url_list)} links in {end - start} seconds')

Kubernetes使用filebeat Multiline自定义收集日志
GlusterFS文件系统的优化
comments powered by Disqus