株式会社WR

株式会社WR

WEB TOTAL CONSULTING

Python asyncioで並列スクレイピング——処理速度を10倍にする
ブログ一覧へ
技術ブログ

Python asyncioで並列スクレイピング——処理速度を10倍にする

Pythonのasyncioとaiohttp/playwright-pythonを使った非同期並列スクレイピングにより、100商品の収集時間を数分から数十秒に短縮できます。

asyncioとは——Pythonの非同期処理基盤

Pythonのasyncioは、シングルスレッドで並行処理(コンカレンシー)を実現するライブラリです。スクレイピングのような「リクエストを送って、レスポンスを待つ間に別の処理をする」という I/Oバウンドな処理に非常に効果的です。

同期処理 vs 非同期処理

# 同期処理:1件ずつ順番に待つ(100件 × 2秒 = 200秒)
for url in urls:
    html = requests.get(url).text  # 2秒待機
    process(html)

# 非同期処理:待機中に他のリクエストを送る(約20秒で完了)
async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

環境構築

pip install aiohttp beautifulsoup4 lxml asyncio

基本的な非同期スクレイパー

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

async def fetch_page(session: aiohttp.ClientSession, url: str) -> tuple[str, str | None]:
    """1ページを非同期で取得"""
    headers = {
        'User-Agent': (
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
            'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
        ),
        'Accept-Language': 'ja-JP,ja;q=0.9',
    }

    try:
        async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
            if resp.status == 200:
                html = await resp.text()
                return url, html
            else:
                print(f"HTTP {resp.status}: {url}")
                return url, None
    except asyncio.TimeoutError:
        print(f"タイムアウト: {url}")
        return url, None
    except aiohttp.ClientError as e:
        print(f"接続エラー [{url}]: {e}")
        return url, None


async def scrape_all(urls: list[str], concurrency: int = 10) -> dict[str, str | None]:
    """並列数を制限しながら全URLを取得"""
    semaphore = asyncio.Semaphore(concurrency)  # 同時接続数を制限

    async def fetch_with_semaphore(session, url):
        async with semaphore:
            result = await fetch_page(session, url)
            # サーバー負荷を考慮したウェイト
            await asyncio.sleep(0.5)
            return result

    connector = aiohttp.TCPConnector(limit=50, ssl=False)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [fetch_with_semaphore(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    return dict(results)

セマフォで同時接続数を制御する

async def controlled_scraping_demo():
    urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)]

    start = time.time()

    # concurrency=5:同時に5リクエストまで
    results = await scrape_all(urls, concurrency=5)

    elapsed = time.time() - start
    success = sum(1 for v in results.values() if v)
    print(f"完了: {success}/{len(urls)}件, {elapsed:.1f}秒")

asyncio.run(controlled_scraping_demo())

HTMLの解析を並列化する

取得したHTMLのパースもCPUバウンドですが、少量なら同期で十分です。大量のHTMLをパースする場合は ProcessPoolExecutor を使います。

from concurrent.futures import ProcessPoolExecutor
import functools

def parse_product_page(url: str, html: str) -> dict | None:
    """HTMLから商品情報を抽出(同期関数)"""
    if not html:
        return None

    soup = BeautifulSoup(html, 'lxml')

    name_el  = soup.select_one('h1.product-name')
    price_el = soup.select_one('.product-price')

    if not (name_el and price_el):
        return None

    return {
        'url':   url,
        'name':  name_el.get_text(strip=True),
        'price': int(price_el.get_text(strip=True).replace(',', '').replace('¥', '')),
    }


async def scrape_and_parse(urls: list[str]) -> list[dict]:
    """取得とパースを並列で実行"""
    # 非同期でHTMLを取得
    html_map = await scrape_all(urls, concurrency=10)

    # ProcessPoolExecutorでCPU並列パース
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [
            loop.run_in_executor(executor, parse_product_page, url, html)
            for url, html in html_map.items()
        ]
        parsed = await asyncio.gather(*tasks)

    return [p for p in parsed if p]

リトライ付きの堅牢な実装

import random

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3,
    backoff_base: float = 1.5,
) -> tuple[str, str | None]:
    """リトライ付きでページを取得"""
    for attempt in range(max_retries):
        try:
            url, html = await fetch_page(session, url)
            if html:
                return url, html

        except Exception as e:
            if attempt == max_retries - 1:
                return url, None

        # 指数バックオフ + ジッター
        wait = backoff_base ** attempt + random.uniform(0, 1)
        await asyncio.sleep(wait)

    return url, None

実測パフォーマンス比較

弊社での実測例(100URLのスクレイピング、各ページ平均応答時間1.5秒):

実装 所要時間 速度比
同期(requests) 約155秒 1x
asyncio (concurrency=5) 約35秒 4.4x
asyncio (concurrency=10) 約18秒 8.6x
asyncio (concurrency=20) 約12秒 12.9x

concurrencyを上げすぎると対象サーバーに負荷をかけすぎるため、適切な値(5〜15程度)を選んでください。


まとめ

Python asyncioとaiohttpを組み合わせることで、I/Oバウンドなスクレイピング処理を10倍以上高速化できます。セマフォで同時接続数を制御し、リトライで信頼性を確保することが実用的な実装の要点です。弊社では大規模データ収集でasyncioを積極活用しています。

データ収集・自動化システムの開発についてはお気軽にご相談ください。

Category 技術ブログ

Related Posts

関連記事

開発・技術のご相談はお気軽に

お見積りは無料です。まずはお気軽にご相談ください。

お問い合わせ →