python-playwright-example

  1. 初次尝试
  2. 换一个思路
  3. 在手机上爽看
  4. 一些想说的话

看到一个漫画网站,刚好里面有我想看的漫画,发现这个漫画网站对手机端极不友好,设置了自动跳转,广告数量过于恶心,所以决定写个脚本来爬取吧

初次尝试

  • 网站地址
https://www.pipimanhua.com

初步写的代码

import requests
from lxml import etree
import os
import aiohttp
import asyncio
import aiofiles
import json
import async_timeout
import threading
import concurrent.futures

header = {
    "User-Agent": ""
}
HEADER_URL = "https://www.pipimanhua.com"
# index_url = "https://www.pipimanhua.com/manhua/16624/"
index_url = "https://www.pipimanhua.com/manhua/13042/"
PAGE_URL_LIST = []

class Title:
    def __init__(self, title, url):
        self.title = title
        self.url = url


# 1. 测试有没有 拦截
def get_index(url):
    response = requests.get(url, headers=header)
    if response.status_code == 200:
        return response.text
    return None

# 2. 获取每一章节的url信息
def get_echo_page_url(url,output_name):
    response = requests.get(url, headers=header)
    if response.status_code == 200:
        html = etree.HTML(response.text)
        li_list = html.xpath("/html/body/div[2]/section/div[3]/div/ul/li")
        print(len(li_list))
        for li in li_list:
            # 获取li下a标签的href和文本
            a = li.xpath("./a")[0]
            href = HEADER_URL + a.xpath("./@href")[0]
            text = a.xpath("./text()")[0]
            # print(href, text)
            PAGE_URL_LIST.append(Title(text, href))
    # 检验
    for title in PAGE_URL_LIST:
        print(title.title, title.url)

    with open(f"{output_name}.json", "w", encoding="utf-8") as f:
        json.dump(
            [{"title": t.title, "url": t.url} for t in PAGE_URL_LIST],
            f,
            ensure_ascii=False,
            indent=4
        )
    return None


# 3. 获取每一页的具体内容
def get_page_img(title: Title):
    imgs_list = []
    response = requests.get(title.url, headers=header)
    if response.status_code == 200:
        html = etree.HTML(response.text)
        img_list = html.xpath("//*[@id='article']//img[@data-original]")
        for img in img_list:
            src = img.xpath("./@data-original")[0]
            print(src)
            imgs_list.append(src)
    os.makedirs(title.title, exist_ok=True)
    for img in imgs_list:
        requests.get(img, headers=header)
        with open(os.path.join(title.title, img.split("/")[-1]), "wb") as f:
            f.write(requests.get(img, headers=header).content)
    return imgs_list

# 3.1 线程池加速
def download_img_thread(img, folder):
    try:
        response = requests.get(img, headers=header)
        if response.status_code == 200:
            filename = os.path.join(folder, img.split("/")[-1])
            with open(filename, "wb") as f:
                f.write(response.content)
    except Exception as e:
        print(f"Failed to download {img}: {e}")

def get_page_img_threed(title: Title):
    imgs_list = []
    response = requests.get(title.url, headers=header)
    if response.status_code == 200:
        html = etree.HTML(response.text)
        img_list = html.xpath("//*[@id='article']//img[@data-original]")
        for img in img_list:
            src = img.xpath("./@data-original")[0]
            print(src)
            imgs_list.append(src)
    os.makedirs(title.title, exist_ok=True)
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
        futures = [executor.submit(download_img_thread, img, title.title) for img in imgs_list]
        concurrent.futures.wait(futures)
    return imgs_list




# 3.2 异步 io 加速
async def download_img(session, img_url, folder, retries=3):
    filename = os.path.join(folder, img_url.split("/")[-1])
    for attempt in range(retries):
        try:
            async with async_timeout.timeout(30):
                async with session.get(img_url, headers=header) as resp:
                    if resp.status == 200:
                        async with aiofiles.open(filename, "wb") as f:
                            content = await resp.read()
                            await f.write(content)
                        return
        except Exception as e:
            if attempt == retries - 1:
                print(f"Failed to download {img_url}: {e}")
            await asyncio.sleep(2)

async def get_page_img_async(title: Title):
    imgs_list = []
    async with aiohttp.ClientSession() as session:
        async with session.get(title.url, headers=header) as response:
            if response.status == 200:
                text = await response.text()
                html = etree.HTML(text)
                img_list = html.xpath("//*[@id='article']//img[@data-original]")
                for img in img_list:
                    src = img.xpath("./@data-original")[0]
                    print(src)
                    imgs_list.append(src)

        os.makedirs(title.title, exist_ok=True)
        tasks = [download_img(session, img, title.title) for img in imgs_list]
        await asyncio.gather(*tasks)
    return imgs_list

async def batch_download_titles(title_list, limit=5):
    sem = asyncio.Semaphore(limit)
    async def sem_task(title):
        async with sem:
            await get_page_img_async(title)
    tasks = [sem_task(title) for title in title_list]
    await asyncio.gather(*tasks)

# 测试能不能连接成功
# print(get_index(index_url))
get_echo_page_url(index_url,"斗破苍穹")

def main():
    with open("test.json", "r", encoding="utf-8") as f:
        PAGE_URL_LIST = json.load(f)

    titles = [Title(t["title"], t["url"]) for t in PAGE_URL_LIST]
    for title in titles:
        get_page_img_threed(title)

上述的做法有一些问题,异步IO速度太快会被封,所以最后选择了线程池

但是后面发现这个 网站的部分章节存在页面加密情况 ——> JS混淆 大约为 1/5 的部分

由于我对 JS反混淆能力太差了,而目标网站是一个 PHP 纯静态,最多一点动态变化的网站,

我决定直接使用 playwright 来直接获取执行完 js 的页面

image-20250901094717595

比如这个页面,实在 反混淆能力比较差,直接放弃了,有兴趣的自己来试试

https://www.pipimanhua.com/read/16574/932920.html

换一个思路

我重新理了一下思路,我应该在爬取的时候使用低频率解析出取所有的 图片url

再单独写具体爬取的脚本

数据一:

  • 章节名和url
[
    {
        "title": "01",
        "url": "https://www.pipimanhua.com/read/13042/745262.html"
    },
    {
        "title": "02",
        "url": "https://www.pipimanhua.com/read/13042/745263.html"
    },
    ...
]

使用的脚本为:

import requests
from lxml import etree
import os
import aiohttp
import asyncio
import aiofiles
import json
import async_timeout
import threading
import concurrent.futures

header = {
    "User-Agent": ""
}
HEADER_URL = "https://www.pipimanhua.com"
# index_url = "https://www.pipimanhua.com/manhua/16624/"
index_url = "https://www.pipimanhua.com/manhua/13042/"
PAGE_URL_LIST = []

class Title:
    def __init__(self, title, url):
        self.title = title
        self.url = url


# 1. 测试有没有 拦截
def get_index(url):
    response = requests.get(url, headers=header)
    if response.status_code == 200:
        return response.text
    return None

# 2. 获取每一章节的url信息
def get_echo_page_url(url,output_name):
    response = requests.get(url, headers=header)
    if response.status_code == 200:
        html = etree.HTML(response.text)
        li_list = html.xpath("/html/body/div[2]/section/div[3]/div/ul/li")
        print(len(li_list))
        for li in li_list:
            # 获取li下a标签的href和文本
            a = li.xpath("./a")[0]
            href = HEADER_URL + a.xpath("./@href")[0]
            text = a.xpath("./text()")[0]
            # print(href, text)
            PAGE_URL_LIST.append(Title(text, href))
    # 检验
    for title in PAGE_URL_LIST:
        print(title.title, title.url)

    with open(f"{output_name}.json", "w", encoding="utf-8") as f:
        json.dump(
            [{"title": t.title, "url": t.url} for t in PAGE_URL_LIST],
            f,
            ensure_ascii=False,
            indent=4
        )
    return None

# 测试能不能连接成功
# print(get_index(index_url))
get_echo_page_url(index_url,"斗破苍穹")

数据二:

  • 章节名与各图片url
[
    {
        "title": "004 喧锣镇技能商店",
        "images": [
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/0_vx.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/1_j1.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/2_x7.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/3_qq.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/4_0u.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/5_al.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/6_u6.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/7_7b.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/8_5u.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/9_mw.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/10_ma.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/11_76.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/12_80.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/13_bb.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/14_le.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/15_j3.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/16_gb.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/17_aw.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/18_1y.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/19_5l.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/20_5m.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/21_y1.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/22_xe.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/23_u8.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/24_l5.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/25_gz.webp",
            "https://res2.tupian.run/res1_gf/0851571ea6/48755/26_8e.webp"
        ]
    },
    ...
]
import os
import asyncio
import json
from playwright.async_api import async_playwright

BASE_DIR = "鲲吞天下"

class Title:
    def __init__(self, title, url):
        self.title = title
        self.url = url

async def get_page_img_playwright(page, title: Title):
    imgs_list = []
    try:
        await page.goto(title.url, timeout=120000, wait_until="domcontentloaded")
        await page.wait_for_selector("#article")
        img_elements = await page.query_selector_all("#article img[data-original]")
        for img in img_elements:
            src = await img.get_attribute("data-original")
            if src:
                imgs_list.append(src)
    except Exception as e:
        print(f"Error: {title.title} - {e}")
    return imgs_list,title

async def worker(queue, results):
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        pages = [await browser.new_page() for _ in range(10)]
        while not queue.empty():
            tasks = []
            for page in pages:
                if queue.empty():
                    break
                title = await queue.get()
                tasks.append(asyncio.create_task(get_page_img_playwright(page, title)))
            for i, task in enumerate(tasks):
                imgs,title = await task
                results.append({"title": title.title , "images": imgs})
                print(f"{title.title} with {len(imgs)} ")
        await browser.close()

async def main():
    # 读取待解析对象
    with open("鲲吞天下.json", "r", encoding="utf-8") as f:
        PAGE_URL_LIST = json.load(f)
    titles = [Title(t["title"], t["url"]) for t in PAGE_URL_LIST]
    
    # 创建队列
    queue = asyncio.Queue()
    for t in titles:
        await queue.put(t)
    results = []

    # 创建工作线程池
    await worker(queue, results)
    
    # 等待所有的任务完成
    if queue.empty():
        print("All tasks completed.")

    # 写入
    try:
        results.sort(key=lambda x: x["title"])
        with open("imgs_debug.json", "w", encoding="utf-8") as f:
            json.dump(results, f, ensure_ascii=False, indent=4)
    except Exception as e:
        print(f"Error occurred while writing to file: {e}")



if __name__ == "__main__":
    asyncio.run(main())
  • 批量爬取数据
BOOK
	Title1
		img1
		img2
		...
	Title2
		img1
		img2
		...
	...
import requests
import json
import os
import asyncio
import aiohttp

BASE_DIR = "鲲吞天下"
MAX_CONCURRENT = 50  # 最大并发数
JSON_FILE = "imgs_debug.json"

class ImageData:
    def __init__(self, title, images):
        self.title = title
        self.images = images

IMAGE_LIST = []
with open(JSON_FILE, "r", encoding="utf-8") as f: 
    data = json.load(f)
    for item in data:
        title = item["title"]
        images = item["images"]
        image_data = ImageData(title, images)
        IMAGE_LIST.append(image_data)

def mkdir_page_dir(title):
    os.makedirs(f"{BASE_DIR}/{title}", exist_ok=True)
    return f"{BASE_DIR}/{title}"

async def get_imgs_from_url(semaphore, folder_path, file_name, image_url):
    async with semaphore:
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(image_url) as response:
                    with open(f"{folder_path}/{file_name}", "wb") as out_file:
                        out_file.write(await response.read())
        except Exception as e:
            print(f"Failed to download {image_url}: {e}")
            print(image_data.title, image_url)

async def main():
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)
    tasks = []
    for image_data in IMAGE_LIST:
        folder_path = mkdir_page_dir(image_data.title)
        for image_url in image_data.images:
            file_name = image_url.split("/")[-1]
            tasks.append(get_imgs_from_url(semaphore, folder_path, file_name, image_url))
    await asyncio.gather(*tasks)
    print("All images downloaded successfully.")

if __name__ == "__main__":
    asyncio.run(main())

在手机上爽看

  • Perfect Viewer

image-20250901100241579

将所有文件打包,放手机里用这个软件来看,既可以避免 动漫图片出现在手机相册,又能爽看无需解压

看本子利器

  • 效果还是不错的

image-20250901100456040

一些想说的话

github