Taint Analysis¶
MCP-Scan uses taint analysis to track data flow from untrusted sources to dangerous sinks.
Concept Overview¶
Taint analysis answers the question: "Can attacker-controlled data reach a dangerous operation?"
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Source │ ──► │ Propagation │ ──► │ Sink │
│ (untrusted) │ │ (data flow)│ │ (dangerous) │
└─────────────┘ └─────────────┘ └─────────────┘
If tainted data reaches a sink without passing through a sanitizer, a vulnerability is reported.
Sources (Taint Origins)¶
Sources are entry points for untrusted data:
MCP Tool Input¶
| Source ID | Language | Pattern | Category |
|---|---|---|---|
py-tool-param |
Python | Tool function parameters | tool_input |
ts-tool-param |
TypeScript | Tool function parameters | tool_input |
Environment Variables¶
| Source ID | Language | Pattern | Category |
|---|---|---|---|
py-os-environ |
Python | os.environ[...] |
env_var |
py-os-getenv |
Python | os.getenv(...) |
env_var |
js-process-env |
JavaScript | process.env[...] |
env_var |
HTTP Request Data¶
| Source ID | Language | Pattern | Category |
|---|---|---|---|
py-flask-request |
Python | request.json, request.args |
http_request |
py-django-request |
Python | request.POST, request.GET |
http_request |
ts-express-req |
TypeScript | req.body, req.query |
http_request |
File Content¶
| Source ID | Language | Pattern | Category |
|---|---|---|---|
py-open-read |
Python | open(...).read() |
file_content |
py-pathlib-read |
Python | Path(...).read_text() |
file_content |
js-fs-read |
JavaScript | fs.readFileSync(...) |
file_content |
Database Results¶
| Source ID | Language | Pattern | Category |
|---|---|---|---|
py-db-fetch |
Python | cursor.fetchone(), fetchall() |
db_result |
py-sqlalchemy-execute |
Python | .execute(...).fetchall() |
db_result |
Sinks (Dangerous Operations)¶
Sinks are operations that are dangerous when receiving tainted data:
Command Execution (RCE)¶
os.system(cmd) # Dangerous sink
subprocess.run(cmd, shell=True) # Dangerous sink
exec(code) # Dangerous sink
| Sink ID | Language | Pattern | Category | Severity |
|---|---|---|---|---|
py-os-system |
Python | os.system(...) |
exec | Critical |
py-subprocess-shell |
Python | subprocess.*(shell=True) |
exec | Critical |
py-eval |
Python | eval(...) |
eval | Critical |
py-exec |
Python | exec(...) |
eval | Critical |
js-child-exec |
JavaScript | child_process.exec(...) |
exec | Critical |
js-eval |
JavaScript | eval(...) |
eval | Critical |
Filesystem Operations¶
| Sink ID | Language | Pattern | Category | Severity |
|---|---|---|---|---|
py-open-write |
Python | open(path, "w") |
filesystem | High |
py-os-remove |
Python | os.remove(path) |
filesystem | High |
js-fs-write |
JavaScript | fs.writeFileSync(...) |
filesystem | High |
Network Operations (SSRF)¶
| Sink ID | Language | Pattern | Category | Severity |
|---|---|---|---|---|
py-requests |
Python | requests.get/post(url) |
network | High |
py-urllib |
Python | urllib.request.urlopen(url) |
network | High |
js-fetch |
JavaScript | fetch(url) |
network | High |
Database Operations (SQLi)¶
| Sink ID | Language | Pattern | Category | Severity |
|---|---|---|---|---|
py-cursor-execute |
Python | cursor.execute(query) |
database | Critical |
py-sqlalchemy-text |
Python | text(raw_sql) |
database | Critical |
Taint Propagation¶
Taint flows through code via:
1. Direct Assignment¶
user_input = request.args.get("q") # Source: user_input is tainted
cmd = user_input # Propagation: cmd becomes tainted
2. String Concatenation¶
cmd = "echo " + user_input # Taint propagates through concat
cmd = f"echo {user_input}" # Taint propagates through f-string
3. String Operations¶
cmd = user_input.upper() # Still tainted
cmd = user_input.strip() # Still tainted
cmd = user_input[:10] # Still tainted (subset)
4. Collection Operations¶
items = [user_input] # List is tainted
items.append(user_input) # Existing list becomes tainted
data = {"key": user_input} # Dict is tainted
5. Function Calls¶
6. Function Returns (Deep Mode)¶
Sanitizers¶
Sanitizers are functions that neutralize taint for specific sink categories:
Shell Sanitizers¶
import shlex
safe_cmd = shlex.quote(user_input) # Sanitized for shell
os.system(f"echo {safe_cmd}") # Safe
| Sanitizer | Language | Sanitizes |
|---|---|---|
shlex.quote |
Python | exec sinks |
pipes.quote |
Python | exec sinks |
Path Sanitizers¶
import os.path
safe_path = os.path.normpath(user_path)
# Must also verify within allowed directory
if safe_path.startswith("/allowed/"):
open(safe_path) # Conditionally safe
| Sanitizer | Language | Sanitizes |
|---|---|---|
os.path.normpath |
Python | filesystem (partial) |
os.path.realpath |
Python | filesystem (partial) |
path.resolve |
JavaScript | filesystem (partial) |
URL Sanitizers¶
| Sanitizer | Language | Sanitizes |
|---|---|---|
urllib.parse.quote |
Python | network |
urllib.parse.urlencode |
Python | network |
encodeURIComponent |
JavaScript | network |
SQL Sanitizers¶
Parameterized queries are the primary sanitizer:
# Safe: parameterized
cursor.execute("SELECT * FROM users WHERE id = ?", [user_id])
# Safe: ORM methods
User.query.filter_by(id=user_id).first()
Analysis Modes¶
Fast Mode (Intra-Procedural)¶
Tracks taint within single functions only:
def handler(user_input): # user_input is tainted
cmd = f"echo {user_input}" # cmd is tainted
os.system(cmd) # Sink reached by tainted data → FINDING
Limitations: - Cannot track across function boundaries - May miss vulnerabilities in called functions - Cannot analyze return value propagation
Deep Mode (Inter-Procedural)¶
Tracks taint across function calls:
def build_command(data):
return f"echo {data}" # Returns tainted
def handler(user_input): # user_input is tainted
cmd = build_command(user_input) # Deep mode: cmd is tainted
os.system(cmd) # Sink reached → FINDING
Capabilities: - Follows function calls up to configurable depth - Uses function summaries for efficiency - Tracks taint through returns - Enables additional rule categories (H, I, J, K)
Configuration:
Taint State¶
The analyzer maintains taint state per scope:
type TaintState struct {
Variables map[string]*TaintInfo
Properties map[string]map[string]*TaintInfo
Returns *TaintInfo
Parent *TaintState
}
type TaintInfo struct {
IsTainted bool
Source *types.Location
Category types.SourceCategory
Confidence types.Confidence
Sanitized []types.SinkCategory
}
Trace Generation¶
When a vulnerability is found, MCP-Scan generates a complete trace:
{
"trace": {
"source": {
"file": "handler.py",
"start_line": 10,
"start_col": 15
},
"sink": {
"file": "handler.py",
"start_line": 25,
"start_col": 5
},
"steps": [
{
"location": {"file": "handler.py", "start_line": 10},
"action": "source",
"variable": "user_input"
},
{
"location": {"file": "handler.py", "start_line": 15},
"action": "assign",
"variable": "cmd"
},
{
"location": {"file": "handler.py", "start_line": 20},
"action": "concat",
"variable": "full_cmd"
},
{
"location": {"file": "handler.py", "start_line": 25},
"action": "sink",
"variable": "full_cmd"
}
]
}
}
Confidence Adjustment¶
Taint confidence is adjusted based on:
- Source confidence: Some sources are more certain than others
- Propagation distance: Longer chains reduce confidence
- Partial sanitization: Sanitizers for wrong category reduce confidence
- Context uncertainty: Deep mode cross-function tracking is less certain
Initial Confidence: High
↓ (long propagation chain)
Adjusted: Medium
↓ (partial sanitization)
Adjusted: Low
Example Analysis¶
Vulnerable Code¶
@tool
def execute_command(command: str):
"""Execute a shell command."""
result = subprocess.run(command, shell=True, capture_output=True)
return result.stdout.decode()
Analysis Trace¶
- Source Identification
-
commandparameter is MCP tool input → tainted -
Propagation Tracking
-
commandpassed directly tosubprocess.run -
Sink Detection
-
subprocess.run(..., shell=True)is exec sink -
Sanitizer Check
-
No sanitizer found in path
-
Finding Generation
Internal Implementation Details¶
Engine Architecture¶
// internal/taint/engine.go
type Engine struct {
catalog *catalog.Catalog // Source/sink/sanitizer definitions
mode Mode // Fast (intra) or Deep (inter)
config *Config // MaxDepth, timeout, etc.
}
// Entry point for analysis
func (e *Engine) Analyze(file *ast.File, surface *MCPSurface) []Finding {
var findings []Finding
for _, fn := range file.Functions {
// Check if function is MCP handler
handler := surface.FindHandler(fn.Name)
// Initialize taint state
state := NewTaintState()
// Mark tool parameters as tainted
if handler != nil {
for _, param := range fn.Parameters {
state.SetTaint(param.Name, &TaintInfo{
Source: param.Location,
SourceType: SourceToolInput,
Confidence: High,
})
}
}
// Analyze function body
fnFindings := e.analyzeStatements(fn.Body, state)
findings = append(findings, fnFindings...)
}
return findings
}
Statement Processing¶
func (e *Engine) analyzeStatements(stmts []ast.Statement, state *TaintState) []Finding {
var findings []Finding
for _, stmt := range stmts {
switch s := stmt.(type) {
case *ast.Assignment:
taint := e.evaluateExpression(s.Value, state)
if taint != nil {
state.SetTaint(s.Target.Name, taint.AddStep(TraceStep{
Location: s.Location,
Action: "assign",
Variable: s.Target.Name,
}))
} else {
state.ClearTaint(s.Target.Name)
}
case *ast.ExpressionStmt:
if call, ok := s.Expression.(*ast.Call); ok {
if sink := e.catalog.FindSink(call); sink != nil {
for idx, arg := range call.Arguments {
if !sink.AcceptsArg(idx) {
continue
}
taint := e.getTaintFromExpr(arg, state)
if taint != nil && e.taintMatchesSink(taint, sink) {
if !e.isSanitized(taint, sink.Category) {
findings = append(findings, Finding{
RuleID: "taint-" + string(sink.Category),
Severity: sink.Severity,
Confidence: taint.Confidence,
Source: taint.Source,
Sink: call.Location,
Trace: taint.Via,
Snippet: extractSnippet(call),
})
}
}
}
}
}
case *ast.IfStatement:
// Clone state for branches
thenState := state.Clone()
elseState := state.Clone()
findings = append(findings, e.analyzeStatements(s.Body, thenState)...)
findings = append(findings, e.analyzeStatements(s.ElseBody, elseState)...)
// Merge at join point (conservative: union of taints)
state.Merge(thenState, elseState)
case *ast.Return:
taint := e.evaluateExpression(s.Value, state)
state.SetReturnTaint(taint)
}
}
return findings
}
Expression Evaluation¶
func (e *Engine) evaluateExpression(expr ast.Expression, state *TaintState) *TaintInfo {
switch ex := expr.(type) {
case *ast.Identifier:
return state.GetTaint(ex.Name)
case *ast.StringLiteral:
return nil // Literals are never tainted
case *ast.BinaryOp:
leftTaint := e.evaluateExpression(ex.Left, state)
rightTaint := e.evaluateExpression(ex.Right, state)
// Taint propagates if either operand is tainted
if leftTaint != nil {
return leftTaint.AddStep(TraceStep{
Location: ex.Location,
Action: "binary_op",
Variable: ex.Operator,
})
}
if rightTaint != nil {
return rightTaint.AddStep(TraceStep{
Location: ex.Location,
Action: "binary_op",
Variable: ex.Operator,
})
}
return nil
case *ast.Call:
// Check if call is a source
if source := e.catalog.FindSource(ex); source != nil {
return &TaintInfo{
Source: ex.Location,
SourceType: source.Category,
Confidence: High,
Via: []TraceStep{{Location: ex.Location, Action: "source"}},
}
}
// Check if call is a sanitizer
if sanitizer := e.catalog.FindSanitizer(ex); sanitizer != nil {
// Return nil to clear taint (for applicable categories)
return nil
}
// Conservative: propagate taint from any argument
for _, arg := range ex.Arguments {
taint := e.evaluateExpression(arg, state)
if taint != nil {
return taint.AddStep(TraceStep{
Location: ex.Location,
Action: "call_return",
Variable: getFunctionName(ex),
})
}
}
return nil
case *ast.MemberAccess:
// Check property access (e.g., request.body)
objTaint := e.evaluateExpression(ex.Object, state)
if objTaint != nil {
return objTaint.AddStep(TraceStep{
Location: ex.Location,
Action: "member_access",
Variable: ex.Property,
})
}
// Check if this property access is a source
if source := e.catalog.FindPropertySource(ex); source != nil {
return &TaintInfo{
Source: ex.Location,
SourceType: source.Category,
Confidence: High,
}
}
return state.GetPropertyTaint(ex.Object, ex.Property)
}
return nil
}
Inter-Procedural Analysis (Deep Mode)¶
// Build function summaries for cross-function analysis
type FunctionSummary struct {
TaintedParams []int // Which params propagate taint
ReturnsTaint bool // Whether return value is tainted
SinksReached []SinkCategory // What sinks can be reached
}
func (e *Engine) buildSummaries(files []*ast.File) map[string]*FunctionSummary {
summaries := make(map[string]*FunctionSummary)
// Build call graph
callGraph := e.buildCallGraph(files)
// Process in reverse topological order
for _, fn := range reverseTopoSort(callGraph) {
summary := &FunctionSummary{}
// Analyze with each parameter tainted
for idx := range fn.Parameters {
state := NewTaintState()
state.SetTaint(fn.Parameters[idx].Name, &TaintInfo{
SourceType: SourceToolInput,
Confidence: High,
})
findings := e.analyzeStatements(fn.Body, state)
if state.ReturnTaint != nil {
summary.TaintedParams = append(summary.TaintedParams, idx)
summary.ReturnsTaint = true
}
for _, f := range findings {
summary.SinksReached = append(summary.SinksReached, f.SinkCategory)
}
}
summaries[fn.Key()] = summary
}
return summaries
}
// Use summaries for context-sensitive analysis
func (e *Engine) analyzeWithSummaries(fn *ast.Function, summaries map[string]*FunctionSummary, state *TaintState, depth int) []Finding {
if depth > e.config.MaxDepth {
return nil
}
var findings []Finding
for _, stmt := range fn.Body {
if call, ok := stmt.(*ast.ExpressionStmt).Expression.(*ast.Call); ok {
callee := resolveCallee(call)
if summary, ok := summaries[callee]; ok {
// Check if tainted args flow through callee
for idx, arg := range call.Arguments {
argTaint := state.GetTaint(argName(arg))
if argTaint != nil && contains(summary.TaintedParams, idx) {
if len(summary.SinksReached) > 0 {
// Tainted data reaches sink through callee
findings = append(findings, Finding{
RuleID: "deep-taint-flow",
Confidence: Medium, // Lower confidence for cross-function
Trace: argTaint.Via,
Context: map[string]string{"through": callee},
})
}
}
}
}
}
}
return findings
}
Confidence Adjustment¶
// Adjust confidence based on analysis quality
func adjustConfidence(taint *TaintInfo) Confidence {
confidence := taint.Confidence
// Reduce confidence for long propagation chains
if len(taint.Via) > 5 {
confidence = reduceConfidence(confidence)
}
// Reduce for cross-function flows (deep mode)
for _, step := range taint.Via {
if step.Action == "cross_function" {
confidence = reduceConfidence(confidence)
break
}
}
return confidence
}
func reduceConfidence(c Confidence) Confidence {
switch c {
case High:
return Medium
case Medium:
return Low
default:
return Low
}
}
See Also¶
- Architecture - Full system architecture
- Vulnerability Classes - All vulnerability types
- Rules Reference - Complete rule documentation