Taint Analysis System¶
Overview¶
The taint analysis system (internal/taint/) performs data flow analysis to track how untrusted data (sources) flows through the program to potentially dangerous operations (sinks). This is the core mechanism for detecting vulnerabilities like RCE, SQL injection, path traversal, and SSRF.
Architecture¶
┌─────────────────────────────────────────────────────────────────┐
│ Taint Analysis Engine │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌────────┐ │
│ │ Sources │───▶│ Taint │───▶│ Sinks │───▶│Findings│ │
│ │(Untrusted)│ │Propagation│ │(Dangerous)│ │ │ │
│ └───────────┘ └───────────┘ └───────────┘ └────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ Catalog │ │TaintState │ │Sanitizers │ │
│ │(Defs) │ │(Tracking) │ │(Break Flow)│ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Analysis Modes¶
Fast Mode (Intra-procedural)¶
Analyzes each function in isolation: - Quick execution - Suitable for CI/CD pipelines - May miss cross-function flows
Deep Mode (Inter-procedural)¶
Follows data flow across function calls: - Builds function summaries - Uses call graph for complete analysis - Suitable for certification
Core Components¶
TaintState¶
Tracks tainted variables in a scope:
type TaintState struct {
Variables map[string]*TaintInfo // Variable name → taint info
Properties map[string]map[string]*TaintInfo // Object → property → taint
Parent *TaintState // For closure capture
Language types.Language // Source file language
}
Operations:
- GetTaint(name) - Get taint for variable (checks parent scopes)
- SetTaint(name, info) - Set taint for variable
- GetPropertyTaint(obj, prop) - Get taint for object property
- SetPropertyTaint(obj, prop, info) - Set taint for property
- Clone() - Deep copy for control flow analysis
- Merge(other) - Merge states at join points
- NewChildState() - Create child for closures
TaintInfo¶
Contains information about a tainted value:
type TaintInfo struct {
Source types.Location // Where taint originated
SourceType types.SourceCategory // Type of source
Via []types.TraceStep // Propagation path
Confidence types.Confidence // Detection confidence
}
TraceStep¶
Records each step in the taint propagation:
type TraceStep struct {
Location types.Location
Action string // "assign", "property_access", "binary_op:+", etc.
Variable string // Affected variable
}
Sources (Untrusted Data Origins)¶
Source Categories¶
| Category | Description | Risk |
|---|---|---|
SourceToolInput |
MCP tool parameters | High - user controlled |
SourceEnvVar |
Environment variables | Medium - may be external |
SourceHTTPRequest |
HTTP request data | High - user controlled |
SourceFileContent |
File contents | Medium - external data |
SourceDBResult |
Database query results | Low - usually controlled |
Built-in Sources¶
Python Sources¶
| ID | Pattern | Category |
|---|---|---|
py-os-environ |
os.environ |
EnvVar |
py-os-getenv |
os.getenv() |
EnvVar |
py-request-args |
request.args |
HTTPRequest |
py-request-form |
request.form |
HTTPRequest |
py-request-json |
request.json |
HTTPRequest |
py-file-read |
file.read() |
FileContent |
JavaScript/TypeScript Sources¶
| ID | Pattern | Category |
|---|---|---|
js-process-env |
process.env |
EnvVar |
js-req-body |
req.body |
HTTPRequest |
js-req-query |
req.query |
HTTPRequest |
js-req-params |
req.params |
HTTPRequest |
js-fs-readfile |
fs.readFileSync() |
FileContent |
MCP Tool Input Sources¶
Parameters of functions decorated with @tool are automatically marked as tainted:
Sinks (Dangerous Operations)¶
Sink Categories¶
| Category | Vulnerability | Severity |
|---|---|---|
SinkExec |
Command Injection (RCE) | Critical |
SinkEval |
Code Evaluation | Critical |
SinkFilesystem |
Path Traversal | High |
SinkNetwork |
SSRF | High |
SinkDatabase |
SQL Injection | High |
SinkLogging |
Information Disclosure | Medium |
SinkResponse |
Data Leakage | Medium |
SinkLLMPrompt |
Prompt Injection | High |
Built-in Sinks¶
Exec Sinks (Class A)¶
| ID | Pattern | Language |
|---|---|---|
py-os-system |
os.system() |
Python |
py-os-popen |
os.popen() |
Python |
py-subprocess-call |
subprocess.call() |
Python |
py-subprocess-run |
subprocess.run() |
Python |
py-subprocess-popen |
subprocess.Popen() |
Python |
js-child-process-exec |
child_process.exec() |
JavaScript |
js-child-process-execsync |
child_process.execSync() |
JavaScript |
js-child-process-spawn |
child_process.spawn() |
JavaScript |
Eval Sinks (Class A)¶
| ID | Pattern | Language |
|---|---|---|
py-eval |
eval() |
Python |
py-exec |
exec() |
Python |
py-compile |
compile() |
Python |
js-eval |
eval() |
JavaScript |
js-function |
new Function() |
JavaScript |
Filesystem Sinks (Class B)¶
| ID | Pattern | Language |
|---|---|---|
py-open |
open() |
Python |
py-pathlib-read |
Path.read_text() |
Python |
py-shutil-copy |
shutil.copy() |
Python |
py-os-remove |
os.remove() |
Python |
js-fs-readfile |
fs.readFileSync() |
JavaScript |
js-fs-writefile |
fs.writeFileSync() |
JavaScript |
Network Sinks (Class C)¶
| ID | Pattern | Language |
|---|---|---|
py-requests-get |
requests.get() |
Python |
py-requests-post |
requests.post() |
Python |
py-urllib-urlopen |
urllib.request.urlopen() |
Python |
py-httpx-get |
httpx.get() |
Python |
js-fetch |
fetch() |
JavaScript |
js-axios-get |
axios.get() |
JavaScript |
Database Sinks (Class D)¶
| ID | Pattern | Language |
|---|---|---|
py-cursor-execute |
cursor.execute() |
Python |
py-conn-execute |
connection.execute() |
Python |
js-query |
.query() |
JavaScript |
js-raw |
.raw() |
JavaScript |
Logging Sinks (Class E)¶
| ID | Pattern | Language |
|---|---|---|
py-print |
print() |
Python |
py-logging-info |
logging.info() |
Python |
py-logger-info |
logger.info() |
Python |
js-console-log |
console.log() |
JavaScript |
js-console-error |
console.error() |
JavaScript |
LLM Sinks (Class H)¶
| ID | Pattern | Language |
|---|---|---|
py-openai-chat |
openai.ChatCompletion.create() |
Python |
py-anthropic-messages |
anthropic.messages.create() |
Python |
py-langchain-invoke |
langchain.llms.invoke() |
Python |
js-openai-chat |
openai.chat.completions.create() |
JavaScript |
Sanitizers (Taint Breakers)¶
Sanitizers break the taint chain for specific sink categories:
Path Sanitizers (Class B)¶
| ID | Pattern | Sanitizes |
|---|---|---|
py-os-path-normpath |
os.path.normpath() |
Filesystem |
py-os-path-abspath |
os.path.abspath() |
Filesystem |
py-os-path-realpath |
os.path.realpath() |
Filesystem |
py-pathlib-resolve |
Path.resolve() |
Filesystem |
js-path-normalize |
path.normalize() |
Filesystem |
js-path-resolve |
path.resolve() |
Filesystem |
URL Sanitizers (Class C)¶
| ID | Pattern | Sanitizes |
|---|---|---|
py-urllib-parse |
urllib.parse.urlparse() |
Network |
js-url-parse |
new URL() |
Network |
Shell Sanitizers (Class A)¶
| ID | Pattern | Sanitizes |
|---|---|---|
py-shlex-quote |
shlex.quote() |
Exec |
py-shlex-split |
shlex.split() |
Exec |
Taint Propagation¶
Assignment¶
Taint propagates through assignment:
String Operations¶
Taint propagates through concatenation and formatting:
user_input = request.args.get("q") # tainted
cmd = "grep " + user_input # tainted (concat)
cmd2 = f"grep {user_input}" # tainted (f-string)
Property Access¶
Taint propagates through object properties:
obj = tainted_object # obj is tainted
value = obj.property # value is tainted
value2 = obj["key"] # value2 is tainted
Control Flow¶
Taint is merged at join points:
if condition:
x = tainted_value # x tainted in then-branch
else:
x = safe_value # x not tainted in else-branch
# x is tainted (conservative merge)
Loops¶
Loops are analyzed twice to propagate taint through iterations:
Closures/Lambdas¶
Closures capture taint from parent scope:
Callback Arguments¶
Iterator methods propagate taint to callbacks:
Analysis Flow¶
Per-Function Analysis¶
- Initialize state: Create TaintState for function
- Check tool handler: If
@tooldecorated, mark parameters as tainted - Analyze body: Process each statement in order
- Report findings: Generate findings for source→sink flows
Statement Analysis¶
func analyzeStatement(stmt, state, file, fn) []Finding {
switch s := stmt.(type) {
case *ast.Assignment:
// Propagate taint from value to target
case *ast.ExpressionStatement:
// Check for sinks in calls
case *ast.Return:
// Check for sensitive data in return
case *ast.IfStatement:
// Clone state, analyze branches, merge
case *ast.ForStatement:
// Propagate taint through iteration
case *ast.TryStatement:
// Analyze try/catch/finally with separate states
}
}
Expression Taint Extraction¶
func getExpressionTaint(expr, state) *TaintInfo {
switch ex := expr.(type) {
case *ast.Identifier:
return state.GetTaint(ex.Name)
case *ast.MemberAccess:
// Check property taint or object taint
case *ast.Call:
// Check if returns tainted data (source)
case *ast.BinaryOp:
// Tainted if either operand is tainted
case *ast.FormattedString:
// Tainted if any interpolation is tainted
}
}
Call Analysis¶
func analyzeCall(call, state, file, fn) []Finding {
// 1. Get receiver and function name
// 2. Analyze callback arguments (lambdas)
// 3. Check if call is a sink
// 4. For each argument, check if tainted
// 5. Check if sanitizer applies
// 6. Generate finding if tainted and not sanitized
}
Finding Generation¶
When tainted data reaches a sink, a finding is generated:
type Finding struct {
RuleID string // e.g., "MCP-A001"
Class types.VulnClass // e.g., ClassA (RCE)
Severity types.Severity // Critical, High, Medium, Low
Confidence types.Confidence // High, Medium, Low
Location types.Location // Sink location
Trace *TaintTrace // Full path from source to sink
SinkID string // Catalog sink ID
Description string // Human-readable description
Remediation string // How to fix
}
Rule Mapping¶
| Sink Category | Rule ID | Class | Description |
|---|---|---|---|
| SinkExec | MCP-A001 | A | Tool input flows to command execution |
| SinkEval | MCP-A002 | A | Tool input flows to code evaluation |
| SinkFilesystem | MCP-B001 | B | Tool input flows to filesystem operation |
| SinkNetwork | MCP-C001 | C | Tool input flows to network request |
| SinkDatabase | MCP-D001 | D | Tool input flows to database query |
| SinkLogging | MCP-E003 | E | Potentially sensitive data logged |
| SinkResponse | MCP-E004 | E | Potentially sensitive data in response |
Configuration¶
# mcp-scan.yaml
taint:
mode: fast # fast or deep
depth: 3 # Max inter-procedural depth
track_properties: true
track_closures: true
# Source categories to track
sources:
- tool_input
- http_request
- env_var
- file_content
# Sink categories to detect
sinks:
- exec
- eval
- filesystem
- network
- database
API Usage¶
Basic Analysis¶
// Create catalog and engine
cat := catalog.New()
cfg := taint.DefaultConfig()
engine := taint.New(cat, cfg)
// Analyze files
files := []*ast.File{...}
surface := surface.Extract(files)
findings := engine.Analyze(files, surface)
Per-File Analysis¶
// Thread-safe per-file analysis
for _, file := range files {
findings := engine.AnalyzeFile(file, surface)
}
Extending the Catalog¶
Adding a Source¶
catalog.Sources = append(catalog.Sources, catalog.SourceDef{
ID: "custom-source",
Language: types.Python,
Receiver: "mylib",
Function: "get_input",
Category: types.SourceToolInput,
Description: "Custom input source",
})
Adding a Sink¶
catalog.Sinks = append(catalog.Sinks, catalog.SinkDef{
ID: "custom-sink",
Language: types.Python,
Receiver: "mylib",
Function: "dangerous_op",
Category: types.SinkExec,
Severity: types.SeverityCritical,
ArgIndex: 0,
Description: "Custom dangerous operation",
})
Adding a Sanitizer¶
catalog.Sanitizers = append(catalog.Sanitizers, catalog.SanitizerDef{
ID: "custom-sanitizer",
Language: types.Python,
Receiver: "mylib",
Function: "sanitize",
Sanitizes: []types.SinkCategory{types.SinkExec, types.SinkFilesystem},
Description: "Custom sanitizer",
})
Example: Complete Flow¶
# Source: Tool input
@server.tool()
def process_file(filename: str): # filename is TAINTED
# Propagation: Assignment
path = "/data/" + filename # path is TAINTED (concat)
# Sanitizer check (if used)
# safe_path = os.path.normpath(path) # Would break taint
# Sink: Filesystem operation
with open(path) as f: # FINDING: MCP-B001
return f.read()
Generated Trace:
Source: process_file:filename (line 2, SourceToolInput)
→ assign to path (line 4)
→ binary_op:+ (line 4)
Sink: open(path) (line 8, SinkFilesystem)
Related Documentation¶
- Pattern Engine - Pattern-based detection
- ML Classifier - ML-based detection
- Import Resolver - Cross-file analysis
- Call Graph - Inter-procedural analysis
- Vulnerability Classes - Class definitions