本文目录导读:
我来给你一个完整的Python批量爬取案例,以爬取新闻网站为例:
基础批量爬取框架
import requests
from bs4 import BeautifulSoup
import time
import random
import json
import pandas as pd
from urllib.parse import urljoin
class BatchCrawler:
def __init__(self, base_url, headers=None):
self.base_url = base_url
self.headers = headers or {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
self.session = requests.Session()
self.session.headers.update(self.headers)
def fetch_page(self, url):
"""获取单个页面"""
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
response.encoding = response.apparent_encoding
return response.text
except Exception as e:
print(f"获取页面失败 {url}: {e}")
return None
def parse_list_page(self, html):
"""解析列表页,提取文章链接"""
soup = BeautifulSoup(html, 'html.parser')
links = []
# 根据实际网站结构调整选择器
for item in soup.select('.news-item a, .article a'):
href = item.get('href')
if href and not href.startswith('javascript'):
full_url = urljoin(self.base_url, href)
links.append(full_url)
return links
def parse_detail_page(self, html):
"""解析详情页,提取内容"""
soup = BeautifulSoup(html, 'html.parser')
data = {
'title': soup.select_one('h1, .title').text.strip() if soup.select_one('h1, .title') else '',
'content': soup.select_one('.content, .article-content').text.strip() if soup.select_one('.content, .article-content') else '',
'time': soup.select_one('.time, .date').text.strip() if soup.select_one('.time, .date') else '',
'author': soup.select_one('.author, .source').text.strip() if soup.select_one('.author, .source') else ''
}
return data
def batch_crawl(self, start_page=1, end_page=10, delay=1):
"""批量爬取"""
all_data = []
for page in range(start_page, end_page + 1):
print(f"正在爬取第 {page} 页...")
# 构建列表页URL
list_url = f"{self.base_url}/page/{page}"
# 获取列表页
list_html = self.fetch_page(list_url)
if not list_html:
continue
# 提取详情页链接
article_links = self.parse_list_page(list_html)
# 爬取每个详情页
for link in article_links:
try:
detail_html = self.fetch_page(link)
if detail_html:
data = self.parse_detail_page(detail_html)
data['url'] = link
all_data.append(data)
print(f" 爬取成功: {data.get('title', '未知标题')[:30]}")
# 随机延迟,避免被封
time.sleep(delay + random.random())
except Exception as e:
print(f" 爬取失败 {link}: {e}")
continue
# 页间延迟
time.sleep(random.uniform(1, 3))
return all_data
def save_data(self, data, filename='output.json'):
"""保存数据"""
# 保存为JSON
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 保存为CSV
csv_filename = filename.replace('.json', '.csv')
df = pd.DataFrame(data)
df.to_csv(csv_filename, index=False, encoding='utf-8-sig')
print(f"数据已保存到 {filename} 和 {csv_filename}")
实战案例:爬取知乎热门问题
import re
import requests
from lxml import etree
class ZhihuHotCrawler:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Cookie': '你的Cookie' # 需要登录后获取
}
self.session = requests.Session()
def crawl_hot_questions(self, limit=50):
"""爬取知乎热门问题"""
url = 'https://www.zhihu.com/hot'
try:
response = self.session.get(url, headers=self.headers)
response.raise_for_status()
# 使用正则提取数据
pattern = r'<script id="js-initialData" type="text/json">(.*?)</script>'
match = re.search(pattern, response.text, re.DOTALL)
if match:
import json
data = json.loads(match.group(1))
hot_list = data.get('initialState', {}).get('topstory', {}).get('hotList', [])
results = []
for item in hot_list[:limit]:
question = item.get('target', {})
results.append({
'title': question.get('title', ''),
'url': f"https://www.zhihu.com/question/{question.get('id', '')}",
'answer_count': question.get('answerCount', 0),
'follower_count': question.get('followerCount', 0),
'hot_score': question.get('hot', '')
})
return results
except Exception as e:
print(f"爬取失败: {e}")
return []
互联网新闻批量爬取
class NewsCrawler(BatchCrawler):
"""新闻爬虫示例"""
def crawl_news_list(self, keywords=['科技', '互联网'], pages=5):
"""按关键词批量爬取新闻"""
all_news = []
for keyword in keywords:
for page in range(1, pages + 1):
# 假设百度新闻的URL格式
url = f"https://news.baidu.com/ns?word={keyword}&pn={page * 10}"
print(f"正在爬取: {keyword} - 第{page}页")
html = self.fetch_page(url)
if not html:
continue
# 解析新闻列表
soup = BeautifulSoup(html, 'html.parser')
news_items = soup.select('.result')
for item in news_items:
try:
news_data = {
'title': item.select_one('h3 a').text if item.select_one('h3 a') else '',
'link': item.select_one('h3 a')['href'] if item.select_one('h3 a') else '',
'summary': item.select_one('.c-summary').text if item.select_one('.c-summary') else '',
'source': item.select_one('.c-source').text if item.select_one('.c-source') else '',
'keyword': keyword
}
all_news.append(news_data)
except:
continue
time.sleep(1) # 避免请求过快
return all_news
## 4. 带反爬处理的批量爬取
class AntiCrawlCrawler:
def __init__(self):
self.proxies = self.load_proxies()
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
]
def load_proxies(self):
"""加载代理IP"""
# 可以从代理池API获取
return [
{'http': 'http://proxy1:8080', 'https': 'https://proxy1:8080'},
# 更多代理...
]
def get_random_headers(self):
"""获取随机请求头"""
return {
'User-Agent': random.choice(self.user_agents),
'Accept': 'text/html,application/xhtml+xml',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
def crawl_with_retry(self, url, max_retries=3):
"""带重试机制的爬取"""
for attempt in range(max_retries):
try:
headers = self.get_random_headers()
proxy = random.choice(self.proxies) if self.proxies else None
response = requests.get(
url,
headers=headers,
proxies=proxy,
timeout=10
)
if response.status_code == 200:
return response.text
elif response.status_code == 403:
print(f"被拒绝访问,更换代理重试...")
continue
except Exception as e:
print(f"第{attempt + 1}次尝试失败: {e}")
time.sleep(2 ** attempt) # 指数退避
return None
使用示例
if __name__ == '__main__':
# 示例1:基础批量爬取
crawler = BatchCrawler('https://example.com')
data = crawler.batch_crawl(start_page=1, end_page=3)
crawler.save_data(data, 'output.json')
# 示例2:新闻爬取
news_crawler = NewsCrawler('https://news.baidu.com')
news = news_crawler.crawl_news_list(['Python', '编程'], 2)
# 示例3:带反爬处理
anti_crawler = AntiCrawlCrawler()
content = anti_crawler.crawl_with_retry('https://example.com')
注意事项
- 遵守robots.txt:爬取前检查网站规则
- 控制频率:设置合理的延迟,避免影响目标网站
- 异常处理:添加完善的错误处理机制
- 数据存储:考虑使用数据库存储大量数据
- 法律合规:不要爬取敏感或受版权保护的数据
这个框架可以根据你的具体需求进行调整和扩展,需要我针对某个特定网站或需求提供更详细的示例吗?
标签: 爬虫批量