Python协程
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
【1】yield与协程
def foo():
print("OK1")
yield 100 # 切换: 保存/恢复的功能
print("OK2")
yield 1000
def bar():
print("OK3")
yield 200
print("OK4")
yield 2000
gen = foo()
ret = next(gen) # gen.__next__()
print(ret)
gen2 = bar()
ret2 = next(gen2) # gen.__next__()
print(ret2)
ret = next(gen) # gen.__next__()
print(ret)
ret2 = next(gen2) # gen.__next__()
print(ret2)
【2】asyncio模块
asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。
为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。
asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
import asyncio
async def task(i):
print(f"task {i} start")
await asyncio.sleep(1)
print(f"task {i} end")
# 创建事件循环对象
loop = asyncio.get_event_loop()
# 直接将协程对象加入时间循环中
tasks = [task(1), task(2)]
# asyncio.wait:将协程任务进行收集,功能类似后面的asyncio.gather
# run_until_complete阻塞调用,直到协程全部运行结束才返回
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
task
: 任务,对协程对象的进一步封装,包含任务的各个状态;asyncio.Task是Future的一个子类,用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数loop.create_task() 或 asyncio.ensure_future()
创建。
import asyncio, time
async def work(i, n): # 使用async关键字定义异步函数
print('任务{}等待: {}秒'.format(i, n))
await asyncio.sleep(n) # 休眠一段时间
print('任务{}在{}秒后返回结束运行'.format(i, n))
return i + n
start_time = time.time() # 开始时间
tasks = [asyncio.ensure_future(work(1, 1)),
asyncio.ensure_future(work(2, 2)),
asyncio.ensure_future(work(3, 3))]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('运行时间: ', time.time() - start_time)
for task in tasks:
print('任务执行结果: ', task.result())
【3】3.8版本+
async.run()
运行协程async.create_task()
创建taskasync.gather()
获取返回值
import asyncio, time
async def work(i, n): # 使用async关键字定义异步函数
print('任务{}等待: {}秒'.format(i, n))
await asyncio.sleep(n) # 休眠一段时间
print('任务{}在{}秒后返回结束运行'.format(i, n))
return i + n
tasks = []
async def main():
global tasks
tasks = [asyncio.create_task(work(1, 1)),
asyncio.create_task(work(2, 2)),
asyncio.create_task(work(3, 3))]
await asyncio.wait(tasks) # 阻塞
start_time = time.time() # 开始时间
asyncio.run(main())
print('运行时间: ', time.time() - start_time)
for task in tasks:
print('任务执行结果: ', task.result())
asyncio.create_task() 函数在 Python 3.7 中被加入。
asyncio.gather
方法
# 用gather()收集返回值
import asyncio, time
async def work(i, n): # 使用async关键字定义异步函数
print('任务{}等待: {}秒'.format(i, n))
await asyncio.sleep(n) # 休眠一段时间
print('任务{}在{}秒后返回结束运行'.format(i, n))
return i + n
async def main():
tasks = [asyncio.create_task(work(1, 1)),
asyncio.create_task(work(2, 2)),
asyncio.create_task(work(3, 3))]
# 将task作为参数传入gather,等异步任务都结束后返回结果列表
response = await asyncio.gather(tasks[0], tasks[1], tasks[2])
print("异步任务结果:", response)
start_time = time.time() # 开始时间
asyncio.run(main())
print('运行时间: ', time.time() - start_time)
【4】aiohttp
我们之前学习过爬虫最重要的模块requests,但它是阻塞式的发起请求,每次请求发起后需阻塞等待其返回响应,不能做其他的事情。本文要介绍的aiohttp可以理解成是和requests对应Python异步网络请求库,它是基于 asyncio 的异步模块,可用于实现异步爬虫,有点就是更快于 requests 的同步爬虫。安装方式,pip install aiohttp。
aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio
的异步库。asyncio可以实现单线程并发IO操作,其实现了TCP、UDP、SSL等协议,aiohttp就是基于asyncio实现的http框架。
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get("http://httpbin.org/headers") as response:
print(await response.text())
asyncio.run(main())
【5】应用案例(异步爬虫)
(1)异步爬取斗图网
import aiohttp
import asyncio
import os
from lxml import etree
import time
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
}
async def get_img_urls():
url = "https://www.pkdoutu.com/photo/list/"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, ssl=False) as resp:
text = await resp.content.read()
selector = etree.HTML(text)
img_urls = selector.xpath('//li[@class="list-group-item"]/div/div/a/img[@data-backup]/@data-backup')
return img_urls
async def async_download(url):
name = os.path.basename(url)
async with aiohttp.ClientSession() as session:
async with session.get(url, ssl=False) as resp:
# 将得到的请求保存到文件中
with open(f"imgs/{name}.jpg", "wb") as f:
f.write(await resp.content.read())
async def main():
tasks = [asyncio.create_task(async_download(url)) for url in await get_img_urls()]
await asyncio.wait(tasks)
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
print(time.time() - start)
(2)异步爬取m3u8视频
m3u8串行爬取版本
import requests
import re
import os
session = requests.Session()
# (1) 爬取m3u8文件的链接
url = "https://www.9tata.cc/play/14999-1-0.html"
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'
}
res = session.get(url, headers=headers, verify=False)
m3u8_link = re.search('now="(.*?m3u8)"', res.text).group(1)
print(m3u8_link) # https://ikcdn01.ikzybf.com/20221015/ecMSO74h/index.m3u8
# 顺便抓一个名字
name = re.search(r'<td class="col2 hidden-xs">(?P<name>\w+)</td>', res.text).group("name")
print("name", name)
# (2) 爬取m3u8文件
res = session.get(m3u8_link)
print(res.text.split("\n")[-1])
m3u8_detail_link = os.path.join(os.path.dirname(m3u8_link), res.text.split("\n")[-1])
print(m3u8_detail_link)
# (3) 爬取m3u8具体文件
res = requests.get(m3u8_detail_link)
print(res.text)
# (4) 解析ts文件
ret = re.findall(r"\n(.*?\.ts)", res.text)
print(ret)
# (5) 下载每一个ts文件
p = os.path.dirname(m3u8_detail_link)
with open(name, "ab") as f:
for ts in ret:
path = os.path.join(p, ts)
res = requests.get(path)
f.write(res.content)
print(f"{ts}下载完成!")
m3u8并发爬取版本
import requests
import re
import os
import aiohttp
import asyncio
import time
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'
}
async def get_m3u8_link():
# (1) 爬取m3u8文件的链接
url = "https://www.9tata.cc/play/14999-1-0.html"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, ssl=False) as response:
res = await response.text()
m3u8_link = re.search('now="(.*?m3u8)"', res).group(1)
print(m3u8_link) # https://ikcdn01.ikzybf.com/20221015/ecMSO74h/index.m3u8
# 顺便抓一个名字
name = re.search(r'<td class="col2 hidden-xs">(?P<name>\w+)</td>', res).group("name")
print("name", name)
return m3u8_link, name
async def get_m3u8_detail_link(m3u8_link):
# (2) 爬取m3u8文件
async with aiohttp.ClientSession() as session:
async with session.get(m3u8_link, headers=headers, ssl=False) as response:
res = await response.text()
m3u8_detail_link = os.path.join(os.path.dirname(m3u8_link), res.split("\n")[-1])
print(m3u8_detail_link)
return m3u8_detail_link
async def parse_ts(m3u8_detail_link):
# (3) 爬取m3u8具体文件
async with aiohttp.ClientSession() as session:
async with session.get(m3u8_detail_link, headers=headers, ssl=False) as response:
with open("ts文件/index.m3u8", "w") as f:
res_text = await response.text()
f.write(res_text)
# (4) 解析ts文件
ret = re.findall(r"\n(.*?\.ts)", res_text)
print(ret)
return ret
async def download_video(m3u8_detail_link, ts_list):
p = os.path.dirname(m3u8_detail_link)
tasks = []
for ts in ts_list:
path = os.path.join(p, ts)
task = asyncio.create_task(download_ts(path, ts))
tasks.append(task)
await asyncio.wait(tasks)
async def download_ts(path, ts):
if not os.path.exists("ts文件"):
os.mkdir("ts文件")
async with aiohttp.ClientSession() as session:
async with session.get(path, ssl=False) as resp:
# 将得到的请求保存到文件中
with open("ts文件/" + ts, "wb") as f:
f.write(await resp.content.read())
print(f"{ts}下载完成!")
def merge(filename='out'):
'''
视频合并
:return:
'''
# 进入到下载后ts 的目录
os.chdir("ts文件")
# 视频合并的命令
os.system(f'ffmpeg -i index.m3u8 -c copy {filename}.mp4')
print(f'视频{filename} 合并成功====')
async def main():
m3u8_link, name = await get_m3u8_link()
m3u8_detail_link = await get_m3u8_detail_link(m3u8_link)
ts_list = await parse_ts(m3u8_detail_link)
await download_video(m3u8_detail_link, ts_list)
merge(name)
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
print(time.time() - start)
- 知识点:ffmpeg -i index.m3u8 -c copy apple.mp4
- 知识点:os.system执行终端命令