前言
很多文档和博客都只介绍如何开发MCP Server,然后集成到VS Code或者Cursor等程序,很少涉及如何开发MCP Host和MCP Client。如果你想要在自己的服务中集成完整的MCP功能,光看这些是远远不够的。所以本文及后续的MCP系列文章都会带你深入了解如何开发MCP Client,让你真正掌握这项技术。
准备开发环境
MCP官方SDK主要支持Python和TypeScript,当然也有其他语言的实现,不过我这里就以Python为例了。我的Python版本是3.13.5
,但其实只要高于3.11应该都没问题。
我个人推荐使用uv
来管理依赖,当然你也可以用传统的pip
。Python SDK有官方的mcp
包和社区的FastMCP
包。官方SDK其实也内置了FastMCP,不过是v1版本,而FastMCP官网已经更新到了v2版本。作为学习,两个都装上试试也无妨。
# 使用 uv
uv add mcp fastmcp# 使用 pip
python -m pip install mcp fastmcp
第一个MCP项目:你好,MCP世界!
在第一个MCP项目中,我们实现一个简单的MCP Client和MCP Server,但还没集成LLM。在这个阶段,Client调用Server的tool或resource都需要手动指定。
MCP Server
下面的MCP Server示例代码定义了一些prompts、resources和tools。这里有个小贴士:函数参数的类型注解、返回类型和docstring都一定要写清楚,否则后续集成LLM时,LLM就无法正确理解如何调用你的工具了。
这段Server可以通过stdio方式被Client调用。在正式让Client调用之前,建议你先手动运行一下Server,测试它能否正常启动,避免Client启动时报一堆让人摸不着头脑的错误。
from mcp.server.fastmcp import FastMCP
from datetime import datetime
import asyncssh
from typing import TypeAlias, Unionmcp = FastMCP("custom")@mcp.prompt()
def greet_user(name: str, style: str = "formal") -> str:"""Greet a user with a specified style."""if style == "formal":return f"Good day, {name}. How do you do?"elif style == "friendly":return f"Hey {name}! What's up?"elif style == "casual":return f"Yo {name}, how's it going?"else:return f"Hello, {name}!"@mcp.resource("greeting://{name}")
def greeting_resource(name: str) -> str:"""A simple greeting resource."""return f"Hello, {name}!"@mcp.resource("config://app")
def get_config() -> str:"""Static configuration data"""return "App configuration here"@mcp.tool()
def add(a: int, b: int) -> int:"""Add two numbers"""return a + b@mcp.tool()
def multiply(a: int, b: int) -> int:"""Multiply two numbers"""return a * bNumber: TypeAlias = Union[int, float]@mcp.tool()
def is_greater_than(a: Number, b: Number) -> Number:"""Check if a is greater than b"""return a > b@mcp.tool()
async def get_weather(city: str) -> str: """Get weather for a given city."""return f"It's always sunny in {city}!"@mcp.tool()
async def get_date() -> str:"""Get today's date."""return datetime.now().strftime("%Y-%m-%d")@mcp.tool()
async def execute_ssh_command_remote(hostname: str, command: str) -> str:"""Execute an SSH command on a remote host.Args:hostname (str): The hostname of the remote host.command (str): The SSH command to execute.Returns:str: The output of the SSH command."""async with asyncssh.connect(hostname, username="rainux", connect_timeout=10) as conn:result = await conn.run(command, timeout=10)stdout = result.stdoutstderr = result.stderrcontent = str(stdout if stdout else stderr)return contentif __name__ == "__main__":mcp.run(transport="stdio")
MCP Client
Client通过STDIO方式调用MCP Server,server_params
中指定了如何运行Server,包括python解释器路径、Server文件名和运行位置。需要注意的是,Client启动时也会启动Server,如果Server报错,Client也会跟着无法启动。
import asyncio
from pathlib import Path
from pydantic import AnyUrlfrom mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_clientserver_params = StdioServerParameters(command=str(Path(__file__).parent / ".venv" / "bin" / "python"),args=[str(Path(__file__).parent / "demo1-server.py")],cwd=str(Path(__file__).parent),
)async def run():async with stdio_client(server_params) as (read, write):async with ClientSession(read, write) as session:# Initialize the connectionawait session.initialize()# List available promptsprompts = await session.list_prompts()print(f"Available prompts: {[p.name for p in prompts.prompts]}")# Get a prompt (greet_user prompt from fastmcp_quickstart)if prompts.prompts:prompt = await session.get_prompt("greet_user", arguments={"name": "Alice", "style": "friendly"})print(f"Prompt result: {prompt.messages[0].content}")# List available resourcesresources = await session.list_resources()print(f"Available resources: {[r.uri for r in resources.resources]}")# List available toolstools = await session.list_tools()print(f"Available tools: {[t.name for t in tools.tools]}")# Read a resource (greeting resource from fastmcp_quickstart)resource_content = await session.read_resource(AnyUrl("greeting://World"))content_block = resource_content.contents[0]if isinstance(content_block, types.TextResourceContents):print(f"Resource content: {content_block.text}")# Call a tool (add tool from fastmcp_quickstart)result = await session.call_tool("add", arguments={"a": 5, "b": 3})result_unstructured = result.content[0]if isinstance(result_unstructured, types.TextContent):print(f"Tool result: {result_unstructured.text}")result_structured = result.structuredContentprint(f"Structured tool result: {result_structured}")if __name__ == "__main__":asyncio.run(run())
运行Client,输出如下:
Processing request of type ListPromptsRequest
Available prompts: ['greet_user']
Processing request of type GetPromptRequest
Prompt result: type='text' text="Hey Alice! What's up?" annotations=None meta=None
Processing request of type ListResourcesRequest
Available resources: [AnyUrl('config://app')]
Processing request of type ListToolsRequest
Available tools: ['add', 'multiply', 'get_weather', 'get_date', 'execute_ssh_command_remote']
Processing request of type ReadResourceRequest
Resource content: Hello, World!
Processing request of type CallToolRequest
Tool result: 8
Structured tool result: {'result': 8}
可以看到,Client成功地调用了Server上的各种功能,包括获取提示、读取资源和调用工具。
使用streamable-http远程调用:让MCP飞起来!
上面的例子中,Client通过STDIO方式在本地调用Server。现在我们稍作修改,让它可以通过HTTP远程调用Server,这样就更加灵活了。
MCP Server
只列出修改的部分:
mcp = FastMCP("custom", host="localhost", port=8001)if __name__ == "__main__":mcp.run(transport="streamable-http")
修改完成后,启动Server,它会监听在localhost:8001
地址上,就像一个小小的Web服务(其实就是个Web服务,暴露的api为/mcp
)。
MCP Client
同样只列出修改的部分。Client需要指定MCP Server的地址。streamablehttp_client
返回的第三个参数get_session_id
用于会话管理,大多数情况下你不需要直接使用它,所以在一些文档中这里会用_
来占位。
from mcp.client.streamable_http import streamablehttp_clientserver_uri = "http://localhost:8001/mcp"async def main():async with streamablehttp_client(server_uri) as (read, write, get_session_id):# 获取当前会话IDsession_id = get_session_id()print(f"Session ID before initialization: {session_id}")async with ClientSession(read, write) as session:# Initialize the connectionawait session.initialize()# 初始化后再次获取会话IDsession_id = get_session_id()print(f"Session ID after initialization: {session_id}")
client运行输出:
Session ID before initialization: None
Session ID after initialization: 60ce4204b907469e9eb46e7e01df040d
Available prompts: ['greet_user']
Prompt result: type='text' text="Hey Alice! What's up?" annotations=None meta=None
Available resources: [AnyUrl('config://app')]
Available tools: ['add', 'multiply', 'get_weather', 'get_date', 'execute_ssh_command_remote']
Resource content: Hello, World!
Tool result: 8
Structured tool result: {'result': 8}
现在我们的MCP应用已经可以通过网络进行远程调用了,架构变得更加灵活。
集成LLM:让AI自己做决定!
前面两个示例中,我们都需要在Client中手动控制调用Server的tool,这在实际应用中显然是不现实的。我们需要集成LLM,让AI自己决定该调用哪个工具。
MCP Server
Server端不需要做任何变更,Client还是通过HTTP方式调用我们之前创建的Server。
MCP Client
这里我们选用阿里的通义千问(Qwen)。Qwen的API Key可以自行申请,氪个5块钱就够个人开发用很久了。为了便于后续开发,我把配置功能单独放到了一个模块里,下面代码中直接使用了,相关模块放在"补充"部分。
"""
MCP (Model Context Protocol) 客户端示例
该客户端演示了如何使用 MCP 协议与 MCP 服务器进行交互,并通过 LLM 调用服务器提供的工具。工作流程:
1. 连接到 MCP 服务器
2. 获取服务器提供的工具列表
3. 用户输入查询
4. 将查询发送给 LLM,LLM 可能会调用 MCP 服务器提供的工具
5. 执行工具调用并获取结果
6. 将结果返回给 LLM 进行最终回答
"""import asyncio
# JSON 处理
import json
# 增强输入功能(在某些系统上提供命令历史等功能)
import readline # 引入readline模块用于增强python的input功能, Windows下的python标准库可能不包含
# 异常追踪信息
import traceback
# 异步上下文管理器,用于资源管理
from contextlib import AsyncExitStack
# 类型提示支持
from typing import List, Optional, cast# MCP 客户端会话和 HTTP 传输
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
# OpenAI 异步客户端,用于与 LLM 通信
from openai import AsyncOpenAI
# OpenAI 聊天完成相关的类型定义
from openai.types.chat import (ChatCompletionAssistantMessageParam,ChatCompletionMessageFunctionToolCall,ChatCompletionMessageParam,ChatCompletionMessageToolCall,ChatCompletionToolMessageParam,ChatCompletionToolParam,ChatCompletionUserMessageParam)# 项目配置和日志模块
from pkg.config import cfg
from pkg.log import loggerclass MCPClient:"""MCP 客户端类,负责管理与 MCP 服务器的连接和交互"""def __init__(self):"""初始化 MCP 客户端"""# 客户端会话,初始为空self.session: Optional[ClientSession] = None# 异步上下文管理栈,用于管理异步资源的生命周期self.exit_stack = AsyncExitStack()# OpenAI 异步客户端,用于与 LLM 通信self.client = AsyncOpenAI(base_url=cfg.llm_base_url,api_key=cfg.llm_api_key,)async def connect_to_server(self, server_uri: str):"""连接到 MCP 服务器Args:server_uri (str): MCP 服务器的 URI"""# 创建 Streamable HTTP 传输连接http_transport = await self.exit_stack.enter_async_context(streamablehttp_client(server_uri))# 获取读写流self.read, self.write, _ = http_transport# 创建并初始化客户端会话self.session = await self.exit_stack.enter_async_context(ClientSession(self.read, self.write))# 初始化会话await self.session.initialize()# 检查会话是否成功初始化if self.session is None:raise RuntimeError("Failed to initialize session")# 获取服务器提供的工具列表response = await self.session.list_tools()tools = response.toolslogger.info(f"\nConnected to server with tools: {[tool.name for tool in tools]}")async def process_query(self, query: str) -> str:"""处理用户查询Args:query (str): 用户的查询Returns:str: 处理结果"""# 初始化消息历史,包含用户的查询messages: List[ChatCompletionMessageParam] = [ChatCompletionUserMessageParam(role="user",content=query)]# 确保会话已初始化if self.session is None:raise RuntimeError("Session not initialized. Please connect to server first.")# 获取服务器提供的工具列表response = await self.session.list_tools()# 构建工具列表,处理可能为None的字段# 这些工具将被传递给 LLM,以便 LLM 知道可以调用哪些工具available_tools: List[ChatCompletionToolParam] = []for tool in response.tools:tool_def: ChatCompletionToolParam = {"type": "function","function": {"name": tool.name,"description": tool.description or "","parameters": tool.inputSchema or {}}}available_tools.append(tool_def)logger.info(f"Available tools: {available_tools}")# 调用 LLM 进行聊天完成response = await self.client.chat.completions.create(model=cfg.llm_model,messages=messages,tools=available_tools,)# 存储最终输出文本final_text = []# 获取 LLM 的响应消息message = response.choices[0].messagefinal_text.append(message.content or "")# 如果 LLM 要求调用工具,则处理工具调用while message.tool_calls:# 处理每个工具调用for tool_call in message.tool_calls:# 确保我们处理的是正确的工具调用类型if hasattr(tool_call, 'function'):# 这是一个函数工具调用function_call = cast(ChatCompletionMessageFunctionToolCall, tool_call)function = function_call.functiontool_name = function.name# 解析工具参数tool_args = json.loads(function.arguments)else:# 跳过不支持的工具调用类型continue# 执行工具调用if self.session is None:raise RuntimeError("Session not initialized. Cannot call tool.")# 调用 MCP 服务器上的工具result = await self.session.call_tool(tool_name, tool_args)final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")# 将工具调用和结果添加到消息历史# 这样 LLM 可以知道它之前调用了哪些工具assistant_msg: ChatCompletionAssistantMessageParam = {"role": "assistant","tool_calls": [{"id": tool_call.id,"type": "function","function": {"name": tool_name,"arguments": json.dumps(tool_args)}}]}messages.append(assistant_msg)# 添加工具调用结果到消息历史tool_msg: ChatCompletionToolMessageParam = {"role": "tool","tool_call_id": tool_call.id,"content": str(result.content) if result.content else ""}messages.append(tool_msg)# 将工具调用的结果交给 LLM,让 LLM 生成最终回答response = await self.client.chat.completions.create(model=cfg.llm_model,messages=messages,tools=available_tools)# 获取新的响应消息message = response.choices[0].messageif message.content:final_text.append(message.content)# 返回最终结果return "\n".join(final_text)async def chat_loop(self):"""运行交互式聊天循环"""print("\nMCP Client Started!")print("Type your queries or 'quit' to exit.")# 持续接收用户输入while True:try:# 获取用户输入query = input("\nQuery: ").strip()# 检查是否退出if query.lower() == 'quit':break# 忽略空输入if not query:continue# 处理用户查询并输出结果response = await self.process_query(query)print("\n" + response)# 异常处理except Exception as e:print(f"\nError: {str(e)}")print(traceback.format_exc())async def cleanup(self):"""清理资源"""await self.exit_stack.aclose()async def main():"""主函数"""# 创建 MCP 客户端实例client = MCPClient()try:# 连接到 MCP 服务器await client.connect_to_server("http://localhost:8001/mcp")# 运行聊天循环await client.chat_loop()except Exception as e:print(f"Error: {str(e)}")finally:# 清理资源await client.cleanup()# 程序入口点
if __name__ == "__main__":asyncio.run(main())
client运行输出:
MCP Client Started!
Type your queries or 'quit' to exit.Query: 今天的日期是什么[Calling tool get_date with args {}]
今天的日期是2025年9月13日。Query: 合肥的天气怎么样?[Calling tool get_weather with args {'city': '合肥'}]
合肥的天气总是阳光明媚!Query: 0.11比0.9大吗[Calling tool is_greater_than with args {'a': 0.11, 'b': 0.9}]
0.11 不比 0.9 大。0.11 小于 0.9。Query: quit
现在AI可以自己决定调用哪个工具了。当你问"今天的日期是什么"时,它会自动调用get_date
工具;当你问"合肥的天气怎么样"时,它会自动调用get_weather
工具。这才是真正的智能!
小结
通过这篇文章,我们从零开始构建了一个完整的MCP应用,涵盖了从基础的Client-Server通信到集成LLM的全过程。我们学习了:
- 如何搭建MCP开发环境
- 如何创建MCP Server并定义tools、resources和prompts
- 如何编写MCP Client并通过stdio和HTTP两种方式与Server通信
- 如何集成LLM,让AI自主决定调用哪个工具
整个过程就像搭积木一样,每一步都有其特定的作用:
- Server负责提供功能(工具和资源)
- Client负责协调和调用这些功能
- LLM负责智能决策,决定何时以及如何使用这些功能
这种架构的优势在于功能扩展非常灵活。当你需要添加新功能时,只需要在Server端添加新的tools或resources,Client和LLM会自动发现并使用它们,而不需要修改Client端的代码。
MCP真正实现了"上下文协议"的概念,让AI可以像人类一样访问和操作各种工具和资源,这是迈向更强大AI应用的重要一步。接下来你可以尝试添加更多有趣的工具,比如文件操作、数据库查询、API调用等,让你的AI助手变得更加强大!
补充
配置模块
pkg/config.py
import json
from pathlib import Pathclass Config:def __init__(self):p = Path(__file__).parent.parent / "conf" / "config.json"if not p.exists():raise FileNotFoundError(f"Config file not found: {p}")self.data = self.read_json(str(p))def read_json(self, filepath: str) -> dict:with open(filepath, "r") as f:return json.load(f)@propertydef llm_model(self) -> str:return self.data["llm"]["model"]@propertydef llm_api_key(self):return self.data["llm"]["api_key"]@propertydef llm_base_url(self) -> str:return self.data["llm"]["base_url"]@propertydef server_host(self) -> str:return self.data["server"]["host"]@propertydef server_port(self) -> int:return self.data["server"]["port"]cfg = Config()
配置文件conf/config.json
{"llm": {"model": "qwen-plus","base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1","api_key": "your token"},"server": {"host": "127.0.0.1","port": 8000}
}
日志模块
pkg/log.py
import logging
import sysdef set_formatter():"""设置formatter"""fmt = "%(asctime)s | %(name)s | %(levelname)s | %(filename)s:%(lineno)d | %(funcName)s | %(message)s"datefmt = "%Y-%m-%d %H:%M:%S"return logging.Formatter(fmt, datefmt=datefmt)def set_stream_handler():return logging.StreamHandler(sys.stdout)def set_file_handler():return logging.FileHandler("app.log", mode="a", encoding="utf-8")def get_logger(name: str = "mylogger", level=logging.DEBUG):logger = logging.getLogger(name)formatter = set_formatter()# handler = set_stream_handler()handler = set_file_handler()handler.setFormatter(formatter)logger.addHandler(handler)logger.setLevel(level)return loggerlogger = get_logger()