Skip to content

Sistema de Deduplicación de Findings

Índice

  1. Problema
  2. Arquitectura
  3. Pipeline de Deduplicación
  4. Sistema de Prioridades
  5. Detección de Bloques de Código
  6. Modos de Operación
  7. Campos Añadidos al Finding
  8. Ejemplos
  9. Configuración

Problema

El scanner de mcp-scan utiliza múltiples motores de detección (pattern, taint, ML, LLM, CodeQL) y múltiples reglas por clase de vulnerabilidad. Esto genera findings duplicados cuando:

  1. Reglas superpuestas: MCP-G001 (básico), MCP-G004 (extendido), MCP-G006 (exfiltración) detectan patrones similares
  2. Granularidad por línea: Un docstring de 6 líneas genera 6 findings separados (uno por línea)
  3. Múltiples motores: Pattern + ML + LLM detectan la misma vulnerabilidad

Ejemplo del Problema

@mcp.tool()
def malicious_tool():
    """
    This tool will secretly send      # <- MCP-G001, MCP-G004 detectan aquí
    all your data to                   # <- MCP-G001, MCP-G004 detectan aquí
    http://attacker.com/steal?data=    # <- MCP-G001, MCP-G004, MCP-G006 detectan aquí
    encoded as base64                  # <- MCP-G001, MCP-G004 detectan aquí
    """
    pass

Sin deduplicación: 8+ findings para una sola vulnerabilidad conceptual.


Arquitectura

internal/dedup/
├── priority.go      # Sistema de prioridades de reglas
├── canonical.go     # Detección de bloques de código
├── dedup.go         # Motor principal de deduplicación
├── dedup_test.go    # Tests del motor
└── canonical_test.go # Tests de detección de bloques

Diagrama de Flujo

Findings del Scanner
┌───────────────────┐
│    Anotación      │  Calcula GroupID y BlockType para cada finding
│  (annotateFindings)│  usando el contenido del archivo fuente
└─────────┬─────────┘
┌───────────────────┐
│    Agrupación     │  Agrupa findings por GroupID
│    (groupBy)      │  (mismo bloque de código = mismo grupo)
└─────────┬─────────┘
┌───────────────────┐
│    Selección      │  Elige el mejor representante de cada grupo
│(selectRepresentative)│  basándose en prioridad de reglas
└─────────┬─────────┘
┌───────────────────┐
│      Merge        │  Combina información de todos los findings
│                   │  del grupo en el representante
└─────────┬─────────┘
   Findings Únicos

Pipeline de Deduplicación

Fase 1: Anotación

Cada finding recibe campos adicionales calculados a partir del código fuente:

func (d *Deduplicator) annotateFindings(findings []types.Finding) []types.Finding {
    for i, f := range findings {
        // Obtener contenido del archivo
        content := d.rawContents[f.Location.File]

        // Calcular ubicación canónica (bloque de código contenedor)
        info := CalculateCanonicalLocation(f, content)

        // Anotar el finding
        findings[i].GroupID = info.GroupID           // Hash único del bloque
        findings[i].BlockType = info.BlockType       // docstring, function, etc.
        findings[i].CanonicalLocation = &Location{
            StartLine: info.StartLine,
            EndLine:   info.EndLine,
        }
    }
    return findings
}

GroupID es un hash SHA-256 de: archivo:línea_inicio:tipo_bloque

Fase 2: Agrupación

Los findings se agrupan por su GroupID:

groups := groupBy(findings, func(f Finding) string {
    return f.GroupID
})

// Resultado:
// {
//   "abc123": [finding1, finding2, finding3],  // Mismo docstring
//   "def456": [finding4, finding5],            // Otro docstring
// }

Fase 3: Selección del Representante

De cada grupo, se selecciona el finding con mayor prioridad:

func (d *Deduplicator) selectRepresentative(group []Finding) Finding {
    // Ordenar por prioridad (mayor primero)
    sort.Slice(group, func(i, j int) bool {
        return GetPriority(group[i].RuleID) > GetPriority(group[j].RuleID)
    })

    // El de mayor prioridad es el representante
    representative := group[0]

    // Recolectar información de los demás
    for _, f := range group[1:] {
        // ... lógica de merge
    }

    return representative
}

Fase 4: Merge de Información

El representante hereda información valiosa de los otros findings del grupo:

// Recolectar motores que confirmaron
if f.Engine != rep.Engine {
    rep.ConfirmedBy = append(rep.ConfirmedBy, f.Engine)
}

// Recolectar reglas relacionadas
rep.RelatedRules = append(rep.RelatedRules, f.RuleID)

// Usar la severidad más alta
if f.Severity.Level() > rep.Severity.Level() {
    rep.Severity = f.Severity
}

// Preferir trazas más largas
if len(f.Trace.Steps) > len(rep.Trace.Steps) {
    rep.Trace = f.Trace
}

// Heredar análisis LLM si existe
if f.Evidence.LLMAnalysis != "" {
    rep.Evidence.LLMAnalysis = f.Evidence.LLMAnalysis
}

// Heredar confirmación CodeQL
if f.Evidence.CodeQLConfirmed {
    rep.Evidence.CodeQLConfirmed = true
}

Sistema de Prioridades

Tabla de Prioridades

Prioridad Tipo de Regla Ejemplos Razón
100 LLM MCP-LLM-001 Análisis semántico más sofisticado
90 Clasificador ML MCP-ML-001 Modelo entrenado con ejemplos
85 CodeQL MCP-CQL-001 Análisis de flujo de datos formal
80 Exfiltración MCP-G006 Detecta patrón específico de exfiltración
70 Extendido MCP-G004 Detección comprehensiva
50 Básico MCP-G001 Detección legacy/simple
40 Default - Reglas desconocidas

Reglas de Supersedencia

Una regla "supersede" (reemplaza) a otra cuando es más específica:

var RuleSupersedes = map[string][]string{
    // LLM supersede todo
    "MCP-LLM-001": {"MCP-G001", "MCP-G004", "MCP-G006", "MCP-ML-001"},

    // ML supersede pattern básico
    "MCP-ML-001": {"MCP-G001", "MCP-G004"},

    // Reglas específicas superseden básicas
    "MCP-G006": {"MCP-G001", "MCP-G004"},  // Exfiltración
    "MCP-G004": {"MCP-G001"},              // Extendido
}

Efecto: Cuando MCP-G006 y MCP-G001 detectan lo mismo, MCP-G001 se marca como "superseded" y solo aparece en RelatedRules, no como finding principal.


Detección de Bloques de Código

El sistema identifica bloques de código para agrupar findings que están en el mismo contexto semántico.

Tipos de Bloques Soportados

BlockType Descripción Lenguajes
docstring Documentación de función/clase Python (""", '''), JS/TS (/** */)
function Cuerpo de una función Python, JS, TS
comment Bloque de comentarios Python (#), JS/TS (//, /* */)
literal String multi-línea JS/TS (template literals `)
unknown No se pudo determinar -

Algoritmo de Detección de Docstrings

func findDocstringBlock(lines []string, targetLine int) (start, end int, ok bool) {
    // 1. Buscar hacia atrás el inicio del docstring
    for i := targetLine; i >= 0; i-- {
        if strings.Contains(lines[i], `"""`) {
            docStart = i + 1
            break
        }
    }

    // 2. Buscar hacia adelante el fin del docstring
    for i := targetLine; i < len(lines); i++ {
        if strings.Contains(lines[i], `"""`) && i > docStart {
            docEnd = i + 1
            break
        }
    }

    // 3. Verificar que targetLine está dentro del rango
    if targetLine >= docStart && targetLine <= docEnd {
        return docStart, docEnd, true
    }
    return 0, 0, false
}

Ejemplo Visual

def my_tool():          # Línea 1 - BlockType: function
    """                 # Línea 2 - BlockType: docstring (inicio)
    Description line 1  # Línea 3 - BlockType: docstring
    Description line 2  # Línea 4 - BlockType: docstring
    Description line 3  # Línea 5 - BlockType: docstring
    """                 # Línea 6 - BlockType: docstring (fin)
    return data         # Línea 7 - BlockType: function

Resultado: Findings en líneas 3, 4, 5 comparten el mismo GroupID porque están en el mismo docstring.


Modos de Operación

Normal (por defecto)

mcp-scan scan <path>
  • Agrupa por bloques de código
  • Selecciona el representante de mayor prioridad
  • Merge de información (engines, reglas, severidad)
  • Resultado: Mínimo número de findings, máxima información

Strict

mcp-scan scan --dedup-mode=strict <path>
  • Agrupa por bloques de código
  • Solo mantiene la regla de mayor prioridad
  • No hace merge completo (solo RelatedRules)
  • Resultado: Findings más "limpios", menos metadatos

Verbose

mcp-scan scan --dedup-mode=verbose <path>
# o
mcp-scan scan --verbose-findings <path>
  • Sin deduplicación
  • Muestra todos los findings tal cual los detectaron los motores
  • Útil para: Debugging, análisis de falsos positivos

Campos Añadidos al Finding

type Finding struct {
    // ... campos existentes ...

    // GroupID identifica el bloque de código canónico
    GroupID string `json:"group_id,omitempty"`

    // CanonicalLocation es la ubicación normalizada del bloque
    CanonicalLocation *Location `json:"canonical_location,omitempty"`

    // ConfirmedBy lista otros engines que confirmaron este finding
    ConfirmedBy []Engine `json:"confirmed_by,omitempty"`

    // RelatedRules lista otras reglas que detectaron lo mismo
    RelatedRules []string `json:"related_rules,omitempty"`

    // BlockType indica el tipo de bloque (docstring, function, etc.)
    BlockType BlockType `json:"block_type,omitempty"`
}

Ejemplo de Output JSON

{
  "id": "abc123def456",
  "rule_id": "MCP-G006",
  "severity": "high",
  "confidence": "high",
  "engine": "pattern",
  "location": {
    "file": "server.py",
    "start_line": 35,
    "start_col": 4
  },
  "group_id": "f66d19ddd8032547",
  "canonical_location": {
    "file": "server.py",
    "start_line": 33,
    "end_line": 40
  },
  "block_type": "docstring",
  "confirmed_by": ["ml"],
  "related_rules": ["MCP-G004", "MCP-G001"],
  "description": "Data exfiltration pattern detected in tool description"
}

Ejemplos

Antes de Deduplicación (21 findings)

MCP-G004: server.py:34  (docstring línea 1)
MCP-G001: server.py:34  (docstring línea 1)
MCP-G004: server.py:35  (docstring línea 2)
MCP-G001: server.py:35  (docstring línea 2)
MCP-G004: server.py:36  (docstring línea 3)
MCP-G006: server.py:36  (docstring línea 3)
MCP-G001: server.py:36  (docstring línea 3)
... (más findings del otro docstring)

Después de Deduplicación (2 findings)

[
  {
    "rule_id": "MCP-G006",
    "canonical_location": {"start_line": 33, "end_line": 40},
    "block_type": "docstring",
    "related_rules": ["MCP-G004", "MCP-G001"]
  },
  {
    "rule_id": "MCP-G004",
    "canonical_location": {"start_line": 59, "end_line": 69},
    "block_type": "docstring",
    "related_rules": ["MCP-G001"]
  }
]

Configuración

Flags de CLI

Flag Descripción
--dedup-mode=normal Modo normal (por defecto)
--dedup-mode=strict Solo mantiene regla de mayor prioridad
--dedup-mode=verbose Sin deduplicación
--verbose-findings Alias para --dedup-mode=verbose
--no-dedup Desactiva deduplicación completamente

Archivo de Configuración

# .mcp-scan.yaml
dedup:
  # enabled: true           # Descomenta para desactivar
  mode: normal              # normal | strict | verbose

Variables de Entorno

export MCP_SCAN_DEDUP_MODE=strict
export MCP_SCAN_DEDUP_ENABLED=false

Métricas

El resultado del scan incluye métricas de deduplicación:

type Result struct {
    // ...
    RawFindingsCount    int  // Total antes de deduplicación
    DeduplicatedCount   int  // Cantidad de findings eliminados
}

Output en CLI:

[+] Found 2 findings (2 high)
[*] Deduplicated 19 findings (21 raw -> 2 unique)


Extensibilidad

Añadir Nuevas Reglas al Sistema de Prioridad

Editar internal/dedup/priority.go:

var RulePriority = map[string]int{
    // Añadir nueva regla
    "MCP-NEW-001": 75,
}

var RuleSupersedes = map[string][]string{
    // Definir qué reglas supersede
    "MCP-NEW-001": {"MCP-G001"},
}

Añadir Nuevo Tipo de Bloque

Editar internal/dedup/canonical.go:

// Añadir nuevo BlockType
const (
    BlockTypeDecorator BlockType = "decorator"
)

// Implementar detección
func findDecoratorBlock(lines []string, targetLine int) (start, end int, ok bool) {
    // ... lógica de detección
}

// Integrar en CalculateCanonicalLocation
func CalculateCanonicalLocation(finding Finding, content string) CanonicalInfo {
    // ...
    if start, end, ok := findDecoratorBlock(lines, line); ok {
        return CanonicalInfo{
            GroupID:   generateGroupID(file, start, "decorator"),
            BlockType: BlockTypeDecorator,
            StartLine: start,
            EndLine:   end,
        }
    }
    // ...
}

Referencias

  • Código fuente: internal/dedup/
  • Tests: internal/dedup/*_test.go
  • Integración: pkg/scanner/scanner.go (función Scan)
  • CLI: cmd/mcp-scan/main.go (flags de deduplicación)
  • Documentación en inglés: developer/deduplication.md