Skip to content

Analisis de Taint (Flujo de Datos)

Documento tecnico detallado para analistas de seguridad


1. Introduccion

El motor de Taint Analysis (analisis de contaminacion) de mcp-scan rastrea como fluyen los datos desde puntos de entrada no confiables (sources) hasta operaciones peligrosas (sinks). Este documento explica en detalle el algoritmo, sus capacidades y limitaciones.


2. Conceptos Fundamentales

2.1 Que es Taint Analysis?

Taint Analysis es una tecnica de analisis estatico que marca datos provenientes de fuentes no confiables como "contaminados" (tainted) y rastrea su propagacion a traves del programa hasta que llegan a operaciones peligrosas.

SOURCE (entrada no confiable)
    |
    | propagacion
    v
VARIABLE contaminada
    |
    | propagacion
    v
SINK (operacion peligrosa)

    = VULNERABILIDAD DETECTADA

2.2 Terminologia

Termino Definicion
Source Punto de entrada de datos no confiables (ej: input de usuario)
Sink Operacion peligrosa donde datos no sanitizados causan vulnerabilidad
Taint Marca que indica que un dato proviene de un source
Propagacion Como el taint se transmite entre variables/expresiones
Sanitizer Funcion que limpia/valida datos, eliminando el taint
Trace Camino completo que sigue el dato desde source hasta sink

2.3 Ejemplo Conceptual

@server.tool()
def read_file(filename: str):  # <-- SOURCE: tool_input
    path = "/data/" + filename  # <-- propagacion por concatenacion
    content = open(path).read() # <-- SINK: filesystem
    return content

Traza:

SOURCE: tool_input (filename) @ line 2
   |
   +-- propagacion: concatenacion "/data/" + filename @ line 3
   |
SINK: filesystem (open) @ line 4

Vulnerabilidad: MCP-B001 (Path Traversal)


3. Arquitectura del Motor de Taint

3.1 Componentes Principales

+------------------+
|   Taint Engine   |
+------------------+
        |
        v
+------------------+     +------------------+
|    Catalog       |<--->|   TaintState     |
| - Sources        |     | - Variables      |
| - Sinks          |     | - Properties     |
| - Sanitizers     |     | - Parent scope   |
+------------------+     +------------------+
        |                        |
        v                        v
+------------------+     +------------------+
|  AST Traversal   |     |  Trace Builder   |
|  - Statements    |     |  - Steps         |
|  - Expressions   |     |  - Locations     |
+------------------+     +------------------+

3.2 Codigo del Engine

Ubicacion: internal/taint/engine.go

type Engine struct {
    catalog *catalog.Catalog  // Definiciones de sources/sinks
    mode    Mode              // fast o deep
    depth   int               // Profundidad inter-procedural
}

3.3 Modos de Analisis

Modo Descripcion Alcance
fast Intra-procedural Solo dentro de cada funcion
deep Inter-procedural A traves de llamadas a funciones

4. Catalogo de Sources

4.1 Fuentes de Datos No Confiables

El catalogo define todas las fuentes de datos que se consideran potencialmente maliciosos:

Ubicacion: internal/catalog/catalog.go

4.1.1 Tool Input (Clase Principal para MCP)

Los parametros de herramientas MCP son automaticamente considerados sources:

@server.tool()
def my_tool(user_input: str):  # user_input es SOURCE
    ...

Deteccion: 1. Buscar funciones con decoradores @tool, @server.tool, etc. 2. Marcar TODOS los parametros como SourceToolInput 3. Confianza: High

4.1.2 Variables de Entorno

ID Lenguaje Receiver Function Categoria
py-os-environ Python os environ SourceEnvVar
py-os-getenv Python os getenv SourceEnvVar
js-process-env JS/TS process env SourceEnvVar

Ejemplo:

api_key = os.environ["API_KEY"]  # api_key queda tainted

4.1.3 Requests HTTP

ID Lenguaje Receiver Function Categoria
py-request-args Python request args SourceHTTPRequest
py-request-form Python request form SourceHTTPRequest
py-request-json Python request json SourceHTTPRequest
js-req-body JS/TS req body SourceHTTPRequest
js-req-query JS/TS req query SourceHTTPRequest
js-req-params JS/TS req params SourceHTTPRequest

4.1.4 Contenido de Archivos

ID Lenguaje Function Categoria
py-file-read Python read SourceFileContent
js-fs-readfile JS readFileSync SourceFileContent

5. Catalogo de Sinks

5.1 Operaciones Peligrosas

Los sinks son operaciones donde datos no sanitizados pueden causar vulnerabilidades:

5.1.1 Ejecucion de Comandos (Clase A - RCE)

ID Lenguaje Receiver Function Severidad
py-os-system Python os system Critical
py-os-popen Python os popen Critical
py-subprocess-call Python subprocess call Critical
py-subprocess-run Python subprocess run Critical
py-subprocess-popen Python subprocess Popen Critical
js-child-process-exec JS child_process exec Critical
js-child-process-execsync JS child_process execSync Critical

Ejemplo de vulnerabilidad:

@server.tool()
def run_command(cmd: str):
    os.system(cmd)  # SOURCE -> SINK directo = RCE

5.1.2 Evaluacion de Codigo (Clase A - RCE)

ID Lenguaje Function Severidad
py-eval Python eval Critical
py-exec Python exec Critical
py-compile Python compile High
js-eval JS/TS eval Critical
js-function JS/TS Function (constructor) Critical

5.1.3 Operaciones de Filesystem (Clase B)

ID Lenguaje Function Severidad
py-open Python open High
py-pathlib-read Python read_text High
py-shutil-copy Python copy High
py-os-remove Python remove High
js-fs-readfile JS readFileSync High
js-fs-writefile JS writeFileSync High

5.1.4 Operaciones de Red (Clase C - SSRF)

ID Lenguaje Receiver Function Severidad
py-requests-get Python requests get High
py-requests-post Python requests post High
py-urllib-urlopen Python urllib.request urlopen High
js-fetch JS/TS - fetch High
js-axios-get JS axios get High

5.1.5 Operaciones de Base de Datos (Clase D - SQLi)

ID Lenguaje Receiver Function Severidad
py-cursor-execute Python cursor execute High
py-conn-execute Python connection execute High
js-query JS - query High
js-raw JS - raw High

5.1.6 Logging (Clase E - Exposicion de Secretos)

ID Lenguaje Function Severidad
py-print Python print Medium
py-logging-info Python info Medium
js-console-log JS log Medium

5.1.7 LLM Prompts (Clase H - Prompt Injection)

Ubicacion: internal/catalog/llm_sinks.go

ID Lenguaje Descripcion Severidad
py-openai-chat-create Python OpenAI ChatCompletion High
py-anthropic-messages-create Python Anthropic Messages High
py-langchain-llm-invoke Python LangChain invoke High
py-langchain-chain-run Python LangChain chain run High
py-ollama-chat Python Ollama chat High
js-openai-chat-completions JS OpenAI JS SDK High

6. Catalogo de Sanitizers

6.1 Funciones que Eliminan Taint

Los sanitizers son funciones que validan o limpian datos, rompiendo la cadena de taint:

6.1.1 Sanitizers de Path (Clase B)

ID Lenguaje Receiver Function Protege Contra
py-os-path-normpath Python os.path normpath Filesystem
py-os-path-abspath Python os.path abspath Filesystem
py-os-path-realpath Python os.path realpath Filesystem
py-pathlib-resolve Python Path resolve Filesystem
js-path-normalize JS path normalize Filesystem
js-path-resolve JS path resolve Filesystem

Ejemplo:

@server.tool()
def read_file(filename: str):
    safe_path = os.path.normpath("/data/" + filename)
    # safe_path ya NO esta tainted para filesystem sinks
    # pero el sanitizer NO protege contra otros sinks

IMPORTANTE: Un sanitizer solo protege contra su categoria de sink. normpath no protege contra exec.

6.1.2 Sanitizers de URL (Clase C)

ID Lenguaje Function Protege Contra
py-urllib-parse Python urlparse Network
js-url-parse JS URL constructor Network

6.1.3 Sanitizers de Shell (Clase A)

ID Lenguaje Receiver Function Protege Contra
py-shlex-quote Python shlex quote Exec
py-shlex-split Python shlex split Exec

Ejemplo:

@server.tool()
def run_cmd(arg: str):
    safe_arg = shlex.quote(arg)
    os.system(f"echo {safe_arg}")  # No se reporta como vulnerable

6.1.4 Sanitizers de LLM (Clase H)

ID Lenguaje Receiver Function Protege Contra
py-llm-guard-scan Python llm_guard scan_prompt LLM Prompt
py-rebuff-detect Python rebuff detect_injection LLM Prompt
py-html-escape Python html escape LLM Prompt (parcial)

7. Algoritmo de Taint Analysis

7.1 Estructura de TaintState

El estado de taint se mantiene por scope (funcion/bloque):

type TaintState struct {
    Variables  map[string]*TaintInfo       // variable -> taint
    Properties map[string]map[string]*TaintInfo // obj -> prop -> taint
    Parent     *TaintState                 // Scope padre (closures)
    Language   types.Language
}

7.2 Estructura de TaintInfo

Cada variable tainted tiene metadata:

type TaintInfo struct {
    Source     types.Location     // Donde se origino
    SourceType types.SourceCategory // Tipo de source
    Via        []types.TraceStep  // Pasos de propagacion
    Confidence types.Confidence   // Alta/Media/Baja
}

7.3 Algoritmo Principal

FUNCION AnalyzeFunction(fn, file, surface):
    state = NuevoTaintState()

    # Si es tool handler, marcar parametros como tainted
    SI esToolHandler(fn, surface):
        PARA CADA param EN fn.Parameters:
            state.SetTaint(param.name, TaintInfo{
                Source: fn.Location,
                SourceType: SourceToolInput,
                Confidence: High
            })

    # Analizar cada statement
    PARA CADA stmt EN fn.Body:
        findings += analyzeStatement(stmt, state, file, fn)

    RETORNAR findings

7.4 Analisis de Statements

7.4.1 Assignment

FUNCION analyzeAssignment(assign, state, file):
    # Obtener taint del valor
    taint = getExpressionTaint(assign.Value, state)

    # Verificar si es call a source
    SI assign.Value ES Call:
        source = catalog.IsSource(call.Receiver, call.Function)
        SI source != null:
            taint = NuevoTaintInfo(source)

    # Propagar taint al target
    SI taint != null:
        newTaint = taint.Clone()
        newTaint.Via.append(TraceStep{
            Location: assign.Location,
            Action: "assign",
            Variable: assign.Target
        })
        state.SetTaint(assign.Target, newTaint)

Ejemplo:

user_input = request.args.get("name")  # user_input recibe taint
path = "/data/" + user_input            # path recibe taint propagado

7.4.2 Call

FUNCION analyzeCall(call, state, file, fn):
    findings = []

    # Verificar si es sink
    sink = catalog.IsSink(call.Receiver, call.Function)
    SI sink == null:
        RETORNAR findings

    # Verificar si algun argumento esta tainted
    PARA CADA arg EN call.Arguments:
        taint = getExpressionTaint(arg, state)
        SI taint != null:
            # Verificar si hay sanitizer
            sanitizer = catalog.IsSanitizer(call.Receiver, call.Function, sink.Category)
            SI sanitizer != null:
                CONTINUAR  # Sanitizado, no reportar

            # Crear hallazgo
            finding = crearFinding(sink, taint, call.Location)
            findings.append(finding)

    RETORNAR findings

7.4.3 If Statement (Control Flow)

FUNCION analyzeIfStatement(ifStmt, state, file, fn):
    # Clonar estados para cada rama
    thenState = state.Clone()
    elseState = state.Clone()

    # Analizar rama then
    PARA CADA stmt EN ifStmt.ThenBody:
        findings += analyzeStatement(stmt, thenState, file, fn)

    # Analizar rama else
    PARA CADA stmt EN ifStmt.ElseBody:
        findings += analyzeStatement(stmt, elseState, file, fn)

    # Merge: union de taints de ambas ramas
    state.Merge(thenState)
    state.Merge(elseState)

    RETORNAR findings

Importante: El merge es conservador - si cualquier rama taint una variable, queda tainted.

7.4.4 For/While Loops

FUNCION analyzeForLoop(forStmt, state, file, fn):
    # Verificar si iteramos sobre datos tainted
    iterTaint = getExpressionTaint(forStmt.Iterable, state)
    SI iterTaint != null:
        # La variable de loop recibe taint
        state.SetTaint(forStmt.Variable, TaintInfo{
            Source: iterTaint.Source,
            Via: [...iterTaint.Via, TraceStep{Action: "iterate"}],
        })

    # Analizar body 2 veces para propagar taint a traves del loop
    loopState = state.Clone()
    PARA i = 0; i < 2; i++:
        PARA CADA stmt EN forStmt.Body:
            findings += analyzeStatement(stmt, loopState, file, fn)

    state.Merge(loopState)
    RETORNAR findings

7.4.5 Try/Except

FUNCION analyzeTryStatement(tryStmt, state, file, fn):
    # Analizar try body
    tryState = state.Clone()
    PARA CADA stmt EN tryStmt.TryBody:
        findings += analyzeStatement(stmt, tryState, file, fn)

    # La variable de excepcion puede contener datos de usuario
    catchState = state.Clone()
    SI tryStmt.CatchVar != "":
        catchState.SetTaint(tryStmt.CatchVar, TaintInfo{
            SourceType: SourceToolInput,  # Conservador
            Confidence: Low
        })

    PARA CADA stmt EN tryStmt.CatchBody:
        findings += analyzeStatement(stmt, catchState, file, fn)

    # Merge todos los estados
    state.Merge(tryState)
    state.Merge(catchState)
    RETORNAR findings

8. Propagacion de Taint

8.1 Reglas de Propagacion

8.1.1 Asignacion Simple

x = tainted_var  # x recibe taint

8.1.2 Concatenacion de Strings

result = "prefix" + tainted  # result recibe taint
result = f"Hello {tainted}"  # result recibe taint (f-string)
result = "{}".format(tainted) # result recibe taint
result = `Hello ${tainted}`  # result recibe taint (template literal)

8.1.3 Operaciones Binarias

result = tainted + clean  # result recibe taint
result = clean + tainted  # result recibe taint
result = tainted * 2      # result recibe taint

8.1.4 Acceso a Propiedades

obj.prop = tainted  # obj.prop recibe taint
x = tainted_obj.prop  # x recibe taint (objeto tainted = propiedades tainted)

8.1.5 Acceso a Indices

arr[0] = tainted  # arr queda tainted
x = tainted_arr[0]  # x recibe taint
x = tainted_dict["key"]  # x recibe taint

8.1.6 Retorno de Funciones Source

x = os.getenv("VAR")  # x recibe taint porque getenv es source

8.1.7 Await (Async)

result = await tainted_coro  # result recibe taint

8.2 Taint en Closures

Las closures capturan el taint del scope padre:

def outer():
    secret = os.getenv("SECRET")  # tainted

    def inner():
        return secret  # inner tiene acceso a taint de outer

    return inner

Implementacion:

func (ts *TaintState) NewChildState() *TaintState {
    child := NewTaintState()
    child.Parent = ts  // Enlace al scope padre
    return child
}

func (ts *TaintState) GetTaint(name string) *TaintInfo {
    if taint, ok := ts.Variables[name]; ok {
        return taint
    }
    if ts.Parent != nil {
        return ts.Parent.GetTaint(name)  // Buscar en padre
    }
    return nil
}


9. Taint de Propiedades (Property-Level)

9.1 Concepto

El taint puede ser a nivel de objeto completo o de propiedad especifica:

user = request.json()  # user completo tainted
name = user["name"]    # name recibe taint de user

config = {}
config["secret"] = os.getenv("KEY")  # solo config.secret tainted
safe = config["other"]  # safe NO esta tainted

9.2 Implementacion

type TaintState struct {
    Variables  map[string]*TaintInfo  // Taint a nivel variable
    Properties map[string]map[string]*TaintInfo  // obj -> prop -> taint
}

func (ts *TaintState) GetPropertyTaint(obj, prop string) *TaintInfo {
    if props, ok := ts.Properties[obj]; ok {
        if taint, ok := props[prop]; ok {
            return taint
        }
    }
    // Fallback: si el objeto esta tainted, la propiedad tambien
    if objTaint := ts.GetTaint(obj); objTaint != nil {
        return objTaint
    }
    return nil
}

10. Analisis Inter-Procedural (Modo Deep)

10.1 Concepto

En modo deep, el analisis sigue el flujo de datos a traves de llamadas a funciones:

def process(data):
    os.system(data)  # SINK

@server.tool()
def handler(input):
    process(input)  # Llamada a funcion

Sin inter-procedural: No se detectaria porque os.system esta en otra funcion. Con inter-procedural: Se detecta el flujo completo.

10.2 Summaries de Funciones

El modo deep utiliza "summaries" que describen como una funcion propaga taint:

type FunctionSummary struct {
    Name       string
    Parameters []ParamSummary
    ReturnsTaint bool
    TaintsFrom []int  // Indices de params que taintan el retorno
}

type ParamSummary struct {
    Index     int
    FlowsTo   []string  // Sinks alcanzados
}

10.3 Construccion de Call Graph

  1. Parsear todos los archivos
  2. Construir grafo de llamadas
  3. Para cada funcion, determinar:
  4. Cuales parametros llegan a sinks
  5. Si el retorno esta tainted

10.4 Limitaciones del Modo Deep

  • Mas lento que modo fast
  • Puede tener falsos positivos por imprecision
  • No soporta dispatch dinamico completo
  • Profundidad limitada (configurable, default: 3)

11. Generacion de Trazas

11.1 Estructura de Traza

Cada hallazgo incluye una traza que documenta el camino:

type TaintTrace struct {
    Source types.Location  // Donde se origino
    Sink   types.Location  // Donde termino
    Steps  []TraceStep     // Pasos intermedios
}

type TraceStep struct {
    Location types.Location
    Action   string  // "assign", "concat", "call", etc.
    Variable string  // Variable afectada
}

11.2 Ejemplo de Traza Completa

Codigo:

@server.tool()
def fetch_data(url: str):
    full_url = "https://api.com/" + url
    response = requests.get(full_url)
    return response.json()

Traza generada:

{
  "source": {
    "file": "server.py",
    "line": 2,
    "column": 16
  },
  "sink": {
    "file": "server.py",
    "line": 4,
    "column": 16
  },
  "steps": [
    {
      "location": {"file": "server.py", "line": 2},
      "action": "tool_input",
      "variable": "url"
    },
    {
      "location": {"file": "server.py", "line": 3},
      "action": "binary_op:+",
      "variable": ""
    },
    {
      "location": {"file": "server.py", "line": 3},
      "action": "assign",
      "variable": "full_url"
    },
    {
      "location": {"file": "server.py", "line": 4},
      "action": "call_arg:requests.get",
      "variable": ""
    }
  ]
}


12. Ajustes de Confianza

12.1 Factores que Afectan Confianza

Factor Efecto Razon
Longitud de traza -Confianza Mas pasos = mas oportunidades de error
Taint cross-file -Confianza Analisis menos preciso entre archivos
Source es tool_input +Confianza Directamente controlado por atacante
Sanitizer parcial presente -Confianza Puede estar sanitizado

12.2 Implementacion

func (e *Engine) adjustConfidence(taint *TaintInfo) types.Confidence {
    confidence := taint.Confidence

    // Degradar por longitud de traza
    if len(taint.Via) > 5 {
        confidence = degradeConfidence(confidence)
    }

    // Degradar si hay salto entre archivos
    if hasCrossFileJump(taint.Via) {
        confidence = degradeConfidence(confidence)
    }

    return confidence
}

13. Casos Especiales

13.1 Callbacks y Lambdas

@server.tool()
def process(items: list):
    items.forEach(lambda x: os.system(x))

Analisis: 1. items es tainted (tool_input) 2. forEach pasa elementos a la lambda 3. Parametro x de la lambda recibe taint de items 4. x llega a os.system = vulnerabilidad

13.2 Return de Datos Sensibles

@server.tool()
def get_secret():
    return os.environ["SECRET"]

Analisis: - os.environ es source (SourceEnvVar) - Return a respuesta es sink (SinkResponse) - Detectado como exposicion de secreto (Clase E)

13.3 Datos en Excepciones

@server.tool()
def risky(data):
    try:
        process(data)
    except Exception as e:
        os.system(f"echo Error: {e}")  # e puede contener data

Analisis: - e se marca como potencialmente tainted (Confidence: Low) - Se reporta pero con baja confianza


14. Limitaciones del Taint Analysis

14.1 No Detecta

  1. Taint implicito: Control flow que filtra datos

    if secret == "password":
        print("correct")  # Filtra informacion sin taint explicito
    

  2. Sanitizacion custom: Funciones de validacion propias

    def my_sanitizer(x):
        return x.replace("..", "")
    

  3. Dispatch dinamico completo: Metodos virtuales

    handler = get_handler()  # Tipo desconocido
    handler.process(data)    # No se sabe que metodo es
    

  4. Reflection: Acceso dinamico a atributos

    getattr(obj, user_input)
    

14.2 Puede Generar Falsos Positivos

  1. Sanitizacion no reconocida: El codigo sanitiza pero no esta en catalogo
  2. Dead code: Paths que nunca se ejecutan
  3. Validacion previa: Checks antes de uso peligroso
  4. Constantes runtime: Valores que parecen variables pero son fijos

14.3 Puede Generar Falsos Negativos

  1. Source no reconocido: Input que no esta en catalogo
  2. Sink no reconocido: Funcion peligrosa custom
  3. Propagacion compleja: Estructuras de datos avanzadas
  4. Alias desconocidos: Imports con nombres custom

15. Recomendaciones para Analistas

15.1 Interpretar Hallazgos

  1. Verificar la traza: Asegurar que el flujo es realista
  2. Buscar sanitizacion: Puede haber validacion no detectada
  3. Evaluar contexto: Un tool handler es mas critico
  4. Considerar confianza: Low confidence requiere revision manual

15.2 Falsos Positivos Comunes

  1. Path traversal con whitelist: El codigo verifica contra lista pero no usa os.path.*
  2. SQL con ORM: El ORM parametriza pero el pattern parece concatenacion
  3. Logging de errores: Se loguea excepcion que no contiene secretos

15.3 Para Reducir Ruido

  1. Agregar sanitizers custom al catalogo
  2. Usar baseline para hallazgos aceptados
  3. Configurar allowlist de hosts/paths
  4. Ajustar severidad de reglas especificas

Siguiente documento: motor-patrones.md