大家好,我是何三,80后老猿,独立开发者

作为一个经常需要关注行业动态的内容创作者,我发现自己陷入了一个怪圈:每天花大量时间刷新闻、看热点,生怕错过什么重要事件;但真正坐下来写分析报告时,又觉得素材零散、思路混乱。直到有一天,我意识到——这种重复性工作不正是编程最擅长解决的吗?

于是,我决定用Python打造一个"懒人神器":它能自动监控热点事件,用DeepSeek进行智能分析生成报告,最后通过邮件推送给我。整个过程完全自动化,我只需要定期查看收件箱就能掌握最新动态。下面分享这个让我工作效率翻倍的解决方案。

核心思路与准备工作

这个自动化系统需要完成三个关键任务:获取热点数据、分析生成报告、发送邮件通知。我们需要用到以下几个Python库:

import requests  # 用于获取网络数据
from deepseek import DeepSeek  # 用于智能分析
import smtplib  # 发送邮件
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import schedule  # 定时任务
import time

首先,我们需要一个热点数据的来源。这里我选择百度热搜榜,因为它覆盖面广且接口简单。当然,你也可以替换为微博、知乎或其他平台的热搜接口。

获取热点数据

获取百度热搜榜的Python函数如下:

def get_baidu_hot():
    url = "https://top.baidu.com/api/board?platform=wise&tab=realtime"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    }

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()

        hot_topics = []
        for item in data.get("data", {}).get("cards", [{}])[0].get("content", []):
            topic = {
                "title": item.get("word"),
                "url": item.get("url"),
                "hot_score": item.get("hotScore")
            }
            hot_topics.append(topic)

        return hot_topics[:10]  # 返回前10个热点

    except Exception as e:
        print(f"获取百度热搜失败: {e}")
        return []

这个函数会返回当前百度热搜榜前10的话题,包含标题、链接和热度值。你可以根据需要调整返回的数量。

智能分析生成报告

有了热点数据后,我们需要用DeepSeek进行深度分析。这里我设计了一个分析模板,让AI能够按照固定格式输出专业报告:

def generate_report_with_deepseek(topics):
    if not topics:
        return "今日无热点数据"

    # 初始化DeepSeek
    ds = DeepSeek(api_key="your_deepseek_api_key")  # 替换为你的API密钥

    prompt = f"""你是一位专业的行业分析师,请根据以下热点事件列表生成一份简洁的分析报告:

    热点列表:
    {topics}

    报告要求:
    1. 选出3个最具行业影响力的事件
    2. 对每个事件进行200字左右的深度分析
    3. 预测未来一周可能的发展趋势
    4. 给出对相关行业的建议

    报告格式:
    ## 今日热点分析报告
    ### 重点事件1:[标题]
    [分析内容]

    ### 重点事件2:[标题]
    [分析内容]

    ### 重点事件3:[标题]
    [分析内容]

    ## 趋势预测与建议
    [综合分析及建议]
    """

    try:
        response = ds.generate(prompt, max_tokens=2000)
        return response.choices[0].text
    except Exception as e:
        print(f"DeepSeek分析失败: {e}")
        return "报告生成失败"

这个函数会将热点数据整理成适合AI分析的格式,并指定输出结构,确保报告的专业性和一致性。

邮件发送功能

报告生成后,我们需要通过邮件发送给自己。以下是配置邮件发送功能的代码:

def send_email(subject, content):
    # 邮件配置
    sender_email = "your_email@example.com"  # 发件邮箱
    receiver_email = "your_email@example.com"  # 收件邮箱
    password = "your_email_password"  # 发件邮箱密码或授权码

    # 创建邮件内容
    message = MIMEMultipart()
    message["From"] = sender_email
    message["To"] = receiver_email
    message["Subject"] = subject

    # 添加正文
    message.attach(MIMEText(content, "plain"))

    try:
        # 发送邮件 (这里以QQ邮箱为例)
        with smtplib.SMTP_SSL("smtp.qq.com", 465) as server:
            server.login(sender_email, password)
            server.sendmail(sender_email, receiver_email, message.as_string())
        print("邮件发送成功")
    except Exception as e:
        print(f"邮件发送失败: {e}")

注意:在实际使用时,建议使用环境变量存储敏感信息如API密钥和邮箱密码,而不是直接写在代码中。

整合与定时执行

现在我们将所有功能整合在一起,并添加定时任务,让系统每天自动运行:

def daily_hotspot_report():
    print("开始执行每日热点报告任务...")

    # 1. 获取热点
    hotspots = get_baidu_hot()
    print(f"获取到{len(hotspots)}条热点数据")

    # 2. 生成报告
    report = generate_report_with_deepseek(hotspots)

    # 3. 发送邮件
    today = time.strftime("%Y-%m-%d")
    email_subject = f"{today} 热点分析报告"
    send_email(email_subject, report)

    print("任务执行完成")

# 设置每天上午10点执行
schedule.every().day.at("10:00").do(daily_hotspot_report)

# 主循环
if __name__ == "__main__":
    print("热点监控系统已启动...")
    while True:
        schedule.run_pending()
        time.sleep(60)  # 每分钟检查一次

实际应用与优化

这个基础版本已经可以工作,但在实际使用中,我做了几点优化:

  1. 错误处理增强:添加了更完善的异常捕获和重试机制
  2. 多数据源整合:不仅抓取百度,还加入了微博和知乎的热搜
  3. 报告模板多样化:根据不同类型的热点(娱乐、科技、财经等)使用不同的分析模板
  4. 本地存储:除了发送邮件,还会将报告保存到本地Markdown文件中

完整优化后的代码如下:

import requests
from deepseek import DeepSeek
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import schedule
import time
import json
from datetime import datetime
import os

# 配置部分
CONFIG = {
    "deepseek_api_key": os.getenv("DEEPSEEK_API_KEY"),
    "email": {
        "sender": os.getenv("EMAIL_SENDER"),
        "receiver": os.getenv("EMAIL_RECEIVER"),
        "password": os.getenv("EMAIL_PASSWORD")
    },
    "data_sources": ["baidu", "weibo"],  # 可扩展的数据源
    "output_dir": "reports"  # 报告存储目录
}

class HotspotReporter:
    def __init__(self):
        self.ds = DeepSeek(api_key=CONFIG["deepseek_api_key"])
        os.makedirs(CONFIG["output_dir"], exist_ok=True)

    def fetch_hotspots(self, source):
        """从不同平台获取热点数据"""
        if source == "baidu":
            return self._fetch_baidu_hot()
        elif source == "weibo":
            return self._fetch_weibo_hot()
        else:
            return []

    def _fetch_baidu_hot(self):
        """获取百度热搜"""
        url = "https://top.baidu.com/api/board?platform=wise&tab=realtime"
        headers = {"User-Agent": "Mozilla/5.0"}

        try:
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            data = response.json()

            return [
                {
                    "source": "baidu",
                    "title": item.get("word"),
                    "url": item.get("url"),
                    "hot_score": item.get("hotScore")
                }
                for item in data.get("data", {}).get("cards", [{}])[0].get("content", [])
            ][:10]

        except Exception as e:
            print(f"获取百度热搜失败: {e}")
            return []

    def _fetch_weibo_hot(self):
        """获取微博热搜"""
        url = "https://weibo.com/ajax/side/hotSearch"
        headers = {"User-Agent": "Mozilla/5.0"}

        try:
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            data = response.json()

            return [
                {
                    "source": "weibo",
                    "title": item.get("word"),
                    "url": f"https://s.weibo.com/weibo?q={item.get('word')}",
                    "hot_score": item.get("raw_hot")
                }
                for item in data.get("data", {}).get("realtime", [])
            ][:10]

        except Exception as e:
            print(f"获取微博热搜失败: {e}")
            return []

    def generate_report(self, all_hotspots):
        """生成综合分析报告"""
        if not all_hotspots:
            return "今日无热点数据"

        prompt = self._build_prompt(all_hotspots)

        try:
            response = self.ds.generate(prompt, max_tokens=2500)
            return response.choices[0].text
        except Exception as e:
            print(f"DeepSeek分析失败: {e}")
            return "报告生成失败"

    def _build_prompt(self, hotspots):
        """构建分析提示词"""
        hotspots_str = "\n".join(
            f"{idx+1}. [{item['source']}] {item['title']} (热度: {item.get('hot_score', 'N/A')})"
            for idx, item in enumerate(hotspots)
        )

        return f"""你是一位资深行业分析师,请根据以下热点事件生成一份专业报告:

        今日热点榜单:
        {hotspots_str}

        报告要求:
        1. 选出3-5个最具影响力的事件,优先选择跨平台热点
        2. 对每个事件进行200字左右的深度分析,包括:
           - 事件背景
           - 可能影响
           - 相关方反应
        3. 综合分析行业趋势
        4. 给出可操作的业务建议

        报告格式:
        # {datetime.now().strftime('%Y-%m-%d')} 热点分析报告

        ## 重点事件分析
        ### 1. [事件标题]
        [分析内容]

        ### 2. [事件标题]
        [分析内容]

        ## 综合趋势与建议
        [您的专业分析]
        """

    def save_report(self, report_content):
        """保存报告到本地"""
        today = datetime.now().strftime("%Y-%m-%d")
        filename = f"{CONFIG['output_dir']}/{today}_hotspot_report.md"

        with open(filename, "w", encoding="utf-8") as f:
            f.write(report_content)

        print(f"报告已保存到 {filename}")

    def send_report(self, report_content):
        """发送邮件报告"""
        today = datetime.now().strftime("%Y-%m-%d")
        subject = f"{today} 热点分析报告"

        msg = MIMEMultipart()
        msg["From"] = CONFIG["email"]["sender"]
        msg["To"] = CONFIG["email"]["receiver"]
        msg["Subject"] = subject
        msg.attach(MIMEText(report_content, "plain"))

        try:
            with smtplib.SMTP_SSL("smtp.qq.com", 465) as server:
                server.login(CONFIG["email"]["sender"], CONFIG["email"]["password"])
                server.sendmail(
                    CONFIG["email"]["sender"],
                    CONFIG["email"]["receiver"],
                    msg.as_string()
                )
            print("邮件发送成功")
        except Exception as e:
            print(f"邮件发送失败: {e}")

def main_job():
    print(f"\n{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 开始执行热点报告任务...")

    reporter = HotspotReporter()

    # 从各平台获取热点
    all_hotspots = []
    for source in CONFIG["data_sources"]:
        hotspots = reporter.fetch_hotspots(source)
        all_hotspots.extend(hotspots)
        print(f"从 {source} 获取到 {len(hotspots)} 条热点")

    # 按热度排序
    all_hotspots.sort(key=lambda x: x.get("hot_score", 0), reverse=True)

    # 生成报告
    report = reporter.generate_report(all_hotspots)

    # 保存和发送
    if report and "报告生成失败" not in report:
        reporter.save_report(report)
        reporter.send_report(report)
    else:
        print("报告生成失败,未保存和发送")

    print("任务执行完成")

if __name__ == "__main__":
    # 首次立即执行
    main_job()

    # 设置定时任务 (每天上午10点和下午4点各一次)
    schedule.every().day.at("10:00").do(main_job)
    schedule.every().day.at("16:00").do(main_job)

    print("热点监控系统已启动,等待定时任务...")
    while True:
        schedule.run_pending()
        time.sleep(60)

为什么这个方案值得尝试

这个自动化系统给我带来了几个意想不到的好处:

  1. 时间节省:每天至少节省2小时的信息收集和分析时间
  2. 分析质量提升:AI生成的报告往往能发现我自己可能忽略的关联点
  3. 知识积累:本地存储的报告形成了有价值的知识库,方便后续查阅
  4. 反应速度:热点出现后能第一时间获得分析,不再担心错过重要事件

最重要的是,它让我从重复劳动中解放出来,可以专注于更有创造性的工作。如果你也厌倦了每天手动追踪热点,不妨试试这个"懒人方案"。刚开始可能需要一些调试,但一旦运行起来,你会惊讶于它带来的效率提升。

记住,最好的工具不是替代思考,而是把我们从机械劳动中解放出来,让我们有更多时间做真正重要的事。这个热点监控系统正是这样的工具——它处理信息收集的苦活,让我们专注于决策和创造。