python-asyncio

  1. Python协程
  2. 【1】yield与协程
  3. 【2】asyncio模块
  4. 【3】3.8版本+
  5. 【4】aiohttp
  6. 【5】应用案例(异步爬虫)
    1. (1)异步爬取斗图网
    2. (2)异步爬取m3u8视频

Python协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

【1】yield与协程

def foo():
    print("OK1")
    yield 100  # 切换: 保存/恢复的功能
    print("OK2")
    yield 1000


def bar():
    print("OK3")
    yield 200
    print("OK4")
    yield 2000


gen = foo()
ret = next(gen)    # gen.__next__()
print(ret)

gen2 = bar()
ret2 = next(gen2)  # gen.__next__()
print(ret2)

ret = next(gen)    # gen.__next__()
print(ret)

ret2 = next(gen2)  # gen.__next__()
print(ret2)

【2】asyncio模块

asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法asyncawait,可以让coroutine的代码更简洁易读。

asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

import asyncio


async def task(i):
    print(f"task {i} start")
    await asyncio.sleep(1)
    print(f"task {i} end")


# 创建事件循环对象
loop = asyncio.get_event_loop()
# 直接将协程对象加入时间循环中
tasks = [task(1), task(2)]
# asyncio.wait:将协程任务进行收集,功能类似后面的asyncio.gather
# run_until_complete阻塞调用,直到协程全部运行结束才返回
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

task: 任务,对协程对象的进一步封装,包含任务的各个状态;asyncio.Task是Future的一个子类,用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数loop.create_task() 或 asyncio.ensure_future()创建。

import asyncio, time


async def work(i, n):  # 使用async关键字定义异步函数
    print('任务{}等待: {}秒'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print('任务{}在{}秒后返回结束运行'.format(i, n))
    return i + n


start_time = time.time()  # 开始时间

tasks = [asyncio.ensure_future(work(1, 1)),
         asyncio.ensure_future(work(2, 2)),
         asyncio.ensure_future(work(3, 3))]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

print('运行时间: ', time.time() - start_time)
for task in tasks:
    print('任务执行结果: ', task.result())

【3】3.8版本+

async.run() 运行协程
async.create_task()创建task
async.gather()获取返回值

import asyncio, time

async def work(i, n):  # 使用async关键字定义异步函数
    print('任务{}等待: {}秒'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print('任务{}在{}秒后返回结束运行'.format(i, n))
    return i + n


tasks = []
async def main():
    global tasks
    tasks = [asyncio.create_task(work(1, 1)),
             asyncio.create_task(work(2, 2)),
             asyncio.create_task(work(3, 3))]

    await asyncio.wait(tasks) # 阻塞


start_time = time.time()  # 开始时间
asyncio.run(main())
print('运行时间: ', time.time() - start_time)
for task in tasks:
    print('任务执行结果: ', task.result())

asyncio.create_task() 函数在 Python 3.7 中被加入。

asyncio.gather方法

# 用gather()收集返回值

import asyncio, time


async def work(i, n):  # 使用async关键字定义异步函数
    print('任务{}等待: {}秒'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print('任务{}在{}秒后返回结束运行'.format(i, n))
    return i + n


async def main():
    tasks = [asyncio.create_task(work(1, 1)),
             asyncio.create_task(work(2, 2)),
             asyncio.create_task(work(3, 3))]

    # 将task作为参数传入gather,等异步任务都结束后返回结果列表
    response = await asyncio.gather(tasks[0], tasks[1], tasks[2])
    print("异步任务结果:", response)


start_time = time.time()  # 开始时间

asyncio.run(main())

print('运行时间: ', time.time() - start_time)

【4】aiohttp

我们之前学习过爬虫最重要的模块requests,但它是阻塞式的发起请求,每次请求发起后需阻塞等待其返回响应,不能做其他的事情。本文要介绍的aiohttp可以理解成是和requests对应Python异步网络请求库,它是基于 asyncio 的异步模块,可用于实现异步爬虫,有点就是更快于 requests 的同步爬虫。安装方式,pip install aiohttp。

aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio的异步库。asyncio可以实现单线程并发IO操作,其实现了TCP、UDP、SSL等协议,aiohttp就是基于asyncio实现的http框架。

import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("http://httpbin.org/headers") as response:
            print(await response.text())

asyncio.run(main())

【5】应用案例(异步爬虫)

(1)异步爬取斗图网

import aiohttp
import asyncio
import os
from lxml import etree
import time

headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
}


async def get_img_urls():
    url = "https://www.pkdoutu.com/photo/list/"
    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=headers, ssl=False) as resp:
            text = await resp.content.read()

            selector = etree.HTML(text)
            img_urls = selector.xpath('//li[@class="list-group-item"]/div/div/a/img[@data-backup]/@data-backup')

            return img_urls


async def async_download(url):
    name = os.path.basename(url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url, ssl=False) as resp:
            # 将得到的请求保存到文件中
            with open(f"imgs/{name}.jpg", "wb") as f:
                f.write(await resp.content.read())


async def main():
    tasks = [asyncio.create_task(async_download(url)) for url in await get_img_urls()]
    await asyncio.wait(tasks)


if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    print(time.time() - start)

image-20230314115717115

(2)异步爬取m3u8视频

m3u8串行爬取版本

import requests
import re
import os

session = requests.Session()

# (1) 爬取m3u8文件的链接
url = "https://www.9tata.cc/play/14999-1-0.html"
headers = {
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'
}
res = session.get(url, headers=headers, verify=False)

m3u8_link = re.search('now="(.*?m3u8)"', res.text).group(1)
print(m3u8_link)  # https://ikcdn01.ikzybf.com/20221015/ecMSO74h/index.m3u8
# 顺便抓一个名字
name = re.search(r'<td class="col2 hidden-xs">(?P<name>\w+)</td>', res.text).group("name")
print("name", name)
# (2) 爬取m3u8文件
res = session.get(m3u8_link)
print(res.text.split("\n")[-1])

m3u8_detail_link = os.path.join(os.path.dirname(m3u8_link), res.text.split("\n")[-1])
print(m3u8_detail_link)

# (3) 爬取m3u8具体文件
res = requests.get(m3u8_detail_link)
print(res.text)

# (4) 解析ts文件
ret = re.findall(r"\n(.*?\.ts)", res.text)
print(ret)

# (5) 下载每一个ts文件
p = os.path.dirname(m3u8_detail_link)
with open(name, "ab") as f:
    for ts in ret:
        path = os.path.join(p, ts)
        res = requests.get(path)
        f.write(res.content)
        print(f"{ts}下载完成!")

m3u8并发爬取版本

import requests
import re
import os
import aiohttp
import asyncio
import time

headers = {
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'
}


async def get_m3u8_link():
    # (1) 爬取m3u8文件的链接
    url = "https://www.9tata.cc/play/14999-1-0.html"

    async with aiohttp.ClientSession() as session:
        async with session.get(url, headers=headers, ssl=False) as response:
            res = await response.text()

            m3u8_link = re.search('now="(.*?m3u8)"', res).group(1)
            print(m3u8_link)  # https://ikcdn01.ikzybf.com/20221015/ecMSO74h/index.m3u8
            # 顺便抓一个名字
            name = re.search(r'<td class="col2 hidden-xs">(?P<name>\w+)</td>', res).group("name")
            print("name", name)
            return m3u8_link, name


async def get_m3u8_detail_link(m3u8_link):
    # (2) 爬取m3u8文件
    async with aiohttp.ClientSession() as session:
        async with session.get(m3u8_link, headers=headers, ssl=False) as response:
            res = await response.text()
            m3u8_detail_link = os.path.join(os.path.dirname(m3u8_link), res.split("\n")[-1])
            print(m3u8_detail_link)
            return m3u8_detail_link


async def parse_ts(m3u8_detail_link):
    # (3) 爬取m3u8具体文件
    async with aiohttp.ClientSession() as session:
        async with session.get(m3u8_detail_link, headers=headers, ssl=False) as response:
            with open("ts文件/index.m3u8", "w") as f:
                res_text = await response.text()
                f.write(res_text)
                # (4) 解析ts文件
                ret = re.findall(r"\n(.*?\.ts)", res_text)
                print(ret)
                return ret


async def download_video(m3u8_detail_link, ts_list):
    p = os.path.dirname(m3u8_detail_link)
    tasks = []
    for ts in ts_list:
        path = os.path.join(p, ts)

        task = asyncio.create_task(download_ts(path, ts))
        tasks.append(task)

    await asyncio.wait(tasks)


async def download_ts(path, ts):
    if not os.path.exists("ts文件"):
        os.mkdir("ts文件")

    async with aiohttp.ClientSession() as session:
        async with session.get(path, ssl=False) as resp:
            # 将得到的请求保存到文件中
            with open("ts文件/" + ts, "wb") as f:
                f.write(await resp.content.read())
            print(f"{ts}下载完成!")


def merge(filename='out'):
    '''
    视频合并
    :return:
    '''
    # 进入到下载后ts 的目录
    os.chdir("ts文件")
    # 视频合并的命令
    os.system(f'ffmpeg -i index.m3u8 -c copy {filename}.mp4')
    print(f'视频{filename} 合并成功====')


async def main():
    m3u8_link, name = await get_m3u8_link()
    m3u8_detail_link = await get_m3u8_detail_link(m3u8_link)
    ts_list = await parse_ts(m3u8_detail_link)
    await download_video(m3u8_detail_link, ts_list)
    merge(name)


if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    print(time.time() - start)
  1. 知识点:ffmpeg -i index.m3u8 -c copy apple.mp4
  2. 知识点:os.system执行终端命令
github