大家好,我是何三,80后老猿,独立开发者
作为一个经常需要关注行业动态的内容创作者,我发现自己陷入了一个怪圈:每天花大量时间刷新闻、看热点,生怕错过什么重要事件;但真正坐下来写分析报告时,又觉得素材零散、思路混乱。直到有一天,我意识到——这种重复性工作不正是编程最擅长解决的吗?
于是,我决定用Python打造一个"懒人神器":它能自动监控热点事件,用DeepSeek进行智能分析生成报告,最后通过邮件推送给我。整个过程完全自动化,我只需要定期查看收件箱就能掌握最新动态。下面分享这个让我工作效率翻倍的解决方案。
核心思路与准备工作
这个自动化系统需要完成三个关键任务:获取热点数据、分析生成报告、发送邮件通知。我们需要用到以下几个Python库:
import requests # 用于获取网络数据
from deepseek import DeepSeek # 用于智能分析
import smtplib # 发送邮件
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import schedule # 定时任务
import time
首先,我们需要一个热点数据的来源。这里我选择百度热搜榜,因为它覆盖面广且接口简单。当然,你也可以替换为微博、知乎或其他平台的热搜接口。
获取热点数据
获取百度热搜榜的Python函数如下:
def get_baidu_hot():
url = "https://top.baidu.com/api/board?platform=wise&tab=realtime"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
hot_topics = []
for item in data.get("data", {}).get("cards", [{}])[0].get("content", []):
topic = {
"title": item.get("word"),
"url": item.get("url"),
"hot_score": item.get("hotScore")
}
hot_topics.append(topic)
return hot_topics[:10] # 返回前10个热点
except Exception as e:
print(f"获取百度热搜失败: {e}")
return []
这个函数会返回当前百度热搜榜前10的话题,包含标题、链接和热度值。你可以根据需要调整返回的数量。
智能分析生成报告
有了热点数据后,我们需要用DeepSeek进行深度分析。这里我设计了一个分析模板,让AI能够按照固定格式输出专业报告:
def generate_report_with_deepseek(topics):
if not topics:
return "今日无热点数据"
# 初始化DeepSeek
ds = DeepSeek(api_key="your_deepseek_api_key") # 替换为你的API密钥
prompt = f"""你是一位专业的行业分析师,请根据以下热点事件列表生成一份简洁的分析报告:
热点列表:
{topics}
报告要求:
1. 选出3个最具行业影响力的事件
2. 对每个事件进行200字左右的深度分析
3. 预测未来一周可能的发展趋势
4. 给出对相关行业的建议
报告格式:
## 今日热点分析报告
### 重点事件1:[标题]
[分析内容]
### 重点事件2:[标题]
[分析内容]
### 重点事件3:[标题]
[分析内容]
## 趋势预测与建议
[综合分析及建议]
"""
try:
response = ds.generate(prompt, max_tokens=2000)
return response.choices[0].text
except Exception as e:
print(f"DeepSeek分析失败: {e}")
return "报告生成失败"
这个函数会将热点数据整理成适合AI分析的格式,并指定输出结构,确保报告的专业性和一致性。
邮件发送功能
报告生成后,我们需要通过邮件发送给自己。以下是配置邮件发送功能的代码:
def send_email(subject, content):
# 邮件配置
sender_email = "your_email@example.com" # 发件邮箱
receiver_email = "your_email@example.com" # 收件邮箱
password = "your_email_password" # 发件邮箱密码或授权码
# 创建邮件内容
message = MIMEMultipart()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = subject
# 添加正文
message.attach(MIMEText(content, "plain"))
try:
# 发送邮件 (这里以QQ邮箱为例)
with smtplib.SMTP_SSL("smtp.qq.com", 465) as server:
server.login(sender_email, password)
server.sendmail(sender_email, receiver_email, message.as_string())
print("邮件发送成功")
except Exception as e:
print(f"邮件发送失败: {e}")
注意:在实际使用时,建议使用环境变量存储敏感信息如API密钥和邮箱密码,而不是直接写在代码中。
整合与定时执行
现在我们将所有功能整合在一起,并添加定时任务,让系统每天自动运行:
def daily_hotspot_report():
print("开始执行每日热点报告任务...")
# 1. 获取热点
hotspots = get_baidu_hot()
print(f"获取到{len(hotspots)}条热点数据")
# 2. 生成报告
report = generate_report_with_deepseek(hotspots)
# 3. 发送邮件
today = time.strftime("%Y-%m-%d")
email_subject = f"{today} 热点分析报告"
send_email(email_subject, report)
print("任务执行完成")
# 设置每天上午10点执行
schedule.every().day.at("10:00").do(daily_hotspot_report)
# 主循环
if __name__ == "__main__":
print("热点监控系统已启动...")
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
实际应用与优化
这个基础版本已经可以工作,但在实际使用中,我做了几点优化:
- 错误处理增强:添加了更完善的异常捕获和重试机制
- 多数据源整合:不仅抓取百度,还加入了微博和知乎的热搜
- 报告模板多样化:根据不同类型的热点(娱乐、科技、财经等)使用不同的分析模板
- 本地存储:除了发送邮件,还会将报告保存到本地Markdown文件中
完整优化后的代码如下:
import requests
from deepseek import DeepSeek
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import schedule
import time
import json
from datetime import datetime
import os
# 配置部分
CONFIG = {
"deepseek_api_key": os.getenv("DEEPSEEK_API_KEY"),
"email": {
"sender": os.getenv("EMAIL_SENDER"),
"receiver": os.getenv("EMAIL_RECEIVER"),
"password": os.getenv("EMAIL_PASSWORD")
},
"data_sources": ["baidu", "weibo"], # 可扩展的数据源
"output_dir": "reports" # 报告存储目录
}
class HotspotReporter:
def __init__(self):
self.ds = DeepSeek(api_key=CONFIG["deepseek_api_key"])
os.makedirs(CONFIG["output_dir"], exist_ok=True)
def fetch_hotspots(self, source):
"""从不同平台获取热点数据"""
if source == "baidu":
return self._fetch_baidu_hot()
elif source == "weibo":
return self._fetch_weibo_hot()
else:
return []
def _fetch_baidu_hot(self):
"""获取百度热搜"""
url = "https://top.baidu.com/api/board?platform=wise&tab=realtime"
headers = {"User-Agent": "Mozilla/5.0"}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
return [
{
"source": "baidu",
"title": item.get("word"),
"url": item.get("url"),
"hot_score": item.get("hotScore")
}
for item in data.get("data", {}).get("cards", [{}])[0].get("content", [])
][:10]
except Exception as e:
print(f"获取百度热搜失败: {e}")
return []
def _fetch_weibo_hot(self):
"""获取微博热搜"""
url = "https://weibo.com/ajax/side/hotSearch"
headers = {"User-Agent": "Mozilla/5.0"}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
return [
{
"source": "weibo",
"title": item.get("word"),
"url": f"https://s.weibo.com/weibo?q={item.get('word')}",
"hot_score": item.get("raw_hot")
}
for item in data.get("data", {}).get("realtime", [])
][:10]
except Exception as e:
print(f"获取微博热搜失败: {e}")
return []
def generate_report(self, all_hotspots):
"""生成综合分析报告"""
if not all_hotspots:
return "今日无热点数据"
prompt = self._build_prompt(all_hotspots)
try:
response = self.ds.generate(prompt, max_tokens=2500)
return response.choices[0].text
except Exception as e:
print(f"DeepSeek分析失败: {e}")
return "报告生成失败"
def _build_prompt(self, hotspots):
"""构建分析提示词"""
hotspots_str = "\n".join(
f"{idx+1}. [{item['source']}] {item['title']} (热度: {item.get('hot_score', 'N/A')})"
for idx, item in enumerate(hotspots)
)
return f"""你是一位资深行业分析师,请根据以下热点事件生成一份专业报告:
今日热点榜单:
{hotspots_str}
报告要求:
1. 选出3-5个最具影响力的事件,优先选择跨平台热点
2. 对每个事件进行200字左右的深度分析,包括:
- 事件背景
- 可能影响
- 相关方反应
3. 综合分析行业趋势
4. 给出可操作的业务建议
报告格式:
# {datetime.now().strftime('%Y-%m-%d')} 热点分析报告
## 重点事件分析
### 1. [事件标题]
[分析内容]
### 2. [事件标题]
[分析内容]
## 综合趋势与建议
[您的专业分析]
"""
def save_report(self, report_content):
"""保存报告到本地"""
today = datetime.now().strftime("%Y-%m-%d")
filename = f"{CONFIG['output_dir']}/{today}_hotspot_report.md"
with open(filename, "w", encoding="utf-8") as f:
f.write(report_content)
print(f"报告已保存到 {filename}")
def send_report(self, report_content):
"""发送邮件报告"""
today = datetime.now().strftime("%Y-%m-%d")
subject = f"{today} 热点分析报告"
msg = MIMEMultipart()
msg["From"] = CONFIG["email"]["sender"]
msg["To"] = CONFIG["email"]["receiver"]
msg["Subject"] = subject
msg.attach(MIMEText(report_content, "plain"))
try:
with smtplib.SMTP_SSL("smtp.qq.com", 465) as server:
server.login(CONFIG["email"]["sender"], CONFIG["email"]["password"])
server.sendmail(
CONFIG["email"]["sender"],
CONFIG["email"]["receiver"],
msg.as_string()
)
print("邮件发送成功")
except Exception as e:
print(f"邮件发送失败: {e}")
def main_job():
print(f"\n{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 开始执行热点报告任务...")
reporter = HotspotReporter()
# 从各平台获取热点
all_hotspots = []
for source in CONFIG["data_sources"]:
hotspots = reporter.fetch_hotspots(source)
all_hotspots.extend(hotspots)
print(f"从 {source} 获取到 {len(hotspots)} 条热点")
# 按热度排序
all_hotspots.sort(key=lambda x: x.get("hot_score", 0), reverse=True)
# 生成报告
report = reporter.generate_report(all_hotspots)
# 保存和发送
if report and "报告生成失败" not in report:
reporter.save_report(report)
reporter.send_report(report)
else:
print("报告生成失败,未保存和发送")
print("任务执行完成")
if __name__ == "__main__":
# 首次立即执行
main_job()
# 设置定时任务 (每天上午10点和下午4点各一次)
schedule.every().day.at("10:00").do(main_job)
schedule.every().day.at("16:00").do(main_job)
print("热点监控系统已启动,等待定时任务...")
while True:
schedule.run_pending()
time.sleep(60)
为什么这个方案值得尝试
这个自动化系统给我带来了几个意想不到的好处:
- 时间节省:每天至少节省2小时的信息收集和分析时间
- 分析质量提升:AI生成的报告往往能发现我自己可能忽略的关联点
- 知识积累:本地存储的报告形成了有价值的知识库,方便后续查阅
- 反应速度:热点出现后能第一时间获得分析,不再担心错过重要事件
最重要的是,它让我从重复劳动中解放出来,可以专注于更有创造性的工作。如果你也厌倦了每天手动追踪热点,不妨试试这个"懒人方案"。刚开始可能需要一些调试,但一旦运行起来,你会惊讶于它带来的效率提升。
记住,最好的工具不是替代思考,而是把我们从机械劳动中解放出来,让我们有更多时间做真正重要的事。这个热点监控系统正是这样的工具——它处理信息收集的苦活,让我们专注于决策和创造。