python_async_spider_example

参考博客: https://blog.csdn.net/qq_36964509/article/details/140364392

思路请看参考博客

视频片段加密 —–> 探寻解密方法 : 加密方法为 AES-128

AES
在加密视频流里常见的就是CBC模式
解密需要:
数据
key 【密钥】
iv 【初始偏移量】

总结:
key 要从index.m38u中获取
iv = b’0000000000000000’

实现如下

"""
    异步函数版本

    2024.9.9 文件加密 ----> AES.CBC 模式
        最终爬取完整视频
"""
import os
import re
import time
import asyncio
from os import path

import aiohttp

import requests

from lxml import etree
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from win32comext.shell.demos.servers.folder_view import tasks

'''
    爬取的视频 是有加密的版本
        还要没有加密的版本
'''

headers = {
    "User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Mobile Safari/537.36 Edg/128.0.0.0",
    "referer": "https://www.9tata.cc/"
}


# 解密偏移
iv = b'0000000000000000'
key = b''
total_len = 0
now = 0

# 最终获取的数据类型
class LastDate:
    def __init__(self,index,data):
        self.index = index
        self.data = data

async def get_first_m3u8_link_name():
    url = "https://www.9tata.cc/play/14999-1-0.html"
    async with aiohttp.ClientSession() as session:
        async with session.get(url,headers=headers,ssl=False) as response:
            tree = etree.HTML(await response.text())
            # 获取name
            name = tree.xpath("/html/body/div[2]/div[1]/h2/a[2]/text()")[0]
            print(name)

            # 获取m3u8实例
            m38u = tree.xpath('//*[@id="cms_player"]/script[1]/text()')[0]
            # print(m38u)
            first_m3u8_link = re.findall('now="(.*?m3u8)"', m38u)[0]
            print(first_m3u8_link)
            return first_m3u8_link,name


async def get_second_m3u8_link(first_m3u8_link):
    async with aiohttp.ClientSession() as session:
        async with session.get(first_m3u8_link,ssl=False,headers=headers) as res:
            data = await res.text()
            # split的模块有些问题提取不出来,还得用正则表达式
            next_url = "https://s1.fsvod1.com" + re.findall('(/.*?m3u8)', data)[0]
            return next_url

# 解析ts文件
async def parse_ts(second_m3u8_link):
    async with aiohttp.ClientSession() as session:
        async with session.get(second_m3u8_link,ssl=False,headers=headers) as res:
            data = await res.text()

            # 定义正则表达式模式
            pattern = re.compile(r'^.*/([^\n/]+\.ts)$', re.MULTILINE)

            ''' 这一段想分别存储ts再用ffmpeg合并'''
            # 执行替换操作
            # new_text = pattern.sub(r'\1',data)

            # 再将key 改为 本地的 key
            # new_text = re.sub(r'URI="(.*)"', r'URI="key.key"', new_text)

            # 将 index.m3u8 存储到本地
            if not path.exists("./ts"):
                os.mkdir("./ts")
            # with open("./ts/index.m3u8", "w") as f:
            #     f.write(new_text)

            # 解析出 其他的 ts 片段
            ret = re.findall(r"\n(.*?\.ts)",data)

            url = 'https://s1.fsvod1.com'
            key_url = url + re.findall(r'URI="(.*)"',data)[0]

            return ret,key_url

# 存储并获取
def get_key(key_url):
    res = requests.get(key_url,headers=headers)
    print('key : ',res.content)
    with open("./ts/key.key",'wb') as f:
        f.write(res.content)
    return res.content


# ts解密
def decrypt(data, key):
    # 创建 AES 解密器
    cipher = AES.new(key, AES.MODE_CBC,iv)

    # 解密数据
    decrypted_data = cipher.decrypt(data)

    # 去除填充
    try:
        dec_data = unpad(decrypted_data, AES.block_size)
    except ValueError:
        print("Padding is incorrect.")
        return None

    return dec_data


# 主要的耗时步骤
async def download_ts(p,ts,data_list):
    # 下载一个ts文件
    name = "./ts/"+ts.split('/')[-1]
    async with aiohttp.ClientSession() as session:
        print(p+ts,"开始下载")
        async with session.get(p+ts,ssl=False,headers=headers) as res:
            res_bdata = await res.content.read()
            res_bdata = decrypt(res_bdata,key=key)
            with open(name, "wb") as f:
                # 避免 打开文件然后一顿操作,会降低运行速度
                f.write(res_bdata)
            print(f"{ts}下载完成!")


# 下载如何放到队列中
async def download_ts_by_index(p,ts,index,data_list):
    try:
        async with aiohttp.ClientSession() as session:
            print(p+ts,"开始下载")
            timeout = aiohttp.ClientTimeout(total=60)
            async with session.get(p+ts,ssl=False,headers=headers,timeout=timeout) as res:
                res_bdata = await res.content.read()
                res_bdata = decrypt(res_bdata,key=key)
                data = LastDate(index=index,data=res_bdata)
                data_list.append(data)
                print(f"{ts}下载完成!")
    except Exception as e:
        print(p+ts,"下载超时")


def write_in_file(filename,res_data_list):
    # 数据要按照索引来排序
    res_data_list.sort(key=lambda x:x.index)
    with open(filename,'wb') as f:
        for data in res_data_list:
            f.write(data.data)
    print(filename,"下载完成!")


async def main(filename):
    global key # 全局变量
    # 1. 爬取
    first_m3u8_link,name = await get_first_m3u8_link_name()
    # 2. 获取第二个m3u8链接
    second_m3u8_link= await get_second_m3u8_link(first_m3u8_link)
    # 3. 解析ts文件 ---> 获取 各个url链接
    ts_list,key_url = await parse_ts(second_m3u8_link)
    # 4. 获取key
    key = get_key(key_url)
    # 5. 下载
    print(ts_list)
    print(len(ts_list))  # 384

    p = "https://s1.fsvod1.com"
    tasks = []
    res_data_list = []
    for i,v in enumerate(ts_list):
        tasks.append(asyncio.create_task(download_ts_by_index(p,v,i,res_data_list)))
    result = await asyncio.gather(*tasks)
    # 将文件写入
    write_in_file(filename,res_data_list)




if __name__ == '__main__':
    filename = "./boom.mp4"
    start = time.time()
    asyncio.run(main(filename))
    end = time.time()
    print(f"cost:",end-start)  #  cost: 21.449084281921387  # 459个文件
github