使用mcp工具的agent实例
项目介绍
MCP (model context proto) 大模型上下文协议
允许大模型识别工具,大模型调用工具的能力是function call 赋予的.
目前MCP支持三种模式,stdio,sse,stream-http.
项目结构:
local_server.py
main.py
.env
服务端
import os
import ast
import operator
import subprocess
import winreg
from typing import Dict,List
from datetime import datetime
import asyncio
import aiohttp
import httpx
from fastmcp import FastMCP,Client
from motor.motor_asyncio import AsyncIOMotorClient
import docker
def init_mongo_client():
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client["MCP"]
col = db["database_exp"]
return col
col = init_mongo_client()
mcp = FastMCP("My Local MCP Server")
'''
本地工具案例:
1. 调用本地工具
2. 查看天气
3. 打开应用
4. 数学计算
'''
# @mcp.tool()
async def get_nvidia_smi() -> str:
"""
调用本地的 nvidia-smi 命令,获取本地 GPU 信息
返回:命令输出的字符串
"""
try:
# 异步启动子进程
proc = await asyncio.create_subprocess_exec(
"nvidia-smi",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
out = stdout.decode().strip()
err = stderr.decode().strip()
if proc.returncode != 0:
# 将标准错误也一并返回,方便定位
raise RuntimeError(f"nvidia-smi 失败(code={proc.returncode}):{err}")
except FileNotFoundError:
return "nvidia-smi 命令未找到,请确保已安装 NVIDIA 驱动程序。"
return out
# @mcp.tool()
async def get_docker_version():
result = subprocess.run(
['docker', '--version'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True # 直接返回 str 而非 bytes
)
if result.returncode != 0:
raise RuntimeError(f"docker --version:{result.stderr}")
return result.stdout
@mcp.tool()
async def fetch_weather(latitude: float, longitude: float, timezone: str = "Asia/Shanghai")-> str:
"""
异步获取并打印指定经纬度的 7 天最高/最低气温
:param latitude: 纬度
:param longitude: 经度
:param timezone: 时区,默认为 Asia/Shanghai
"""
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": latitude,
"longitude": longitude,
"daily": ["temperature_2m_max", "temperature_2m_min"],
"timezone": timezone
}
# 创建一个 ClientSession,并在结束时自动关闭
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as resp:
resp.raise_for_status() # 检查 HTTP 状态
data = await resp.json() # 异步解析 JSON
# 遍历并打印结果
res_txt = ''
for d, tmax, tmin in zip(
data["daily"]["time"],
data["daily"]["temperature_2m_max"],
data["daily"]["temperature_2m_min"]
):
res_txt += f"\n {d} → 最高 {tmax}℃, 最低 {tmin}℃"
return res_txt
@mcp.tool()
async def get_detailed_weather_async(city: str, days: int = 3, timeout: tuple = (5.0, 10.0)) -> str:
"""
使用 wttr.in 公共接口异步获取指定城市的天气信息,
返回包含当前天气和未来 `days` 天逐时预报的字符串。
:param city: 城市英文或拼音
:param days: 预报天数,默认为 3
:param timeout: (connect_timeout, read_timeout)
"""
async def _format_current(curr: dict, city: str) -> List[str]:
return [
f"城市:{city}",
f"时间:{curr['observation_time']} (UTC)",
f"温度:{curr['temp_C']}°C,体感:{curr['FeelsLikeC']}°C",
f"天气:{curr['weatherDesc'][0]['value']}",
f"湿度:{curr['humidity']}%",
f"能见度:{curr['visibility']} km",
f"气压:{curr['pressure']} mb",
f"风速:{curr['windspeedKmph']} km/h,方向:{curr['winddir16Point']}",
f"云量:{curr['cloudcover']}%",
f"降水量:{curr['precipMM']} mm",
"-" * 40
]
async def _format_forecast(weather: List[dict], days: int) -> List[str]:
lines: List[str] = [f"未来{days}天天气预报:"]
for day in weather[:days]:
date = datetime.fromisoformat(day['date'])
lines.append(
f"{date.month}月{date.day}日:最高 {day['maxtempC']}°C,最低 {day['mintempC']}°C,日照 {day['sunHour']} 小时"
)
for h in day['hourly']:
t = int(h['time']) // 100
desc = h['weatherDesc'][0]['value']
temp = h['tempC']
lines.append(f" {t:02d}:00 — {temp}°C,{desc}")
lines.append("-" * 20)
return lines
# 正确的 Timeout 初始化:要么只是 default,要么全指定
client_timeout = httpx.Timeout( # 只传一个值,所有超时都用这个
timeout=timeout[1]
)
# 或者想分别指定 connect/read/write/pool:
# client_timeout = httpx.Timeout(
# connect=timeout[0],
# read=timeout[1],
# write=timeout[1],
# pool=timeout[1]
# )
url = f"http://wttr.in/{city}?format=j1"
try:
async with httpx.AsyncClient(timeout=client_timeout) as client:
resp = await client.get(url)
resp.raise_for_status()
data = resp.json()
except httpx.RequestError as e:
return f"网络错误或接口不可用:{e}"
except ValueError:
return "返回的数据无法解析为 JSON。"
current = data.get('current_condition', [])
weather = data.get('weather', [])
if not current or not weather:
return "未能获取到有效的天气数据。"
lines: List[str] = []
lines += await _format_current(current[0], city)
lines += await _format_forecast(weather, days)
return "\n".join(lines)
@mcp.tool()
async def add_tow_num(a:int,b:int) -> int:
"""
计算两个数的和
"""
return a + b
@mcp.tool()
async def get_docker_ps():
"""
获取当前运行的docker容器列表
"""
try:
# 异步启动子进程
proc = await asyncio.create_subprocess_exec(
"docker",
"ps",
"-a", # 显示所有容器,包括未运行的
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
out = stdout.decode().strip()
err = stderr.decode().strip()
print(out)
if proc.returncode != 0:
# 将标准错误也一并返回,方便定位
raise RuntimeError(f"docker ps 失败(code={proc.returncode}):{err}")
except FileNotFoundError:
return "docker 命令未找到,请确保已安装 Docker。"
return out
@mcp.tool()
async def get_employee_info(name: str) -> dict:
""" 查询员工信息,返回数据库中员工的详细信息 """
doc = await col.find_one({"name": name})
if not doc:
return {'error': f"没有找到员工 {name} 的信息"}
# 把 ObjectId 转成字符串,避免序列化报错
doc["_id"] = str(doc["_id"])
return doc
@mcp.tool()
async def open_application(app_path: str) -> str:
"""
打开指定的应用程序。调用错误会返回错误调用信息
- Windows: 使用 os.startfile
- macOS: 使用 `open` 命令
- Linux: 使用 `xdg-open` 命令
"""
try:
if os.name == 'nt': # Windows
os.startfile(app_path)
elif os.uname().sysname == 'Darwin': # macOS
subprocess.Popen(['open', app_path])
else: # Linux and other
subprocess.Popen(['xdg-open', app_path])
except Exception as e:
return f"应用程序 {app_path} 未找到,请检查路径是否正确。,错误信息:{e}"
@mcp.tool()
def get_installed_applications() -> Dict[str, str]:
"""
从 Windows 注册表中读取已安装的应用程序(64位和32位)及其可执行路径或安装位置。
返回字典:{ '应用名称': '路径', ... }
"""
uninstall_paths = [
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall"),
(winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall"),
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Wow6432Node\Microsoft\Windows\CurrentVersion\Uninstall")
]
apps = {}
for hive, path in uninstall_paths:
try:
reg_key = winreg.OpenKey(hive, path)
except FileNotFoundError:
continue
for i in range(winreg.QueryInfoKey(reg_key)[0]):
try:
subkey_name = winreg.EnumKey(reg_key, i)
subkey = winreg.OpenKey(reg_key, subkey_name)
name, _ = winreg.QueryValueEx(subkey, "DisplayName")
# 优先取 DisplayIcon,否则 InstallLocation
try:
icon, _ = winreg.QueryValueEx(subkey, "DisplayIcon")
exe_path = icon.split(",")[0]
except FileNotFoundError:
exe_path, _ = winreg.QueryValueEx(subkey, "InstallLocation")
if name and exe_path:
apps[name] = exe_path
except Exception:
continue
return apps
@mcp.tool()
async def compute_expression(expr_str: str) -> float:
"""
安全计算数学表达式,仅支持 +, -, *, /, **, (), 数字。
:param expr_str: 表达式字符串,例如 "123123+3414-21312*2+3413-321"
:return: 计算结果
"""
# 支持的运算符映射
operators = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.USub: operator.neg,
}
def eval_node(node):
if isinstance(node, ast.Num): # <number>
return node.n
if isinstance(node, ast.BinOp): # <left> <operator> <right>
left = eval_node(node.left)
right = eval_node(node.right)
op = operators[type(node.op)]
return op(left, right)
if isinstance(node, ast.UnaryOp): # - <operand>
operand = eval_node(node.operand)
op = operators[type(node.op)]
return op(operand)
if isinstance(node, ast.Expression):
return eval_node(node.body)
raise TypeError(f"Unsupported node type: {type(node)}")
# 解析表达式
parsed = ast.parse(expr_str, mode='eval')
return eval_node(parsed)
@mcp.tool()
async def list_docker_containers() -> list:
"""
获取当前 Docker 主机上的所有容器信息(包括运行中和已停止的),并返回一个包含容器名称、状态和短 ID 的列表。
返回:
list: 一个字典列表,每个字典包含容器名称(name)、运行状态(status)和短ID(short_id)。
"""
# 创建一个 Docker 客户端实例,默认连接本地 Docker 守护进程(通过 Unix Socket)
client = docker.from_env()
# 获取所有容器,包括已停止的(all=True)
all_containers = client.containers.list(all=True)
# 遍历所有容器,提取出需要的信息并组织成字典,返回列表
return [
{
"容器名称": container.name, # 容器的名称(例如 my_app)
"状态": container.status, # 容器状态,如 running、exited 等
"ID": container.short_id # 容器的短 ID,便于查看和管理
}
for container in all_containers
]
@mcp.tool()
async def get_docker_logs(container_name: str, tail: int = 100) -> str:
"""
获取指定 Docker 容器的日志内容(默认获取最近的 100 行)
参数:
container_name (str): 要获取日志的容器名称或容器 ID
tail (int): 要获取的最后 N 行日志,默认值为 100
返回:
str: 容器日志文本(解码为 UTF-8 字符串)
异常:
若容器不存在或获取日志失败,则抛出异常
"""
try:
# 创建 Docker 客户端,连接本地 Docker 守护进程
client = docker.from_env()
# 尝试获取指定容器对象
container = client.containers.get(container_name)
# 获取日志(默认仅最后 N 行)
logs = container.logs(tail=tail).decode('utf-8')
return logs
except docker.errors.NotFound:
return f"❌ 找不到名为 '{container_name}' 的容器。"
except Exception as e:
return f"❌ 获取日志时发生错误:{str(e)}"
async def main():
client = Client(mcp)
async with client:
# 1) 列出可用工具
tools = await client.list_tools()
print("Available tools:", tools)
# 2) 调用工具
result = await client.call_tool("get_nvidia_smi", {})
print("GPU 信息:",result)
if __name__ == "__main__":
mcp.run(transport='stdio')
# asyncio.run(main())
agent端
import os
import json
import asyncio
from fastmcp import Client
from fastmcp.client.transports import PythonStdioTransport
from dotenv import load_dotenv
from openai import AzureOpenAI
# 加载环境变量
load_dotenv()
# 配置 MCP 本地服务脚本和传输
server_script = "local_server.py"
transport = PythonStdioTransport(
script_path=server_script,
env={
**os.environ,
"PATH": os.environ.get("PATH", "") + os.pathsep + r"C:\Windows\System32"
}
)
client = Client(transport)
# 初始化 Azure OpenAI LLM
llm = AzureOpenAI(
azure_endpoint=os.getenv("ENDPOINT_URL"),
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
api_version="2025-01-01-preview"
)
def prepare_tools(mcp_tools):
return [
{
"type": "function",
"function": {
"name": t.name,
"description": t.description or "",
"parameters": t.inputSchema
}
}
for t in mcp_tools
]
def invoke_llm(messages, tools):
return llm.chat.completions.create(
model=os.getenv("DEPLOYMENT_NAME"),
messages=messages,
tools=tools,
tool_choice="auto"
)
async def handle_tool_calls(tool_calls, messages, mcp_tools):
for tool_call in tool_calls:
fn_name = tool_call.function.name
args = json.loads(tool_call.function.arguments)
print(f"🔧 调用工具: {fn_name}({args})")
try:
result = await client.call_tool(fn_name, args)
result_text = result[0].text if result else "无返回结果"
except Exception as e:
result_text = f"工具调用失败: {e}"
# print(f"📤 工具结果: {result_text}\n")
# 将结果追加到对话
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"name": fn_name,
"content": result_text
})
return messages
async def chat_loop():
async with client:
mcp_tools = await client.list_tools()
azure_tools = prepare_tools(mcp_tools)
messages = [
{"role": "system", "content": "你是一个智能体,可以调用工具完成任务。请在任务完成后回复'任务已完成'或提供明确结论。"}
]
print("🔁 智能体对话服务启动,输入 'exit' 或 'quit' 退出。\n")
while True:
try:
user_input = input("You: ")
except (EOFError, KeyboardInterrupt):
print("\n会话已结束。")
break
if user_input.strip().lower() in ["exit", "quit"]:
print("✅ 退出对话。")
break
messages.append({"role": "user", "content": user_input})
# 开始循环处理
while True:
response = await asyncio.to_thread(invoke_llm, messages, azure_tools)
msg = response.choices[0].message
if msg.tool_calls:
messages.append({
"role": "assistant",
"content": None,
"tool_calls": msg.tool_calls
})
messages = await handle_tool_calls(msg.tool_calls, messages, mcp_tools)
continue # 回到 LLM,继续完成任务
elif msg.content:
print(f"🤖 AI: {msg.content}\n")
messages.append({"role": "assistant", "content": msg.content})
break # 认为任务完成,退出循环
if __name__ == "__main__":
asyncio.run(chat_loop())