Building a stdio MCP server

Kashish Hora

Kashish Hora

Co-founder of MCPcat

Try out MCPcat

The Quick Answer

Create a stdio MCP server that communicates through standard input/output streams. Install the SDK and implement your server:

$npm install @modelcontextprotocol/sdk zod
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";

const server = new McpServer({
  name: "my-stdio-server",
  version: "1.0.0"
});

const transport = new StdioServerTransport();
await server.connect(transport);

The stdio transport enables subprocess communication where the client launches your server and exchanges JSON-RPC messages through stdin/stdout streams.

Prerequisites

  • Node.js v18 or higher (for TypeScript implementation)
  • Python 3.8+ with asyncio support (for Python implementation)
  • Basic understanding of JSON-RPC protocol
  • Familiarity with subprocess communication patterns

Installation

TypeScript

# Create new project
$mkdir my-mcp-server && cd my-mcp-server
$npm init -y
 
# Install dependencies
$npm install @modelcontextprotocol/sdk zod
$npm install -D typescript @types/node tsx
 
# Initialize TypeScript
$npx tsc --init

Python

# Create virtual environment
$python -m venv venv
$source venv/bin/activate # On Windows: venv\Scripts\activate
 
# Install MCP SDK
$pip install mcp

Configuration

MCP stdio servers require specific configuration in the client application. The client spawns your server as a subprocess and manages the communication channels.

Client Configuration (Claude Desktop)

{
  "mcpServers": {
    "my-server": {
      "command": "node",
      "args": ["path/to/server.js"],
      "env": {
        "API_KEY": "your-api-key"
      }
    }
  }
}

The command specifies the executable, args contains command-line arguments, and env sets environment variables. The client automatically connects stdin/stdout streams for bidirectional communication.

TypeScript Server Configuration

import { McpServer, ServerOptions } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";

const serverOptions: ServerOptions = {
  name: "my-stdio-server",
  version: "1.0.0",
  description: "A stdio-based MCP server",
  vendor: "YourCompany"
};

const server = new McpServer(serverOptions);

Server metadata helps clients identify and display information about your server. Include descriptive names and semantic versioning for better compatibility tracking.

Usage

Building a functional stdio MCP server involves registering tools, resources, and prompts that clients can discover and invoke. The server processes incoming JSON-RPC requests and returns structured responses.

Implementing Tools

import { z } from "zod";

server.tool(
  "calculate",
  {
    operation: z.enum(["add", "subtract", "multiply", "divide"]),
    a: z.number(),
    b: z.number()
  },
  async ({ operation, a, b }) => {
    let result: number;
    switch (operation) {
      case "add": result = a + b; break;
      case "subtract": result = a - b; break;
      case "multiply": result = a * b; break;
      case "divide": 
        if (b === 0) throw new Error("Division by zero");
        result = a / b; 
        break;
    }
    
    return {
      content: [{ type: "text", text: `Result: ${result}` }]
    };
  }
);

Tools expose executable functions to the client. Use Zod schemas for input validation, ensuring type safety and automatic error handling for malformed requests.

Python Implementation

from mcp.server import Server
from mcp.server.stdio import stdio_server
import asyncio

app = Server("my-stdio-server")

@app.list_tools()
async def list_tools():
    return [{
        "name": "calculate",
        "description": "Perform arithmetic operations",
        "inputSchema": {
            "type": "object",
            "properties": {
                "operation": {"type": "string", "enum": ["add", "subtract"]},
                "a": {"type": "number"},
                "b": {"type": "number"}
            },
            "required": ["operation", "a", "b"]
        }
    }]

@app.call_tool()
async def call_tool(name: str, arguments: dict):
    if name == "calculate":
        op = arguments["operation"]
        a, b = arguments["a"], arguments["b"]
        result = a + b if op == "add" else a - b
        return {"content": [{"type": "text", "text": f"Result: {result}"}]}

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await app.run(read_stream, write_stream)

if __name__ == "__main__":
    asyncio.run(main())

Python's async context managers handle stream lifecycle automatically. The decorator pattern simplifies handler registration while maintaining protocol compliance.

Handling Resources

server.resource(
  "config://settings",
  "Application configuration",
  async () => {
    const config = {
      debug: process.env.DEBUG === "true",
      maxConnections: parseInt(process.env.MAX_CONN || "10"),
      timeout: parseInt(process.env.TIMEOUT || "30000")
    };
    
    return {
      contents: [{
        uri: "config://settings",
        mimeType: "application/json",
        text: JSON.stringify(config, null, 2)
      }]
    };
  }
);

Resources provide read-only data access. Use URI schemes to categorize resources (e.g., file://, config://, data://) and return appropriate MIME types for content negotiation.

Common Issues

Error: "Invalid JSON-RPC message"

The stdio transport requires precise message formatting with newline delimiters. Logging to stdout corrupts the protocol stream.

// WRONG - corrupts stdout
console.log("Debug info");

// CORRECT - use stderr
console.error("Debug info");

// BETTER - use proper logging
import { writeFileSync } from "fs";
function log(message: string) {
  writeFileSync("server.log", `${new Date().toISOString()} ${message}\n`, { flag: "a" });
}

Always redirect debug output to stderr or log files. The client expects only JSON-RPC messages on stdout, and any other content causes parsing failures.

Error: "Server process terminated unexpectedly"

Unhandled exceptions crash the subprocess, breaking the client connection. Implement global error handlers to maintain stability.

process.on("uncaughtException", (error) => {
  console.error("Uncaught exception:", error);
  // Log error details but don't exit
});

process.on("unhandledRejection", (reason, promise) => {
  console.error("Unhandled rejection at:", promise, "reason:", reason);
});

// Graceful shutdown
process.on("SIGTERM", async () => {
  await transport.close();
  process.exit(0);
});

Proper error boundaries prevent cascading failures. Log errors for debugging while keeping the server operational for subsequent requests.

Error: "Message framing error"

The stdio protocol expects exactly one JSON object per line without embedded newlines. Multiline JSON breaks message parsing.

// WRONG - contains newlines
const response = {
  content: [{
    type: "text",
    text: `Line 1
Line 2
Line 3`
  }]
};

// CORRECT - escape newlines
const response = {
  content: [{
    type: "text",
    text: "Line 1\nLine 2\nLine 3"
  }]
};

// Ensure proper serialization
const message = JSON.stringify(response);
process.stdout.write(message + "\n");

String content must escape special characters. The SDK handles this automatically, but manual implementations require careful encoding.

Examples

File System MCP Server

This example demonstrates a production-ready file system server with proper error handling and security considerations:

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { readFile, readdir, stat } from "fs/promises";
import { join, resolve, relative } from "path";
import { z } from "zod";

const server = new McpServer({
  name: "filesystem-server",
  version: "1.0.0"
});

// Security: restrict to specific directory
const ALLOWED_ROOT = process.env.FS_ROOT || process.cwd();

function validatePath(requestedPath: string): string {
  const resolved = resolve(ALLOWED_ROOT, requestedPath);
  const rel = relative(ALLOWED_ROOT, resolved);
  if (rel.startsWith("..")) {
    throw new Error("Access denied: Path outside allowed directory");
  }
  return resolved;
}

server.tool(
  "readFile",
  { path: z.string() },
  async ({ path }) => {
    try {
      const safePath = validatePath(path);
      const content = await readFile(safePath, "utf-8");
      return {
        content: [{ type: "text", text: content }]
      };
    } catch (error) {
      return {
        content: [{ 
          type: "text", 
          text: `Error reading file: ${error.message}` 
        }],
        isError: true
      };
    }
  }
);

server.tool(
  "listDirectory",
  { path: z.string().default(".") },
  async ({ path }) => {
    const safePath = validatePath(path);
    const entries = await readdir(safePath, { withFileTypes: true });
    
    const formatted = await Promise.all(
      entries.map(async (entry) => {
        const fullPath = join(safePath, entry.name);
        const stats = await stat(fullPath);
        return {
          name: entry.name,
          type: entry.isDirectory() ? "directory" : "file",
          size: stats.size,
          modified: stats.mtime.toISOString()
        };
      })
    );
    
    return {
      content: [{ 
        type: "text", 
        text: JSON.stringify(formatted, null, 2) 
      }]
    };
  }
);

const transport = new StdioServerTransport();
await server.connect(transport);

This implementation validates all paths against a root directory, preventing directory traversal attacks. Error handling ensures graceful failures without exposing system internals. The server returns structured data suitable for further processing by AI models.

Database Query Server

A practical example showing database integration with connection pooling and prepared statements:

from mcp.server import Server
from mcp.server.stdio import stdio_server
import asyncio
import asyncpg
import json
from typing import Dict, Any

app = Server("database-server")

class DatabasePool:
    def __init__(self):
        self.pool = None
    
    async def initialize(self):
        self.pool = await asyncpg.create_pool(
            host="localhost",
            database="myapp",
            user="readonly",
            password=os.environ.get("DB_PASSWORD"),
            min_size=1,
            max_size=10,
            command_timeout=10
        )
    
    async def execute_query(self, query: str, params: list = None):
        async with self.pool.acquire() as conn:
            # Set readonly transaction
            async with conn.transaction(readonly=True):
                rows = await conn.fetch(query, *(params or []))
                return [dict(row) for row in rows]

db = DatabasePool()

@app.list_tools()
async def list_tools():
    return [{
        "name": "query",
        "description": "Execute a SELECT query",
        "inputSchema": {
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "params": {"type": "array", "items": {"type": "string"}}
            },
            "required": ["query"]
        }
    }]

@app.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]):
    if name == "query":
        query = arguments["query"]
        params = arguments.get("params", [])
        
        # Validate query is SELECT only
        if not query.strip().upper().startswith("SELECT"):
            return {
                "content": [{
                    "type": "text",
                    "text": "Error: Only SELECT queries are allowed"
                }],
                "isError": True
            }
        
        try:
            results = await db.execute_query(query, params)
            return {
                "content": [{
                    "type": "text",
                    "text": json.dumps(results, default=str, indent=2)
                }]
            }
        except Exception as e:
            return {
                "content": [{
                    "type": "text",
                    "text": f"Query error: {str(e)}"
                }],
                "isError": True
            }

async def main():
    await db.initialize()
    async with stdio_server() as (reader, writer):
        await app.run(reader, writer)

if __name__ == "__main__":
    asyncio.run(main())

Connection pooling optimizes database resource usage across multiple requests. Read-only transactions and query validation provide security layers. The parameterized queries prevent SQL injection while maintaining flexibility for dynamic queries.

Integration with External APIs

This example shows how to wrap external services as MCP tools with rate limiting and caching:

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
import fetch from "node-fetch";
import NodeCache from "node-cache";

const server = new McpServer({
  name: "weather-server",
  version: "1.0.0"
});

// Cache responses for 10 minutes
const cache = new NodeCache({ stdTTL: 600 });

// Simple rate limiter
const rateLimiter = {
  requests: new Map<string, number[]>(),
  
  check(key: string, limit: number, window: number): boolean {
    const now = Date.now();
    const requests = this.requests.get(key) || [];
    const recent = requests.filter(t => now - t < window);
    
    if (recent.length >= limit) {
      return false;
    }
    
    recent.push(now);
    this.requests.set(key, recent);
    return true;
  }
};

server.tool(
  "getWeather",
  {
    location: z.string(),
    units: z.enum(["metric", "imperial"]).default("metric")
  },
  async ({ location, units }) => {
    // Check cache first
    const cacheKey = `${location}-${units}`;
    const cached = cache.get(cacheKey);
    if (cached) {
      return {
        content: [{ 
          type: "text", 
          text: `${cached} (cached)` 
        }]
      };
    }
    
    // Rate limit: 10 requests per minute
    if (!rateLimiter.check("weather-api", 10, 60000)) {
      return {
        content: [{ 
          type: "text", 
          text: "Rate limit exceeded. Please try again later." 
        }],
        isError: true
      };
    }
    
    try {
      const apiKey = process.env.WEATHER_API_KEY;
      const response = await fetch(
        `https://api.openweathermap.org/data/2.5/weather?q=${location}&units=${units}&appid=${apiKey}`
      );
      
      if (!response.ok) {
        throw new Error(`API error: ${response.status}`);
      }
      
      const data = await response.json();
      const result = `Weather in ${data.name}: ${data.main.temp}°, ${data.weather[0].description}`;
      
      // Cache the result
      cache.set(cacheKey, result);
      
      return {
        content: [{ type: "text", text: result }]
      };
    } catch (error) {
      return {
        content: [{ 
          type: "text", 
          text: `Failed to fetch weather: ${error.message}` 
        }],
        isError: true
      };
    }
  }
);

const transport = new StdioServerTransport();
await server.connect(transport);

External API integration requires careful consideration of rate limits and failure modes. Caching reduces API calls and improves response times. The rate limiter prevents abuse while maintaining service availability for legitimate requests.