Python增量爬取案例如何实现?

wen python案例 2

本文目录导读:

  1. 基于时间戳的增量爬取
  2. 基于ID的增量爬取
  3. 完整的增量爬虫框架
  4. 基于数据库的增量方案
  5. 高级增量爬取(支持断点续爬)
  6. 配置化的增量爬虫

我来介绍几种Python增量爬取的实现方法,从简单到复杂逐步说明。

基于时间戳的增量爬取

基础实现

import requests
import json
from datetime import datetime
import time
class TimestampIncrementalSpider:
    def __init__(self):
        self.last_crawl_time = None
        self.state_file = 'crawl_state.json'
        self.load_state()
    def load_state(self):
        """加载上次爬取时间"""
        try:
            with open(self.state_file, 'r') as f:
                state = json.load(f)
                self.last_crawl_time = state.get('last_crawl_time')
        except FileNotFoundError:
            self.last_crawl_time = None
    def save_state(self):
        """保存爬取时间"""
        with open(self.state_file, 'w') as f:
            json.dump({'last_crawl_time': self.last_crawl_time}, f)
    def crawl_news(self, limit=10):
        """爬取新闻示例"""
        # 模拟API请求
        news_list = [
            {'id': 1, 'title': '新闻1', 'time': '2024-01-20 10:00:00'},
            {'id': 2, 'title': '新闻2', 'time': '2024-01-20 11:00:00'},
        ]
        new_items = []
        for news in news_list:
            news_time = datetime.strptime(news['time'], '%Y-%m-%d %H:%M:%S')
            # 判断是否是新的内容
            if self.last_crawl_time is None or news_time > self.last_crawl_time:
                new_items.append(news)
        # 更新爬取时间
        if new_items:
            self.last_crawl_time = max(
                datetime.strptime(n['time'], '%Y-%m-%d %H:%M:%S') 
                for n in new_items
            )
            self.save_state()
        return new_items
# 使用示例
spider = TimestampIncrementalSpider()
new_news = spider.crawl_news()
print(f"新增新闻: {len(new_news)} 条")

基于ID的增量爬取

带ID追踪的实现

import redis
import requests
from bs4 import BeautifulSoup
import json
class IDBasedIncrementalSpider:
    def __init__(self):
        self.redis_client = redis.StrictRedis(
            host='localhost', 
            port=6379, 
            decode_responses=True
        )
        self.crawled_key = 'crawled_ids'  # Redis集合key
    def is_crawled(self, item_id):
        """检查是否已爬取"""
        return self.redis_client.sismember(self.crawled_key, str(item_id))
    def mark_crawled(self, item_id):
        """标记为已爬取"""
        self.redis_client.sadd(self.crawled_key, str(item_id))
    def crawl_new_items(self, url):
        """爬取新项目"""
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'html.parser')
        new_items = []
        items = soup.select('.item')  # 假设CSS选择器
        for item in items:
            item_id = item.get('data-id')
            if not self.is_crawled(item_id):
                # 解析内容
                title = item.select_one('.title').text
                content = item.select_one('.content').text
                new_item = {
                    'id': item_id,
                    'title': title,
                    'content': content
                }
                new_items.append(new_item)
                # 标记为已爬取
                self.mark_crawled(item_id)
        return new_items
# 使用示例
spider = IDBasedIncrementalSpider()
new_data = spider.crawl_new_items('http://example.com/list')

完整的增量爬虫框架

使用Scrapy实现增量爬取

import scrapy
from scrapy_redis.spiders import RedisSpider
import hashlib
import json
class IncrementalSpider(RedisSpider):
    """使用Scrapy-Redis实现的增量爬虫"""
    name = 'incremental_spider'
    redis_key = 'incremental:start_urls'
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.duplicate_filter = set()
    def generate_fingerprint(self, item):
        """生成唯一指纹"""
        text = f"{item['title']}{item['url']}"
        return hashlib.md5(text.encode()).hexdigest()
    def parse(self, response):
        """解析响应"""
        items = response.css('.item')
        for item in items:
            data = {
                'title': item.css('.title::text').get(),
                'url': response.url,
                'content': item.css('.content::text').get(),
                'time': item.css('.time::text').get()
            }
            fingerprint = self.generate_fingerprint(data)
            # 检查是否重复
            if fingerprint not in self.duplicate_filter:
                self.duplicate_filter.add(fingerprint)
                yield data
    def closed(self, reason):
        """爬虫关闭时保存状态"""
        with open('crawled_fingerprints.json', 'w') as f:
            json.dump(list(self.duplicate_filter), f)

基于数据库的增量方案

使用SQLite存储状态

import sqlite3
import requests
from datetime import datetime
import hashlib
class DatabaseIncrementalSpider:
    def __init__(self, db_path='crawl_state.db'):
        self.conn = sqlite3.connect(db_path)
        self.create_tables()
    def create_tables(self):
        """创建数据表"""
        cursor = self.conn.cursor()
        # 存储爬取状态
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS crawl_state (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                source TEXT UNIQUE,
                last_crawl_time TEXT,
                last_item_id TEXT
            )
        ''')
        # 存储已爬取内容
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS crawled_items (
                fingerprint TEXT PRIMARY KEY,
                title TEXT,
                url TEXT,
                content TEXT,
                crawl_time TEXT
            )
        ''')
        self.conn.commit()
    def get_state(self, source):
        """获取爬取状态"""
        cursor = self.conn.cursor()
        cursor.execute(
            'SELECT last_crawl_time, last_item_id FROM crawl_state WHERE source = ?',
            (source,)
        )
        return cursor.fetchone()
    def update_state(self, source, last_crawl_time=None, last_item_id=None):
        """更新爬取状态"""
        cursor = self.conn.cursor()
        cursor.execute('''
            INSERT OR REPLACE INTO crawl_state 
            (source, last_crawl_time, last_item_id) 
            VALUES (?, ?, ?)
        ''', (source, last_crawl_time, last_item_id))
        self.conn.commit()
    def item_exists(self, fingerprint):
        """检查项目是否已存在"""
        cursor = self.conn.cursor()
        cursor.execute(
            'SELECT 1 FROM crawled_items WHERE fingerprint = ?',
            (fingerprint,)
        )
        return cursor.fetchone() is not None
    def save_item(self, item):
        """保存新项目"""
        content = f"{item['title']}{item['url']}"
        fingerprint = hashlib.md5(content.encode()).hexdigest()
        if not self.item_exists(fingerprint):
            cursor = self.conn.cursor()
            cursor.execute('''
                INSERT INTO crawled_items 
                (fingerprint, title, url, content, crawl_time) 
                VALUES (?, ?, ?, ?, ?)
            ''', (
                fingerprint,
                item['title'],
                item['url'],
                item['content'],
                datetime.now().isoformat()
            ))
            self.conn.commit()
            return True
        return False
    def crawl(self, source_url, parser_func):
        """通用爬取方法"""
        response = requests.get(source_url)
        items = parser_func(response)
        new_count = 0
        for item in items:
            if self.save_item(item):
                new_count += 1
        self.update_state(
            source=source_url,
            last_crawl_time=datetime.now().isoformat()
        )
        return new_count
# 使用示例
def parse_news(response):
    """解析新闻页面"""
    items = []
    # 解析逻辑...
    return items
spider = DatabaseIncrementalSpider()
new_count = spider.crawl('http://example.com/news', parse_news)
print(f"新增 {new_count} 条内容")

高级增量爬取(支持断点续爬)

import asyncio
import aiohttp
from typing import Set, Dict
import pickle
import os
class AsyncIncrementalSpider:
    """异步增量爬虫,支持断点续爬"""
    def __init__(self, state_file='spider_state.pkl'):
        self.state_file = state_file
        self.visited_urls: Set[str] = set()
        self.pending_urls: Set[str] = set()
        self.load_state()
    def load_state(self):
        """加载爬取状态"""
        if os.path.exists(self.state_file):
            try:
                with open(self.state_file, 'rb') as f:
                    state = pickle.load(f)
                    self.visited_urls = state.get('visited', set())
                    self.pending_urls = state.get('pending', set())
                print(f"恢复状态: 已访问 {len(self.visited_urls)}, 待处理 {len(self.pending_urls)}")
            except Exception as e:
                print(f"加载状态失败: {e}")
    def save_state(self):
        """保存爬取状态"""
        state = {
            'visited': self.visited_urls,
            'pending': self.pending_urls,
            'timestamp': __import__('time').time()
        }
        with open(self.state_file, 'wb') as f:
            pickle.dump(state, f)
    async def fetch(self, session, url):
        """异步获取页面"""
        try:
            async with session.get(url, timeout=10) as response:
                if response.status == 200:
                    return await response.text()
                return None
        except Exception as e:
            print(f"请求失败 {url}: {e}")
            self.pending_urls.add(url)  # 失败后重新加入待处理
            return None
    async def process_page(self, session, url):
        """处理单个页面"""
        if url in self.visited_urls:
            return []
        html = await self.fetch(session, url)
        if not html:
            return []
        # 解析页面,获取新URL
        new_urls = self.extract_urls(html, url)
        new_items = self.extract_items(html)
        self.visited_urls.add(url)
        self.pending_urls.discard(url)
        return new_urls, new_items
    def extract_urls(self, html, base_url):
        """提取页面中的URL"""
        # 简化实现,实际使用BeautifulSoup等
        urls = []
        # 解析逻辑...
        return urls
    def extract_items(self, html):
        """提取页面中的数据"""
        # 简化实现
        items = []
        # 解析逻辑...
        return items
    async def crawl(self, start_urls: list, max_concurrent=10):
        """异步爬取主函数"""
        self.pending_urls.update(start_urls)
        connector = aiohttp.TCPConnector(limit=max_concurrent)
        async with aiohttp.ClientSession(connector=connector) as session:
            while self.pending_urls:
                current_batch = list(self.pending_urls)[:max_concurrent]
                tasks = []
                for url in current_batch:
                    task = self.process_page(session, url)
                    tasks.append(task)
                results = await asyncio.gather(*tasks)
                for new_urls, items in results:
                    if new_urls:
                        for url in new_urls:
                            if url not in self.visited_urls:
                                self.pending_urls.add(url)
                # 定期保存状态
                self.save_state()
                print(f"进度: 已访问 {len(self.visited_urls)}, 待处理 {len(self.pending_urls)}")
        self.save_state()
        print("爬取完成!")
# 使用示例
async def main():
    spider = AsyncIncrementalSpider()
    await spider.crawl(['http://example.com/page1'])
    # 再次运行,会从断点继续
    await spider.crawl(['http://example.com/page2'])
# asyncio.run(main())

配置化的增量爬虫

# config.yaml
"""
spider:
  name: news_spider
  start_urls:
    - http://example.com/news
  incremental:
    type: id  # timestamp, id, fingerprint
    storage: redis  # redis, sqlite, file
    check_interval: 300  # seconds
  extraction: h1.title
    content: div.content
    time: span.time
    id: attribute(data-id)
"""
import yaml
from typing import Dict, Any
class ConfigurableIncrementalSpider:
    def __init__(self, config_path='config.yaml'):
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)
        self.setup_storage()
    def setup_storage(self):
        """根据配置设置存储"""
        storage_type = self.config['spider']['incremental']['storage']
        if storage_type == 'redis':
            self.storage = RedisStorage()
        elif storage_type == 'sqlite':
            self.storage = SQLiteStorage()
        else:
            self.storage = FileStorage()
    def check_if_new(self, item):
        """根据类型检查是否新内容"""
        check_type = self.config['spider']['incremental']['type']
        if check_type == 'id':
            return self.storage.is_new_id(item['id'])
        elif check_type == 'timestamp':
            return self.storage.is_new_time(item['time'])
        elif check_type == 'fingerprint':
            return self.storage.is_new_fingerprint(item)
        return True
# 简单使用
class SimpleIncrementalSpider:
    """最简增量爬虫实现"""
    def __init__(self):
        self.seen = set()
    def should_crawl(self, url):
        if url not in self.seen:
            self.seen.add(url)
            return True
        return False
    def crawl(self, url):
        if self.should_crawl(url):
            # 执行爬取
            pass
# 使用示例
spider = SimpleIncrementalSpider()
urls = ['page1', 'page2', 'page1']  # page1重复
for url in urls:
    spider.crawl(url)  # page1只会被爬取一次
  1. 去重策略:选择合适的方法(ID、时间戳、指纹)
  2. 状态持久化:保存爬取状态到文件或数据库
  3. 异常处理:爬取失败时的重试机制
  4. 性能优化:批量处理、异步请求
  5. 断点续爬:支持中断后继续爬取

选择哪种实现方式取决于你的具体需求:

  • 简单场景:时间戳+文件存储
  • 数据量大:Redis去重
  • 需要持久化:数据库方案
  • 大规模爬取:Scrapy-Redis框架

标签: 案例实现

抱歉,评论功能暂时关闭!