Analisis de Taint (Flujo de Datos)¶
Documento tecnico detallado para analistas de seguridad
1. Introduccion¶
El motor de Taint Analysis (analisis de contaminacion) de mcp-scan rastrea como fluyen los datos desde puntos de entrada no confiables (sources) hasta operaciones peligrosas (sinks). Este documento explica en detalle el algoritmo, sus capacidades y limitaciones.
2. Conceptos Fundamentales¶
2.1 Que es Taint Analysis?¶
Taint Analysis es una tecnica de analisis estatico que marca datos provenientes de fuentes no confiables como "contaminados" (tainted) y rastrea su propagacion a traves del programa hasta que llegan a operaciones peligrosas.
SOURCE (entrada no confiable)
|
| propagacion
v
VARIABLE contaminada
|
| propagacion
v
SINK (operacion peligrosa)
= VULNERABILIDAD DETECTADA
2.2 Terminologia¶
| Termino | Definicion |
|---|---|
| Source | Punto de entrada de datos no confiables (ej: input de usuario) |
| Sink | Operacion peligrosa donde datos no sanitizados causan vulnerabilidad |
| Taint | Marca que indica que un dato proviene de un source |
| Propagacion | Como el taint se transmite entre variables/expresiones |
| Sanitizer | Funcion que limpia/valida datos, eliminando el taint |
| Trace | Camino completo que sigue el dato desde source hasta sink |
2.3 Ejemplo Conceptual¶
@server.tool()
def read_file(filename: str): # <-- SOURCE: tool_input
path = "/data/" + filename # <-- propagacion por concatenacion
content = open(path).read() # <-- SINK: filesystem
return content
Traza:
SOURCE: tool_input (filename) @ line 2
|
+-- propagacion: concatenacion "/data/" + filename @ line 3
|
SINK: filesystem (open) @ line 4
Vulnerabilidad: MCP-B001 (Path Traversal)
3. Arquitectura del Motor de Taint¶
3.1 Componentes Principales¶
+------------------+
| Taint Engine |
+------------------+
|
v
+------------------+ +------------------+
| Catalog |<--->| TaintState |
| - Sources | | - Variables |
| - Sinks | | - Properties |
| - Sanitizers | | - Parent scope |
+------------------+ +------------------+
| |
v v
+------------------+ +------------------+
| AST Traversal | | Trace Builder |
| - Statements | | - Steps |
| - Expressions | | - Locations |
+------------------+ +------------------+
3.2 Codigo del Engine¶
Ubicacion: internal/taint/engine.go
type Engine struct {
catalog *catalog.Catalog // Definiciones de sources/sinks
mode Mode // fast o deep
depth int // Profundidad inter-procedural
}
3.3 Modos de Analisis¶
| Modo | Descripcion | Alcance |
|---|---|---|
fast |
Intra-procedural | Solo dentro de cada funcion |
deep |
Inter-procedural | A traves de llamadas a funciones |
4. Catalogo de Sources¶
4.1 Fuentes de Datos No Confiables¶
El catalogo define todas las fuentes de datos que se consideran potencialmente maliciosos:
Ubicacion: internal/catalog/catalog.go
4.1.1 Tool Input (Clase Principal para MCP)¶
Los parametros de herramientas MCP son automaticamente considerados sources:
Deteccion:
1. Buscar funciones con decoradores @tool, @server.tool, etc.
2. Marcar TODOS los parametros como SourceToolInput
3. Confianza: High
4.1.2 Variables de Entorno¶
| ID | Lenguaje | Receiver | Function | Categoria |
|---|---|---|---|---|
py-os-environ |
Python | os |
environ |
SourceEnvVar |
py-os-getenv |
Python | os |
getenv |
SourceEnvVar |
js-process-env |
JS/TS | process |
env |
SourceEnvVar |
Ejemplo:
4.1.3 Requests HTTP¶
| ID | Lenguaje | Receiver | Function | Categoria |
|---|---|---|---|---|
py-request-args |
Python | request |
args |
SourceHTTPRequest |
py-request-form |
Python | request |
form |
SourceHTTPRequest |
py-request-json |
Python | request |
json |
SourceHTTPRequest |
js-req-body |
JS/TS | req |
body |
SourceHTTPRequest |
js-req-query |
JS/TS | req |
query |
SourceHTTPRequest |
js-req-params |
JS/TS | req |
params |
SourceHTTPRequest |
4.1.4 Contenido de Archivos¶
| ID | Lenguaje | Function | Categoria |
|---|---|---|---|
py-file-read |
Python | read |
SourceFileContent |
js-fs-readfile |
JS | readFileSync |
SourceFileContent |
5. Catalogo de Sinks¶
5.1 Operaciones Peligrosas¶
Los sinks son operaciones donde datos no sanitizados pueden causar vulnerabilidades:
5.1.1 Ejecucion de Comandos (Clase A - RCE)¶
| ID | Lenguaje | Receiver | Function | Severidad |
|---|---|---|---|---|
py-os-system |
Python | os |
system |
Critical |
py-os-popen |
Python | os |
popen |
Critical |
py-subprocess-call |
Python | subprocess |
call |
Critical |
py-subprocess-run |
Python | subprocess |
run |
Critical |
py-subprocess-popen |
Python | subprocess |
Popen |
Critical |
js-child-process-exec |
JS | child_process |
exec |
Critical |
js-child-process-execsync |
JS | child_process |
execSync |
Critical |
Ejemplo de vulnerabilidad:
5.1.2 Evaluacion de Codigo (Clase A - RCE)¶
| ID | Lenguaje | Function | Severidad |
|---|---|---|---|
py-eval |
Python | eval |
Critical |
py-exec |
Python | exec |
Critical |
py-compile |
Python | compile |
High |
js-eval |
JS/TS | eval |
Critical |
js-function |
JS/TS | Function (constructor) |
Critical |
5.1.3 Operaciones de Filesystem (Clase B)¶
| ID | Lenguaje | Function | Severidad |
|---|---|---|---|
py-open |
Python | open |
High |
py-pathlib-read |
Python | read_text |
High |
py-shutil-copy |
Python | copy |
High |
py-os-remove |
Python | remove |
High |
js-fs-readfile |
JS | readFileSync |
High |
js-fs-writefile |
JS | writeFileSync |
High |
5.1.4 Operaciones de Red (Clase C - SSRF)¶
| ID | Lenguaje | Receiver | Function | Severidad |
|---|---|---|---|---|
py-requests-get |
Python | requests |
get |
High |
py-requests-post |
Python | requests |
post |
High |
py-urllib-urlopen |
Python | urllib.request |
urlopen |
High |
js-fetch |
JS/TS | - | fetch |
High |
js-axios-get |
JS | axios |
get |
High |
5.1.5 Operaciones de Base de Datos (Clase D - SQLi)¶
| ID | Lenguaje | Receiver | Function | Severidad |
|---|---|---|---|---|
py-cursor-execute |
Python | cursor |
execute |
High |
py-conn-execute |
Python | connection |
execute |
High |
js-query |
JS | - | query |
High |
js-raw |
JS | - | raw |
High |
5.1.6 Logging (Clase E - Exposicion de Secretos)¶
| ID | Lenguaje | Function | Severidad |
|---|---|---|---|
py-print |
Python | print |
Medium |
py-logging-info |
Python | info |
Medium |
js-console-log |
JS | log |
Medium |
5.1.7 LLM Prompts (Clase H - Prompt Injection)¶
Ubicacion: internal/catalog/llm_sinks.go
| ID | Lenguaje | Descripcion | Severidad |
|---|---|---|---|
py-openai-chat-create |
Python | OpenAI ChatCompletion | High |
py-anthropic-messages-create |
Python | Anthropic Messages | High |
py-langchain-llm-invoke |
Python | LangChain invoke | High |
py-langchain-chain-run |
Python | LangChain chain run | High |
py-ollama-chat |
Python | Ollama chat | High |
js-openai-chat-completions |
JS | OpenAI JS SDK | High |
6. Catalogo de Sanitizers¶
6.1 Funciones que Eliminan Taint¶
Los sanitizers son funciones que validan o limpian datos, rompiendo la cadena de taint:
6.1.1 Sanitizers de Path (Clase B)¶
| ID | Lenguaje | Receiver | Function | Protege Contra |
|---|---|---|---|---|
py-os-path-normpath |
Python | os.path |
normpath |
Filesystem |
py-os-path-abspath |
Python | os.path |
abspath |
Filesystem |
py-os-path-realpath |
Python | os.path |
realpath |
Filesystem |
py-pathlib-resolve |
Python | Path |
resolve |
Filesystem |
js-path-normalize |
JS | path |
normalize |
Filesystem |
js-path-resolve |
JS | path |
resolve |
Filesystem |
Ejemplo:
@server.tool()
def read_file(filename: str):
safe_path = os.path.normpath("/data/" + filename)
# safe_path ya NO esta tainted para filesystem sinks
# pero el sanitizer NO protege contra otros sinks
IMPORTANTE: Un sanitizer solo protege contra su categoria de sink. normpath no protege contra exec.
6.1.2 Sanitizers de URL (Clase C)¶
| ID | Lenguaje | Function | Protege Contra |
|---|---|---|---|
py-urllib-parse |
Python | urlparse |
Network |
js-url-parse |
JS | URL constructor |
Network |
6.1.3 Sanitizers de Shell (Clase A)¶
| ID | Lenguaje | Receiver | Function | Protege Contra |
|---|---|---|---|---|
py-shlex-quote |
Python | shlex |
quote |
Exec |
py-shlex-split |
Python | shlex |
split |
Exec |
Ejemplo:
@server.tool()
def run_cmd(arg: str):
safe_arg = shlex.quote(arg)
os.system(f"echo {safe_arg}") # No se reporta como vulnerable
6.1.4 Sanitizers de LLM (Clase H)¶
| ID | Lenguaje | Receiver | Function | Protege Contra |
|---|---|---|---|---|
py-llm-guard-scan |
Python | llm_guard |
scan_prompt |
LLM Prompt |
py-rebuff-detect |
Python | rebuff |
detect_injection |
LLM Prompt |
py-html-escape |
Python | html |
escape |
LLM Prompt (parcial) |
7. Algoritmo de Taint Analysis¶
7.1 Estructura de TaintState¶
El estado de taint se mantiene por scope (funcion/bloque):
type TaintState struct {
Variables map[string]*TaintInfo // variable -> taint
Properties map[string]map[string]*TaintInfo // obj -> prop -> taint
Parent *TaintState // Scope padre (closures)
Language types.Language
}
7.2 Estructura de TaintInfo¶
Cada variable tainted tiene metadata:
type TaintInfo struct {
Source types.Location // Donde se origino
SourceType types.SourceCategory // Tipo de source
Via []types.TraceStep // Pasos de propagacion
Confidence types.Confidence // Alta/Media/Baja
}
7.3 Algoritmo Principal¶
FUNCION AnalyzeFunction(fn, file, surface):
state = NuevoTaintState()
# Si es tool handler, marcar parametros como tainted
SI esToolHandler(fn, surface):
PARA CADA param EN fn.Parameters:
state.SetTaint(param.name, TaintInfo{
Source: fn.Location,
SourceType: SourceToolInput,
Confidence: High
})
# Analizar cada statement
PARA CADA stmt EN fn.Body:
findings += analyzeStatement(stmt, state, file, fn)
RETORNAR findings
7.4 Analisis de Statements¶
7.4.1 Assignment¶
FUNCION analyzeAssignment(assign, state, file):
# Obtener taint del valor
taint = getExpressionTaint(assign.Value, state)
# Verificar si es call a source
SI assign.Value ES Call:
source = catalog.IsSource(call.Receiver, call.Function)
SI source != null:
taint = NuevoTaintInfo(source)
# Propagar taint al target
SI taint != null:
newTaint = taint.Clone()
newTaint.Via.append(TraceStep{
Location: assign.Location,
Action: "assign",
Variable: assign.Target
})
state.SetTaint(assign.Target, newTaint)
Ejemplo:
user_input = request.args.get("name") # user_input recibe taint
path = "/data/" + user_input # path recibe taint propagado
7.4.2 Call¶
FUNCION analyzeCall(call, state, file, fn):
findings = []
# Verificar si es sink
sink = catalog.IsSink(call.Receiver, call.Function)
SI sink == null:
RETORNAR findings
# Verificar si algun argumento esta tainted
PARA CADA arg EN call.Arguments:
taint = getExpressionTaint(arg, state)
SI taint != null:
# Verificar si hay sanitizer
sanitizer = catalog.IsSanitizer(call.Receiver, call.Function, sink.Category)
SI sanitizer != null:
CONTINUAR # Sanitizado, no reportar
# Crear hallazgo
finding = crearFinding(sink, taint, call.Location)
findings.append(finding)
RETORNAR findings
7.4.3 If Statement (Control Flow)¶
FUNCION analyzeIfStatement(ifStmt, state, file, fn):
# Clonar estados para cada rama
thenState = state.Clone()
elseState = state.Clone()
# Analizar rama then
PARA CADA stmt EN ifStmt.ThenBody:
findings += analyzeStatement(stmt, thenState, file, fn)
# Analizar rama else
PARA CADA stmt EN ifStmt.ElseBody:
findings += analyzeStatement(stmt, elseState, file, fn)
# Merge: union de taints de ambas ramas
state.Merge(thenState)
state.Merge(elseState)
RETORNAR findings
Importante: El merge es conservador - si cualquier rama taint una variable, queda tainted.
7.4.4 For/While Loops¶
FUNCION analyzeForLoop(forStmt, state, file, fn):
# Verificar si iteramos sobre datos tainted
iterTaint = getExpressionTaint(forStmt.Iterable, state)
SI iterTaint != null:
# La variable de loop recibe taint
state.SetTaint(forStmt.Variable, TaintInfo{
Source: iterTaint.Source,
Via: [...iterTaint.Via, TraceStep{Action: "iterate"}],
})
# Analizar body 2 veces para propagar taint a traves del loop
loopState = state.Clone()
PARA i = 0; i < 2; i++:
PARA CADA stmt EN forStmt.Body:
findings += analyzeStatement(stmt, loopState, file, fn)
state.Merge(loopState)
RETORNAR findings
7.4.5 Try/Except¶
FUNCION analyzeTryStatement(tryStmt, state, file, fn):
# Analizar try body
tryState = state.Clone()
PARA CADA stmt EN tryStmt.TryBody:
findings += analyzeStatement(stmt, tryState, file, fn)
# La variable de excepcion puede contener datos de usuario
catchState = state.Clone()
SI tryStmt.CatchVar != "":
catchState.SetTaint(tryStmt.CatchVar, TaintInfo{
SourceType: SourceToolInput, # Conservador
Confidence: Low
})
PARA CADA stmt EN tryStmt.CatchBody:
findings += analyzeStatement(stmt, catchState, file, fn)
# Merge todos los estados
state.Merge(tryState)
state.Merge(catchState)
RETORNAR findings
8. Propagacion de Taint¶
8.1 Reglas de Propagacion¶
8.1.1 Asignacion Simple¶
8.1.2 Concatenacion de Strings¶
result = "prefix" + tainted # result recibe taint
result = f"Hello {tainted}" # result recibe taint (f-string)
result = "{}".format(tainted) # result recibe taint
result = `Hello ${tainted}` # result recibe taint (template literal)
8.1.3 Operaciones Binarias¶
result = tainted + clean # result recibe taint
result = clean + tainted # result recibe taint
result = tainted * 2 # result recibe taint
8.1.4 Acceso a Propiedades¶
obj.prop = tainted # obj.prop recibe taint
x = tainted_obj.prop # x recibe taint (objeto tainted = propiedades tainted)
8.1.5 Acceso a Indices¶
arr[0] = tainted # arr queda tainted
x = tainted_arr[0] # x recibe taint
x = tainted_dict["key"] # x recibe taint
8.1.6 Retorno de Funciones Source¶
8.1.7 Await (Async)¶
8.2 Taint en Closures¶
Las closures capturan el taint del scope padre:
def outer():
secret = os.getenv("SECRET") # tainted
def inner():
return secret # inner tiene acceso a taint de outer
return inner
Implementacion:
func (ts *TaintState) NewChildState() *TaintState {
child := NewTaintState()
child.Parent = ts // Enlace al scope padre
return child
}
func (ts *TaintState) GetTaint(name string) *TaintInfo {
if taint, ok := ts.Variables[name]; ok {
return taint
}
if ts.Parent != nil {
return ts.Parent.GetTaint(name) // Buscar en padre
}
return nil
}
9. Taint de Propiedades (Property-Level)¶
9.1 Concepto¶
El taint puede ser a nivel de objeto completo o de propiedad especifica:
user = request.json() # user completo tainted
name = user["name"] # name recibe taint de user
config = {}
config["secret"] = os.getenv("KEY") # solo config.secret tainted
safe = config["other"] # safe NO esta tainted
9.2 Implementacion¶
type TaintState struct {
Variables map[string]*TaintInfo // Taint a nivel variable
Properties map[string]map[string]*TaintInfo // obj -> prop -> taint
}
func (ts *TaintState) GetPropertyTaint(obj, prop string) *TaintInfo {
if props, ok := ts.Properties[obj]; ok {
if taint, ok := props[prop]; ok {
return taint
}
}
// Fallback: si el objeto esta tainted, la propiedad tambien
if objTaint := ts.GetTaint(obj); objTaint != nil {
return objTaint
}
return nil
}
10. Analisis Inter-Procedural (Modo Deep)¶
10.1 Concepto¶
En modo deep, el analisis sigue el flujo de datos a traves de llamadas a funciones:
def process(data):
os.system(data) # SINK
@server.tool()
def handler(input):
process(input) # Llamada a funcion
Sin inter-procedural: No se detectaria porque os.system esta en otra funcion.
Con inter-procedural: Se detecta el flujo completo.
10.2 Summaries de Funciones¶
El modo deep utiliza "summaries" que describen como una funcion propaga taint:
type FunctionSummary struct {
Name string
Parameters []ParamSummary
ReturnsTaint bool
TaintsFrom []int // Indices de params que taintan el retorno
}
type ParamSummary struct {
Index int
FlowsTo []string // Sinks alcanzados
}
10.3 Construccion de Call Graph¶
- Parsear todos los archivos
- Construir grafo de llamadas
- Para cada funcion, determinar:
- Cuales parametros llegan a sinks
- Si el retorno esta tainted
10.4 Limitaciones del Modo Deep¶
- Mas lento que modo fast
- Puede tener falsos positivos por imprecision
- No soporta dispatch dinamico completo
- Profundidad limitada (configurable, default: 3)
11. Generacion de Trazas¶
11.1 Estructura de Traza¶
Cada hallazgo incluye una traza que documenta el camino:
type TaintTrace struct {
Source types.Location // Donde se origino
Sink types.Location // Donde termino
Steps []TraceStep // Pasos intermedios
}
type TraceStep struct {
Location types.Location
Action string // "assign", "concat", "call", etc.
Variable string // Variable afectada
}
11.2 Ejemplo de Traza Completa¶
Codigo:
@server.tool()
def fetch_data(url: str):
full_url = "https://api.com/" + url
response = requests.get(full_url)
return response.json()
Traza generada:
{
"source": {
"file": "server.py",
"line": 2,
"column": 16
},
"sink": {
"file": "server.py",
"line": 4,
"column": 16
},
"steps": [
{
"location": {"file": "server.py", "line": 2},
"action": "tool_input",
"variable": "url"
},
{
"location": {"file": "server.py", "line": 3},
"action": "binary_op:+",
"variable": ""
},
{
"location": {"file": "server.py", "line": 3},
"action": "assign",
"variable": "full_url"
},
{
"location": {"file": "server.py", "line": 4},
"action": "call_arg:requests.get",
"variable": ""
}
]
}
12. Ajustes de Confianza¶
12.1 Factores que Afectan Confianza¶
| Factor | Efecto | Razon |
|---|---|---|
| Longitud de traza | -Confianza | Mas pasos = mas oportunidades de error |
| Taint cross-file | -Confianza | Analisis menos preciso entre archivos |
| Source es tool_input | +Confianza | Directamente controlado por atacante |
| Sanitizer parcial presente | -Confianza | Puede estar sanitizado |
12.2 Implementacion¶
func (e *Engine) adjustConfidence(taint *TaintInfo) types.Confidence {
confidence := taint.Confidence
// Degradar por longitud de traza
if len(taint.Via) > 5 {
confidence = degradeConfidence(confidence)
}
// Degradar si hay salto entre archivos
if hasCrossFileJump(taint.Via) {
confidence = degradeConfidence(confidence)
}
return confidence
}
13. Casos Especiales¶
13.1 Callbacks y Lambdas¶
Analisis:
1. items es tainted (tool_input)
2. forEach pasa elementos a la lambda
3. Parametro x de la lambda recibe taint de items
4. x llega a os.system = vulnerabilidad
13.2 Return de Datos Sensibles¶
Analisis:
- os.environ es source (SourceEnvVar)
- Return a respuesta es sink (SinkResponse)
- Detectado como exposicion de secreto (Clase E)
13.3 Datos en Excepciones¶
@server.tool()
def risky(data):
try:
process(data)
except Exception as e:
os.system(f"echo Error: {e}") # e puede contener data
Analisis:
- e se marca como potencialmente tainted (Confidence: Low)
- Se reporta pero con baja confianza
14. Limitaciones del Taint Analysis¶
14.1 No Detecta¶
-
Taint implicito: Control flow que filtra datos
-
Sanitizacion custom: Funciones de validacion propias
-
Dispatch dinamico completo: Metodos virtuales
-
Reflection: Acceso dinamico a atributos
14.2 Puede Generar Falsos Positivos¶
- Sanitizacion no reconocida: El codigo sanitiza pero no esta en catalogo
- Dead code: Paths que nunca se ejecutan
- Validacion previa: Checks antes de uso peligroso
- Constantes runtime: Valores que parecen variables pero son fijos
14.3 Puede Generar Falsos Negativos¶
- Source no reconocido: Input que no esta en catalogo
- Sink no reconocido: Funcion peligrosa custom
- Propagacion compleja: Estructuras de datos avanzadas
- Alias desconocidos: Imports con nombres custom
15. Recomendaciones para Analistas¶
15.1 Interpretar Hallazgos¶
- Verificar la traza: Asegurar que el flujo es realista
- Buscar sanitizacion: Puede haber validacion no detectada
- Evaluar contexto: Un tool handler es mas critico
- Considerar confianza: Low confidence requiere revision manual
15.2 Falsos Positivos Comunes¶
- Path traversal con whitelist: El codigo verifica contra lista pero no usa
os.path.* - SQL con ORM: El ORM parametriza pero el pattern parece concatenacion
- Logging de errores: Se loguea excepcion que no contiene secretos
15.3 Para Reducir Ruido¶
- Agregar sanitizers custom al catalogo
- Usar baseline para hallazgos aceptados
- Configurar allowlist de hosts/paths
- Ajustar severidad de reglas especificas
Siguiente documento: motor-patrones.md