我来详细介绍Python数据去重爬取的实操方法,包括多种去重策略的实现。
基础去重方案
使用集合(Set)去重
import requests
from urllib.parse import urlparse
class BasicDedupCrawler:
def __init__(self):
self.visited_urls = set()
self.visited_contents = set()
def crawl(self, url):
"""基础URL去重爬取"""
if url in self.visited_urls:
print(f"跳过已访问URL: {url}")
return None
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
# 标记URL为已访问
self.visited_urls.add(url)
return response.text
except Exception as e:
print(f"爬取失败 {url}: {e}")
return None
def crawl_dedup_content(self, url):
"""内容去重爬取"""
content = self.crawl(url)
if content:
# 使用内容哈希进行去重
content_hash = hash(content)
if content_hash not in self.visited_contents:
self.visited_contents.add(content_hash)
return content
return None
基于Bloom Filter的高效去重
import hashlib
import mmh3
from bitarray import bitarray
class BloomFilter:
def __init__(self, size=1000000, hash_count=7):
self.size = size
self.hash_count = hash_count
self.bit_array = bitarray(size)
self.bit_array.setall(0)
def add(self, item):
"""添加元素到布隆过滤器"""
for i in range(self.hash_count):
# 使用不同的哈希函数
digest = hashlib.md5(f"{item}{i}".encode()).hexdigest()
index = int(digest, 16) % self.size
self.bit_array[index] = 1
def contains(self, item):
"""检查元素是否存在"""
for i in range(self.hash_count):
digest = hashlib.md5(f"{item}{i}".encode()).hexdigest()
index = int(digest, 16) % self.size
if self.bit_array[index] == 0:
return False
return True
class BloomFilterCrawler:
def __init__(self):
self.bloom = BloomFilter()
self.url_count = 0
def should_crawl(self, url):
"""判断是否需要爬取"""
if self.bloom.contains(url):
return False
self.bloom.add(url)
self.url_count += 1
return True
基于数据库的持久化去重
import sqlite3
import hashlib
from datetime import datetime
class DatabaseDedupCrawler:
def __init__(self, db_path='crawler.db'):
self.conn = sqlite3.connect(db_path)
self.create_tables()
def create_tables(self):
"""创建去重表"""
cursor = self.conn.cursor()
# URL去重表
cursor.execute('''
CREATE TABLE IF NOT EXISTS visited_urls (
url_hash TEXT PRIMARY KEY,
url TEXT NOT NULL,
visited_time TIMESTAMP
)
''')
# 内容去重表
cursor.execute('''
CREATE TABLE IF NOT EXISTS content_hashes (
content_hash TEXT PRIMARY KEY,
url TEXT,
first_seen TIMESTAMP
)
''')
self.conn.commit()
def is_url_visited(self, url):
"""检查URL是否已访问"""
url_hash = hashlib.md5(url.encode()).hexdigest()
cursor = self.conn.execute(
"SELECT 1 FROM visited_urls WHERE url_hash = ?",
(url_hash,)
)
return cursor.fetchone() is not None
def mark_url_visited(self, url):
"""标记URL为已访问"""
url_hash = hashlib.md5(url.encode()).hexdigest()
self.conn.execute(
"INSERT OR IGNORE INTO visited_urls VALUES (?, ?, ?)",
(url_hash, url, datetime.now())
)
self.conn.commit()
def is_content_duplicate(self, content):
"""检查内容是否重复"""
content_hash = hashlib.md5(content.encode()).hexdigest()
cursor = self.conn.execute(
"SELECT 1 FROM content_hashes WHERE content_hash = ?",
(content_hash,)
)
return cursor.fetchone() is not None
完整的多线程去重爬虫示例
import requests
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from collections import deque
import hashlib
import time
class AdvancedDedupCrawler:
def __init__(self, max_threads=5, max_pages=100):
self.max_threads = max_threads
self.max_pages = max_pages
self.visited_urls = set()
self.visited_content_hashes = set()
self.url_queue = deque()
self.url_lock = threading.Lock()
self.content_lock = threading.Lock()
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def extract_links(self, html, base_url):
"""提取页面中的链接"""
soup = BeautifulSoup(html, 'html.parser')
links = []
for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
# 构建完整URL
full_url = urljoin(base_url, href)
# 只保留同域名的链接
if urlparse(full_url).netloc == urlparse(base_url).netloc:
links.append(full_url)
return links
def is_duplicate(self, url, content):
"""综合去重判断"""
with self.url_lock:
if url in self.visited_urls:
return True, "URL已访问"
# 内容去重
content_hash = hashlib.md5(content.encode()).hexdigest()
with self.content_lock:
if content_hash in self.visited_content_hashes:
return True, "内容重复"
return False, None
def process_page(self, url):
"""处理单个页面"""
try:
response = self.session.get(url, timeout=10)
response.encoding = response.apparent_encoding
if response.status_code != 200:
return None
content = response.text
# 去重检查
is_dup, reason = self.is_duplicate(url, content)
if is_dup:
print(f"跳过 [{reason}]: {url}")
return None
# 标记为已访问
with self.url_lock:
self.visited_urls.add(url)
with self.content_lock:
self.visited_content_hashes.add(
hashlib.md5(content.encode()).hexdigest()
)
print(f"爬取成功 [{len(self.visited_urls)}]: {url}")
# 提取新链接
new_links = self.extract_links(content, url)
with self.url_lock:
for link in new_links:
if link not in self.visited_urls:
self.url_queue.append(link)
return content
except Exception as e:
print(f"爬取失败 {url}: {e}")
return None
def crawl(self, start_url):
"""主爬取逻辑"""
self.url_queue.append(start_url)
crawled_count = 0
with ThreadPoolExecutor(max_workers=self.max_threads) as executor:
future_to_url = {}
while (self.url_queue or future_to_url) and crawled_count < self.max_pages:
# 提交新任务
while self.url_queue and len(future_to_url) < self.max_threads:
url = self.url_queue.popleft()
future = executor.submit(self.process_page, url)
future_to_url[future] = url
# 处理完成的任务
if future_to_url:
completed_futures = []
for future in as_completed(future_to_url, timeout=5):
url = future_to_url[future]
try:
result = future.result()
if result:
crawled_count += 1
except Exception as e:
print(f"任务异常 {url}: {e}")
completed_futures.append(future)
# 清理已完成的任务
for future in completed_futures:
del future_to_url[future]
print(f"爬取完成,共爬取 {crawled_count} 个页面")
def get_statistics(self):
"""获取统计信息"""
return {
"visited_urls": len(self.visited_urls),
"unique_contents": len(self.visited_content_hashes),
"queue_size": len(self.url_queue)
}
# 使用示例
if __name__ == "__main__":
# 初始化爬虫
crawler = AdvancedDedupCrawler(max_threads=3, max_pages=50)
# 开始爬取
start_url = "https://example.com"
crawler.crawl(start_url)
# 获取统计
stats = crawler.get_statistics()
print(f"统计信息: {stats}")
基于Redis的分布式去重
import redis
import hashlib
import json
class RedisDedupCrawler:
def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
self.redis_client = redis.StrictRedis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.url_set_key = 'crawler:visited_urls'
self.content_set_key = 'crawler:content_hashes'
def add_url(self, url):
"""添加URL到已访问集合"""
url_hash = hashlib.sha256(url.encode()).hexdigest()
return self.redis_client.sadd(self.url_set_key, url_hash)
def is_url_visited(self, url):
"""检查URL是否已访问"""
url_hash = hashlib.sha256(url.encode()).hexdigest()
return self.redis_client.sismember(self.url_set_key, url_hash)
def add_content_hash(self, content):
"""添加内容哈希到集合"""
content_hash = hashlib.md5(content.encode()).hexdigest()
return self.redis_client.sadd(self.content_set_key, content_hash)
def is_content_duplicate(self, content):
"""检查内容是否重复"""
content_hash = hashlib.md5(content.encode()).hexdigest()
return self.redis_client.sismember(self.content_set_key, content_hash)
def get_crawled_count(self):
"""获取已爬取URL数量"""
return self.redis_client.scard(self.url_set_key)
去重策略最佳实践
多层级去重
class MultiLayerDedup:
def __init__(self):
# 内存缓存(最快)
self.memory_cache = set()
# Bloom Filter(节省内存)
self.bloom_filter = BloomFilter()
# 数据库(持久化)
self.db = DatabaseDedupCrawler()
# Redis(分布式)
self.redis = None # 可选
def check_all(self, url, content=None):
"""多层级去重检查"""
# 1. 内存缓存检查(最快)
if url in self.memory_cache:
return True, "内存缓存命中"
# 2. Bloom Filter检查
if self.bloom_filter.contains(url):
# 进一步确认
if self.db.is_url_visited(url):
self.memory_cache.add(url)
return True, "Bloom Filter命中"
# 3. 内容去重
if content:
if self.db.is_content_duplicate(content):
return True, "内容重复"
return False, None
def mark_visited(self, url, content=None):
"""标记为已访问"""
# 1. 内存缓存
self.memory_cache.add(url)
# 2. Bloom Filter
self.bloom_filter.add(url)
# 3. 数据库持久化
self.db.mark_url_visited(url)
if content:
self.db.mark_content_visited(content)
实用建议
-
选择合适的去重策略:
- 小规模爬虫:使用集合(Set)即可
- 大规模爬虫:Bloom Filter + 数据库
- 分布式系统:Redis + Bloom Filter
-
URL规范化:
from urllib.parse import urlparse, urlunparse
def normalize_url(url): """URL规范化""" parsed = urlparse(url)
去除fragment
parsed = parsed._replace(fragment='')
# 去除末尾斜杠
path = parsed.path.rstrip('/')
parsed = parsed._replace(path=path)
# 转换为小写
return urlunparse(parsed).lower()
3. **性能优化**:
- 使用缓冲和批量操作
- 选择合适的哈希算法
- 合理设置Bloom Filter参数
这个实操指南提供了从简单到复杂的去重方案,你可以根据实际需求选择合适的策略。 标签: 爬虫实操