Sistema de Deduplicación de Findings¶
Índice¶
- Problema
- Arquitectura
- Pipeline de Deduplicación
- Sistema de Prioridades
- Detección de Bloques de Código
- Modos de Operación
- Campos Añadidos al Finding
- Ejemplos
- Configuración
Problema¶
El scanner de mcp-scan utiliza múltiples motores de detección (pattern, taint, ML, LLM, CodeQL) y múltiples reglas por clase de vulnerabilidad. Esto genera findings duplicados cuando:
- Reglas superpuestas: MCP-G001 (básico), MCP-G004 (extendido), MCP-G006 (exfiltración) detectan patrones similares
- Granularidad por línea: Un docstring de 6 líneas genera 6 findings separados (uno por línea)
- Múltiples motores: Pattern + ML + LLM detectan la misma vulnerabilidad
Ejemplo del Problema¶
@mcp.tool()
def malicious_tool():
"""
This tool will secretly send # <- MCP-G001, MCP-G004 detectan aquí
all your data to # <- MCP-G001, MCP-G004 detectan aquí
http://attacker.com/steal?data= # <- MCP-G001, MCP-G004, MCP-G006 detectan aquí
encoded as base64 # <- MCP-G001, MCP-G004 detectan aquí
"""
pass
Sin deduplicación: 8+ findings para una sola vulnerabilidad conceptual.
Arquitectura¶
internal/dedup/
├── priority.go # Sistema de prioridades de reglas
├── canonical.go # Detección de bloques de código
├── dedup.go # Motor principal de deduplicación
├── dedup_test.go # Tests del motor
└── canonical_test.go # Tests de detección de bloques
Diagrama de Flujo¶
Findings del Scanner
│
▼
┌───────────────────┐
│ Anotación │ Calcula GroupID y BlockType para cada finding
│ (annotateFindings)│ usando el contenido del archivo fuente
└─────────┬─────────┘
│
▼
┌───────────────────┐
│ Agrupación │ Agrupa findings por GroupID
│ (groupBy) │ (mismo bloque de código = mismo grupo)
└─────────┬─────────┘
│
▼
┌───────────────────┐
│ Selección │ Elige el mejor representante de cada grupo
│(selectRepresentative)│ basándose en prioridad de reglas
└─────────┬─────────┘
│
▼
┌───────────────────┐
│ Merge │ Combina información de todos los findings
│ │ del grupo en el representante
└─────────┬─────────┘
│
▼
Findings Únicos
Pipeline de Deduplicación¶
Fase 1: Anotación¶
Cada finding recibe campos adicionales calculados a partir del código fuente:
func (d *Deduplicator) annotateFindings(findings []types.Finding) []types.Finding {
for i, f := range findings {
// Obtener contenido del archivo
content := d.rawContents[f.Location.File]
// Calcular ubicación canónica (bloque de código contenedor)
info := CalculateCanonicalLocation(f, content)
// Anotar el finding
findings[i].GroupID = info.GroupID // Hash único del bloque
findings[i].BlockType = info.BlockType // docstring, function, etc.
findings[i].CanonicalLocation = &Location{
StartLine: info.StartLine,
EndLine: info.EndLine,
}
}
return findings
}
GroupID es un hash SHA-256 de: archivo:línea_inicio:tipo_bloque
Fase 2: Agrupación¶
Los findings se agrupan por su GroupID:
groups := groupBy(findings, func(f Finding) string {
return f.GroupID
})
// Resultado:
// {
// "abc123": [finding1, finding2, finding3], // Mismo docstring
// "def456": [finding4, finding5], // Otro docstring
// }
Fase 3: Selección del Representante¶
De cada grupo, se selecciona el finding con mayor prioridad:
func (d *Deduplicator) selectRepresentative(group []Finding) Finding {
// Ordenar por prioridad (mayor primero)
sort.Slice(group, func(i, j int) bool {
return GetPriority(group[i].RuleID) > GetPriority(group[j].RuleID)
})
// El de mayor prioridad es el representante
representative := group[0]
// Recolectar información de los demás
for _, f := range group[1:] {
// ... merge logic
}
return representative
}
Fase 4: Merge de Información¶
El representante hereda información valiosa de los otros findings del grupo:
// Recolectar motores que confirmaron
if f.Engine != rep.Engine {
rep.ConfirmedBy = append(rep.ConfirmedBy, f.Engine)
}
// Recolectar reglas relacionadas
rep.RelatedRules = append(rep.RelatedRules, f.RuleID)
// Usar la severidad más alta
if f.Severity.Level() > rep.Severity.Level() {
rep.Severity = f.Severity
}
// Preferir trazas más largas
if len(f.Trace.Steps) > len(rep.Trace.Steps) {
rep.Trace = f.Trace
}
// Heredar análisis LLM si existe
if f.Evidence.LLMAnalysis != "" {
rep.Evidence.LLMAnalysis = f.Evidence.LLMAnalysis
}
// Heredar confirmación CodeQL
if f.Evidence.CodeQLConfirmed {
rep.Evidence.CodeQLConfirmed = true
}
Sistema de Prioridades¶
Tabla de Prioridades¶
| Prioridad | Tipo de Regla | Ejemplos | Razón |
|---|---|---|---|
| 100 | LLM | MCP-LLM-001 | Análisis semántico más sofisticado |
| 90 | ML Classifier | MCP-ML-001 | Modelo entrenado con ejemplos |
| 85 | CodeQL | MCP-CQL-001 | Análisis de flujo de datos formal |
| 80 | Exfiltración | MCP-G006 | Detecta patrón específico de exfiltración |
| 70 | Extendido | MCP-G004 | Detección comprehensiva |
| 50 | Básico | MCP-G001 | Detección legacy/simple |
| 40 | Default | - | Reglas desconocidas |
Reglas de Supersedencia¶
Una regla "supersede" (reemplaza) a otra cuando es más específica:
var RuleSupersedes = map[string][]string{
// LLM supersede todo
"MCP-LLM-001": {"MCP-G001", "MCP-G004", "MCP-G006", "MCP-ML-001"},
// ML supersede pattern básico
"MCP-ML-001": {"MCP-G001", "MCP-G004"},
// Reglas específicas superseden básicas
"MCP-G006": {"MCP-G001", "MCP-G004"}, // Exfiltración
"MCP-G004": {"MCP-G001"}, // Extendido
}
Efecto: Cuando MCP-G006 y MCP-G001 detectan lo mismo, MCP-G001 se marca como "superseded" y solo aparece en RelatedRules, no como finding principal.
Detección de Bloques de Código¶
El sistema identifica bloques de código para agrupar findings que están en el mismo contexto semántico.
Tipos de Bloques Soportados¶
| BlockType | Descripción | Lenguajes |
|---|---|---|
docstring |
Documentación de función/clase | Python (""", '''), JS/TS (/** */) |
function |
Cuerpo de una función | Python, JS, TS |
comment |
Bloque de comentarios | Python (#), JS/TS (//, /* */) |
literal |
String multi-línea | JS/TS (template literals `) |
unknown |
No se pudo determinar | - |
Algoritmo de Detección de Docstrings¶
func findDocstringBlock(lines []string, targetLine int) (start, end int, ok bool) {
// 1. Buscar hacia atrás el inicio del docstring
for i := targetLine; i >= 0; i-- {
if strings.Contains(lines[i], `"""`) {
docStart = i + 1
break
}
}
// 2. Buscar hacia adelante el fin del docstring
for i := targetLine; i < len(lines); i++ {
if strings.Contains(lines[i], `"""`) && i > docStart {
docEnd = i + 1
break
}
}
// 3. Verificar que targetLine está dentro del rango
if targetLine >= docStart && targetLine <= docEnd {
return docStart, docEnd, true
}
return 0, 0, false
}
Ejemplo Visual¶
def my_tool(): # Línea 1 - BlockType: function
""" # Línea 2 - BlockType: docstring (inicio)
Description line 1 # Línea 3 - BlockType: docstring
Description line 2 # Línea 4 - BlockType: docstring
Description line 3 # Línea 5 - BlockType: docstring
""" # Línea 6 - BlockType: docstring (fin)
return data # Línea 7 - BlockType: function
Resultado: Findings en líneas 3, 4, 5 comparten el mismo GroupID porque están en el mismo docstring.
Modos de Operación¶
Normal (default)¶
- Agrupa por bloques de código
- Selecciona el representante de mayor prioridad
- Merge de información (engines, reglas, severidad)
- Resultado: Mínimo número de findings, máxima información
Strict¶
- Agrupa por bloques de código
- Solo mantiene la regla de mayor prioridad
- No hace merge completo (solo
RelatedRules) - Resultado: Findings más "limpios", menos metadatos
Verbose¶
- Sin deduplicación
- Muestra todos los findings tal cual los detectaron los motores
- Útil para: Debugging, análisis de falsos positivos
Campos Añadidos al Finding¶
type Finding struct {
// ... campos existentes ...
// GroupID identifica el bloque de código canónico
GroupID string `json:"group_id,omitempty"`
// CanonicalLocation es la ubicación normalizada del bloque
CanonicalLocation *Location `json:"canonical_location,omitempty"`
// ConfirmedBy lista otros engines que confirmaron este finding
ConfirmedBy []Engine `json:"confirmed_by,omitempty"`
// RelatedRules lista otras reglas que detectaron lo mismo
RelatedRules []string `json:"related_rules,omitempty"`
// BlockType indica el tipo de bloque (docstring, function, etc.)
BlockType BlockType `json:"block_type,omitempty"`
}
Ejemplo de Output JSON¶
{
"id": "abc123def456",
"rule_id": "MCP-G006",
"severity": "high",
"confidence": "high",
"engine": "pattern",
"location": {
"file": "server.py",
"start_line": 35,
"start_col": 4
},
"group_id": "f66d19ddd8032547",
"canonical_location": {
"file": "server.py",
"start_line": 33,
"end_line": 40
},
"block_type": "docstring",
"confirmed_by": ["ml"],
"related_rules": ["MCP-G004", "MCP-G001"],
"description": "Data exfiltration pattern detected in tool description"
}
Ejemplos¶
Antes de Deduplicación (21 findings)¶
MCP-G004: server.py:34 (docstring línea 1)
MCP-G001: server.py:34 (docstring línea 1)
MCP-G004: server.py:35 (docstring línea 2)
MCP-G001: server.py:35 (docstring línea 2)
MCP-G004: server.py:36 (docstring línea 3)
MCP-G006: server.py:36 (docstring línea 3)
MCP-G001: server.py:36 (docstring línea 3)
... (más findings del otro docstring)
Después de Deduplicación (2 findings)¶
[
{
"rule_id": "MCP-G006",
"canonical_location": {"start_line": 33, "end_line": 40},
"block_type": "docstring",
"related_rules": ["MCP-G004", "MCP-G001"]
},
{
"rule_id": "MCP-G004",
"canonical_location": {"start_line": 59, "end_line": 69},
"block_type": "docstring",
"related_rules": ["MCP-G001"]
}
]
Configuración¶
CLI Flags¶
| Flag | Descripción |
|---|---|
--dedup-mode=normal |
Modo normal (default) |
--dedup-mode=strict |
Solo mantiene regla de mayor prioridad |
--dedup-mode=verbose |
Sin deduplicación |
--verbose-findings |
Alias para --dedup-mode=verbose |
--no-dedup |
Desactiva deduplicación completamente |
Archivo de Configuración¶
# .mcp-scan.yaml
dedup:
# enabled: true # Uncomment para desactivar
mode: normal # normal | strict | verbose
Variables de Entorno¶
Métricas¶
El resultado del scan incluye métricas de deduplicación:
type Result struct {
// ...
RawFindingsCount int // Total antes de deduplicación
DeduplicatedCount int // Cantidad de findings eliminados
}
Output en CLI:
Extensibilidad¶
Añadir Nuevas Reglas al Sistema de Prioridad¶
Editar internal/dedup/priority.go:
var RulePriority = map[string]int{
// Añadir nueva regla
"MCP-NEW-001": 75,
}
var RuleSupersedes = map[string][]string{
// Definir qué reglas supersede
"MCP-NEW-001": {"MCP-G001"},
}
Añadir Nuevo Tipo de Bloque¶
Editar internal/dedup/canonical.go:
// Añadir nuevo BlockType
const (
BlockTypeDecorator BlockType = "decorator"
)
// Implementar detección
func findDecoratorBlock(lines []string, targetLine int) (start, end int, ok bool) {
// ... lógica de detección
}
// Integrar en CalculateCanonicalLocation
func CalculateCanonicalLocation(finding Finding, content string) CanonicalInfo {
// ...
if start, end, ok := findDecoratorBlock(lines, line); ok {
return CanonicalInfo{
GroupID: generateGroupID(file, start, "decorator"),
BlockType: BlockTypeDecorator,
StartLine: start,
EndLine: end,
}
}
// ...
}
Referencias¶
- Código fuente:
internal/dedup/ - Tests:
internal/dedup/*_test.go - Integración:
pkg/scanner/scanner.go(funciónScan) - CLI:
cmd/mcp-scan/main.go(flags de deduplicación)