MCP实现原理深度解析
Model Context Protocol(MCP)作为连接AI助手与外部世界的桥梁,其底层实现涉及多个技术层面。本文将深入解析MCP的实现原理,帮助开发者理解这一前沿协议的技术细节。
协议基础架构
JSON-RPC 2.0 基础
MCP 建立在 JSON-RPC 2.0 协议之上,这为其提供了标准化的消息传递机制:
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": {
"listChanged": true
},
"sampling": {}
},
"clientInfo": {
"name": "example-client",
"version": "1.0.0"
}
}
}消息类型定义
MCP 定义了四种基本消息类型:
from enum import Enum
from typing import Dict, Any, Optional, Union
from dataclasses import dataclass
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
NOTIFICATION = "notification"
ERROR = "error"
@dataclass
class MCPMessage:
jsonrpc: str = "2.0"
id: Optional[Union[str, int]] = None
method: Optional[str] = None
params: Optional[Dict[str, Any]] = None
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None流程图
1. 完整协议交互流程
sequenceDiagram
participant Client as MCP客户端
participant Server as MCP服务器
participant Transport as 传输层
participant Resource as 资源提供者
participant Tool as 工具执行器
Note over Client,Tool: MCP协议初始化阶段
Client->>Transport: 建立连接
Transport->>Server: 连接就绪
Client->>Server: initialize请求
Note right of Client: 发送客户端能力和信息
Server->>Client: initialize响应
Note left of Server: 返回服务器能力和信息
Client->>Server: notifications/initialized
Note over Client,Server: 初始化完成通知
Note over Client,Tool: 资源操作流程
Client->>Server: resources/list
Server->>Resource: 获取资源列表
Resource->>Server: 返回资源信息
Server->>Client: 资源列表响应
Client->>Server: resources/read
Note right of Client: 请求读取特定资源
Server->>Resource: 读取资源内容
Resource->>Server: 返回资源内容
Server->>Client: 资源内容响应
Note over Client,Tool: 工具调用流程
Client->>Server: tools/list
Server->>Tool: 获取可用工具
Tool->>Server: 返回工具列表
Server->>Client: 工具列表响应
Client->>Server: tools/call
Note right of Client: 调用特定工具
Server->>Tool: 执行工具逻辑
Tool->>Server: 返回执行结果
Server->>Client: 工具执行结果
Note over Client,Server: 连接关闭
Client->>Server: 断开连接
Server->>Transport: 关闭传输2. 初始化协商流程
flowchart TD
A[客户端启动] --> B[建立传输连接]
B --> C[发送initialize请求]
C --> D{服务器验证}
D -->|成功| E[返回服务器能力]
D -->|失败| F[返回错误响应]
E --> G[客户端发送initialized通知]
G --> H[协议握手完成]
F --> I[连接终止]
style A fill:#e1f5fe
style H fill:#c8e6c9
style I fill:#ffcdd23. 资源访问流程
flowchart TD
A[客户端请求资源列表] --> B[服务器查询所有资源提供者]
B --> C[汇总资源列表]
C --> D[返回资源清单]
D --> E{客户端选择资源}
E -->|读取资源| F[发送resources/read请求]
E -->|不读取| G[流程结束]
F --> H[服务器验证URI]
H --> I{URI有效?}
I -->|有效| J[定位资源提供者]
I -->|无效| K[返回错误]
J --> L[读取资源内容]
L --> M{读取成功?}
M -->|成功| N[返回资源内容]
M -->|失败| O[返回读取错误]
N --> P[客户端处理内容]
style A fill:#e3f2fd
style N fill:#c8e6c9
style K fill:#ffcdd2
style O fill:#ffcdd24. 工具执行流程
flowchart TD
A[获取工具列表] --> B[tools/list请求]
B --> C[服务器返回工具清单]
C --> D[客户端选择工具]
D --> E[构造工具参数]
E --> F[发送tools/call请求]
F --> G[服务器验证工具名称]
G --> H{工具存在?}
H -->|存在| I[验证输入参数]
H -->|不存在| J[返回工具不存在错误]
I --> K{参数有效?}
K -->|有效| L[执行工具逻辑]
K -->|无效| M[返回参数验证错误]
L --> N{执行成功?}
N -->|成功| O[返回执行结果]
N -->|失败| P[返回执行错误]
O --> Q[客户端处理结果]
style A fill:#f3e5f5
style O fill:#c8e6c9
style J fill:#ffcdd2
style M fill:#ffcdd2
style P fill:#ffcdd25. 错误处理流程
flowchart TD
A[接收请求] --> B[解析JSON消息]
B --> C{解析成功?}
C -->|失败| D[返回解析错误 -32700]
C -->|成功| E[验证消息格式]
E --> F{格式有效?}
F -->|无效| G[返回无效请求 -32600]
F -->|有效| H[查找方法处理器]
H --> I{方法存在?}
I -->|不存在| J[返回方法未找到 -32601]
I -->|存在| K[验证参数]
K --> L{参数有效?}
L -->|无效| M[返回参数无效 -32602]
L -->|有效| N[执行方法]
N --> O{执行成功?}
O -->|失败| P[返回内部错误 -32603]
O -->|成功| Q[返回正常结果]
style Q fill:#c8e6c9
style D fill:#ffcdd2
style G fill:#ffcdd2
style J fill:#ffcdd2
style M fill:#ffcdd2
style P fill:#ffcdd26. 传输层抽象架构
graph TB
subgraph "应用层"
A[MCP客户端]
B[MCP服务器]
end
subgraph "协议层"
C[JSON-RPC 2.0]
D[MCP协议处理器]
end
subgraph "传输层"
E[Stdio传输]
F[WebSocket传输]
G[HTTP/SSE传输]
H[自定义传输]
end
subgraph "网络层"
I[TCP/IP]
J[Unix Socket]
K[命名管道]
end
A --> C
B --> C
C --> D
D --> E
D --> F
D --> G
D --> H
E --> I
F --> I
G --> I
H --> J
H --> K
style A fill:#e1f5fe
style B fill:#e8f5e8
style C fill:#fff3e0
style D fill:#f3e5f57. 并发处理架构
graph TD
A[客户端请求] --> B[请求队列]
B --> C[连接池管理器]
C --> D[工作线程池]
subgraph "并发处理"
D --> E[工作线程1]
D --> F[工作线程2]
D --> G[工作线程N]
end
E --> H[资源提供者1]
F --> I[工具执行器1]
G --> J[提示处理器1]
subgraph "资源层"
H --> K[文件系统]
I --> L[数据库]
J --> M[外部API]
end
K --> N[响应聚合器]
L --> N
M --> N
N --> O[响应队列]
O --> P[客户端响应]
style A fill:#e3f2fd
style P fill:#c8e6c9
style B fill:#fff3e0
style O fill:#fff3e08. 状态机流程
stateDiagram-v2
[*] --> Disconnected
Disconnected --> Connecting: 建立连接
Connecting --> Connected: 连接成功
Connecting --> Disconnected: 连接失败
Connected --> Initializing: 发送initialize
Initializing --> Ready: 初始化成功
Initializing --> Disconnected: 初始化失败
Ready --> Processing: 处理请求
Processing --> Ready: 请求完成
Processing --> Error: 处理错误
Error --> Ready: 错误恢复
Error --> Disconnected: 严重错误
Ready --> Disconnected: 主动断开
note right of Ready
可以处理各种MCP请求:
- resources/list
- resources/read
- tools/list
- tools/call
- prompts/list
- prompts/get
end note核心协议实现
1. 连接初始化流程
import asyncio
import json
from typing import Dict, Any, Callable
class MCPConnection:
"""MCP 连接管理器"""
def __init__(self):
self.capabilities = {}
self.client_info = {}
self.server_info = {}
self.initialized = False
self.message_handlers = {}
self.request_id_counter = 0
async def initialize(self, client_capabilities: Dict[str, Any]) -> Dict[str, Any]:
"""初始化 MCP 连接"""
# 构造初始化请求
initialize_request = {
"jsonrpc": "2.0",
"id": self._next_request_id(),
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": client_capabilities,
"clientInfo": {
"name": "python-mcp-client",
"version": "1.0.0"
}
}
}
# 发送初始化请求
response = await self._send_request(initialize_request)
if "error" in response:
raise Exception(f"初始化失败: {response['error']}")
# 保存服务器信息和能力
result = response["result"]
self.server_info = result.get("serverInfo", {})
self.capabilities = result.get("capabilities", {})
# 发送初始化完成通知
await self._send_notification({
"jsonrpc": "2.0",
"method": "notifications/initialized"
})
self.initialized = True
return result
def _next_request_id(self) -> int:
"""生成下一个请求ID"""
self.request_id_counter += 1
return self.request_id_counter2. 资源管理实现
from abc import ABC, abstractmethod
from typing import List, Optional
import mimetypes
import os
class Resource:
"""MCP 资源定义"""
def __init__(self, uri: str, name: str, description: str = "",
mime_type: str = None, annotations: Dict[str, Any] = None):
self.uri = uri
self.name = name
self.description = description
self.mime_type = mime_type or self._detect_mime_type(uri)
self.annotations = annotations or {}
def _detect_mime_type(self, uri: str) -> str:
"""自动检测MIME类型"""
mime_type, _ = mimetypes.guess_type(uri)
return mime_type or "application/octet-stream"
class ResourceProvider(ABC):
"""资源提供者抽象基类"""
@abstractmethod
async def list_resources(self) -> List[Resource]:
"""列出所有可用资源"""
pass
@abstractmethod
async def read_resource(self, uri: str) -> Dict[str, Any]:
"""读取指定资源内容"""
pass
class FileResourceProvider(ResourceProvider):
"""文件系统资源提供者"""
def __init__(self, base_path: str, allowed_extensions: List[str] = None):
self.base_path = os.path.abspath(base_path)
self.allowed_extensions = allowed_extensions or ['.txt', '.md', '.py', '.json']
async def list_resources(self) -> List[Resource]:
"""列出目录中的所有文件资源"""
resources = []
for root, dirs, files in os.walk(self.base_path):
for file in files:
if self._is_allowed_file(file):
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, self.base_path)
uri = f"file://{file_path}"
resource = Resource(
uri=uri,
name=relative_path,
description=f"文件: {relative_path}"
)
resources.append(resource)
return resources
async def read_resource(self, uri: str) -> Dict[str, Any]:
"""读取文件资源内容"""
if not uri.startswith("file://"):
raise ValueError("无效的文件URI")
file_path = uri[7:] # 移除 "file://" 前缀
if not self._is_safe_path(file_path):
raise ValueError("路径不安全")
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {
"contents": [{
"type": "text",
"text": content
}]
}
except Exception as e:
raise Exception(f"读取文件失败: {str(e)}")
def _is_allowed_file(self, filename: str) -> bool:
"""检查文件是否允许访问"""
_, ext = os.path.splitext(filename)
return ext.lower() in self.allowed_extensions
def _is_safe_path(self, path: str) -> bool:
"""检查路径是否安全(防止路径遍历攻击)"""
abs_path = os.path.abspath(path)
return abs_path.startswith(self.base_path)3. 工具执行引擎
from typing import Dict, Any, Callable, Awaitable
import inspect
import json
from jsonschema import validate, ValidationError
class Tool:
"""MCP 工具定义"""
def __init__(self, name: str, description: str,
input_schema: Dict[str, Any], handler: Callable):
self.name = name
self.description = description
self.input_schema = input_schema
self.handler = handler
self._validate_handler()
def _validate_handler(self):
"""验证处理函数签名"""
sig = inspect.signature(self.handler)
if not inspect.iscoroutinefunction(self.handler):
raise ValueError(f"工具处理函数 {self.name} 必须是异步函数")
class ToolRegistry:
"""工具注册表"""
def __init__(self):
self.tools: Dict[str, Tool] = {}
def register_tool(self, tool: Tool):
"""注册工具"""
self.tools[tool.name] = tool
def get_tool_list(self) -> List[Dict[str, Any]]:
"""获取工具列表"""
return [
{
"name": tool.name,
"description": tool.description,
"inputSchema": tool.input_schema
}
for tool in self.tools.values()
]
async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""调用工具"""
if name not in self.tools:
raise ValueError(f"未找到工具: {name}")
tool = self.tools[name]
# 验证输入参数
try:
validate(arguments, tool.input_schema)
except ValidationError as e:
raise ValueError(f"参数验证失败: {str(e)}")
# 调用工具处理函数
try:
result = await tool.handler(**arguments)
return {
"content": [
{
"type": "text",
"text": json.dumps(result, ensure_ascii=False, indent=2)
}
]
}
except Exception as e:
raise Exception(f"工具执行失败: {str(e)}")
# 工具装饰器
def mcp_tool(name: str, description: str, input_schema: Dict[str, Any]):
"""MCP 工具装饰器"""
def decorator(func: Callable):
tool = Tool(name, description, input_schema, func)
# 这里可以自动注册到全局注册表
return tool
return decorator
# 使用示例
@mcp_tool(
name="calculate",
description="执行数学计算",
input_schema={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "数学表达式"
}
},
"required": ["expression"]
}
)
async def calculate_tool(expression: str) -> Dict[str, Any]:
"""计算器工具"""
try:
# 安全的表达式求值(仅限基本数学运算)
allowed_names = {
k: v for k, v in __builtins__.items()
if k in ['abs', 'round', 'min', 'max', 'sum']
}
allowed_names.update({'__builtins__': {}})
result = eval(expression, allowed_names)
return {"result": result}
except Exception as e:
return {"error": f"计算错误: {str(e)}"}4. 传输层抽象
from abc import ABC, abstractmethod
import asyncio
import sys
from typing import AsyncIterator, Tuple, Optional
class Transport(ABC):
"""传输层抽象接口"""
@abstractmethod
async def read_message(self) -> Optional[Dict[str, Any]]:
"""读取一条消息"""
pass
@abstractmethod
async def write_message(self, message: Dict[str, Any]):
"""写入一条消息"""
pass
@abstractmethod
async def close(self):
"""关闭传输连接"""
pass
class StdioTransport(Transport):
"""标准输入输出传输实现"""
def __init__(self):
self.reader = None
self.writer = None
async def initialize(self):
"""初始化stdio传输"""
self.reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(self.reader)
loop = asyncio.get_event_loop()
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
transport, protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout
)
self.writer = asyncio.StreamWriter(transport, protocol, self.reader, loop)
async def read_message(self) -> Optional[Dict[str, Any]]:
"""从stdin读取JSON消息"""
try:
line = await self.reader.readline()
if not line:
return None
message_text = line.decode('utf-8').strip()
if not message_text:
return None
return json.loads(message_text)
except json.JSONDecodeError as e:
raise Exception(f"JSON解析错误: {str(e)}")
except Exception as e:
raise Exception(f"读取消息失败: {str(e)}")
async def write_message(self, message: Dict[str, Any]):
"""向stdout写入JSON消息"""
try:
message_text = json.dumps(message, ensure_ascii=False)
self.writer.write((message_text + '\n').encode('utf-8'))
await self.writer.drain()
except Exception as e:
raise Exception(f"写入消息失败: {str(e)}")
async def close(self):
"""关闭传输连接"""
if self.writer:
self.writer.close()
await self.writer.wait_closed()
class HTTPTransport(Transport):
"""HTTP传输实现(WebSocket或Server-Sent Events)"""
def __init__(self, url: str, transport_type: str = "websocket"):
self.url = url
self.transport_type = transport_type
self.websocket = None
self.session = None
async def initialize(self):
"""初始化HTTP传输"""
if self.transport_type == "websocket":
import websockets
self.websocket = await websockets.connect(self.url)
elif self.transport_type == "sse":
import aiohttp
self.session = aiohttp.ClientSession()
async def read_message(self) -> Optional[Dict[str, Any]]:
"""从WebSocket或SSE读取消息"""
if self.transport_type == "websocket" and self.websocket:
try:
message_text = await self.websocket.recv()
return json.loads(message_text)
except Exception as e:
raise Exception(f"WebSocket读取失败: {str(e)}")
# SSE实现略...
return None
async def write_message(self, message: Dict[str, Any]):
"""向WebSocket写入消息"""
if self.transport_type == "websocket" and self.websocket:
try:
message_text = json.dumps(message, ensure_ascii=False)
await self.websocket.send(message_text)
except Exception as e:
raise Exception(f"WebSocket写入失败: {str(e)}")
async def close(self):
"""关闭HTTP传输"""
if self.websocket:
await self.websocket.close()
if self.session:
await self.session.close()5. 完整的MCP服务器实现
import logging
from typing import Dict, Any, List, Optional, Callable
class MCPServer:
"""完整的MCP服务器实现"""
def __init__(self, name: str, version: str = "1.0.0"):
self.name = name
self.version = version
self.transport = None
self.resource_providers = []
self.tool_registry = ToolRegistry()
self.prompt_registry = {}
self.logger = logging.getLogger(f"mcp.{name}")
# 消息处理器映射
self.message_handlers = {
"initialize": self._handle_initialize,
"resources/list": self._handle_list_resources,
"resources/read": self._handle_read_resource,
"tools/list": self._handle_list_tools,
"tools/call": self._handle_call_tool,
"prompts/list": self._handle_list_prompts,
"prompts/get": self._handle_get_prompt
}
def add_resource_provider(self, provider: ResourceProvider):
"""添加资源提供者"""
self.resource_providers.append(provider)
def register_tool(self, tool: Tool):
"""注册工具"""
self.tool_registry.register_tool(tool)
async def run(self, transport: Transport):
"""运行MCP服务器"""
self.transport = transport
await transport.initialize()
self.logger.info(f"MCP服务器 {self.name} 已启动")
try:
while True:
message = await transport.read_message()
if message is None:
break
await self._handle_message(message)
except Exception as e:
self.logger.error(f"服务器运行错误: {str(e)}")
finally:
await transport.close()
async def _handle_message(self, message: Dict[str, Any]):
"""处理接收到的消息"""
try:
method = message.get("method")
message_id = message.get("id")
params = message.get("params", {})
if method in self.message_handlers:
handler = self.message_handlers[method]
result = await handler(params)
if message_id is not None:
# 发送响应
response = {
"jsonrpc": "2.0",
"id": message_id,
"result": result
}
await self.transport.write_message(response)
else:
# 未知方法
if message_id is not None:
error_response = {
"jsonrpc": "2.0",
"id": message_id,
"error": {
"code": -32601,
"message": f"未知方法: {method}"
}
}
await self.transport.write_message(error_response)
except Exception as e:
self.logger.error(f"处理消息错误: {str(e)}")
if message.get("id") is not None:
error_response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"error": {
"code": -32603,
"message": f"内部错误: {str(e)}"
}
}
await self.transport.write_message(error_response)
async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理初始化请求"""
client_capabilities = params.get("capabilities", {})
return {
"protocolVersion": "2024-11-05",
"capabilities": {
"resources": {
"subscribe": False,
"listChanged": True
},
"tools": {
"listChanged": True
},
"prompts": {
"listChanged": True
},
"logging": {}
},
"serverInfo": {
"name": self.name,
"version": self.version
}
}
async def _handle_list_resources(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理资源列表请求"""
all_resources = []
for provider in self.resource_providers:
resources = await provider.list_resources()
for resource in resources:
all_resources.append({
"uri": resource.uri,
"name": resource.name,
"description": resource.description,
"mimeType": resource.mime_type
})
return {"resources": all_resources}
async def _handle_read_resource(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理资源读取请求"""
uri = params.get("uri")
if not uri:
raise ValueError("缺少URI参数")
for provider in self.resource_providers:
try:
return await provider.read_resource(uri)
except Exception:
continue
raise ValueError(f"未找到资源: {uri}")
async def _handle_list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理工具列表请求"""
return {"tools": self.tool_registry.get_tool_list()}
async def _handle_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理工具调用请求"""
name = params.get("name")
arguments = params.get("arguments", {})
if not name:
raise ValueError("缺少工具名称")
return await self.tool_registry.call_tool(name, arguments)
async def _handle_list_prompts(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理提示列表请求"""
return {"prompts": list(self.prompt_registry.values())}
async def _handle_get_prompt(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理获取提示请求"""
name = params.get("name")
arguments = params.get("arguments", {})
if name not in self.prompt_registry:
raise ValueError(f"未找到提示: {name}")
prompt = self.prompt_registry[name]
# 这里需要根据arguments渲染提示模板
return prompt使用示例
async def main():
# 创建MCP服务器
server = MCPServer("example-server")
# 添加文件资源提供者
file_provider = FileResourceProvider("/tmp/mcp-resources")
server.add_resource_provider(file_provider)
# 注册计算器工具
server.register_tool(calculate_tool)
# 使用stdio传输运行服务器
transport = StdioTransport()
await server.run(transport)
if __name__ == "__main__":
asyncio.run(main())协议扩展机制
自定义能力声明
class ExtendedMCPServer(MCPServer):
"""扩展MCP服务器"""
async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理初始化请求(包含自定义能力)"""
base_result = await super()._handle_initialize(params)
# 添加自定义能力
base_result["capabilities"]["experimental"] = {
"customFeature": {
"version": "1.0"
}
}
return base_result消息拦截器
class MessageInterceptor:
"""消息拦截器"""
async def before_handle(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""消息处理前拦截"""
return message
async def after_handle(self, message: Dict[str, Any], result: Any) -> Any:
"""消息处理后拦截"""
return result
class InterceptorMCPServer(MCPServer):
"""支持拦截器的MCP服务器"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.interceptors: List[MessageInterceptor] = []
def add_interceptor(self, interceptor: MessageInterceptor):
"""添加消息拦截器"""
self.interceptors.append(interceptor)
async def _handle_message(self, message: Dict[str, Any]):
"""处理消息(包含拦截器支持)"""
# 前置拦截
for interceptor in self.interceptors:
message = await interceptor.before_handle(message)
# 原始处理逻辑...
result = await super()._handle_message(message)
# 后置拦截
for interceptor in self.interceptors:
result = await interceptor.after_handle(message, result)
return result性能优化策略
1. 连接池管理
class ConnectionPool:
"""MCP连接池"""
def __init__(self, max_connections: int = 10):
self.max_connections = max_connections
self.active_connections = {}
self.connection_semaphore = asyncio.Semaphore(max_connections)
async def get_connection(self, client_id: str) -> MCPConnection:
"""获取或创建连接"""
async with self.connection_semaphore:
if client_id not in self.active_connections:
self.active_connections[client_id] = MCPConnection()
return self.active_connections[client_id]
async def release_connection(self, client_id: str):
"""释放连接"""
if client_id in self.active_connections:
await self.active_connections[client_id].close()
del self.active_connections[client_id]2. 缓存机制
from functools import wraps
import time
from typing import Dict, Any, Tuple
class MCPCache:
"""MCP响应缓存"""
def __init__(self, default_ttl: int = 300):
self.cache: Dict[str, Tuple[Any, float]] = {}
self.default_ttl = default_ttl
def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
if key in self.cache:
value, expire_time = self.cache[key]
if time.time() < expire_time:
return value
else:
del self.cache[key]
return None
def set(self, key: str, value: Any, ttl: int = None):
"""设置缓存值"""
ttl = ttl or self.default_ttl
expire_time = time.time() + ttl
self.cache[key] = (value, expire_time)
def cache_response(cache: MCPCache, ttl: int = None):
"""响应缓存装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
# 尝试从缓存获取
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
# 执行函数并缓存结果
result = await func(*args, **kwargs)
cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator安全考虑
1. 访问控制
class AccessController:
"""访问控制器"""
def __init__(self):
self.permissions = {}
def grant_permission(self, client_id: str, resource: str, actions: List[str]):
"""授予权限"""
if client_id not in self.permissions:
self.permissions[client_id] = {}
self.permissions[client_id][resource] = actions
def check_permission(self, client_id: str, resource: str, action: str) -> bool:
"""检查权限"""
client_permissions = self.permissions.get(client_id, {})
resource_actions = client_permissions.get(resource, [])
return action in resource_actions2. 输入验证
import re
from typing import Any
class InputValidator:
"""输入验证器"""
@staticmethod
def validate_uri(uri: str) -> bool:
"""验证URI格式"""
uri_pattern = re.compile(
r'^[a-zA-Z][a-zA-Z\d+\-.]*:' # scheme
r'(?://(?:[^\s/?#]+@)?[^\s/?#]+)?' # authority
r'[^\s?#]*' # path
r'(?:\?[^\s#]*)?' # query
r'(?:#[^\s]*)?$' # fragment
)
return bool(uri_pattern.match(uri))
@staticmethod
def sanitize_string(text: str, max_length: int = 1000) -> str:
"""清理字符串输入"""
if len(text) > max_length:
raise ValueError(f"文本长度超过限制: {max_length}")
# 移除控制字符
sanitized = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
return sanitized总结
MCP的实现涉及多个技术层面:
- 协议层:基于JSON-RPC 2.0的消息传递机制
- 传输层:支持多种传输方式(stdio、WebSocket、HTTP)
- 业务层:资源管理、工具执行、提示处理
- 安全层:访问控制、输入验证、安全传输