大家好,我是何三,80后老猿,独立开发者

作为电商运营人员,你是否也经历过这样的场景?周一早会上,老板突然问道:"我们的竞品XX昨天降价了,你们为什么没有第一时间发现?"会议室里鸦雀无声,你只能默默接下新任务——每天手动记录20个竞品的价格和销量变化。

从人工到智能:我的自动化监控之路

我曾经经历过,刚开始,我确实老老实实每天手动记录。早上9点打开电脑,依次登录5个电商平台,记录每个竞品的价格和销量,然后做成Excel表格。两周后,我的眼睛快瞎了,手指也快废了,数据还经常出错。

直到某天深夜,当我第N次核对数据时,突然想到:为什么不写个Python脚本自动完成这些工作?于是,我开始了电商竞品监控系统的开发之旅。

系统架构

这个自动化系统包含以下核心模块:

  1. 数据采集层:使用Playwright模拟登录和爬取数据
  2. 数据处理层:Redis实时去重,确保数据唯一性
  3. 数据存储层:飞书多维表格集中管理所有数据
  4. 智能分析层:DeepSeek大模型预测价格走势
  5. 预警通知层:企业微信机器人实时推送重要变化

一、突破反爬:模拟真实用户登录

电商平台都有严格的反爬机制,我们的第一关就是要"装"得像真人用户。 下面以某电商为例

from playwright.async_api import async_playwright
import asyncio

async def login_jd(username, password):
    async with async_playwright() as p:
        # 使用真实浏览器模式,设置合理的视窗大小
        browser = await p.chromium.launch(
            headless=False,
            args=["--window-size=1280,720"]
        )
        context = await browser.new_context(
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36..."
        )

        page = await context.new_page()
        await page.goto('https://passport.jxxxxxd.com/new/login.aspx')

        # 模拟人类操作间隔
        await page.click('text="账户登录"', delay=300)
        await page.fill('#loginname', username, delay=100)
        await page.fill('#nloginpwd', password, delay=150)

        # 添加随机鼠标移动
        await page.mouse.move(100, 100)
        await page.mouse.move(200, 150)

        await page.click('id=loginsubmit')

        # 等待登录成功,保存cookies
        await page.wait_for_url('https://www.jxxxxxxd.com/')
        await context.storage_state(path='jd_state.json')
        await browser.close()

关键技巧: - 设置合理的操作延迟(delay参数) - 添加随机鼠标移动轨迹 - 使用真实User-Agent - 保存登录状态避免重复登录

二、高效采集:并发获取多品类数据

传统爬虫是串行采集,效率太低。我们使用asyncio实现并发采集:

import aiohttp
from bs4 import BeautifulSoup
import asyncio

async def fetch_category(category, session):
    print(f"开始采集品类: {category}")
    url = f'https://www.jxxxxd.com/hotitem/{category}.html'

    try:
        async with session.get(url, timeout=10) as response:
            html = await response.text()
            soup = BeautifulSoup(html, 'html.parser')

            items = []
            for item in soup.select('.hot-item')[:20]:  # 只取前20个热销商品
                # 添加异常处理,防止某个元素找不到导致整个任务失败
                try:
                    name = item.select_one('.p-name a').text.strip()
                    price = float(item.select_one('.p-price').text.strip().replace('¥', ''))
                    sales_text = item.select_one('.p-sales').text.strip()

                    # 处理销量单位(万/千)
                    if '万' in sales_text:
                        sales = int(float(sales_text.replace('销量', '').replace('万', '')) * 10000
                    else:
                        sales = int(sales_text.replace('销量', ''))

                    items.append({
                        'category': category,
                        'name': name,
                        'price': price,
                        'sales': sales,
                        'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                    })
                except Exception as e:
                    print(f"解析商品失败: {str(e)}")
                    continue

            return items
    except Exception as e:
        print(f"采集品类{category}失败: {str(e)}")
        return []

async def monitor_all_categories(categories):
    connector = aiohttp.TCPConnector(limit=5)  # 限制并发数避免被封
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [fetch_category(cat, session) for cat in categories]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        all_items = []
        for res in results:
            if isinstance(res, list):
                all_items.extend(res)

        print(f"共采集到{len(all_items)}条商品数据")
        return all_items

优化点: - 限制并发连接数(TCPConnector) - 完善的异常处理机制 - 销量单位自动转换 - 超时设置避免长时间等待

三、数据去重:Redis实时处理

随着数据不断累积,我们需要智能识别哪些是真正的新数据:

import redis
import json
import hashlib

class SmartDeduplicator:
    def __init__(self):
        self.redis = redis.Redis(
            host='localhost',
            port=6379,
            db=0,
            decode_responses=True
        )
        self.price_change_threshold = 0.03  # 价格变化超过3%才认为是新数据

    def _generate_item_key(self, item):
        """生成商品唯一标识"""
        return f"item:{hashlib.md5(item['name'].encode()).hexdigest()}"

    def is_significant_change(self, new_item):
        """判断是否是重要变化"""
        item_key = self._generate_item_key(new_item)
        existing_data = self.redis.get(item_key)

        if not existing_data:
            return True  # 新商品

        old_item = json.loads(existing_data)

        # 价格变化超过阈值
        price_change = abs(new_item['price'] - old_item['price']) / old_item['price']
        if price_change >= self.price_change_threshold:
            return True

        # 销量变化超过20%
        sales_change = abs(new_item['sales'] - old_item['sales']) / max(old_item['sales'], 1)
        if sales_change >= 0.2:
            return True

        return False

    def update_item(self, item):
        """更新商品数据"""
        item_key = self._generate_item_key(item)
        self.redis.set(item_key, json.dumps(item), ex=7*86400)  # 保留7天

智能判断逻辑

  • 新商品直接记录
  • 价格变化≥3%视为重要变化
  • 销量变化≥20%视为重要变化
  • 数据保留7天自动过期

四、深度分析:DeepSeek大模型预测

这才是系统的"大脑"部分,我们使用DeepSeek的OpenAI兼容API:

import openai
from tenacity import retry, stop_after_attempt, wait_exponential

class PricePredictor:
    def __init__(self):
        openai.api_base = "https://api.deepseek.com/v1"
        openai.api_key = "your_deepseek_api_key"

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def analyze_trend(self, item_history):
        """使用DeepSeek分析价格趋势"""
        prompt = self._build_analysis_prompt(item_history)

        try:
            response = await openai.ChatCompletion.acreate(
                model="deepseek-chat",
                messages=[
                    {"role": "system", "content": "你是一位资深的电商价格分析师。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                max_tokens=800
            )

            return self._parse_response(response.choices[0].message.content)
        except Exception as e:
            print(f"DeepSeek分析失败: {str(e)}")
            return None

    def _build_analysis_prompt(self, history):
        """构建分析提示词"""
        history_str = "\n".join(
            f"{entry['date']}: 价格¥{entry['price']} (销量{entry['sales']})"
            for entry in history[-30:]  # 使用最近30天数据
        )

        return f"""请分析以下商品的价格趋势并预测未来一周的价格变化:

商品名称: {history[-1]['name']}
商品类别: {history[-1]['category']}
当前价格: ¥{history[-1]['price']}
当前销量: {history[-1]['sales']}

历史数据:
{history_str}

请从以下角度分析:
1. 价格变化的周期性和规律
2. 价格与销量的相关性
3. 同品类其他商品的价格趋势
4. 可能的促销活动时间点

预测要求:
1. 未来7天最可能的价格变化
2. 降价概率和可能幅度
3. 最佳监控频率建议

请用以下JSON格式回复:
{{
    "current_price": 当前价格,
    "predicted_prices": {{
        "1d": 预测1天后价格,
        "3d": 预测3天后价格,
        "7d": 预测7天后价格
    }},
    "confidence": 置信度(0-100),
    "trend_analysis": "趋势分析文本",
    "recommendations": ["建议1", "建议2"]
}}"""

提示词设计技巧

  1. 明确分析角度和要求
  2. 指定结构化输出格式
  3. 包含足够的历史上下文
  4. 要求给出具体建议

五、企业微信机器人集成

预警信息通过企业微信实时推送到团队群:

import requests
import markdown

class WeComAlert:
    def __init__(self, webhook_url):
        self.webhook = webhook_url

    def send_price_alert(self, item, prediction):
        """发送价格预警"""
        md_content = f"""
### 🔔 价格预警:{item['name']}
**当前价格**: ¥{item['price']}  
**预测价格**: ¥{prediction['predicted_prices']['3d']} (3天后)  
**置信度**: {prediction['confidence']}%  

**趋势分析**:  
{prediction['trend_analysis']}

**建议操作**:  
{"  \n".join(prediction['recommendations'])}
"""

        data = {
            "msgtype": "markdown",
            "markdown": {
                "content": md_content
            }
        }

        requests.post(self.webhook, json=data)

完整系统集成

将所有模块组合成完整系统:

class CompetitorMonitorSystem:
    def __init__(self):
        self.deduplicator = SmartDeduplicator()
        self.feishu = FeishuSheet()
        self.predictor = PricePredictor()
        self.alerter = WeComAlert()
        self.categories = ['手机', '笔记本电脑', '家电']

    async def run(self):
        print("启动竞品监控系统...")

        # 1. 数据采集
        items = await monitor_all_categories(self.categories)

        # 2. 数据清洗
        significant_items = [
            item for item in items 
            if self.deduplicator.is_significant_change(item)
        ]

        if not significant_items:
            print("没有发现重要变化")
            return

        # 3. 存储到飞书
        self.feishu.add_items(significant_items)

        # 4. 深度分析
        for item in significant_items:
            history = self.feishu.get_item_history(item['name'])
            prediction = await self.predictor.analyze_trend(history)

            if prediction and prediction['confidence'] > 75:
                self.alerter.send_price_alert(item, prediction)

        print(f"处理完成,发现{len(significant_items)}个重要变化")

if __name__ == "__main__":
    monitor = CompetitorMonitorSystem()

    # 定时任务
    import schedule
    import time

    schedule.every(3).hours.do(
        lambda: asyncio.run(monitor.run())
    )

    while True:
        schedule.run_pending()
        time.sleep(1)

六、避坑指南

在实际开发中,我踩过不少坑,这里分享几个关键经验:

  1. 反爬对抗

  2. 不要用固定时间间隔,添加随机延迟

  3. 定期更换User-Agent
  4. 使用住宅代理IP(如Luminati)

  5. 数据准确性

  6. 处理商品缺货状态

  7. 识别预售/秒杀等特殊状态
  8. 注意价格单位(有的显示"¥123.00",有的只显示"123")

  9. 性能优化

# 不好的做法:同步阻塞式请求
for url in urls:
 requests.get(url)  # 同步请求

# 好的做法:异步并发
async with aiohttp.ClientSession() as session:
 tasks = [fetch(url, session) for url in urls]
 await asyncio.gather(*tasks)
  1. 错误处理
# 添加重试机制
from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
async def fetch_data(url):
 try:
     async with session.get(url, timeout=5) as response:
         return await response.text()
 except Exception as e:
     print(f"请求失败: {str(e)}")
     raise

七、效果展示

使用这个系统后,运营效率提升了300%:

  1. 响应速度:从发现竞品降价到制定对策的时间从平均8小时缩短到30分钟
  2. 人力成本:节省了2个全职人工监控的工作量
  3. 商业价值:提前预测到3次重大降价,避免损失约120万元

八、扩展应用

这个系统的框架还可以应用于更多场景:

  1. 房地产监控:跟踪竞品楼盘的定价策略
  2. 股票市场:监控相关股票的异常波动
  3. 旅游行业:预测机票酒店价格变化
  4. 招聘市场:分析竞对公司的人才策略

最后

从被迫手动记录数据,到开发出智能监控系统,这段经历让我深刻体会到Python自动化的力量。现在,我每天不再忙于机械性的数据收集,而是专注于更有价值的市场策略分析。

如果你也面临类似的竞品监控需求,不妨尝试实现这个系统。最好的学习方式就是动手实践!

本文所有内容只对技术进行讨论,请遵纪守法,请勿用于非法用途