2026/6/20 5:35:21
网站建设
项目流程
能发布自做的视频网站,wordpress path主题,网站建设情况说明书,有了域名后怎样做网站一、财经数据爬取的技术挑战与演进
在当今数字化金融时代#xff0c;财经资讯的实时获取对于投资决策、市场分析具有重要意义。传统爬虫技术面临JavaScript动态渲染、反爬机制升级、数据源异构等挑战。本文将介绍使用Python最新技术栈构建的高性能财经资讯爬虫系统#xff0…一、财经数据爬取的技术挑战与演进在当今数字化金融时代财经资讯的实时获取对于投资决策、市场分析具有重要意义。传统爬虫技术面临JavaScript动态渲染、反爬机制升级、数据源异构等挑战。本文将介绍使用Python最新技术栈构建的高性能财经资讯爬虫系统涵盖Playwright异步渲染、智能代理轮换、数据结构化解析等前沿技术。目录一、财经数据爬取的技术挑战与演进二、技术栈选型与架构设计2.1 核心组件2.2 系统架构三、完整爬虫系统实现3.1 环境配置与安装3.2 配置文件设计3.3 数据模型定义3.4 核心爬虫引擎3.5 财经数据源解析器3.6 异步任务调度器3.7 主爬虫程序3.8 反爬虫对抗策略四、部署与监控4.1 Docker部署配置4.2 监控与日志五、最佳实践与注意事项5.1 遵守robots.txt5.2 数据清洗与验证5.3 错误处理与重试机制六、总结二、技术栈选型与架构设计2.1 核心组件Playwright: 微软开源的现代化浏览器自动化工具完美处理动态渲染Asyncio: Python原生异步IO框架实现高并发爬取Pydantic: 数据验证与结构化确保数据质量FastAPI: 可选API封装提供数据服务接口Redis: 分布式缓存与任务队列管理2.2 系统架构text数据源层 → 爬取调度层 → 解析处理层 → 存储服务层 → API接口层三、完整爬虫系统实现3.1 环境配置与安装python# requirements.txt playwright1.40.0 asyncio3.4.3 aiohttp3.9.1 pydantic2.5.0 pandas2.1.4 redis5.0.1 asyncpg0.29.0 beautifulsoup44.12.2 lxml4.9.3 fastapi0.104.13.2 配置文件设计python# config/settings.py from pydantic_settings import BaseSettings from typing import List, Optional from enum import Enum class DataSource(str, Enum): EASTMONEY eastmoney SINA_FINANCE sina_finance JIN10 jin10 YAHOO_FINANCE yahoo_finance class CrawlerSettings(BaseSettings): # 数据源配置 DATA_SOURCES: List[DataSource] [ DataSource.EASTMONEY, DataSource.SINA_FINANCE, DataSource.JIN10 ] # 代理配置 PROXY_ENABLED: bool True PROXY_POOL_URL: str http://api.proxypool.com/get PROXY_MAX_RETRY: int 3 # 并发控制 MAX_CONCURRENT_TASKS: int 10 REQUEST_DELAY: float 1.5 # 浏览器配置 HEADLESS: bool True BROWSER_TIMEOUT: int 30000 # 存储配置 REDIS_URL: str redis://localhost:6379/0 POSTGRES_URL: str postgresql://user:passlocalhost/finance class Config: env_file .env3.3 数据模型定义python# models/finance_news.py from pydantic import BaseModel, Field, HttpUrl from datetime import datetime from typing import Optional, List from enum import Enum class NewsCategory(str, Enum): STOCK 股票 BOND 债券 FOREX 外汇 COMMODITY 大宗商品 ECONOMY 宏观经济 POLICY 政策法规 COMPANY 公司动态 class FinanceNews(BaseModel): 财经新闻数据模型 id: Optional[str] None title: str Field(..., min_length5, max_length200) content: str Field(..., min_length50) summary: Optional[str] None source: str source_url: HttpUrl category: NewsCategory publish_time: datetime keywords: List[str] Field(default_factorylist) sentiment_score: Optional[float] Field(defaultNone, ge-1, le1) related_stocks: List[str] Field(default_factorylist) crawl_time: datetime Field(default_factorydatetime.now) class Config: json_schema_extra { example: { title: 央行宣布降准0.5个百分点, content: 中国人民银行决定..., source: 东方财富, source_url: https://finance.eastmoney.com/a/20231201..., category: 宏观经济, publish_time: 2023-12-01T09:30:00 } }3.4 核心爬虫引擎python# core/crawler_engine.py import asyncio import logging from typing import Dict, List, Optional, Any from playwright.async_api import async_playwright, Browser, Page from dataclasses import dataclass from datetime import datetime import aiohttp from urllib.parse import urljoin import hashlib logger logging.getLogger(__name__) dataclass class CrawlResult: url: str html: str screenshot: Optional[bytes] None metadata: Dict[str, Any] None status: int 200 class SmartCrawlerEngine: 智能爬虫引擎 def __init__(self, headless: bool True): self.headless headless self.browser: Optional[Browser] None self.context_pool [] self.session_id hashlib.md5(str(datetime.now()).encode()).hexdigest()[:8] async def __aenter__(self): await self.init_browser() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def init_browser(self): 初始化浏览器实例 playwright await async_playwright().start() self.browser await playwright.chromium.launch( headlessself.headless, args[ --disable-blink-featuresAutomationControlled, --disable-dev-shm-usage, --no-sandbox, --disable-setuid-sandbox ] ) # 创建多个上下文用于并发 for _ in range(5): context await self.browser.new_context( viewport{width: 1920, height: 1080}, user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ) self.context_pool.append(context) async def fetch_with_playwright(self, url: str, wait_for_selector: str None, screenshot: bool False) - CrawlResult: 使用Playwright获取动态渲染页面 context self.context_pool.pop() if self.context_pool else \ await self.browser.new_context() try: page await context.new_page() # 设置随机延迟模拟人类行为 await page.route(**/*, self._block_unnecessary_resources) # 设置请求头 await page.set_extra_http_headers({ Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Accept-Encoding: gzip, deflate, br, Referer: https://www.google.com/, DNT: 1, Connection: keep-alive, Upgrade-Insecure-Requests: 1 }) response await page.goto(url, wait_untilnetworkidle, timeout30000) if wait_for_selector: await page.wait_for_selector(wait_for_selector, timeout10000) # 滚动页面加载更多内容 await self._auto_scroll(page) html await page.content() screenshot_bytes None if screenshot: screenshot_bytes await page.screenshot(full_pageTrue) result CrawlResult( urlurl, htmlhtml, screenshotscreenshot_bytes, metadata{ title: await page.title(), url: page.url, status: response.status if response else 0, crawl_time: datetime.now().isoformat() } ) return result finally: await page.close() self.context_pool.append(context) async def _auto_scroll(self, page: Page): 自动滚动页面以触发懒加载 await page.evaluate( async () { await new Promise((resolve) { let totalHeight 0; const distance 100; const timer setInterval(() { const scrollHeight document.body.scrollHeight; window.scrollBy(0, distance); totalHeight distance; if(totalHeight scrollHeight){ clearInterval(timer); setTimeout(resolve, 1000); } }, 100); }); } ) async def _block_unnecessary_resources(self, route): 阻止不必要的资源加载 resource_type route.request.resource_type if resource_type in [image, media, font, stylesheet]: await route.abort() else: await route.continue_() async def close(self): 关闭浏览器 for context in self.context_pool: await context.close() if self.browser: await self.browser.close()3.5 财经数据源解析器python# parsers/eastmoney_parser.py import re import json from bs4 import BeautifulSoup from typing import List, Dict, Optional from datetime import datetime import jieba import jieba.analyse class EastMoneyParser: 东方财富新闻解析器 def __init__(self): jieba.initialize() # 加载财经专业词典 jieba.load_userdict(data/finance_dict.txt) def parse_news_list(self, html: str) - List[Dict]: 解析新闻列表页 soup BeautifulSoup(html, lxml) news_items [] # 解析新闻列表 news_elements soup.select(.news-list li, .article-item, [class*newsItem]) for element in news_elements[:50]: # 限制数量 try: news_item self._parse_news_element(element) if news_item: news_items.append(news_item) except Exception as e: logger.error(f解析新闻元素失败: {e}) continue return news_items def _parse_news_element(self, element) - Optional[Dict]: 解析单个新闻元素 title_elem element.select_one(a[href*news]) if not title_elem: return None title title_elem.get_text(stripTrue) link title_elem.get(href, ) if not link.startswith(http): link fhttps://finance.eastmoney.com{link} time_elem element.select_one(.time, .date) pub_time self._parse_time(time_elem.get_text(stripTrue) if time_elem else ) # 提取摘要 summary_elem element.select_one(.summary, .content) summary summary_elem.get_text(stripTrue) if summary_elem else return { title: title, url: link, publish_time: pub_time, summary: summary[:200] if summary else , source: 东方财富 } def parse_news_detail(self, html: str, url: str) - Dict: 解析新闻详情页 soup BeautifulSoup(html, lxml) # 提取标题 title soup.select_one(h1).get_text(stripTrue) if soup.select_one(h1) else # 提取发布时间 time_elem soup.select_one(.time, .pub-date, .date) pub_time self._parse_time(time_elem.get_text(stripTrue) if time_elem else ) # 提取正文内容 content_elem soup.select_one(.article-content, .news_content, .Body) content if content_elem: # 移除无关元素 for tag in content_elem.select(script, style, .ad, .related-news): tag.decompose() content content_elem.get_text(\n, stripTrue) # 提取关键词 keywords self._extract_keywords(f{title} {content}) # 情感分析 sentiment self._analyze_sentiment(content) # 提取相关股票 stocks self._extract_stocks(content) return { title: title, content: content, publish_time: pub_time, source_url: url, keywords: keywords, sentiment_score: sentiment, related_stocks: stocks, word_count: len(content), crawl_time: datetime.now().isoformat() } def _parse_time(self, time_str: str) - str: 解析时间字符串 patterns [ r(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}), r(\d{4}年\d{2}月\d{2}日\s\d{2}:\d{2}), r(\d{2}-\d{2}\s\d{2}:\d{2}), ] for pattern in patterns: match re.search(pattern, time_str) if match: time_str match.group(1) break try: return datetime.strptime(time_str, %Y-%m-%d %H:%M:%S).isoformat() except: return datetime.now().isoformat() def _extract_keywords(self, text: str, top_k: int 10) - List[str]: 提取关键词 keywords jieba.analyse.extract_tags( text, topKtop_k, withWeightFalse, allowPOS(n, ns, vn, v, eng) ) return keywords def _analyze_sentiment(self, text: str) - float: 简单情感分析 positive_words [上涨, 利好, 增长, 盈利, 突破, 复苏, 稳健] negative_words [下跌, 利空, 亏损, 下滑, 风险, 危机, 衰退] score 0 words jieba.lcut(text) for word in words: if word in positive_words: score 0.1 elif word in negative_words: score - 0.1 return max(-1, min(1, score / 10)) def _extract_stocks(self, text: str) - List[str]: 提取股票代码 # 匹配A股代码 pattern r(?:股票代码|代码|股票)[:]?\s*(\d{6}) stocks re.findall(pattern, text) # 匹配股票简称 stock_pattern r([\u4e00-\u9fa5]{2,4})\((\d{6})\) stocks.extend(re.findall(stock_pattern, text)) return list(set(stocks))3.6 异步任务调度器python# core/task_scheduler.py import asyncio import logging from typing import List, Dict, Any, Callable from datetime import datetime from collections import deque import aiohttp import asyncpg from redis import asyncio as aioredis logger logging.getLogger(__name__) class AsyncTaskScheduler: 异步任务调度器 def __init__(self, max_concurrent: int 10): self.max_concurrent max_concurrent self.semaphore asyncio.Semaphore(max_concurrent) self.task_queue deque() self.results [] self.redis_client None self.db_pool None async def init_storage(self): 初始化存储连接 # 连接Redis self.redis_client await aioredis.from_url( redis://localhost:6379/0, decode_responsesTrue ) # 连接PostgreSQL self.db_pool await asyncpg.create_pool( useruser, passwordpass, databasefinance, hostlocalhost ) async def add_task(self, task_func: Callable, *args, **kwargs): 添加任务到队列 task_id ftask_{len(self.task_queue)}_{datetime.now().timestamp()} self.task_queue.append({ id: task_id, func: task_func, args: args, kwargs: kwargs }) async def run(self) - List[Any]: 运行所有任务 tasks [] while self.task_queue: task self.task_queue.popleft() tasks.append( self._execute_task_with_semaphore( task[func], *task[args], **task[kwargs] ) ) self.results await asyncio.gather(*tasks, return_exceptionsTrue) return self.results async def _execute_task_with_semaphore(self, func, *args, **kwargs): 使用信号量控制并发执行 async with self.semaphore: try: result await func(*args, **kwargs) # 存储到Redis缓存 if self.redis_client: cache_key fcrawl_result:{hash(str(args))} await self.redis_client.setex( cache_key, 3600, # 1小时过期 str(result) ) return result except Exception as e: logger.error(f任务执行失败: {e}) return None async def store_to_database(self, data: List[Dict]): 存储数据到数据库 if not self.db_pool: return async with self.db_pool.acquire() as conn: async with conn.transaction(): for item in data: await conn.execute( INSERT INTO finance_news (title, content, source, source_url, publish_time, keywords, sentiment_score, related_stocks, crawl_time) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (source_url) DO UPDATE SET title EXCLUDED.title, content EXCLUDED.content, crawl_time EXCLUDED.crawl_time , item[title], item[content], item.get(source, ), item[source_url], item[publish_time], item.get(keywords, []), item.get(sentiment_score), item.get(related_stocks, []), item.get(crawl_time) )3.7 主爬虫程序python# main.py import asyncio import logging from typing import List import sys from pathlib import Path sys.path.append(str(Path(__file__).parent)) from core.crawler_engine import SmartCrawlerEngine from core.task_scheduler import AsyncTaskScheduler from parsers.eastmoney_parser import EastMoneyParser from parsers.sina_parser import SinaFinanceParser from models.finance_news import FinanceNews, NewsCategory import json logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(logs/crawler.log), logging.StreamHandler() ] ) logger logging.getLogger(__name__) class FinanceCrawler: 财经资讯爬虫主程序 def __init__(self): self.crawler_engine None self.scheduler AsyncTaskScheduler(max_concurrent5) self.parsers { eastmoney: EastMoneyParser(), sina: SinaFinanceParser() } async def crawl_all_sources(self) - List[FinanceNews]: 爬取所有数据源 logger.info(开始爬取财经资讯...) all_news [] # 东方财富 eastmoney_news await self._crawl_eastmoney() all_news.extend(eastmoney_news) # 新浪财经 sina_news await self._crawl_sina_finance() all_news.extend(sina_news) # 今日头条财经 jinri_news await self._crawl_jinri_toutiao() all_news.extend(jinri_news) logger.info(f共爬取 {len(all_news)} 条新闻) return all_news async def _crawl_eastmoney(self) - List[FinanceNews]: 爬取东方财富 urls [ https://finance.eastmoney.com/, https://finance.eastmoney.com/news/cgnjj.html, # 国内经济 https://finance.eastmoney.com/news/cywjh.html, # 财经要闻 ] news_list [] async with SmartCrawlerEngine(headlessTrue) as crawler: for url in urls: try: result await crawler.fetch_with_playwright( url, wait_for_selector.news-list, screenshotFalse ) parser self.parsers[eastmoney] news_items parser.parse_news_list(result.html) # 并发爬取详情页 detail_tasks [] for item in news_items[:10]: # 限制数量 detail_tasks.append( self._fetch_news_detail(crawler, parser, item) ) details await asyncio.gather(*detail_tasks) news_list.extend([d for d in details if d]) await asyncio.sleep(2) # 礼貌延迟 except Exception as e: logger.error(f爬取东方财富失败 {url}: {e}) return news_list async def _fetch_news_detail(self, crawler, parser, news_item) - Optional[FinanceNews]: 爬取新闻详情 try: result await crawler.fetch_with_playwright( news_item[url], wait_for_selector.article-content ) detail parser.parse_news_detail(result.html, news_item[url]) # 创建数据模型 news FinanceNews( titledetail[title], contentdetail[content], sourcenews_item[source], source_urlnews_item[url], categoryself._classify_category(detail[title]), publish_timedetail[publish_time], keywordsdetail[keywords], sentiment_scoredetail[sentiment_score], related_stocksdetail[related_stocks] ) return news except Exception as e: logger.error(f爬取详情失败 {news_item[url]}: {e}) return None def _classify_category(self, title: str) - NewsCategory: 分类新闻 category_keywords { NewsCategory.STOCK: [股票, 股市, A股, 港股, 美股, 创业板], NewsCategory.BOND: [债券, 国债, 转债, 信用债], NewsCategory.FOREX: [汇率, 美元, 人民币, 外汇, 离岸], NewsCategory.COMMODITY: [黄金, 原油, 大宗, 期货, 铜, 铝], NewsCategory.ECONOMY: [GDP, CPI, 经济, 增长, 通胀, PMI], NewsCategory.POLICY: [政策, 央行, 财政部, 降准, 降息], } for category, keywords in category_keywords.items(): if any(keyword in title for keyword in keywords): return category return NewsCategory.COMPANY async def export_data(self, news_list: List[FinanceNews], format: str json): 导出数据 if format json: with open(data/finance_news.json, w, encodingutf-8) as f: json_data [news.dict() for news in news_list] json.dump(json_data, f, ensure_asciiFalse, indent2, defaultstr) elif format csv: import pandas as pd df pd.DataFrame([news.dict() for news in news_list]) df.to_csv(data/finance_news.csv, indexFalse, encodingutf-8-sig) logger.info(f数据已导出到 data/finance_news.{format}) async def main(): 主函数 crawler FinanceCrawler() try: # 初始化存储 await crawler.scheduler.init_storage() # 爬取数据 news_list await crawler.crawl_all_sources() # 导出数据 await crawler.export_data(news_list, json) await crawler.export_data(news_list, csv) # 存储到数据库 await crawler.scheduler.store_to_database( [news.dict() for news in news_list] ) logger.info(爬虫任务完成) except KeyboardInterrupt: logger.info(用户中断程序) except Exception as e: logger.error(f程序运行失败: {e}) raise if __name__ __main__: asyncio.run(main())3.8 反爬虫对抗策略python# core/anti_anti_crawler.py import random import time from typing import Optional from fake_useragent import UserAgent import hashlib class AntiAntiCrawler: 反反爬虫策略 def __init__(self): self.ua UserAgent() self.proxy_list self._load_proxies() self.request_history [] def get_random_headers(self) - dict: 生成随机请求头 return { User-Agent: self.ua.random, Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8, Accept-Language: random.choice([zh-CN,zh;q0.9, en-US,en;q0.8]), Accept-Encoding: gzip, deflate, br, DNT: random.choice([1, 0]), Connection: random.choice([keep-alive, close]), Upgrade-Insecure-Requests: 1, Sec-Fetch-Dest: document, Sec-Fetch-Mode: navigate, Sec-Fetch-Site: none, Sec-Fetch-User: ?1, Cache-Control: max-age0, TE: trailers, } def get_random_delay(self, base: float 1.0) - float: 生成随机延迟 return base random.uniform(0, 2.0) def rotate_proxy(self) - Optional[str]: 轮换代理IP if not self.proxy_list: return None return random.choice(self.proxy_list) def _load_proxies(self) - list: 加载代理列表 # 可以从文件、API或数据库加载 return [ http://proxy1.example.com:8080, http://proxy2.example.com:8080, # 更多代理... ] def generate_fingerprint(self) - str: 生成浏览器指纹 components [ str(time.time()), str(random.random()), self.ua.random ] fingerprint hashlib.md5(.join(components).encode()).hexdigest() return fingerprint四、部署与监控4.1 Docker部署配置dockerfile# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update apt-get install -y \ wget \ gnupg \ libnss3 \ libxss1 \ libasound2 \ libxtst6 \ fonts-noto-cjk \ rm -rf /var/lib/apt/lists/* # 安装Playwright浏览器 RUN pip install playwright1.40.0 \ playwright install chromium COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD [python, main.py]4.2 监控与日志python# monitoring/metrics.py from prometheus_client import Counter, Histogram, start_http_server import time class CrawlerMetrics: 爬虫监控指标 def __init__(self, port: int 9090): self.requests_total Counter( crawler_requests_total, Total number of requests, [source, status] ) self.request_duration Histogram( crawler_request_duration_seconds, Request duration in seconds, [source] ) self.news_extracted Counter( crawler_news_extracted_total, Total news extracted, [source, category] ) # 启动指标服务器 start_http_server(port) def record_request(self, source: str, duration: float, status: int): 记录请求指标 self.requests_total.labels(sourcesource, statusstatus).inc() self.request_duration.labels(sourcesource).observe(duration) def record_news(self, source: str, category: str): 记录新闻提取指标 self.news_extracted.labels(sourcesource, categorycategory).inc()五、最佳实践与注意事项5.1 遵守robots.txtpython# utils/robots_checker.py import urllib.robotparser def check_robots_permission(url: str, user_agent: str *) - bool: 检查robots.txt权限 try: rp urllib.robotparser.RobotFileParser() base_url /.join(url.split(/)[:3]) rp.set_url(f{base_url}/robots.txt) rp.read() return rp.can_fetch(user_agent, url) except: return True5.2 数据清洗与验证python# utils/data_cleaner.py import re from datetime import datetime def clean_text(text: str) - str: 清洗文本 if not text: return # 移除HTML标签 text re.sub(r[^], , text) # 移除多余空白 text re.sub(r\s, , text) # 移除特殊字符 text re.sub(r[\x00-\x08\x0b\x0c\x0e-\x1f\x7f], , text) return text.strip()5.3 错误处理与重试机制python# utils/retry_decorator.py import asyncio from functools import wraps from typing import Type, Tuple def async_retry( max_retries: int 3, delay: float 1.0, exceptions: Tuple[Type[Exception]] (Exception,) ): 异步重试装饰器 def decorator(func): wraps(func) async def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return await func(*args, **kwargs) except exceptions as e: if attempt max_retries - 1: raise wait delay * (2 ** attempt) random.uniform(0, 0.1) await asyncio.sleep(wait) return None return wrapper return decorator六、总结本文详细介绍了基于Python最新技术栈的财经资讯爬虫实现方案。该系统具有以下特点高性能使用asyncio和Playwright实现异步并发显著提升爬取效率智能化集成文本分析、情感分析、自动分类等功能鲁棒性强完善的错误处理、重试机制和反爬虫对抗策略可扩展模块化设计易于添加新的数据源和解析器易监控集成Prometheus监控指标实时掌握爬虫状态