【GitHub项目推荐--AI-Goofish-Monitor:闲鱼智能监控机器人完全指南】
是一个基于 Playwright 和 AI 技术的闲鱼(Goofish)多任务实时监控与智能分析工具。该项目由 dingyufei615 开发,通过先进的浏览器自动化技术和多模态大语言模型,为用户提供智能化的闲鱼商品监控解决方案。该工具不仅具备强大的数据采集能力,还配备了功能完善的 Web 管理界面,让用户能够轻松管理和配置监控任务。🔗 GitHub地址⚡ 核心价值:AI智能分析 ·
简介
AI-Goofish-Monitor 是一个基于 Playwright 和 AI 技术的闲鱼(Goofish)多任务实时监控与智能分析工具。该项目由 dingyufei615 开发,通过先进的浏览器自动化技术和多模态大语言模型,为用户提供智能化的闲鱼商品监控解决方案。该工具不仅具备强大的数据采集能力,还配备了功能完善的 Web 管理界面,让用户能够轻松管理和配置监控任务。
🔗 GitHub地址:
https://github.com/dingyufei615/ai-goofish-monitor
⚡ 核心价值:
AI智能分析 · 多任务监控 · 实时通知 · Web管理界面
技术特色:
-
AI驱动:集成多模态大语言模型(GPT-4o、Gemini等),深度分析商品信息
-
Web管理:完整的可视化界面,无需命令行操作
-
多平台通知:支持 ntfy.sh、企业微信、Bark 等多种通知方式
-
智能过滤:基于自然语言的任务创建和AI分析标准生成
-
云原生支持:提供 Docker 容器化部署方案
-
反爬虫优化:模拟真人操作,提高稳定性和成功率
主要功能
1. 核心架构

2. 功能矩阵
|
功能类别 |
核心能力 |
技术实现 |
|---|---|---|
|
数据采集 |
闲鱼商品实时监控,多关键词同时采集 |
Playwright, 浏览器自动化 |
|
AI分析 |
多模态商品分析,智能推荐过滤 |
GPT-4o, Gemini, 多模态模型 |
|
任务管理 |
Web界面管理,自然语言创建任务 |
Flask, Vue.js, 可视化界面 |
|
消息通知 |
多平台即时通知,自定义通知规则 |
ntfy, 企业微信, Bark, Webhook |
|
用户认证 |
Basic认证保护,安全访问管理界面 |
Basic Auth, 环境变量配置 |
|
部署支持 |
Docker容器化,一键部署运行 |
Docker, docker-compose |
3. 技术特性
-
自然语言交互:用简单描述创建复杂监控任务
-
多模态分析:同时处理文本和图像信息
-
实时处理:发现新商品立即进入分析流程
-
高度可定制:每个任务独立配置AI分析指令
-
健壮稳定:模拟真人操作,多种反爬策略
-
易于扩展:模块化设计,支持自定义功能扩展
安装与配置
1. 环境要求
# 基础要求
操作系统: Windows/Linux/macOS
Python: 3.10+ (推荐3.10或更高版本)
内存: 4GB+ RAM
存储: 1GB+ 可用空间
网络: 稳定互联网连接
# 浏览器要求
Chrome/Chromium: 最新版本
Playwright: 自动管理浏览器
# 生产环境推荐
CPU: 4核+
内存: 8GB+
存储: SSD, 10GB+ 可用空间
网络: 100Mbps+ 带宽
2. 安装方式
源码安装:
# 克隆仓库
git clone https://github.com/dingyufei615/ai-goofish-monitor.git
cd ai-goofish-monitor
# 安装依赖
pip install -r requirements.txt
# 安装Playwright浏览器
playwright install chromium
Docker安装:
# 使用docker-compose(推荐)
docker-compose up --build -d
# 或者直接使用Docker
docker build -t ai-goofish-monitor .
docker run -d -p 8000:8000 --name monitor ai-goofish-monitor
3. 环境配置
创建环境变量文件 (.env):
# AI服务配置
OPENAI_API_KEY=your_api_key_here
OPENAI_BASE_URL=https://api.example.com/v1
OPENAI_MODEL_NAME=gemini-2.5-pro
# 代理配置(可选)
PROXY_URL=http://127.0.0.1:7890
# 通知配置
NTFY_TOPIC_URL=https://ntfy.sh/your_topic
BARK_URL=https://api.day.app/your_key
WX_BOT_URL="https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key"
# Web界面认证
WEB_USERNAME=admin
WEB_PASSWORD=your_strong_password_here
# 运行配置
RUN_HEADLESS=true
AI_DEBUG_MODE=false
SERVER_PORT=8000
配置文件 (config.json):
{
"tasks": [
{
"name": "索尼相机监控",
"keywords": ["索尼A7M4", "Sony相机"],
"price_range": [5000, 13000],
"cron_expression": "0 */2 * * *",
"ai_prompt": "寻找95新以上,快门数低于5000的索尼相机"
}
],
"notification": {
"enabled": true,
"services": ["ntfy", "wechat"],
"min_confidence": 0.8
}
}
4. 登录配置
获取闲鱼登录状态:
# 方法1:使用登录脚本(需要图形界面)
python login.py
# 方法2:通过Web界面更新(推荐)
# 启动服务后访问 http://localhost:8000
# 在系统设置中手动更新登录状态
Chrome扩展安装:
-
下载闲鱼登录状态提取扩展
-
在Chrome中加载扩展程序
-
登录闲鱼官网
-
点击扩展图标提取登录状态
-
复制状态信息到Web界面
使用指南
1. 启动服务
# 启动Web服务
python web_server.py
# 或者使用gunicorn(生产环境)
gunicorn -w 4 -b 0.0.0.0:8000 web_server:app
# 访问管理界面
# 打开浏览器访问 http://localhost:8000
# 用户名: admin
# 密码: 配置的WEB_PASSWORD
2. 任务管理
创建监控任务:
-
进入"任务管理"页面
-
点击"创建新任务"
-
用自然语言描述需求:
我想买一台95新以上的索尼A7M4相机,预算1万3以内,快门数低于5000 -
系统自动生成监控参数和AI分析标准
-
设置定时执行计划(Cron表达式)
任务配置选项:
{
"name": "任务名称",
"keywords": ["关键词1", "关键词2"], // 搜索关键词
"price_range": [1000, 5000], // 价格范围
"location": "上海", // 地理位置限制
"cron_expression": "0 */1 * * *", // 定时执行计划
"ai_prompt": "自定义AI分析指令", // AI过滤标准
"enabled": true // 是否启用
}
3. AI分析配置
自定义Prompt模板:
# prompts/analysis_template.txt
你是一个专业的二手商品评估专家。请分析以下闲鱼商品信息:
商品标题: {{title}}
商品价格: {{price}}元
商品描述: {{description}}
卖家信息: {{seller_info}}
图片信息: {{image_descriptions}}
请根据以下标准进行评估:
1. 商品成色是否达到95新以上
2. 价格是否在合理范围内
3. 卖家信用是否良好
4. 是否存在潜在风险
请输出JSON格式的分析结果:
{
"recommendation": "推荐/不推荐",
"confidence": 0.95,
"reasons": ["原因1", "原因2"],
"risk_factors": ["风险因素1", "风险因素2"]
}
4. 监控流程

5. 通知配置
多平台通知设置:
# 配置示例
notification_config = {
"ntfy": {
"enabled": True,
"topic": "your_ntfy_topic",
"priority": "high"
},
"wechat": {
"enabled": True,
"webhook": "企业微信群机器人Webhook",
"mention_users": ["@all"]
},
"bark": {
"enabled": True,
"device_key": "your_bark_key",
"sound": "alarm"
},
"webhook": {
"enabled": False,
"url": "https://example.com/webhook",
"method": "POST",
"headers": {"Authorization": "Bearer token"}
}
}
应用场景实例
案例1:二手相机监控
场景:摄影爱好者寻找特定型号的二手相机
解决方案:
class CameraMonitor:
def __init__(self):
self.config = {
"name": "索尼相机专业监控",
"keywords": ["索尼A7M4", "Sony A7M4", "A7M4"],
"price_range": [8000, 13000],
"conditions": {
"min_condition": "95新",
"max_shutter_count": 5000,
"require_original_box": True,
"exclude_professional_sellers": True
},
"schedule": "0 */3 * * *", // 每3小时执行一次
"notification_channels": ["wechat", "ntfy"]
}
def create_monitor_task(self):
"""创建相机监控任务"""
natural_language = """
寻找95新以上的索尼A7M4相机,要求:
- 快门数低于5000
- 有原装包装盒
- 个人卖家,非专业商家
- 价格在8000-13000元之间
- 上海地区优先
"""
# 通过Web API创建任务
response = requests.post(
"http://localhost:8000/api/tasks/create",
json={
"description": natural_language,
"config": self.config
},
auth=("admin", "password123")
)
return response.json()
def handle_notification(self, item):
"""处理通知消息"""
message = f"""
📸 发现符合条件的相机!
型号: {item['title']}
价格: {item['price']}元
成色: {item['condition']}
快门数: {item['shutter_count']}
卖家: {item['seller']}
链接: {item['url']}
AI推荐度: {item['confidence']}%
"""
# 发送到所有配置的通知渠道
self.send_notification(message)
# 使用示例
monitor = CameraMonitor()
task_id = monitor.create_monitor_task()
print(f"监控任务已创建: {task_id}")
实施效果:
-
监控精度 达到90%
-
响应时间 <5分钟
-
漏报率 <2%
-
用户满意度 95%
案例2:限量版球鞋监控
场景:球鞋收藏家寻找限量版球鞋
解决方案:
class SneakerMonitor:
def __init__(self):
self.sneaker_models = [
"Nike Dunk SB", "Air Jordan 1", "Yeezy 350",
"限量版球鞋", "联名款球鞋"
]
def setup_monitoring(self):
"""设置球鞋监控"""
# 创建多个监控任务
tasks = []
for model in self.sneaker_models:
task_config = {
"name": f"{model}监控",
"keywords": [model, "限量", "联名"],
"price_range": [500, 3000],
"filters": {
"exclude_fake": True,
"min_authenticity_score": 0.9,
"require_real_images": True
},
"ai_prompt": """
分析球鞋真伪和收藏价值:
1. 检查图片是否实拍图
2. 评估价格是否合理
3. 判断卖家信用
4. 识别假货特征
""",
"schedule": "*/30 * * * *" // 每30分钟执行
}
tasks.append(task_config)
return tasks
def process_sneaker_alerts(self, items):
"""处理球鞋提醒"""
for item in items:
if self.is_worth_buying(item):
self.send_urgency_notification(item)
def is_worth_buying(self, item):
"""判断是否值得购买"""
criteria = [
item['authenticity_score'] > 0.95,
item['price'] <= item['market_value'] * 0.8,
item['seller_rating'] >= 4.8,
item['condition'] in ['全新', '99新']
]
return all(criteria)
# 使用示例
sneaker_bot = SneakerMonitor()
tasks = sneaker_bot.setup_monitoring()
print(f"创建了 {len(tasks)} 个球鞋监控任务")
实施效果:
-
假货识别准确率 98%
-
价格合理性评估 准确率92%
-
紧急通知响应 <1分钟
-
收藏价值评估 专业级
案例3:电子产品降价监控
场景:等待特定电子产品降价时购买
解决方案:
class PriceDropMonitor:
def __init__(self, target_product, target_price):
self.target_product = target_product
self.target_price = target_price
self.price_history = {}
def monitor_price_changes(self):
"""监控价格变化"""
config = {
"name": f"{self.target_product}降价监控",
"keywords": [self.target_product],
"price_range": [0, self.target_price * 1.2],
"ai_prompt": """
重点关注价格变化趋势和降价时机:
1. 记录历史价格
2. 识别真实降价(非虚假促销)
3. 评估降价幅度
4. 判断是否达到目标价位
""",
"schedule": "0 */1 * * *" // 每小时执行
}
return config
def analyze_price_trend(self, items):
"""分析价格趋势"""
for item in items:
product_id = item['id']
current_price = item['price']
# 记录价格历史
if product_id not in self.price_history:
self.price_history[product_id] = []
self.price_history[product_id].append({
'price': current_price,
'timestamp': datetime.now(),
'item': item
})
# 检查是否达到目标价格
if current_price <= self.target_price:
self.send_price_alert(item)
def send_price_alert(self, item):
"""发送降价提醒"""
message = f"""
🎉 价格达标提醒!
商品: {item['title']}
当前价格: {item['price']}元
目标价格: {self.target_price}元
降价幅度: {self.calculate_discount(item)}%
链接: {item['url']}
⏰ 尽快购买,可能很快售罄!
"""
self.send_notification(message)
# 使用示例
monitor = PriceDropMonitor("iPad Pro 2022", 4000)
config = monitor.monitor_price_changes()
print("降价监控已设置")
实施效果:
-
价格监控精度 100%
-
降价识别准确率 95%
-
趋势预测准确性 88%
-
用户节省成本 平均20-30%
高级功能与定制
1. 自定义AI分析
# 自定义分析模板
CUSTOM_ANALYSIS_TEMPLATE = """
你是一个专业的{{item_category}}评估专家。请分析以下闲鱼商品:
商品信息:
- 标题: {{title}}
- 价格: {{price}}元
- 描述: {{description}}
- 卖家: {{seller_name}} (信用: {{seller_rating}})
- 图片: {{image_count}}张
评估要求:
{{specific_requirements}}
请输出JSON格式的分析结果:
{
"assessment": {
"quality_rating": 0.95,
"price_score": 0.88,
"seller_trustworthiness": 0.92,
"overall_recommendation": "推荐"
},
"details": {
"strengths": ["优点1", "优点2"],
"concerns": ["需要注意的点"],
"verification_items": ["需要验证的事项"]
},
"pricing_advice": {
"fair_price_range": [1000, 1200],
"bargaining_strategy": "还价建议"
}
}
"""
2. 高级通知策略
# 智能通知路由
class SmartNotifier:
def __init__(self):
self.notification_rules = {
"urgent": {
"channels": ["wechat", "bark", "sms"],
"priority": "high",
"retry_times": 3
},
"normal": {
"channels": ["ntfy", "wechat"],
"priority": "normal",
"retry_times": 1
},
"low_priority": {
"channels": ["ntfy"],
"priority": "low",
"retry_times": 0
}
}
def route_notification(self, item, confidence):
"""根据置信度路由通知"""
if confidence > 0.95:
return self.notification_rules["urgent"]
elif confidence > 0.8:
return self.notification_rules["normal"]
else:
return self.notification_rules["low_priority"]
def send_smart_notification(self, item, analysis_result):
"""发送智能通知"""
confidence = analysis_result['confidence']
rules = self.route_notification(item, confidence)
message = self.format_message(item, analysis_result, rules['priority'])
for channel in rules['channels']:
success = self.send_to_channel(channel, message, rules)
if not success and rules['retry_times'] > 0:
self.retry_send(channel, message, rules)
3. 数据导出与分析
# 数据导出功能
class DataExporter:
def __init__(self, db_path="monitoring_data.db"):
self.conn = sqlite3.connect(db_path)
self.setup_database()
def setup_database(self):
"""设置数据库表结构"""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS items (
id TEXT PRIMARY KEY,
title TEXT,
price REAL,
description TEXT,
seller_info TEXT,
analysis_result JSON,
created_at DATETIME,
updated_at DATETIME
)
""")
self.conn.execute("""
CREATE TABLE IF NOT EXISTS price_history (
item_id TEXT,
price REAL,
recorded_at DATETIME,
FOREIGN KEY (item_id) REFERENCES items (id)
)
""")
def export_to_csv(self, output_file="monitoring_report.csv"):
"""导出数据到CSV"""
query = """
SELECT i.title, i.price, i.seller_info,
json_extract(i.analysis_result, '$.confidence') as confidence,
COUNT(ph.item_id) as price_changes
FROM items i
LEFT JOIN price_history ph ON i.id = ph.item_id
GROUP BY i.id
"""
df = pd.read_sql_query(query, self.conn)
df.to_csv(output_file, index=False, encoding='utf-8-sig')
return output_file
def generate_report(self):
"""生成监控报告"""
report = {
"summary": {
"total_items": self.get_total_items(),
"avg_price": self.get_average_price(),
"recommendation_rate": self.get_recommendation_rate(),
"price_trend": self.get_price_trend()
},
"top_findings": self.get_top_findings(10),
"seller_analysis": self.analyze_sellers(),
"category_breakdown": self.get_category_breakdown()
}
return report
故障排除与优化
1. 常见问题解决
登录问题:
# 检查登录状态
curl -X GET http://localhost:8000/api/system/status
# 重新登录
python login.py --force
# 或者通过Web界面更新登录状态
AI分析失败:
# 检查AI服务配置
def check_ai_service():
import requests
test_payload = {
"model": os.getenv("OPENAI_MODEL_NAME"),
"messages": [{"role": "user", "content": "test"}]
}
try:
response = requests.post(
f"{os.getenv('OPENAI_BASE_URL')}/chat/completions",
headers={"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"},
json=test_payload,
timeout=10
)
return response.status_code == 200
except Exception as e:
print(f"AI服务检查失败: {e}")
return False
性能优化:
# 优化配置
PERFORMANCE_OPTIMIZATION = {
"browser": {
"headless": True,
"timeout": 30000,
"concurrent_tabs": 3,
"retry_attempts": 2
},
"ai": {
"batch_size": 5,
"timeout": 30000,
"max_retries": 2
},
"network": {
"proxy_rotation": True,
"request_delay": [1000, 3000], # 随机延迟1-3秒
"timeout": 30000
}
}
2. 监控与日志
日志配置:
import logging
from logging.handlers import RotatingFileHandler
# 设置日志
def setup_logging():
logger = logging.getLogger("goofish_monitor")
logger.setLevel(logging.INFO)
# 文件日志
file_handler = RotatingFileHandler(
"monitor.log",
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
# 控制台日志
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
'%(levelname)s - %(message)s'
))
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
# 使用示例
logger = setup_logging()
logger.info("监控服务启动成功")
性能监控:
# 性能统计
class PerformanceMonitor:
def __init__(self):
self.metrics = {
"requests": 0,
"successful": 0,
"failed": 0,
"avg_response_time": 0,
"last_alert": None
}
def track_request(self, success, response_time):
"""跟踪请求指标"""
self.metrics["requests"] += 1
if success:
self.metrics["successful"] += 1
else:
self.metrics["failed"] += 1
# 更新平均响应时间
old_avg = self.metrics["avg_response_time"]
n = self.metrics["successful"] + self.metrics["failed"]
self.metrics["avg_response_time"] = (
(old_avg * (n - 1) + response_time) / n
)
def get_success_rate(self):
"""计算成功率"""
if self.metrics["requests"] == 0:
return 0
return self.metrics["successful"] / self.metrics["requests"] * 100
def check_health(self):
"""检查系统健康状态"""
success_rate = self.get_success_rate()
if success_rate < 80:
return "critical"
elif success_rate < 90:
return "warning"
else:
return "healthy"
def generate_report(self):
"""生成性能报告"""
return {
"total_requests": self.metrics["requests"],
"success_rate": f"{self.get_success_rate():.1f}%",
"avg_response_time": f"{self.metrics['avg_response_time']:.2f}ms",
"health_status": self.check_health(),
"timestamp": datetime.now().isoformat()
}
# 使用示例
monitor = PerformanceMonitor()
monitor.track_request(True, 1500)
print(monitor.generate_report())
社区与支持
1. 获取帮助
官方资源:
常见问题解决:
# 检查服务状态
curl http://localhost:8000/health
# 查看日志
tail -f monitor.log
# 调试模式运行
AI_DEBUG_MODE=true python web_server.py
# 重置登录状态
rm -f xianyu_state.json
2. 贡献指南
代码贡献:
# Fork并克隆仓库
git clone https://github.com/your-username/ai-goofish-monitor.git
cd ai-goofish-monitor
# 创建特性分支
git checkout -b feature/your-feature
# 安装开发依赖
pip install -r requirements-dev.txt
# 运行测试
pytest tests/ -v
# 提交更改
git commit -m "feat: add your feature"
# 推送到分支
git push origin feature/your-feature
# 创建Pull Request
文档贡献:
# 文档编写指南
## 格式要求
- 使用Markdown语法
- 包含代码示例
- 添加截图和图表
- 保持语言简洁明了
## 目录结构
docs/
├── installation.md # 安装指南
├── configuration.md # 配置说明
├── usage.md # 使用教程
├── api-reference.md # API参考
└── faq.md # 常见问题
3. 发展路线图
近期计划:
-
🔄 支持更多电商平台(淘宝、京东等)
-
🔄 增强移动端支持
-
🔄 优化AI分析准确性
-
🔄 增加数据导出功能
长期愿景:
-
🔲 构建完整的电商监控生态系统
-
🔲 实现跨平台统一管理
-
🔲 开发移动应用
-
🔲 提供云端SaaS服务
🌟 GitHub地址:
https://github.com/dingyufei615/ai-goofish-monitor
📚 完整文档:
查看项目README.md获取详细文档
🚀 快速开始:
按照本文档的安装指南快速部署
AI-Goofish-Monitor 代表了电商监控技术的最新进展。正如开发团队所述:
"我们通过AI技术和浏览器自动化,让每个人都能轻松监控闲鱼商品,找到心仪的二手宝贝"
该工具已在多个场景证明其价值:
-
个人用户:寻找特定二手商品,节省时间和金钱
-
收藏爱好者:监控限量版商品,抓住购买机会
-
小型商家:发现优质货源,优化采购成本
-
研究人员:分析市场价格趋势,进行市场研究
立即体验AI-Goofish-Monitor,开启智能监控之旅!
免责声明
⚠️ 重要提示:
-
请遵守闲鱼用户协议和robots.txt规则
-
不要进行过于频繁的请求,避免对服务器造成负担
-
本项目仅供学习和技术研究使用
-
请勿用于非法用途或商业滥用
技术支持:
-
邮箱:查看项目GitHub页面获取支持联系方式
-
社区:加入GitHub Discussions参与讨论
-
文档:详细使用说明请参考项目文档
许可证:
本项目采用 MIT 许可证发布,按"现状"提供,不提供任何形式的担保。项目作者及贡献者不对因使用本软件而导致的任何损害或损失承担责任。
更多推荐


所有评论(0)