2026/6/20 3:20:35
网站建设
项目流程
做网站数据对电脑要求,建设项目环保竣工验收备案网站,利用软件做许多网站违法吗,看动漫什么网站好摘要本文将深入探讨如何构建一个高效、稳定的淘宝商品价格监控系统。我们将采用最新的Python异步爬虫技术、反爬对抗策略以及智能解析方案#xff0c;实现一个完整的商品价格追踪解决方案。本文适合有一定Python基础的开发者#xff0c;涵盖从基础到高级的多个技术要点。1. 项…摘要本文将深入探讨如何构建一个高效、稳定的淘宝商品价格监控系统。我们将采用最新的Python异步爬虫技术、反爬对抗策略以及智能解析方案实现一个完整的商品价格追踪解决方案。本文适合有一定Python基础的开发者涵盖从基础到高级的多个技术要点。1. 项目概述与技术选型淘宝价格监控系统需要解决以下核心问题高效获取商品页面HTML内容智能解析动态加载的价格数据应对复杂的反爬机制实现定时监控与数据持久化异常处理与告警机制技术栈选择异步框架aiohttp asyncio高性能异步请求解析引擎Playwright BeautifulSoup4动态页面支持反爬对抗代理池 请求头轮换 行为模拟数据存储SQLite CSV轻量级方案调度任务APScheduler定时任务管理监控告警SMTP邮件通知 日志系统2. 环境配置与依赖安装bash# 创建虚拟环境 python -m venv taobao_monitor source taobao_monitor/bin/activate # Linux/Mac # taobao_monitor\Scripts\activate # Windows # 安装核心依赖 pip install aiohttp3.8.4 pip install beautifulsoup44.12.2 pip install playwright1.37.0 pip install apscheduler3.10.4 pip install pandas2.0.3 pip install loguru0.7.2 pip install fake-useragent1.4.0 # 安装Playwright浏览器驱动 playwright install chromium3. 核心爬虫架构设计我们的系统采用模块化设计主要包含以下组件text淘宝价格监控系统 ├── crawler/ # 爬虫核心 │ ├── async_client.py # 异步HTTP客户端 │ ├── parser.py # 页面解析器 │ └── anti_spider.py # 反爬对抗模块 ├── database/ # 数据存储 │ ├── models.py # 数据模型 │ └── storage.py # 存储接口 ├── scheduler/ # 任务调度 │ └── monitor.py # 监控调度器 ├── utils/ # 工具函数 │ ├── logger.py # 日志配置 │ └── notification.py # 通知模块 └── config.py # 配置文件4. 淘宝页面智能解析策略淘宝商品页面采用动态渲染技术价格数据通常通过JavaScript加载。我们采用多解析策略python# 策略1直接API请求效率最高 # 策略2Playwright渲染解析最稳定 # 策略3HTML特征提取备用方案5. 异步请求与性能优化采用aiohttp实现异步并发请求配合连接池和请求限流确保高并发下的稳定性。6. 数据存储与监控告警使用SQLite存储历史价格数据支持CSV导出。当价格变化超过阈值时自动发送邮件通知。7. 完整代码实现以下是完整的淘宝价格监控系统实现python 淘宝商品价格监控系统 v2.0 采用异步爬虫智能解析反爬对抗技术 作者Python爬虫专家 创建时间2024年1月 import asyncio import json import time import sqlite3 from datetime import datetime from typing import Dict, List, Optional, Tuple from dataclasses import dataclass from urllib.parse import urlparse, parse_qs import aiohttp import pandas as pd from bs4 import BeautifulSoup from loguru import logger from fake_useragent import UserAgent from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger import playwright.async_api from playwright.async_api import async_playwright # 配置模块 class Config: 系统配置 # 数据库配置 DB_PATH taobao_prices.db # 请求配置 REQUEST_TIMEOUT 30 MAX_CONCURRENT 5 # 最大并发数 REQUEST_DELAY 1 # 请求延迟(秒) # 解析配置 USE_PLAYWRIGHT True # 是否使用Playwright PLAYWRIGHT_TIMEOUT 30000 # Playwright超时(毫秒) # 监控配置 CHECK_INTERVAL 3600 # 检查间隔(秒) PRICE_CHANGE_THRESHOLD 0.1 # 价格变化阈值(10%) # 通知配置 EMAIL_ENABLED False SMTP_SERVER smtp.example.com SMTP_PORT 587 # 代理配置 PROXY_POOL [] # 代理池格式: http://user:passip:port # 数据模型 dataclass class Product: 商品信息 id: str url: str title: str current_price: float original_price: float discount: float sales: int shop_name: str timestamp: datetime # 异步HTTP客户端 class AsyncHttpClient: 异步HTTP客户端支持代理和请求头轮换 def __init__(self): self.session None self.ua UserAgent() self.request_count 0 async def __aenter__(self): connector aiohttp.TCPConnector( limitConfig.MAX_CONCURRENT, sslFalse ) self.session aiohttp.ClientSession( connectorconnector, headersself._get_headers() ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() def _get_headers(self) - Dict: 生成随机请求头 return { User-Agent: self.ua.random, Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, DNT: 1, Connection: keep-alive, Upgrade-Insecure-Requests: 1, Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: none, Sec-Fetch-User: ?1, Cache-Control: max-age0, } async def get(self, url: str, **kwargs) - Optional[str]: 发送GET请求 try: await asyncio.sleep(Config.REQUEST_DELAY) # 代理轮换 if Config.PROXY_POOL: proxy Config.PROXY_POOL[self.request_count % len(Config.PROXY_POOL)] kwargs[proxy] proxy async with self.session.get( url, timeoutConfig.REQUEST_TIMEOUT, headersself._get_headers(), **kwargs ) as response: self.request_count 1 if response.status 200: content await response.text() logger.info(f成功获取页面: {url}) return content elif response.status 403: logger.warning(f访问被拒绝: {url}) else: logger.error(fHTTP错误 {response.status}: {url}) except asyncio.TimeoutError: logger.error(f请求超时: {url}) except Exception as e: logger.error(f请求异常 {url}: {str(e)}) return None # 智能解析器 class TaobaoParser: 淘宝页面智能解析器 staticmethod def extract_product_id(url: str) - Optional[str]: 从URL提取商品ID try: parsed urlparse(url) params parse_qs(parsed.query) # 多种可能的ID参数 for key in [id, item_id, itemid]: if key in params: return params[key][0] # 从路径中提取 path_parts parsed.path.split(/) for part in path_parts: if part.isdigit() and len(part) 8: return part except Exception as e: logger.error(f提取商品ID失败: {str(e)}) return None staticmethod def parse_html(html: str, url: str) - Optional[Product]: 解析HTML页面获取商品信息 try: soup BeautifulSoup(html, html.parser) # 方法1尝试从JSON-LD数据中提取 json_ld soup.find(script, typeapplication/ldjson) if json_ld: try: data json.loads(json_ld.string) if type in data and data[type] Product: product_id TaobaoParser.extract_product_id(url) return Product( idproduct_id or , urlurl, titledata.get(name, ), current_pricefloat(data.get(offers, {}).get(price, 0)), original_price0, discount0, sales0, shop_namedata.get(brand, {}).get(name, ), timestampdatetime.now() ) except json.JSONDecodeError: pass # 方法2从页面meta标签提取 meta_title soup.find(meta, propertyog:title) meta_price soup.find(meta, propertyog:product:price) if meta_title and meta_price: product_id TaobaoParser.extract_product_id(url) title meta_title.get(content, ) price float(meta_price.get(content, 0)) return Product( idproduct_id or , urlurl, titletitle[:100], # 限制标题长度 current_priceprice, original_priceprice, discount0, sales0, shop_name, timestampdatetime.now() ) # 方法3从页面特定元素提取 # 这里可以根据实际页面结构调整选择器 price_element soup.select_one(.tb-rmb-num, .price) title_element soup.select_one(.tb-detail-hd h1, .title) if price_element and title_element: try: price_text price_element.text.strip().replace(¥, ).replace(, ) price float(price_text) product_id TaobaoParser.extract_product_id(url) return Product( idproduct_id or , urlurl, titletitle_element.text.strip()[:100], current_priceprice, original_priceprice, discount0, sales0, shop_name, timestampdatetime.now() ) except ValueError: pass except Exception as e: logger.error(f解析HTML失败: {str(e)}) return None staticmethod async def parse_with_playwright(url: str) - Optional[Product]: 使用Playwright解析动态页面 try: async with async_playwright() as p: browser await p.chromium.launch( headlessTrue, args[--disable-blink-featuresAutomationControlled] ) context await browser.new_context( viewport{width: 1920, height: 1080}, user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ) page await context.new_page() # 屏蔽图片和CSS加快加载 await page.route(**/*.{png,jpg,jpeg,webp,gif,css}, lambda route: route.abort()) await page.goto(url, timeoutConfig.PLAYWRIGHT_TIMEOUT) # 等待价格元素加载 await page.wait_for_load_state(networkidle) # 尝试多种选择器 selectors [ .tb-rmb-num, .price, [data-price], .tm-price ] for selector in selectors: elements await page.query_selector_all(selector) if elements: break # 执行JavaScript提取数据 product_data await page.evaluate( () { const result {}; // 提取价格 const priceEl document.querySelector(.tb-rmb-num) || document.querySelector(.price) || document.querySelector([data-price]); if (priceEl) { result.price parseFloat(priceEl.textContent.replace(/[^0-9.]/g, )); } // 提取标题 const titleEl document.querySelector(.tb-detail-hd h1) || document.querySelector(.title) || document.querySelector(h1); if (titleEl) { result.title titleEl.textContent.trim(); } // 提取销量 const salesEl document.querySelector(.tb-sell-counter) || document.querySelector(.sell-count); if (salesEl) { result.sales parseInt(salesEl.textContent.replace(/[^0-9]/g, )); } return result; } ) await browser.close() if price in product_data: product_id TaobaoParser.extract_product_id(url) return Product( idproduct_id or , urlurl, titleproduct_data.get(title, )[:100], current_pricefloat(product_data.get(price, 0)), original_pricefloat(product_data.get(price, 0)), discount0, salesproduct_data.get(sales, 0), shop_name, timestampdatetime.now() ) except Exception as e: logger.error(fPlaywright解析失败: {str(e)}) return None # 数据存储 class PriceDatabase: 价格数据库管理 def __init__(self, db_path: str Config.DB_PATH): self.db_path db_path self._init_database() def _init_database(self): 初始化数据库表 conn sqlite3.connect(self.db_path) cursor conn.cursor() # 商品信息表 cursor.execute( CREATE TABLE IF NOT EXISTS products ( id TEXT PRIMARY KEY, url TEXT NOT NULL, title TEXT, shop_name TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) # 价格历史表 cursor.execute( CREATE TABLE IF NOT EXISTS price_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, product_id TEXT NOT NULL, price REAL NOT NULL, discount REAL, sales INTEGER, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (product_id) REFERENCES products (id) ) ) # 创建索引 cursor.execute(CREATE INDEX IF NOT EXISTS idx_product_id ON price_history (product_id)) cursor.execute(CREATE INDEX IF NOT EXISTS idx_timestamp ON price_history (timestamp)) conn.commit() conn.close() def save_product(self, product: Product): 保存商品信息 conn sqlite3.connect(self.db_path) cursor conn.cursor() try: # 插入或更新商品信息 cursor.execute( INSERT OR REPLACE INTO products (id, url, title, shop_name) VALUES (?, ?, ?, ?) , (product.id, product.url, product.title, product.shop_name)) # 插入价格记录 cursor.execute( INSERT INTO price_history (product_id, price, discount, sales) VALUES (?, ?, ?, ?) , (product.id, product.current_price, product.discount, product.sales)) conn.commit() logger.info(f保存商品数据: {product.title}) except Exception as e: logger.error(f保存数据失败: {str(e)}) conn.rollback() finally: conn.close() def get_price_history(self, product_id: str, days: int 30) - pd.DataFrame: 获取商品价格历史 conn sqlite3.connect(self.db_path) query SELECT price, discount, sales, timestamp FROM price_history WHERE product_id ? AND timestamp datetime(now, ?) ORDER BY timestamp DESC df pd.read_sql_query( query, conn, params(product_id, f-{days} days) ) conn.close() return df # 监控调度器 class PriceMonitor: 价格监控调度器 def __init__(self): self.db PriceDatabase() self.scheduler AsyncIOScheduler() self.products [] # 监控的商品URL列表 def add_product(self, url: str): 添加监控商品 self.products.append(url) logger.info(f添加监控商品: {url}) async def check_product(self, url: str): 检查单个商品价格 logger.info(f开始检查商品: {url}) product None if Config.USE_PLAYWRIGHT: product await TaobaoParser.parse_with_playwright(url) if not product: async with AsyncHttpClient() as client: html await client.get(url) if html: product TaobaoParser.parse_html(html, url) if product: self.db.save_product(product) await self._check_price_alert(product) else: logger.warning(f无法获取商品信息: {url}) async def _check_price_alert(self, product: Product): 检查价格变化并发送告警 history self.db.get_price_history(product.id, days1) if len(history) 1: previous_price history.iloc[1][price] current_price product.current_price price_change abs(current_price - previous_price) / previous_price if price_change Config.PRICE_CHANGE_THRESHOLD: message ( f价格告警!\n f商品: {product.title}\n f原价: ¥{previous_price:.2f}\n f现价: ¥{current_price:.2f}\n f变化: {price_change*100:.1f}%\n f链接: {product.url} ) logger.warning(message) # 这里可以添加邮件或微信通知 async def check_all_products(self): 检查所有监控商品 logger.info(开始检查所有商品价格) semaphore asyncio.Semaphore(Config.MAX_CONCURRENT) async def limited_check(url): async with semaphore: await self.check_product(url) tasks [limited_check(url) for url in self.products] await asyncio.gather(*tasks, return_exceptionsTrue) logger.info(所有商品检查完成) def start(self): 启动监控调度 self.scheduler.add_job( self.check_all_products, IntervalTrigger(secondsConfig.CHECK_INTERVAL), idprice_monitor ) self.scheduler.start() logger.info(价格监控已启动) def stop(self): 停止监控 self.scheduler.shutdown() logger.info(价格监控已停止) # 主程序 async def main(): 主函数 # 配置日志 logger.add( logs/taobao_monitor_{time:YYYY-MM-DD}.log, rotation1 day, retention30 days, levelINFO ) # 创建监控器 monitor PriceMonitor() # 添加监控商品示例 sample_products [ https://item.taobao.com/item.htm?id1234567890, https://detail.tmall.com/item.htm?id2345678901, ] for url in sample_products: monitor.add_product(url) # 立即执行一次检查 await monitor.check_all_products() # 启动定时监控 monitor.start() # 保持程序运行 try: while True: await asyncio.sleep(1) except KeyboardInterrupt: monitor.stop() logger.info(程序已退出) # 辅助工具 def export_to_csv(product_id: str, days: int 30): 导出价格历史到CSV db PriceDatabase() df db.get_price_history(product_id, days) if not df.empty: filename fprice_history_{product_id}_{datetime.now().strftime(%Y%m%d)}.csv df.to_csv(filename, indexFalse, encodingutf-8-sig) logger.info(f数据已导出到: {filename}) else: logger.warning(没有找到历史数据) def analyze_price_trend(product_id: str): 分析价格趋势 db PriceDatabase() df db.get_price_history(product_id, days90) if len(df) 1: df[timestamp] pd.to_datetime(df[timestamp]) df.set_index(timestamp, inplaceTrue) # 计算统计指标 stats { 最高价: df[price].max(), 最低价: df[price].min(), 平均价: df[price].mean(), 当前价: df[price].iloc[0], 波动率: df[price].std() / df[price].mean() } print(价格统计报告:) for key, value in stats.items(): print(f{key}: {value:.2f}) # 建议购买时机 if df[price].iloc[0] df[price].quantile(0.3): print(建议当前价格处于低位适合购买) elif df[price].iloc[0] df[price].quantile(0.7): print(建议当前价格处于高位建议观望) else: print(建议当前价格处于正常区间) if __name__ __main__: # 运行监控系统 asyncio.run(main()) # 使用示例 # export_to_csv(1234567890, 30) # analyze_price_trend(1234567890)8. 部署与维护建议部署方案本地部署适合个人使用配置简单服务器部署使用Docker容器化部署云函数部署无服务器架构按需运行维护建议定期更新每季度更新一次解析规则监控日志每日检查错误日志代理维护定期更换代理IP池数据备份每周备份数据库注意事项遵守淘宝Robots协议控制请求频率避免封禁仅用于个人学习和研究商业使用需获得授权