Python 网站抓取工具实现:从抓取到本地存储的完整方案

下面我将为您实现一个完整的 Python 网站抓取工具,包含网页内容抓取和本地存储功能。这个工具将使用 Requests 库获取网页内容,BeautifulSoup 解析 HTML,并将抓取到的资源(文本、图片等)保存到本地指定文件夹中。

一、基础爬虫实现

1. 安装必要库

首先需要安装以下 Python 库:

pip install requests beautifulsoup4

2. 基础爬虫代码

import os
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse

class WebScraper:
    def __init__(self, base_url, save_dir='downloads'):
        """
        初始化爬虫
        :param base_url: 要抓取的网站URL
        :param save_dir: 本地存储目录
        """
        self.base_url = base_url
        self.save_dir = save_dir
        self.visited_urls = set()
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        })
        
        # 创建保存目录
        os.makedirs(self.save_dir, exist_ok=True)

    def is_valid_url(self, url):
        """
        检查URL是否有效且属于同一域名
        """
        parsed = urlparse(url)
        return parsed.netloc == urlparse(self.base_url).netloc and parsed.scheme in ['http', 'https']

    def download_resource(self, url):
        """
        下载资源文件(图片、CSS、JS等)
        """
        try:
            response = self.session.get(url, stream=True)
            if response.status_code == 200:
                # 从URL中提取文件名
                filename = os.path.basename(urlparse(url).path)
                if not filename:
                    filename = f"resource_{hash(url)}.bin"
                
                # 创建资源子目录
                resource_dir = os.path.join(self.save_dir, 'resources')
                os.makedirs(resource_dir, exist_ok=True)
                
                # 保存文件
                filepath = os.path.join(resource_dir, filename)
                with open(filepath, 'wb') as f:
                    for chunk in response.iter_content(1024):
                        f.write(chunk)
                print(f"下载资源: {url} -> {filepath}")
                return filepath
        except Exception as e:
            print(f"下载资源失败: {url}, 错误: {e}")
        return None

    def scrape_page(self, url):
        """
        抓取单个页面内容
        """
        if url in self.visited_urls:
            return
        self.visited_urls.add(url)
        
        try:
            print(f"抓取页面: {url}")
            response = self.session.get(url)
            if response.status_code == 200:
                soup = BeautifulSoup(response.text, 'html.parser')
                
                # 保存HTML内容
                filename = f"page_{len(self.visited_urls)}.html"
                filepath = os.path.join(self.save_dir, filename)
                with open(filepath, 'w', encoding='utf-8') as f:
                    f.write(response.text)
                print(f"保存页面: {filepath}")
                
                # 下载页面中的资源
                for tag in soup.find_all(['img', 'link', 'script']):
                    if tag.name == 'img' and tag.get('src'):
                        resource_url = urljoin(url, tag['src'])
                        self.download_resource(resource_url)
                    elif tag.name == 'link' and tag.get('href') and 'stylesheet' in tag.get('rel', []):
                        resource_url = urljoin(url, tag['href'])
                        self.download_resource(resource_url)
                    elif tag.name == 'script' and tag.get('src'):
                        resource_url = urljoin(url, tag['src'])
                        self.download_resource(resource_url)
                
                # 提取并跟踪页面中的链接
                for link in soup.find_all('a', href=True):
                    next_url = urljoin(url, link['href'])
                    if self.is_valid_url(next_url):
                        self.scrape_page(next_url)
                        
        except Exception as e:
            print(f"抓取页面失败: {url}, 错误: {e}")

    def run(self):
        """
        启动爬虫
        """
        self.scrape_page(self.base_url)
        print(f"抓取完成!共抓取 {len(self.visited_urls)} 个页面,保存在目录: {os.path.abspath(self.save_dir)}")

# 使用示例
if __name__ == '__main__':
    target_url = ' https://example.com '  # 替换为您要抓取的网站
    scraper = WebScraper(target_url, 'my_website_data')
    scraper.run()

二、功能扩展与优化

1. 支持更多资源类型下载

我们可以扩展download_resource方法,支持更多类型的资源下载:

def download_resource(self, url):
    try:
        response = self.session.get(url, stream=True)
        if response.status_code == 200:
            # 根据内容类型确定文件扩展名
            content_type = response.headers.get('Content-Type', '')
            ext_map = {
                'text/css': '.css',
                'application/javascript': '.js',
                'image/jpeg': '.jpg',
                'image/png': '.png',
                'image/gif': '.gif',
                'application/pdf': '.pdf'
            }
            
            # 从URL中提取文件名
            filename = os.path.basename(urlparse(url).path)
            if not filename:
                filename = f"resource_{hash(url)}"
            
            # 添加合适的扩展名
            for ct, ext in ext_map.items():
                if ct in content_type:
                    filename += ext
                    break
            else:
                filename += '.bin'  # 默认二进制文件
            
            # 创建资源子目录
            resource_dir = os.path.join(self.save_dir, 'resources')
            os.makedirs(resource_dir, exist_ok=True)
            
            # 保存文件
            filepath = os.path.join(resource_dir, filename)
            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(1024):
                    f.write(chunk)
            print(f"下载资源: {url} -> {filepath}")
            return filepath
    except Exception as e:
        print(f"下载资源失败: {url}, 错误: {e}")
    return None

2. 添加延迟和随机User-Agent

为了避免被网站封禁,我们可以添加请求延迟和随机 User-Agent:

import random
import time

class WebScraper:
    def __init__(self, base_url, save_dir='downloads'):
        # ...其他初始化代码...
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0'
        ]
        self.delay = (1, 3)  # 随机延迟1-3秒

    def get_random_user_agent(self):
        return random.choice(self.user_agents)

    def request_with_delay(self, url):
        """带延迟的请求"""
        time.sleep(random.uniform(*self.delay))
        self.session.headers.update({'User-Agent': self.get_random_user_agent()})
        return self.session.get(url, stream=True)
    
    # 修改download_resource和scrape_page方法,使用request_with_delay代替session.get

3. 支持代理设置

如果需要通过代理访问网站,可以添加代理支持:

class WebScraper:
    def __init__(self, base_url, save_dir='downloads', proxies=None):
        # ...其他初始化代码...
        self.proxies = proxies or {}
        
    def request_with_delay(self, url):
        """带延迟的请求"""
        time.sleep(random.uniform(*self.delay))
        self.session.headers.update({'User-Agent': self.get_random_user_agent()})
        return self.session.get(url, stream=True, proxies=self.proxies)

三、高级功能:使用Scrapy框架

对于更复杂的抓取需求,可以使用 Scrapy 框架,它提供了更强大的功能和更好的性能。

1. 安装Scrapy

pip install scrapy

2. 创建Scrapy项目

scrapy startproject website_scraper
cd website_scraper

3. 编写Spider

spiders目录下创建website_spider.py

import os
import scrapy
from urllib.parse import urljoin, urlparse
from scrapy.pipelines.files import FilesPipeline
from scrapy.exceptions import DropItem

class WebsiteScraperSpider(scrapy.Spider):
    name = "website_scraper"
    allowed_domains = ["example.com"]  # 替换为目标域名
    start_urls = [" https://example.com/ "]  # 替换为起始URL
    
    custom_settings = {
        'DOWNLOAD_DELAY': 2,  # 下载延迟
        'CONCURRENT_REQUESTS_PER_DOMAIN': 1,
        'FILES_STORE': os.path.join('downloads', 'resources'),  # 资源存储目录
        'ITEM_PIPELINES': {
            'website_scraper.pipelines.ResourceDownloadPipeline': 1,
        }
    }
    
    def parse(self, response):
        # 保存HTML页面
        page_filename = f"page_{hash(response.url)}.html"
        page_path = os.path.join('downloads', page_filename)
        with open(page_path, 'wb') as f:
            f.write(response.body)
        self.logger.info(f"Saved page {response.url} to {page_path}")
        
        # 提取资源链接
        resource_urls = []
        for tag in response.xpath('//img|//link|//script'):
            if tag.root.tag == 'img' and tag.root.get('src'):
                resource_urls.append(urljoin(response.url, tag.root.get('src')))
            elif tag.root.tag == 'link' and tag.root.get('href') and 'stylesheet' in tag.root.get('rel', []):
                resource_urls.append(urljoin(response.url, tag.root.get('href')))
            elif tag.root.tag == 'script' and tag.root.get('src'):
                resource_urls.append(urljoin(response.url, tag.root.get('src')))
        
        # 发送资源下载请求
        for url in resource_urls:
            yield {'file_urls': [url], 'page_url': response.url}
        
        # 跟踪页面链接
        for link in response.css('a::attr(href)').getall():
            next_url = urljoin(response.url, link)
            if urlparse(next_url).netloc == urlparse(self.start_urls.netloc:
                yield scrapy.Request(next_url, callback=self.parse)

class ResourceDownloadPipeline(FilesPipeline):
    def get_media_requests(self, item, info):
        for file_url in item['file_urls']:
            yield scrapy.Request(file_url, meta={'page_url': item['page_url']})
    
    def file_path(self, request, response=None, info=None, *, item=None):
        # 根据资源类型组织存储路径
        content_type = response.headers.get('Content-Type', b'').decode().split(';')
        file_ext = {
            'text/css': '.css',
            'application/javascript': '.js',
            'image/jpeg': '.jpg',
            'image/png': '.png',
            'image/gif': '.gif',
            'application/pdf': '.pdf'
        }.get(content_type, '.bin')
        
        filename = os.path.basename(urlparse(request.url).path)
        if not filename or os.path.splitext(filename)[1] != file_ext:
            filename = f"{hash(request.url)}{file_ext}"
        
        return filename

4. 运行Scrapy爬虫

scrapy crawl website_scraper

四、注意事项与最佳实践

  1. 遵守 robots.txt 规则: 在抓取网站前,检查目标网站的robots.txt文件(如 https://example.com/robots.txt),确保您的抓取行为符合网站的规定。

  2. 设置合理的抓取间隔: 避免对服务器造成过大压力,设置适当的DOWNLOAD_DELAY(如 2-5 秒)。

  3. 处理异常和错误: 添加适当的异常处理,确保爬虫在遇到问题时能够优雅地恢复或继续。

  4. 限制抓取深度: 对于大型网站,设置最大抓取深度或页面数量限制,避免无限抓取。

  5. 数据存储优化

    • 按类型组织文件结构(如html/images/css/等)

    • 考虑使用数据库存储元数据和索引

    • 对于大量数据,考虑使用压缩存储

  6. 法律和道德考虑

    • 尊重网站的版权和隐私政策

    • 不要抓取敏感或个人数据

    • 考虑在抓取前获取网站所有者的许可

五、完整示例:带配置的爬虫工具

以下是一个更完整的、可配置的爬虫工具实现:

import os
import time
import random
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import json
from datetime import datetime

class AdvancedWebScraper:
    def __init__(self, config):
        """
        初始化高级爬虫
        :param config: 配置字典,包含以下键:
            - base_url: 起始URL
            - save_dir: 保存目录
            - max_pages: 最大抓取页面数 (0表示无限制)
            - max_depth: 最大抓取深度
            - delay: 请求延迟范围 (min, max)
            - user_agents: User-Agent列表
            - proxies: 代理配置
            - allowed_extensions: 允许下载的文件扩展名
        """
        self.base_url = config['base_url']
        self.save_dir = config.get('save_dir', 'downloads')
        self.max_pages = config.get('max_pages', 0)
        self.max_depth = config.get('max_depth', 3)
        self.delay = config.get('delay', (1, 3))
        self.user_agents = config.get('user_agents', [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        ])
        self.proxies = config.get('proxies', {})
        self.allowed_extensions = config.get('allowed_extensions', ['.jpg', '.png', '.gif', '.css', '.js', '.pdf'])
        
        self.session = requests.Session()
        self.visited_urls = set()
        self.page_count = 0
        self.resource_count = 0
        self.start_time = datetime.now()
        
        # 创建保存目录结构
        os.makedirs(os.path.join(self.save_dir, 'pages'), exist_ok=True)
        os.makedirs(os.path.join(self.save_dir, 'resources'), exist_ok=True)
        os.makedirs(os.path.join(self.save_dir, 'logs'), exist_ok=True)

    def get_random_user_agent(self):
        return random.choice(self.user_agents)

    def request_with_delay(self, url):
        """带延迟的请求"""
        time.sleep(random.uniform(*self.delay))
        self.session.headers.update({
            'User-Agent': self.get_random_user_agent(),
            'Referer': self.base_url
        })
        try:
            response = self.session.get(url, stream=True, proxies=self.proxies, timeout=10)
            response.raise_for_status()
            return response
        except requests.exceptions.RequestException as e:
            self.log_error(f"请求失败: {url}, 错误: {str(e)}")
            return None

    def log_error(self, message):
        """记录错误日志"""
        log_file = os.path.join(self.save_dir, 'logs', 'errors.log')
        with open(log_file, 'a', encoding='utf-8') as f:
            timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            f.write(f"[{timestamp}] {message}\n")

    def save_page(self, url, content):
        """保存页面内容"""
        filename = f"page_{self.page_count}_{hash(url)}.html"
        filepath = os.path.join(self.save_dir, 'pages', filename)
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(content)
        self.page_count += 1
        return filepath

    def download_resource(self, url):
        """下载资源文件"""
        parsed = urlparse(url)
        ext = os.path.splitext(parsed.path)[1].lower()
        
        if self.allowed_extensions and ext not in self.allowed_extensions:
            return None
            
        response = self.request_with_delay(url)
        if response and response.status_code == 200:
            # 确定文件名
            filename = os.path.basename(parsed.path)
            if not filename:
                filename = f"resource_{self.resource_count}{ext}" if ext else f"resource_{self.resource_count}.bin"
            
            # 保存文件
            filepath = os.path.join(self.save_dir, 'resources', filename)
            with open(filepath, 'wb') as f:
                for chunk in response.iter_content(1024):
                    f.write(chunk)
            
            self.resource_count += 1
            return filepath
        return None

    def scrape_page(self, url, depth=0):
        """递归抓取页面"""
        if url in self.visited_urls:
            return
        if self.max_pages > 0 and self.page_count >= self.max_pages:
            return
        if depth > self.max_depth:
            return
            
        self.visited_urls.add(url)
        print(f"抓取: {url} (深度: {depth})")
        
        response = self.request_with_delay(url)
        if not response:
            return
            
        # 保存页面
        page_path = self.save_page(url, response.text)
        print(f"保存页面: {page_path}")
        
        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 下载资源
        for tag in soup.find_all(['img', 'link', 'script', 'source']):
            resource_url = None
            if tag.name == 'img' and tag.get('src'):
                resource_url = urljoin(url, tag['src'])
            elif tag.name == 'link' and tag.get('href') and 'stylesheet' in tag.get('rel', []):
                resource_url = urljoin(url, tag['href'])
            elif tag.name == 'script' and tag.get('src'):
                resource_url = urljoin(url, tag['src'])
            elif tag.name == 'source' and tag.get('src'):
                resource_url = urljoin(url, tag['src'])
                
            if resource_url:
                filepath = self.download_resource(resource_url)
                if filepath:
                    print(f"下载资源: {resource_url} -> {filepath}")
        
        # 跟踪链接
        if depth < self.max_depth:
            for link in soup.find_all('a', href=True):
                next_url = urljoin(url, link['href'])
                parsed = urlparse(next_url)
                if parsed.netloc == urlparse(self.base_url).netloc and parsed.scheme in ['http', 'https']:
                    self.scrape_page(next_url, depth + 1)

    def run(self):
        """运行爬虫"""
        print(f"开始抓取: {self.base_url}")
        self.scrape_page(self.base_url)
        
        # 生成报告
        report = {
            'base_url': self.base_url,
            'start_time': self.start_time.strftime('%Y-%m-%d %H:%M:%S'),
            'end_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'pages_crawled': self.page_count,
            'resources_downloaded': self.resource_count,
            'visited_urls': list(self.visited_urls)
        }
        
        report_path = os.path.join(self.save_dir, 'report.json')
        with open(report_path, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2)
        
        print(f"抓取完成!共抓取 {self.page_count} 个页面,下载 {self.resource_count} 个资源")
        print(f"详细报告已保存到: {report_path}")

# 配置和使用示例
if __name__ == '__main__':
    config = {
        'base_url': ' https://example.com ',  # 替换为目标网站
        'save_dir': 'example_com_data',
        'max_pages': 50,  # 最多抓取50个页面
        'max_depth': 2,   # 最大深度2层
        'delay': (1, 3),  # 1-3秒随机延迟
        'user_agents': [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        ],
        'allowed_extensions': ['.jpg', '.png', '.gif', '.css', '.js', '.pdf']
    }
    
    scraper = AdvancedWebScraper(config)
    scraper.run()

六、总结

本文提供了从基础到高级的 Python 网站抓取工具实现方案,包括:

  1. 基础爬虫实现:使用 Requests 和 BeautifulSoup 库抓取网页内容和资源

  2. 功能扩展:添加延迟、随机 User-Agent、代理支持等高级功能

  3. Scrapy 框架方案:使用专业的 Scrapy 框架实现更强大的爬虫

  4. 本地存储管理:合理组织抓取数据的目录结构

  5. 最佳实践:遵守 robots.txt、设置合理抓取间隔等

您可以根据实际需求选择合适的实现方案。对于简单任务,基础爬虫已经足够;对于复杂或大规模抓取任务,建议使用 Scrapy 框架。


Python 网站抓取工具实现:从抓取到本地存储的完整方案
https://uniomo.com/archives/python-wang-zhan-zhua-qu-gong-ju-shi-xian-cong-zhua-qu-dao-ben-di-cun-chu-de-wan-zheng-fang-an
作者
雨落秋垣
发布于
2025年04月15日
许可协议