Python批量爬取案例怎么写?

wen python案例 5

本文目录导读:

  1. 基础批量爬取框架
  2. 实战案例:爬取知乎热门问题
  3. 互联网新闻批量爬取
  4. 使用示例
  5. 注意事项

我来给你一个完整的Python批量爬取案例,以爬取新闻网站为例:

基础批量爬取框架

import requests
from bs4 import BeautifulSoup
import time
import random
import json
import pandas as pd
from urllib.parse import urljoin
class BatchCrawler:
    def __init__(self, base_url, headers=None):
        self.base_url = base_url
        self.headers = headers or {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        self.session = requests.Session()
        self.session.headers.update(self.headers)
    def fetch_page(self, url):
        """获取单个页面"""
        try:
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            response.encoding = response.apparent_encoding
            return response.text
        except Exception as e:
            print(f"获取页面失败 {url}: {e}")
            return None
    def parse_list_page(self, html):
        """解析列表页,提取文章链接"""
        soup = BeautifulSoup(html, 'html.parser')
        links = []
        # 根据实际网站结构调整选择器
        for item in soup.select('.news-item a, .article a'):
            href = item.get('href')
            if href and not href.startswith('javascript'):
                full_url = urljoin(self.base_url, href)
                links.append(full_url)
        return links
    def parse_detail_page(self, html):
        """解析详情页,提取内容"""
        soup = BeautifulSoup(html, 'html.parser')
        data = {
            'title': soup.select_one('h1, .title').text.strip() if soup.select_one('h1, .title') else '',
            'content': soup.select_one('.content, .article-content').text.strip() if soup.select_one('.content, .article-content') else '',
            'time': soup.select_one('.time, .date').text.strip() if soup.select_one('.time, .date') else '',
            'author': soup.select_one('.author, .source').text.strip() if soup.select_one('.author, .source') else ''
        }
        return data
    def batch_crawl(self, start_page=1, end_page=10, delay=1):
        """批量爬取"""
        all_data = []
        for page in range(start_page, end_page + 1):
            print(f"正在爬取第 {page} 页...")
            # 构建列表页URL
            list_url = f"{self.base_url}/page/{page}"
            # 获取列表页
            list_html = self.fetch_page(list_url)
            if not list_html:
                continue
            # 提取详情页链接
            article_links = self.parse_list_page(list_html)
            # 爬取每个详情页
            for link in article_links:
                try:
                    detail_html = self.fetch_page(link)
                    if detail_html:
                        data = self.parse_detail_page(detail_html)
                        data['url'] = link
                        all_data.append(data)
                        print(f"  爬取成功: {data.get('title', '未知标题')[:30]}")
                    # 随机延迟,避免被封
                    time.sleep(delay + random.random())
                except Exception as e:
                    print(f"  爬取失败 {link}: {e}")
                    continue
            # 页间延迟
            time.sleep(random.uniform(1, 3))
        return all_data
    def save_data(self, data, filename='output.json'):
        """保存数据"""
        # 保存为JSON
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        # 保存为CSV
        csv_filename = filename.replace('.json', '.csv')
        df = pd.DataFrame(data)
        df.to_csv(csv_filename, index=False, encoding='utf-8-sig')
        print(f"数据已保存到 {filename} 和 {csv_filename}")

实战案例:爬取知乎热门问题

import re
import requests
from lxml import etree
class ZhihuHotCrawler:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Cookie': '你的Cookie'  # 需要登录后获取
        }
        self.session = requests.Session()
    def crawl_hot_questions(self, limit=50):
        """爬取知乎热门问题"""
        url = 'https://www.zhihu.com/hot'
        try:
            response = self.session.get(url, headers=self.headers)
            response.raise_for_status()
            # 使用正则提取数据
            pattern = r'<script id="js-initialData" type="text/json">(.*?)</script>'
            match = re.search(pattern, response.text, re.DOTALL)
            if match:
                import json
                data = json.loads(match.group(1))
                hot_list = data.get('initialState', {}).get('topstory', {}).get('hotList', [])
                results = []
                for item in hot_list[:limit]:
                    question = item.get('target', {})
                    results.append({
                        'title': question.get('title', ''),
                        'url': f"https://www.zhihu.com/question/{question.get('id', '')}",
                        'answer_count': question.get('answerCount', 0),
                        'follower_count': question.get('followerCount', 0),
                        'hot_score': question.get('hot', '')
                    })
                return results
        except Exception as e:
            print(f"爬取失败: {e}")
            return []

互联网新闻批量爬取

class NewsCrawler(BatchCrawler):
    """新闻爬虫示例"""
    def crawl_news_list(self, keywords=['科技', '互联网'], pages=5):
        """按关键词批量爬取新闻"""
        all_news = []
        for keyword in keywords:
            for page in range(1, pages + 1):
                # 假设百度新闻的URL格式
                url = f"https://news.baidu.com/ns?word={keyword}&pn={page * 10}"
                print(f"正在爬取: {keyword} - 第{page}页")
                html = self.fetch_page(url)
                if not html:
                    continue
                # 解析新闻列表
                soup = BeautifulSoup(html, 'html.parser')
                news_items = soup.select('.result')
                for item in news_items:
                    try:
                        news_data = {
                            'title': item.select_one('h3 a').text if item.select_one('h3 a') else '',
                            'link': item.select_one('h3 a')['href'] if item.select_one('h3 a') else '',
                            'summary': item.select_one('.c-summary').text if item.select_one('.c-summary') else '',
                            'source': item.select_one('.c-source').text if item.select_one('.c-source') else '',
                            'keyword': keyword
                        }
                        all_news.append(news_data)
                    except:
                        continue
                time.sleep(1)  # 避免请求过快
        return all_news
## 4. 带反爬处理的批量爬取
class AntiCrawlCrawler:
    def __init__(self):
        self.proxies = self.load_proxies()
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
        ]
    def load_proxies(self):
        """加载代理IP"""
        # 可以从代理池API获取
        return [
            {'http': 'http://proxy1:8080', 'https': 'https://proxy1:8080'},
            # 更多代理...
        ]
    def get_random_headers(self):
        """获取随机请求头"""
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept': 'text/html,application/xhtml+xml',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
        }
    def crawl_with_retry(self, url, max_retries=3):
        """带重试机制的爬取"""
        for attempt in range(max_retries):
            try:
                headers = self.get_random_headers()
                proxy = random.choice(self.proxies) if self.proxies else None
                response = requests.get(
                    url, 
                    headers=headers, 
                    proxies=proxy,
                    timeout=10
                )
                if response.status_code == 200:
                    return response.text
                elif response.status_code == 403:
                    print(f"被拒绝访问,更换代理重试...")
                    continue
            except Exception as e:
                print(f"第{attempt + 1}次尝试失败: {e}")
                time.sleep(2 ** attempt)  # 指数退避
        return None

使用示例

if __name__ == '__main__':
    # 示例1:基础批量爬取
    crawler = BatchCrawler('https://example.com')
    data = crawler.batch_crawl(start_page=1, end_page=3)
    crawler.save_data(data, 'output.json')
    # 示例2:新闻爬取
    news_crawler = NewsCrawler('https://news.baidu.com')
    news = news_crawler.crawl_news_list(['Python', '编程'], 2)
    # 示例3:带反爬处理
    anti_crawler = AntiCrawlCrawler()
    content = anti_crawler.crawl_with_retry('https://example.com')

注意事项

  1. 遵守robots.txt:爬取前检查网站规则
  2. 控制频率:设置合理的延迟,避免影响目标网站
  3. 异常处理:添加完善的错误处理机制
  4. 数据存储:考虑使用数据库存储大量数据
  5. 法律合规:不要爬取敏感或受版权保护的数据

这个框架可以根据你的具体需求进行调整和扩展,需要我针对某个特定网站或需求提供更详细的示例吗?

标签: 爬虫批量

抱歉,评论功能暂时关闭!