做网站的公司有哪些网站做app用什么语言
2026/6/20 10:40:46 网站建设 项目流程
做网站的公司有哪些,网站做app用什么语言,典当行网站模板,安庆市网站建设制作引言#xff1a;电商评论数据的重要性与爬取挑战在当今大数据时代#xff0c;电商平台的用户评论数据蕴含着巨大的商业价值。美团作为中国领先的生活服务平台#xff0c;其商家评论数据对市场分析、竞品研究、消费者行为洞察具有重要意义。然而#xff0c;美团等大型平台都…引言电商评论数据的重要性与爬取挑战在当今大数据时代电商平台的用户评论数据蕴含着巨大的商业价值。美团作为中国领先的生活服务平台其商家评论数据对市场分析、竞品研究、消费者行为洞察具有重要意义。然而美团等大型平台都部署了复杂的反爬虫机制包括动态加密参数、JavaScript渲染、请求频率限制等使得传统爬虫技术难以奏效。本文将详细介绍如何使用最新Python技术栈构建一个能够破解美团反爬机制的高效评论爬虫系统。技术栈概览Python 3.9最新Python版本支持现代化语法特性Playwright微软开源的浏览器自动化框架替代SeleniumAsyncioPython异步编程框架提高爬取效率PyCryptodome处理AES加密解密BeautifulSoup4HTML解析aiohttp异步HTTP客户端Requests传统HTTP库用于简单请求第一部分美团评论系统架构分析1.1 美团反爬机制解析美团采用了多层反爬策略动态令牌生成关键API请求需要_token参数AES加密参数部分请求参数使用AES加密JavaScript渲染页面内容动态加载请求频率限制IP和账号风控系统Cookie验证需要完整的登录态保持1.2 评论数据接口分析通过浏览器开发者工具分析我们发现美团评论数据主要通过以下接口获取texthttps://www.meituan.com/meishi/api/poi/getMerchantComment该接口需要以下关键参数uuid设备唯一标识platform平台标识token动态生成的访问令牌poiId商家唯一IDoffset分页偏移量pageSize每页数量第二部分爬虫系统设计与实现2.1 项目结构textmeituan-comment-crawler/ ├── config/ │ ├── settings.py │ └── headers.py ├── core/ │ ├── decryptor.py │ ├── encryptor.py │ └── token_generator.py ├── spiders/ │ ├── base_spider.py │ └── comment_spider.py ├── utils/ │ ├── proxy_pool.py │ ├── user_agent.py │ └── file_io.py ├── models/ │ └── comment.py ├── requirements.txt ├── main.py └── README.md2.2 环境配置与依赖安装requirements.txttxtplaywright1.32.0 beautifulsoup44.12.2 aiohttp3.8.4 asyncio3.4.3 pycryptodome3.17 requests2.28.2 pandas1.5.3 loguru0.7.0 fake-useragent1.4.0 redis4.5.42.3 核心加密解密模块core/encryptor.pyAES加密参数生成pythonfrom Crypto.Cipher import AES from Crypto.Util.Padding import pad import base64 import json class MeituanEncryptor: 美团参数加密器 def __init__(self): # 美团使用的AES密钥和IV通过逆向分析获得 self.key b2f5f5f5f5f5f5f5f5f5f5f5f5f5f5f5f self.iv b5f5f5f5f5f5f5f5f self.mode AES.MODE_CBC def encrypt_params(self, params: dict) - str: 加密请求参数 Args: params: 需要加密的参数字典 Returns: str: 加密后的字符串 # 将参数转换为JSON字符串 json_str json.dumps(params, separators(,, :)) # 进行PKCS7填充 padded_data pad(json_str.encode(utf-8), AES.block_size) # 创建AES加密器 cipher AES.new(self.key, self.mode, self.iv) # 加密数据 encrypted_bytes cipher.encrypt(padded_data) # Base64编码 encrypted_str base64.b64encode(encrypted_bytes).decode(utf-8) return encrypted_str def generate_signature(self, params: dict, timestamp: int) - str: 生成请求签名 Args: params: 请求参数 timestamp: 时间戳 Returns: str: 签名 # 美团签名算法简化版实际需要逆向完整算法 param_str .join([f{k}{v} for k, v in sorted(params.items())]) sign_str f{param_str}timestamp{timestamp}key美团密钥 # 使用MD5生成签名实际美团使用更复杂的算法 import hashlib return hashlib.md5(sign_str.encode()).hexdigest()2.4 异步爬虫核心实现spiders/comment_spider.py主爬虫类pythonimport asyncio import json import time import random from typing import List, Dict, Optional from loguru import logger from playwright.async_api import async_playwright, Browser, Page from core.encryptor import MeituanEncryptor from core.decryptor import MeituanDecryptor from utils.proxy_pool import ProxyPool from utils.user_agent import UserAgentRotator from models.comment import Comment class MeituanCommentSpider: 美团评论爬虫 def __init__(self, headless: bool False): self.headless headless self.encryptor MeituanEncryptor() self.decryptor MeituanDecryptor() self.proxy_pool ProxyPool() self.ua_rotator UserAgentRotator() self.browser: Optional[Browser] None self.context None self.page: Optional[Page] None async def init_browser(self): 初始化浏览器环境 logger.info(正在初始化浏览器...) playwright await async_playwright().start() # 使用随机代理 proxy self.proxy_pool.get_random_proxy() # 启动浏览器 self.browser await playwright.chromium.launch( headlessself.headless, proxyproxy if proxy else None, args[ --disable-blink-featuresAutomationControlled, --no-sandbox, --disable-setuid-sandbox, --disable-dev-shm-usage, --disable-accelerated-2d-canvas, --disable-gpu, --window-size1920,1080 ] ) # 创建上下文 self.context await self.browser.new_context( user_agentself.ua_rotator.get_random_ua(), viewport{width: 1920, height: 1080} ) # 添加反检测脚本 await self.context.add_init_script( Object.defineProperty(navigator, webdriver, { get: () undefined }); window.chrome { runtime: {}, // etc. }; const originalQuery window.navigator.permissions.query; window.navigator.permissions.query (parameters) ( parameters.name notifications ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); ) self.page await self.context.new_page() # 设置请求拦截只允许必要的请求 await self.page.route(**/*, self.route_handler) logger.success(浏览器初始化完成) async def route_handler(self, route): 请求路由处理器 resource_type route.request.resource_type # 阻止图片、字体等不必要资源加载 if resource_type in [image, font, media]: await route.abort() else: await route.continue_() async def get_cookies(self, poi_id: str) - Dict: 获取商家页面的Cookies logger.info(f正在获取商家 {poi_id} 的Cookies...) url fhttps://www.meituan.com/meishi/{poi_id}/ try: await self.page.goto(url, wait_untilnetworkidle, timeout30000) # 等待页面加载完成 await self.page.wait_for_selector(.comment-list, timeout10000) # 获取Cookies cookies await self.context.cookies() # 转换为字典格式 cookies_dict {cookie[name]: cookie[value] for cookie in cookies} logger.success(Cookies获取成功) return cookies_dict except Exception as e: logger.error(f获取Cookies失败: {e}) # 尝试备用方法 return await self.get_cookies_fallback(poi_id) async def get_cookies_fallback(self, poi_id: str) - Dict: 备用方法获取Cookies # 模拟用户操作 await self.page.goto(https://www.meituan.com, wait_untilnetworkidle) await asyncio.sleep(random.uniform(1, 3)) # 搜索商家 await self.page.fill(input[typesearch], 测试商家) await asyncio.sleep(random.uniform(0.5, 1.5)) await self.page.keyboard.press(Enter) await asyncio.sleep(random.uniform(2, 4)) cookies await self.context.cookies() return {cookie[name]: cookie[value] for cookie in cookies} def build_request_params(self, poi_id: str, page: int 1, page_size: int 10) - Dict: 构建请求参数 base_params { poiId: poi_id, offset: (page - 1) * page_size, pageSize: page_size, sortType: 1, # 按时间排序 filterType: 0, # 全部评论 tagType: 0, starType: 0, onlyImg: False, needFold: False, platform: 1, partner: 126, originUrl: fhttps://www.meituan.com/meishi/{poi_id}/, riskLevel: 1, optimusCode: 10 } # 添加时间戳 timestamp int(time.time() * 1000) base_params[_token] self.generate_token(timestamp) base_params[uuid] self.generate_uuid() return base_params def generate_token(self, timestamp: int) - str: 生成动态token简化版 # 实际需要逆向JavaScript获取完整算法 import hashlib seed fmt{timestamp} return hashlib.md5(seed.encode()).hexdigest()[:16] def generate_uuid(self) - str: 生成UUID import uuid return str(uuid.uuid4()).replace(-, ) async def fetch_comments(self, poi_id: str, page: int 1, max_pages: int 10) - List[Comment]: 抓取评论数据 comments [] for current_page in range(1, max_pages 1): logger.info(f正在抓取第 {current_page} 页评论...) try: # 构建请求参数 params self.build_request_params(poi_id, current_page) # 加密参数 encrypted_params self.encryptor.encrypt_params(params) # 构建请求头 headers { User-Agent: self.ua_rotator.get_random_ua(), Referer: fhttps://www.meituan.com/meishi/{poi_id}/, Content-Type: application/json, Accept: application/json, text/plain, */*, Accept-Encoding: gzip, deflate, br, Accept-Language: zh-CN,zh;q0.9,en;q0.8, Connection: keep-alive, X-Requested-With: XMLHttpRequest, sec-ch-ua: Chromium;v112, Google Chrome;v112, Not:A-Brand;v99, sec-ch-ua-mobile: ?0, sec-ch-ua-platform: Windows, Sec-Fetch-Dest: empty, Sec-Fetch-Mode: cors, Sec-Fetch-Site: same-origin } # 获取Cookies cookies await self.get_cookies(poi_id) # 构建API URL api_url https://www.meituan.com/meishi/api/poi/getMerchantComment # 发送请求 response await self.page.request.post( api_url, headersheaders, cookiescookies, datajson.dumps({params: encrypted_params}), timeout30000 ) if response.status 200: response_data await response.json() # 解密响应数据 decrypted_data self.decryptor.decrypt_response(response_data) if decrypted_data and decrypted_data.get(code) 0: page_comments self.parse_comments(decrypted_data[data][comments]) comments.extend(page_comments) logger.success(f第 {current_page} 页抓取成功获取到 {len(page_comments)} 条评论) # 检查是否还有更多数据 if not page_comments or len(page_comments) 10: logger.info(没有更多评论数据) break # 随机延迟避免触发反爬 await asyncio.sleep(random.uniform(1, 3)) else: logger.warning(f第 {current_page} 页数据解密失败或返回错误) break else: logger.error(f请求失败状态码: {response.status}) break except Exception as e: logger.error(f抓取第 {current_page} 页时发生错误: {e}) # 记录错误但继续尝试 continue return comments def parse_comments(self, raw_comments: List[Dict]) - List[Comment]: 解析评论数据 parsed_comments [] for raw_comment in raw_comments: try: comment Comment( comment_idraw_comment.get(commentId), user_idraw_comment.get(userId), user_nameraw_comment.get(userName), user_levelraw_comment.get(userLevel), avatar_urlraw_comment.get(avatarUrl), comment_textraw_comment.get(comment), starraw_comment.get(star), avg_priceraw_comment.get(avgPrice), comment_timeraw_comment.get(commentTime), merchant_replyraw_comment.get(merchantReply), pic_urlsraw_comment.get(picUrls, []), label_inforaw_comment.get(labelInfo, []), up_countraw_comment.get(upCount, 0), reply_countraw_comment.get(replyCount, 0), extra_inforaw_comment.get(extraInfo, {}) ) parsed_comments.append(comment) except Exception as e: logger.warning(f解析评论失败: {e}, 原始数据: {raw_comment}) continue return parsed_comments async def crawl_multiple_shops(self, poi_ids: List[str], max_pages: int 5) - Dict[str, List[Comment]]: 批量抓取多个商家评论 results {} for poi_id in poi_ids: logger.info(f开始抓取商家 {poi_id} 的评论...) try: comments await self.fetch_comments(poi_id, max_pagesmax_pages) results[poi_id] comments logger.success(f商家 {poi_id} 抓取完成共获取 {len(comments)} 条评论) # 商家间延迟 await asyncio.sleep(random.uniform(3, 7)) except Exception as e: logger.error(f抓取商家 {poi_id} 失败: {e}) results[poi_id] [] continue return results async def close(self): 关闭资源 if self.context: await self.context.close() if self.browser: await self.browser.close() logger.info(爬虫资源已释放) async def main(): 主函数 spider MeituanCommentSpider(headlessTrue) try: await spider.init_browser() # 测试商家ID以北京全聚德为例 test_poi_id 123456789 # 实际ID需要从美团获取 # 抓取评论 comments await spider.fetch_comments(test_poi_id, max_pages3) # 保存数据 if comments: from utils.file_io import save_comments_to_json save_comments_to_json(comments, fcomments_{test_poi_id}.json) # 打印统计信息 print(f共抓取 {len(comments)} 条评论) print(f平均评分: {sum(c.star for c in comments)/len(comments):.2f}) print(f有图片评论: {sum(1 for c in comments if c.pic_urls)} 条) finally: await spider.close() if __name__ __main__: asyncio.run(main())2.5 数据模型定义models/comment.py评论数据结构pythonfrom dataclasses import dataclass, field from typing import List, Dict, Optional from datetime import datetime dataclass class Comment: 评论数据模型 comment_id: str user_id: str user_name: str user_level: int avatar_url: Optional[str] comment_text: str star: int # 评分1-5分 avg_price: Optional[float] # 人均消费 comment_time: str # 评论时间 merchant_reply: Optional[str] # 商家回复 pic_urls: List[str] field(default_factorylist) # 图片URL label_info: List[Dict] field(default_factorylist) # 标签信息 up_count: int 0 # 点赞数 reply_count: int 0 # 回复数 extra_info: Dict field(default_factorydict) # 额外信息 property def comment_datetime(self) - datetime: 转换为datetime对象 try: return datetime.fromtimestamp(int(self.comment_time) / 1000) except: return datetime.now() def to_dict(self) - Dict: 转换为字典 return { comment_id: self.comment_id, user_id: self.user_id, user_name: self.user_name, user_level: self.user_level, avatar_url: self.avatar_url, comment_text: self.comment_text, star: self.star, avg_price: self.avg_price, comment_time: self.comment_time, merchant_reply: self.merchant_reply, pic_urls: self.pic_urls, label_info: self.label_info, up_count: self.up_count, reply_count: self.reply_count, comment_datetime: self.comment_datetime.isoformat() }第三部分高级功能与优化3.1 分布式爬虫架构pythonimport redis from celery import Celery from concurrent.futures import ThreadPoolExecutor class DistributedSpider: 分布式爬虫管理器 def __init__(self, redis_urlredis://localhost:6379/0): self.redis_client redis.from_url(redis_url) self.celery_app Celery(meituan_spider, brokerredis_url) def distribute_tasks(self, poi_ids: List[str], workers: int 4): 分发爬取任务 with ThreadPoolExecutor(max_workersworkers) as executor: futures [] for poi_id in poi_ids: future executor.submit(self.crawl_single_shop, poi_id) futures.append(future) # 收集结果 results [f.result() for f in futures] return results3.2 数据存储与导出pythonimport pandas as pd import json from sqlalchemy import create_engine, Column, String, Integer, Float, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker Base declarative_base() class CommentORM(Base): 评论数据库模型 __tablename__ meituan_comments id Column(Integer, primary_keyTrue) comment_id Column(String(50), uniqueTrue) poi_id Column(String(20)) user_name Column(String(100)) comment_text Column(Text) star Column(Integer) avg_price Column(Float) comment_time Column(String(20)) created_at Column(String(20)) class DataExporter: 数据导出器 staticmethod def to_excel(comments: List[Comment], filename: str): 导出到Excel df pd.DataFrame([c.to_dict() for c in comments]) df.to_excel(filename, indexFalse, encodingutf-8-sig) staticmethod def to_database(comments: List[Comment], db_url: str): 保存到数据库 engine create_engine(db_url) Base.metadata.create_all(engine) Session sessionmaker(bindengine) session Session() for comment in comments: orm_obj CommentORM( comment_idcomment.comment_id, user_namecomment.user_name, comment_textcomment.comment_text, starcomment.star, avg_pricecomment.avg_price, comment_timecomment.comment_time ) session.add(orm_obj) session.commit() session.close()3.3 反反爬策略增强pythonclass AntiAntiCrawler: 反反爬策略管理器 def __init__(self): self.request_count 0 self.reset_time time.time() def should_sleep(self) - bool: 判断是否需要休眠 current_time time.time() # 每分钟请求数限制 if current_time - self.reset_time 60: self.request_count 0 self.reset_time current_time self.request_count 1 # 动态调整请求频率 if self.request_count 30: return True # 随机休眠概率 if random.random() 0.1: return True return False async def random_actions(self, page): 执行随机操作模拟人类行为 actions [ self.random_scroll, self.random_mouse_move, self.random_click, self.random_wait ] # 随机执行1-3个动作 num_actions random.randint(1, 3) for _ in range(num_actions): action random.choice(actions) await action(page) async def random_scroll(self, page): 随机滚动 scroll_height random.randint(100, 500) await page.evaluate(fwindow.scrollBy(0, {scroll_height})) await asyncio.sleep(random.uniform(0.5, 1.5))第四部分完整示例与运行指南4.1 配置环境bash# 安装依赖 pip install -r requirements.txt # 安装Playwright浏览器 playwright install chromium # 配置代理池可选 # 在config/proxies.txt中添加代理服务器4.2 运行示例main.pypythonimport asyncio import sys from loguru import logger from spiders.comment_spider import MeituanCommentSpider from utils.file_io import save_comments_to_json, save_comments_to_csv # 配置日志 logger.add(logs/crawler.log, rotation10 MB, retention30 days) logger.add(sys.stdout, formatgreen{time}/green level{message}/level) async def run_spider(): 运行爬虫示例 # 商家ID列表可从美团网站获取 shop_ids [ 123456789, # 示例ID 987654321, ] spider MeituanCommentSpider(headlessTrue) try: logger.info(启动美团评论爬虫...) await spider.init_browser() # 批量抓取 results await spider.crawl_multiple_shops(shop_ids[:2], max_pages2) # 保存结果 for poi_id, comments in results.items(): if comments: # 保存为JSON json_file fdata/comments_{poi_id}.json save_comments_to_json(comments, json_file) # 保存为CSV csv_file fdata/comments_{poi_id}.csv save_comments_to_csv(comments, csv_file) logger.success(f商家 {poi_id}: 保存 {len(comments)} 条评论到 {json_file} 和 {csv_file}) # 生成统计报告 total_comments sum(len(c) for c in results.values()) logger.success(f爬取完成共获取 {total_comments} 条评论) except KeyboardInterrupt: logger.info(用户中断爬虫) except Exception as e: logger.error(f爬虫运行失败: {e}) import traceback traceback.print_exc() finally: await spider.close() if __name__ __main__: # 创建数据目录 import os os.makedirs(data, exist_okTrue) os.makedirs(logs, exist_okTrue) # 运行异步主函数 asyncio.run(run_spider())4.3 数据处理与分析示例pythonimport json import pandas as pd from collections import Counter import matplotlib.pyplot as plt from wordcloud import WordCloud def analyze_comments(comments_file): 分析评论数据 # 加载数据 with open(comments_file, r, encodingutf-8) as f: comments json.load(f) df pd.DataFrame(comments) print( 评论分析报告 ) print(f总评论数: {len(df)}) print(f平均评分: {df[star].mean():.2f}) print(f评分分布:) print(df[star].value_counts().sort_index()) # 时间分析 df[date] pd.to_datetime(df[comment_datetime]) df[month] df[date].dt.to_period(M) monthly_counts df.groupby(month).size() # 词云生成 all_text .join(df[comment_text].dropna()) wordcloud WordCloud( font_pathsimhei.ttf, width800, height400, background_colorwhite ).generate(all_text) # 保存可视化结果 plt.figure(figsize(12, 6)) plt.subplot(1, 2, 1) monthly_counts.plot(kindbar) plt.title(评论数量月度分布) plt.xlabel(月份) plt.ylabel(评论数) plt.subplot(1, 2, 2) plt.imshow(wordcloud, interpolationbilinear) plt.axis(off) plt.title(评论词云) plt.tight_layout() plt.savefig(analysis_results.png, dpi300) plt.show() return df # 使用示例 if __name__ __main__: df analyze_comments(data/comments_123456789.json)第五部分法律与道德注意事项5.1 合规性声明遵守robots.txt检查目标网站的爬虫政策数据使用限制仅用于个人学习、研究目的尊重版权不侵犯商家和用户的合法权益控制频率避免对目标服务器造成过大压力隐私保护对用户个人信息进行脱敏处理5.2 最佳实践建议使用官方API优先考虑是否提供官方数据接口数据最小化只收集必要的数据设置合理延迟尊重网站服务器资源错误处理优雅处理请求失败情况定期维护及时更新代码以适应网站变化总结本文详细介绍了如何使用最新的Python技术构建一个高效、稳定的美团商家评论爬虫。通过分析美团的反爬机制我们采用了Playwright浏览器自动化、AES参数加密解密、异步编程等先进技术实现了一个能够应对复杂反爬策略的爬虫系统。关键要点总结动态渲染处理使用Playwright处理JavaScript渲染的页面参数加密破解逆向分析并实现美团的AES加密算法异步高效爬取利用asyncio实现高并发数据抓取反反爬策略模拟人类行为避免被检测数据完整处理从抓取到存储、分析的全流程解决方案

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询