🛡️ MCP para Pentesters — Resources, Prompts e CVE Integration
Status: ✅ Concluído Ambiente: 💻 Kali Linux 2025.x · Python 3.13 · MCP SDK 1.26.0 · NVD API v2.0 Tags: 🏷️ #mcp #pentest #redteam #python #nvd #cve #resources #prompts
Na parte anterior criamos um MCP Server com Tools básicas de pentest. Agora vamos implementar as outras duas primitivas do protocolo — Resources e Prompts — e integrar com a API pública da NVD para busca de CVEs em tempo real. Ao final, o LLM consegue consultar o escopo autorizado, executar tools e registrar achados automaticamente.
⚙️ As 3 Primitivas do MCP
🧰 Tools: A IA executa uma ação — nmap, whois, cve_search. Tem efeito colateral.
📁 Resources: A IA lê dados — escopo, loot, logs. Somente leitura, sem efeitos.
📋 Prompts: A IA segue um workflow — recon completo, análise de serviço. Ativado pelo usuário.
💡 Diferença chave: Uma Tool age, um Resource informa. O LLM usa o Resource para entender o contexto — como verificar se o alvo está no escopo — antes de decidir qual Tool chamar.
📁 Passo 1 — Criar Estrutura de Diretórios e Escopo
Antes de subir o server, crie o diretório de trabalho e o arquivo de escopo autorizado:
⚠️ Bug corrigido — CVSS Severidade: No formato cvssMetricV2 (CVEs mais antigos), o campo baseSeverity fica fora do objeto cvssData, diferente do V3. O código abaixo já traz essa correção aplicada.
from mcp.server.fastmcp import FastMCP
import subprocess
import socket
import json
import re
import urllib.request
import urllib.parse
mcp = FastMCP("PentestServer-v2")
# ─────────────────────────────────────────
# HELPERS DE SEGURANÇA
# ─────────────────────────────────────────
def validar_alvo(target: str) -> bool:
"""Valida se o alvo é um IP ou domínio legítimo"""
ip = r'^(\d{1,3}\.){3}\d{1,3}(/\d{1,2})?$'
domain = r'^[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$'
return bool(re.match(ip, target) or re.match(domain, target))
def sanitizar_flags(flags: str) -> str:
"""Allowlist de flags nmap permitidas"""
allowed = {'-sV', '-sC', '-sS', '-sT', '-sU', '-T1', '-T2',
'-T3', '-T4', '-p', '-A', '-O', '--open', '-Pn', '--script'}
partes = flags.split()
return ' '.join(p for p in partes if any(p.startswith(a) for a in allowed))
@mcp.tool()
def nmap_scan(target: str, flags: str = "-sV -T4") -> str:
"""
Executa scan nmap em um alvo autorizado.
Flags recomendadas:
'-sV -T4' -> versoes de servicos
'-sC -sV' -> scripts padrao + versoes
'-p 80,443,8080,8443' -> portas especificas
'-A' -> deteccao agressiva (OS + versao + scripts)
"""
if not validar_alvo(target):
return f"Alvo invalido: {target}. Use IP ou dominio valido."
flags_safe = sanitizar_flags(flags)
cmd = ["nmap"] + flags_safe.split() + [target]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
return result.stdout or result.stderr
@mcp.tool()
def dns_lookup(domain: str) -> str:
"""Resolve DNS de um dominio. Retorna todos os IPs associados."""
try:
infos = socket.getaddrinfo(domain, None)
ips = list(set([i[4][0] for i in infos]))
return json.dumps({"domain": domain, "ips": ips}, indent=2)
except Exception as e:
return f"Erro: {str(e)}"
@mcp.tool()
def whois_lookup(target: str) -> str:
"""Executa whois em dominio ou IP. Util para OSINT e reconhecimento."""
if not validar_alvo(target):
return f"Alvo invalido: {target}"
result = subprocess.run(
["whois", target], capture_output=True, text=True, timeout=30
)
return result.stdout[:3000] or result.stderr
@mcp.tool()
def check_open_port(host: str, port: int, timeout: float = 2.0) -> str:
"""Verifica se uma porta TCP esta aberta. Mais rapido que nmap para checks pontuais."""
if not validar_alvo(host):
return f"Host invalido: {host}"
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return f"Porta {port} ABERTA em {host}" if result == 0 else f"Porta {port} FECHADA/FILTRADA em {host}"
except Exception as e:
return f"Erro: {str(e)}"
@mcp.tool()
def cve_search(keyword: str, results_per_page: int = 5) -> str:
"""
Busca CVEs na NVD (National Vulnerability Database) por palavra-chave.
Exemplos: 'apache 2.4.49', 'openssh 8.2', 'log4j', 'wordpress 6.0'
Retorna CVE ID, CVSS score, severidade e descricao.
"""
try:
query = urllib.parse.urlencode({
"keywordSearch": keyword,
"resultsPerPage": results_per_page
})
url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?{query}"
req = urllib.request.Request(
url, headers={"User-Agent": "MCP-PentestServer/1.0"}
)
with urllib.request.urlopen(req, timeout=15) as response:
data = json.loads(response.read().decode())
vulns = data.get("vulnerabilities", [])
if not vulns:
return f"Nenhum CVE encontrado para: {keyword}"
resultados = []
for v in vulns:
cve = v.get("cve", {})
cve_id = cve.get("id", "N/A")
descs = cve.get("descriptions", [])
desc = next((d["value"] for d in descs if d["lang"] == "en"), "Sem descricao")
desc = desc[:200] + "..." if len(desc) > 200 else desc
metrics = cve.get("metrics", {})
score = "N/A"
severity = "N/A"
for version in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]:
if version in metrics:
m = metrics[version][0]
score = m.get("cvssData", {}).get("baseScore", "N/A")
# V3: baseSeverity dentro de cvssData
# V2: baseSeverity fora, no nivel raiz do metric
severity = (
m.get("cvssData", {}).get("baseSeverity")
or m.get("baseSeverity", "N/A")
)
break
except Exception as e:
return f"Erro ao consultar NVD: {str(e)}"
@mcp.tool()
def salvar_loot(titulo: str, conteudo: str) -> str:
"""
Salva um achado no arquivo de loot do pentest atual.
Use para registrar: credenciais, servicos vulneraveis, misconfigurations.
"""
import os
from datetime import datetime
@mcp.resource("file://escopo")
def get_escopo() -> str:
"""Escopo autorizado do pentest atual. Leia antes de executar qualquer tool."""
try:
with open("/root/pentest/escopo.txt", "r") as f:
return f.read()
except FileNotFoundError:
return "Nenhum escopo definido.\nCrie /root/pentest/escopo.txt com os alvos autorizados."
@mcp.resource("file://loot")
def get_loot() -> str:
"""Achados e loot registrados no pentest atual."""
try:
with open("/root/pentest/loot.md", "r") as f:
return f.read()
except FileNotFoundError:
return "Nenhum loot registrado ainda."
@mcp.prompt()
def recon_completo(alvo: str) -> str:
"""Inicia workflow de reconhecimento estruturado em um alvo."""
return f"""
Voce e um pentester conduzindo um teste devidamente autorizado.
ALVO: {alvo}
Siga este workflow na ordem exata:
1. Leia o resource 'file://escopo' e confirme que {alvo} esta autorizado
2. dns_lookup em {alvo}
3. whois_lookup em {alvo}
4. nmap_scan com '-sV -sC -T4 -p 21,22,23,25,53,80,443,445,3306,5432,8080,8443'
5. Para cada servico identificado, execute cve_search com 'nome versao'
6. Salve os achados criticos com salvar_loot
7. Entregue relatorio com: servicos encontrados, CVEs por severidade, proximos passos
"""
@mcp.prompt()
def analise_servico(servico: str, versao: str) -> str:
"""Analisa um servico especifico em busca de vulnerabilidades conhecidas."""
return f"""
Analise o servico abaixo em busca de vulnerabilidades conhecidas:
Servico: {servico}
Versao: {versao}
1. Execute cve_search com '{servico} {versao}'
2. Execute cve_search com '{servico}' para variacoes
3. Ordene por CVSS score (critico primeiro)
4. Para CVEs com score >= 7.0, descreva o impacto e se ha exploit publico
5. Sugira mitigacoes para cada vulnerabilidade critica encontrada
"""
if __name__ == "__main__":
mcp.run()
💡 Dica: O LLM lê o nome da função, o docstring e os tipos dos parâmetros para decidir quando e como usar cada primitiva. Nomes e docstrings descritivos são parte funcional do código.
🔍 Passo 4 — Subir o Inspector
1. Ativar o venv:
source ~/mcp-env/bin/activate
2. Subir o inspector com o server v2:
mcp dev /usr/local/scripts/pentest_server_v2.py
3. Acessar no browser: 🌐 http://localhost:5173
🔌 Passo 5 — Configurar Conexão no Inspector
Preencha os campos no painel lateral esquerdo e clique em Connect:
Transport Type:STDIO
Command:python3
Arguments:/usr/local/scripts/pentest_server_v2.py
✅ Status verde = conectado. As tabs Tools, Resources e Prompts aparecerão no topo da interface.
Clique em Tools → cve_search e preencha os campos:
keyword:apache 2.4.49
results_per_page:3
Resultado esperado (confirmado em laboratório):
[
{
"id": "CVE-2021-41773",
"score": 9.8,
"severity": "CRITICAL",
"description": "A flaw was found in a change made to path normalization in Apache HTTP Server 2.4.49...",
"url": "https://nvd.nist.gov/vuln/detail/CVE-2021-41773"
},
{
"id": "CVE-2021-42013",
"score": 9.8,
"severity": "CRITICAL",
"description": "It was found that the fix for CVE-2021-41773 in Apache HTTP Server 2.4.50 was insufficient...",
"url": "https://nvd.nist.gov/vuln/detail/CVE-2021-42013"
}
]