大家好,我是何三,80后老猿,独立开发者
作为电商运营人员,你是否也经历过这样的场景?周一早会上,老板突然问道:"我们的竞品XX昨天降价了,你们为什么没有第一时间发现?"会议室里鸦雀无声,你只能默默接下新任务——每天手动记录20个竞品的价格和销量变化。
从人工到智能:我的自动化监控之路
我曾经经历过,刚开始,我确实老老实实每天手动记录。早上9点打开电脑,依次登录5个电商平台,记录每个竞品的价格和销量,然后做成Excel表格。两周后,我的眼睛快瞎了,手指也快废了,数据还经常出错。
直到某天深夜,当我第N次核对数据时,突然想到:为什么不写个Python脚本自动完成这些工作?于是,我开始了电商竞品监控系统的开发之旅。
系统架构
这个自动化系统包含以下核心模块:
- 数据采集层:使用Playwright模拟登录和爬取数据
- 数据处理层:Redis实时去重,确保数据唯一性
- 数据存储层:飞书多维表格集中管理所有数据
- 智能分析层:DeepSeek大模型预测价格走势
- 预警通知层:企业微信机器人实时推送重要变化
一、突破反爬:模拟真实用户登录
电商平台都有严格的反爬机制,我们的第一关就是要"装"得像真人用户。 下面以某电商为例
from playwright.async_api import async_playwright
import asyncio
async def login_jd(username, password):
async with async_playwright() as p:
# 使用真实浏览器模式,设置合理的视窗大小
browser = await p.chromium.launch(
headless=False,
args=["--window-size=1280,720"]
)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36..."
)
page = await context.new_page()
await page.goto('https://passport.jxxxxxd.com/new/login.aspx')
# 模拟人类操作间隔
await page.click('text="账户登录"', delay=300)
await page.fill('#loginname', username, delay=100)
await page.fill('#nloginpwd', password, delay=150)
# 添加随机鼠标移动
await page.mouse.move(100, 100)
await page.mouse.move(200, 150)
await page.click('id=loginsubmit')
# 等待登录成功,保存cookies
await page.wait_for_url('https://www.jxxxxxxd.com/')
await context.storage_state(path='jd_state.json')
await browser.close()
关键技巧: - 设置合理的操作延迟(delay参数) - 添加随机鼠标移动轨迹 - 使用真实User-Agent - 保存登录状态避免重复登录
二、高效采集:并发获取多品类数据
传统爬虫是串行采集,效率太低。我们使用asyncio实现并发采集:
import aiohttp
from bs4 import BeautifulSoup
import asyncio
async def fetch_category(category, session):
print(f"开始采集品类: {category}")
url = f'https://www.jxxxxd.com/hotitem/{category}.html'
try:
async with session.get(url, timeout=10) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
items = []
for item in soup.select('.hot-item')[:20]: # 只取前20个热销商品
# 添加异常处理,防止某个元素找不到导致整个任务失败
try:
name = item.select_one('.p-name a').text.strip()
price = float(item.select_one('.p-price').text.strip().replace('¥', ''))
sales_text = item.select_one('.p-sales').text.strip()
# 处理销量单位(万/千)
if '万' in sales_text:
sales = int(float(sales_text.replace('销量', '').replace('万', '')) * 10000
else:
sales = int(sales_text.replace('销量', ''))
items.append({
'category': category,
'name': name,
'price': price,
'sales': sales,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
print(f"解析商品失败: {str(e)}")
continue
return items
except Exception as e:
print(f"采集品类{category}失败: {str(e)}")
return []
async def monitor_all_categories(categories):
connector = aiohttp.TCPConnector(limit=5) # 限制并发数避免被封
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_category(cat, session) for cat in categories]
results = await asyncio.gather(*tasks, return_exceptions=True)
all_items = []
for res in results:
if isinstance(res, list):
all_items.extend(res)
print(f"共采集到{len(all_items)}条商品数据")
return all_items
优化点: - 限制并发连接数(TCPConnector) - 完善的异常处理机制 - 销量单位自动转换 - 超时设置避免长时间等待
三、数据去重:Redis实时处理
随着数据不断累积,我们需要智能识别哪些是真正的新数据:
import redis
import json
import hashlib
class SmartDeduplicator:
def __init__(self):
self.redis = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
self.price_change_threshold = 0.03 # 价格变化超过3%才认为是新数据
def _generate_item_key(self, item):
"""生成商品唯一标识"""
return f"item:{hashlib.md5(item['name'].encode()).hexdigest()}"
def is_significant_change(self, new_item):
"""判断是否是重要变化"""
item_key = self._generate_item_key(new_item)
existing_data = self.redis.get(item_key)
if not existing_data:
return True # 新商品
old_item = json.loads(existing_data)
# 价格变化超过阈值
price_change = abs(new_item['price'] - old_item['price']) / old_item['price']
if price_change >= self.price_change_threshold:
return True
# 销量变化超过20%
sales_change = abs(new_item['sales'] - old_item['sales']) / max(old_item['sales'], 1)
if sales_change >= 0.2:
return True
return False
def update_item(self, item):
"""更新商品数据"""
item_key = self._generate_item_key(item)
self.redis.set(item_key, json.dumps(item), ex=7*86400) # 保留7天
智能判断逻辑:
- 新商品直接记录
- 价格变化≥3%视为重要变化
- 销量变化≥20%视为重要变化
- 数据保留7天自动过期
四、深度分析:DeepSeek大模型预测
这才是系统的"大脑"部分,我们使用DeepSeek的OpenAI兼容API:
import openai
from tenacity import retry, stop_after_attempt, wait_exponential
class PricePredictor:
def __init__(self):
openai.api_base = "https://api.deepseek.com/v1"
openai.api_key = "your_deepseek_api_key"
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def analyze_trend(self, item_history):
"""使用DeepSeek分析价格趋势"""
prompt = self._build_analysis_prompt(item_history)
try:
response = await openai.ChatCompletion.acreate(
model="deepseek-chat",
messages=[
{"role": "system", "content": "你是一位资深的电商价格分析师。"},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=800
)
return self._parse_response(response.choices[0].message.content)
except Exception as e:
print(f"DeepSeek分析失败: {str(e)}")
return None
def _build_analysis_prompt(self, history):
"""构建分析提示词"""
history_str = "\n".join(
f"{entry['date']}: 价格¥{entry['price']} (销量{entry['sales']})"
for entry in history[-30:] # 使用最近30天数据
)
return f"""请分析以下商品的价格趋势并预测未来一周的价格变化:
商品名称: {history[-1]['name']}
商品类别: {history[-1]['category']}
当前价格: ¥{history[-1]['price']}
当前销量: {history[-1]['sales']}
历史数据:
{history_str}
请从以下角度分析:
1. 价格变化的周期性和规律
2. 价格与销量的相关性
3. 同品类其他商品的价格趋势
4. 可能的促销活动时间点
预测要求:
1. 未来7天最可能的价格变化
2. 降价概率和可能幅度
3. 最佳监控频率建议
请用以下JSON格式回复:
{{
"current_price": 当前价格,
"predicted_prices": {{
"1d": 预测1天后价格,
"3d": 预测3天后价格,
"7d": 预测7天后价格
}},
"confidence": 置信度(0-100),
"trend_analysis": "趋势分析文本",
"recommendations": ["建议1", "建议2"]
}}"""
提示词设计技巧:
- 明确分析角度和要求
- 指定结构化输出格式
- 包含足够的历史上下文
- 要求给出具体建议
五、企业微信机器人集成
预警信息通过企业微信实时推送到团队群:
import requests
import markdown
class WeComAlert:
def __init__(self, webhook_url):
self.webhook = webhook_url
def send_price_alert(self, item, prediction):
"""发送价格预警"""
md_content = f"""
### 🔔 价格预警:{item['name']}
**当前价格**: ¥{item['price']}
**预测价格**: ¥{prediction['predicted_prices']['3d']} (3天后)
**置信度**: {prediction['confidence']}%
**趋势分析**:
{prediction['trend_analysis']}
**建议操作**:
{" \n".join(prediction['recommendations'])}
"""
data = {
"msgtype": "markdown",
"markdown": {
"content": md_content
}
}
requests.post(self.webhook, json=data)
完整系统集成
将所有模块组合成完整系统:
class CompetitorMonitorSystem:
def __init__(self):
self.deduplicator = SmartDeduplicator()
self.feishu = FeishuSheet()
self.predictor = PricePredictor()
self.alerter = WeComAlert()
self.categories = ['手机', '笔记本电脑', '家电']
async def run(self):
print("启动竞品监控系统...")
# 1. 数据采集
items = await monitor_all_categories(self.categories)
# 2. 数据清洗
significant_items = [
item for item in items
if self.deduplicator.is_significant_change(item)
]
if not significant_items:
print("没有发现重要变化")
return
# 3. 存储到飞书
self.feishu.add_items(significant_items)
# 4. 深度分析
for item in significant_items:
history = self.feishu.get_item_history(item['name'])
prediction = await self.predictor.analyze_trend(history)
if prediction and prediction['confidence'] > 75:
self.alerter.send_price_alert(item, prediction)
print(f"处理完成,发现{len(significant_items)}个重要变化")
if __name__ == "__main__":
monitor = CompetitorMonitorSystem()
# 定时任务
import schedule
import time
schedule.every(3).hours.do(
lambda: asyncio.run(monitor.run())
)
while True:
schedule.run_pending()
time.sleep(1)
六、避坑指南
在实际开发中,我踩过不少坑,这里分享几个关键经验:
-
反爬对抗:
-
不要用固定时间间隔,添加随机延迟
- 定期更换User-Agent
-
使用住宅代理IP(如Luminati)
-
数据准确性:
-
处理商品缺货状态
- 识别预售/秒杀等特殊状态
-
注意价格单位(有的显示"¥123.00",有的只显示"123")
-
性能优化:
# 不好的做法:同步阻塞式请求
for url in urls:
requests.get(url) # 同步请求
# 好的做法:异步并发
async with aiohttp.ClientSession() as session:
tasks = [fetch(url, session) for url in urls]
await asyncio.gather(*tasks)
- 错误处理:
# 添加重试机制
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
async def fetch_data(url):
try:
async with session.get(url, timeout=5) as response:
return await response.text()
except Exception as e:
print(f"请求失败: {str(e)}")
raise
七、效果展示
使用这个系统后,运营效率提升了300%:
- 响应速度:从发现竞品降价到制定对策的时间从平均8小时缩短到30分钟
- 人力成本:节省了2个全职人工监控的工作量
- 商业价值:提前预测到3次重大降价,避免损失约120万元
八、扩展应用
这个系统的框架还可以应用于更多场景:
- 房地产监控:跟踪竞品楼盘的定价策略
- 股票市场:监控相关股票的异常波动
- 旅游行业:预测机票酒店价格变化
- 招聘市场:分析竞对公司的人才策略
最后
从被迫手动记录数据,到开发出智能监控系统,这段经历让我深刻体会到Python自动化的力量。现在,我每天不再忙于机械性的数据收集,而是专注于更有价值的市场策略分析。
如果你也面临类似的竞品监控需求,不妨尝试实现这个系统。最好的学习方式就是动手实践!
本文所有内容只对技术进行讨论,请遵纪守法,请勿用于非法用途