import asyncio import json import logging import sys import os from typing import Any, Dict, List, Optional from llama_cpp import Llama from mcp.server.models import InitializationOptions from mcp.server import NotificationOptions, Server from mcp.types import ( Resource, Tool, TextContent, ImageContent, EmbeddedResource, LoggingLevel ) import mcp.types as types from pydantic import AnyUrl import mcp.server.stdio from config import config # Setup logging logging.basicConfig(level=getattr(logging, config.log_level), format='%(asctime)s - %(levelname)s - %(message)s') class MCPLLMInterface: """MCP interface for local LLM model using llama-cpp-python""" def __init__(self, model_path: str): self.model_path = model_path self.llm = None self.server = Server("deepseek-mcp-server") self.logger = logging.getLogger(__name__) self._setup_handlers() def _setup_handlers(self): """Setup MCP server handlers""" @self.server.list_resources() async def handle_list_resources() -> List[Resource]: """List available resources""" return [ Resource( uri=AnyUrl("llm://deepseek/chat"), name="DeepSeek Chat", description="Chat interface to DeepSeek 7B model", mimeType="text/plain" ) ] @self.server.read_resource() async def handle_read_resource(uri: AnyUrl) -> str: """Read resource content""" if str(uri) == "llm://deepseek/chat": return "DeepSeek 7B Chat Model - Ready for conversation" else: raise ValueError(f"Unknown resource: {uri}") @self.server.list_tools() async def handle_list_tools() -> List[Tool]: """List available tools""" return [ Tool( name="chat", description="Chat with the DeepSeek 7B model", inputSchema={ "type": "object", "properties": { "message": { "type": "string", "description": "Message to send to the model" }, "max_tokens": { "type": "integer", "description": "Maximum tokens to generate", "default": 512 }, "temperature": { "type": "number", "description": "Temperature for sampling", "default": 0.7 } }, "required": ["message"] } ), Tool( name="generate", description="Generate text completion with the DeepSeek model", inputSchema={ "type": "object", "properties": { "prompt": { "type": "string", "description": "Prompt for text generation" }, "max_tokens": { "type": "integer", "description": "Maximum tokens to generate", "default": 256 }, "temperature": { "type": "number", "description": "Temperature for sampling", "default": 0.7 } }, "required": ["prompt"] } ) ] @self.server.call_tool() async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]: """Handle tool calls""" try: if not self.llm: await self._load_model() if name == "chat": return await self._handle_chat(arguments) elif name == "generate": return await self._handle_generate(arguments) else: raise ValueError(f"Unknown tool: {name}") except Exception as e: self.logger.error(f"Error handling tool call '{name}': {e}") return [TextContent( type="text", text=f"Error: {str(e)}" )] async def _load_model(self): """Load the LLM model""" if self.llm is None: try: self.logger.info(f"Loading model from: {self.model_path}") # Use configuration for model parameters n_gpu_layers = config.n_gpu_layers # Detect GPU availability and adjust layers if needed try: import llama_cpp self.logger.info(f"Attempting to use {n_gpu_layers} GPU layers") except Exception as e: self.logger.warning(f"GPU detection issue: {e}") n_gpu_layers = 0 self.logger.info("Falling back to CPU only") # Load model in executor to avoid blocking loop = asyncio.get_event_loop() self.llm = await loop.run_in_executor( None, lambda: Llama( model_path=self.model_path, n_ctx=config.n_ctx, n_gpu_layers=n_gpu_layers, n_threads=config.n_threads, n_batch=config.n_batch, verbose=False, use_mlock=config.use_mlock, low_vram=config.low_vram, ) ) self.logger.info("Model loaded successfully") except Exception as e: self.logger.error(f"Failed to load model: {e}") raise RuntimeError(f"Model loading failed: {e}") async def _handle_chat(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle chat requests""" try: message = arguments["message"] max_tokens = arguments.get("max_tokens", config.default_max_tokens) temperature = arguments.get("temperature", config.default_temperature) self.logger.info(f"Processing chat request: {message[:50]}...") # Format as chat prompt for DeepSeek prompt = f"<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n{message}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n" # Run inference in executor to avoid blocking loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: self.llm( prompt, max_tokens=max_tokens, temperature=temperature, top_p=config.default_top_p, repeat_penalty=config.default_repeat_penalty, stop=["<|eot_id|>", "<|end_of_text|>", "user:", "User:"] ) ) response_text = response["choices"][0]["text"].strip() self.logger.info(f"Generated response: {len(response_text)} characters") return [TextContent( type="text", text=response_text )] except Exception as e: self.logger.error(f"Error in chat handling: {e}") return [TextContent( type="text", text=f"Sorry, I encountered an error: {str(e)}" )] async def _handle_generate(self, arguments: Dict[str, Any]) -> List[TextContent]: """Handle text generation requests""" try: prompt = arguments["prompt"] max_tokens = arguments.get("max_tokens", min(config.default_max_tokens, 256)) temperature = arguments.get("temperature", config.default_temperature) self.logger.info(f"Processing generation request: {prompt[:50]}...") # Run inference in executor to avoid blocking loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: self.llm( prompt, max_tokens=max_tokens, temperature=temperature, top_p=0.9, repeat_penalty=1.1 ) ) response_text = response["choices"][0]["text"] self.logger.info(f"Generated text: {len(response_text)} characters") return [TextContent( type="text", text=response_text )] except Exception as e: self.logger.error(f"Error in text generation: {e}") return [TextContent( type="text", text=f"Sorry, I encountered an error: {str(e)}" )] async def run(self): """Run the MCP server""" try: self.logger.info("Starting DeepSeek MCP Server...") async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await self.server.run( read_stream, write_stream, InitializationOptions( server_name=config.server_name, server_version=config.server_version, capabilities=self.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={} ) ) ) except Exception as e: self.logger.error(f"Server error: {e}") raise