1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| mcp = FastMCP()
TASK_TTL = timedelta(minutes=10)
_task_store: Dict[str, Dict] = {} _gc_task = None
async def _crawl_task(task_id: str, url: str) -> None: """ 在后台执行真正的 crawl4ai 抓取,完成后写入 task_store。 任何异常都会被捕获并标记为 failed。 """ try: browser_conf = BrowserConfig( headless=False, ) run_conf = CrawlerRunConfig( capture_network_requests=True, )
async with AsyncWebCrawler(config=browser_conf) as crawler: result = await crawler.arun(url=url, config=run_conf)
net = result.network_requests or [] req_cnt = sum(1 for r in net if r.get("event_type") == "request") resp_cnt = sum(1 for r in net if r.get("event_type") == "response") fail_cnt = sum(1 for r in net if r.get("event_type") == "request_failed")
_task_store[task_id] = { "status": "completed", "result": { "url": result.url, "network_requests": net, "total_requests": req_cnt, "total_responses": resp_cnt, "total_failed": fail_cnt, }, "created_at": datetime.now(timezone.utc), } except Exception as exc: _task_store[task_id] = { "status": "failed", "result": {"error": str(exc)}, "created_at": datetime.now(timezone.utc), }
async def _gc_tasks() -> None: while True: await asyncio.sleep(60) cutoff = datetime.now(timezone.utc) - TASK_TTL to_del = [tid for tid, t in _task_store.items() if t["created_at"] < cutoff] for tid in to_del: _task_store.pop(tid, None)
@mcp.tool("start_crawl") async def start_crawl( url: str = Field(description="要爬取的网址"), ) -> str: """ 启动异步爬取任务,立即返回 task_id,供后续轮询。 """ global _gc_task if _gc_task is None: _gc_task = asyncio.create_task(_gc_tasks())
task_id = str(uuid.uuid4()) _task_store[task_id] = { "status": "running", "created_at": datetime.now(timezone.utc), } asyncio.create_task(_crawl_task(task_id, url)) return task_id
@mcp.tool("get_crawl_result") async def get_crawl_result( task_id: str = Field(description="start_crawl 返回的任务 ID"), ) -> Dict: """ 查询任务状态与结果。 返回示例 { "status": "running" | "completed" | "failed" | "not_found", "result": <具体数据或错误信息> } """ task = _task_store.get(task_id) if not task: return {"status": "not_found"} return {"status": task["status"], "result": task.get("result")}
def main(): mcp.run(transport="stdio")
if __name__ == "__main__": main()
|