Skip to content

Taint Analysis

MCP-Scan uses taint analysis to track data flow from untrusted sources to dangerous sinks.

Concept Overview

Taint analysis answers the question: "Can attacker-controlled data reach a dangerous operation?"

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Source    │ ──► │ Propagation │ ──► │    Sink     │
│ (untrusted) │     │  (data flow)│     │ (dangerous) │
└─────────────┘     └─────────────┘     └─────────────┘

If tainted data reaches a sink without passing through a sanitizer, a vulnerability is reported.

Sources (Taint Origins)

Sources are entry points for untrusted data:

MCP Tool Input

@tool
def execute(command: str):  # 'command' is tainted
    ...
Source ID Language Pattern Category
py-tool-param Python Tool function parameters tool_input
ts-tool-param TypeScript Tool function parameters tool_input

Environment Variables

value = os.environ["USER_INPUT"]  # Tainted
value = os.getenv("EXTERNAL_DATA")  # Tainted
Source ID Language Pattern Category
py-os-environ Python os.environ[...] env_var
py-os-getenv Python os.getenv(...) env_var
js-process-env JavaScript process.env[...] env_var

HTTP Request Data

data = request.json  # Tainted
param = request.args.get("q")  # Tainted
Source ID Language Pattern Category
py-flask-request Python request.json, request.args http_request
py-django-request Python request.POST, request.GET http_request
ts-express-req TypeScript req.body, req.query http_request

File Content

content = open(path).read()  # Tainted if path is untrusted
data = Path(file).read_text()  # Tainted
Source ID Language Pattern Category
py-open-read Python open(...).read() file_content
py-pathlib-read Python Path(...).read_text() file_content
js-fs-read JavaScript fs.readFileSync(...) file_content

Database Results

row = cursor.fetchone()  # Tainted
results = db.execute(query).fetchall()  # Tainted
Source ID Language Pattern Category
py-db-fetch Python cursor.fetchone(), fetchall() db_result
py-sqlalchemy-execute Python .execute(...).fetchall() db_result

Sinks (Dangerous Operations)

Sinks are operations that are dangerous when receiving tainted data:

Command Execution (RCE)

os.system(cmd)  # Dangerous sink
subprocess.run(cmd, shell=True)  # Dangerous sink
exec(code)  # Dangerous sink
Sink ID Language Pattern Category Severity
py-os-system Python os.system(...) exec Critical
py-subprocess-shell Python subprocess.*(shell=True) exec Critical
py-eval Python eval(...) eval Critical
py-exec Python exec(...) eval Critical
js-child-exec JavaScript child_process.exec(...) exec Critical
js-eval JavaScript eval(...) eval Critical

Filesystem Operations

open(path, "w").write(data)  # Dangerous if path tainted
os.remove(path)  # Dangerous if path tainted
Sink ID Language Pattern Category Severity
py-open-write Python open(path, "w") filesystem High
py-os-remove Python os.remove(path) filesystem High
js-fs-write JavaScript fs.writeFileSync(...) filesystem High

Network Operations (SSRF)

requests.get(url)  # Dangerous if url tainted
urllib.request.urlopen(url)  # Dangerous if url tainted
Sink ID Language Pattern Category Severity
py-requests Python requests.get/post(url) network High
py-urllib Python urllib.request.urlopen(url) network High
js-fetch JavaScript fetch(url) network High

Database Operations (SQLi)

cursor.execute(query)  # Dangerous if query has tainted interpolation
Sink ID Language Pattern Category Severity
py-cursor-execute Python cursor.execute(query) database Critical
py-sqlalchemy-text Python text(raw_sql) database Critical

Taint Propagation

Taint flows through code via:

1. Direct Assignment

user_input = request.args.get("q")  # Source: user_input is tainted
cmd = user_input  # Propagation: cmd becomes tainted

2. String Concatenation

cmd = "echo " + user_input  # Taint propagates through concat
cmd = f"echo {user_input}"  # Taint propagates through f-string

3. String Operations

cmd = user_input.upper()  # Still tainted
cmd = user_input.strip()  # Still tainted
cmd = user_input[:10]  # Still tainted (subset)

4. Collection Operations

items = [user_input]  # List is tainted
items.append(user_input)  # Existing list becomes tainted
data = {"key": user_input}  # Dict is tainted

5. Function Calls

def process(data):
    return data.lower()

result = process(user_input)  # result is tainted

6. Function Returns (Deep Mode)

def get_data():
    return request.args.get("q")

data = get_data()  # In deep mode: data is tainted

Sanitizers

Sanitizers are functions that neutralize taint for specific sink categories:

Shell Sanitizers

import shlex
safe_cmd = shlex.quote(user_input)  # Sanitized for shell
os.system(f"echo {safe_cmd}")  # Safe
Sanitizer Language Sanitizes
shlex.quote Python exec sinks
pipes.quote Python exec sinks

Path Sanitizers

import os.path
safe_path = os.path.normpath(user_path)
# Must also verify within allowed directory
if safe_path.startswith("/allowed/"):
    open(safe_path)  # Conditionally safe
Sanitizer Language Sanitizes
os.path.normpath Python filesystem (partial)
os.path.realpath Python filesystem (partial)
path.resolve JavaScript filesystem (partial)

URL Sanitizers

from urllib.parse import quote
safe_param = quote(user_input)  # URL-encoded
Sanitizer Language Sanitizes
urllib.parse.quote Python network
urllib.parse.urlencode Python network
encodeURIComponent JavaScript network

SQL Sanitizers

Parameterized queries are the primary sanitizer:

# Safe: parameterized
cursor.execute("SELECT * FROM users WHERE id = ?", [user_id])

# Safe: ORM methods
User.query.filter_by(id=user_id).first()

Analysis Modes

Fast Mode (Intra-Procedural)

Tracks taint within single functions only:

def handler(user_input):  # user_input is tainted
    cmd = f"echo {user_input}"  # cmd is tainted
    os.system(cmd)  # Sink reached by tainted data → FINDING

Limitations: - Cannot track across function boundaries - May miss vulnerabilities in called functions - Cannot analyze return value propagation

Deep Mode (Inter-Procedural)

Tracks taint across function calls:

def build_command(data):
    return f"echo {data}"  # Returns tainted

def handler(user_input):  # user_input is tainted
    cmd = build_command(user_input)  # Deep mode: cmd is tainted
    os.system(cmd)  # Sink reached → FINDING

Capabilities: - Follows function calls up to configurable depth - Uses function summaries for efficiency - Tracks taint through returns - Enables additional rule categories (H, I, J, K)

Configuration:

// Default depth limit
MaxDepth: 3

Taint State

The analyzer maintains taint state per scope:

type TaintState struct {
    Variables  map[string]*TaintInfo
    Properties map[string]map[string]*TaintInfo
    Returns    *TaintInfo
    Parent     *TaintState
}

type TaintInfo struct {
    IsTainted   bool
    Source      *types.Location
    Category    types.SourceCategory
    Confidence  types.Confidence
    Sanitized   []types.SinkCategory
}

Trace Generation

When a vulnerability is found, MCP-Scan generates a complete trace:

{
  "trace": {
    "source": {
      "file": "handler.py",
      "start_line": 10,
      "start_col": 15
    },
    "sink": {
      "file": "handler.py",
      "start_line": 25,
      "start_col": 5
    },
    "steps": [
      {
        "location": {"file": "handler.py", "start_line": 10},
        "action": "source",
        "variable": "user_input"
      },
      {
        "location": {"file": "handler.py", "start_line": 15},
        "action": "assign",
        "variable": "cmd"
      },
      {
        "location": {"file": "handler.py", "start_line": 20},
        "action": "concat",
        "variable": "full_cmd"
      },
      {
        "location": {"file": "handler.py", "start_line": 25},
        "action": "sink",
        "variable": "full_cmd"
      }
    ]
  }
}

Confidence Adjustment

Taint confidence is adjusted based on:

  1. Source confidence: Some sources are more certain than others
  2. Propagation distance: Longer chains reduce confidence
  3. Partial sanitization: Sanitizers for wrong category reduce confidence
  4. Context uncertainty: Deep mode cross-function tracking is less certain
Initial Confidence: High
  ↓ (long propagation chain)
Adjusted: Medium
  ↓ (partial sanitization)
Adjusted: Low

Example Analysis

Vulnerable Code

@tool
def execute_command(command: str):
    """Execute a shell command."""
    result = subprocess.run(command, shell=True, capture_output=True)
    return result.stdout.decode()

Analysis Trace

  1. Source Identification
  2. command parameter is MCP tool input → tainted

  3. Propagation Tracking

  4. command passed directly to subprocess.run

  5. Sink Detection

  6. subprocess.run(..., shell=True) is exec sink

  7. Sanitizer Check

  8. No sanitizer found in path

  9. Finding Generation

    {
      "rule_id": "MCP-A003",
      "severity": "critical",
      "confidence": "high",
      "trace": {
        "source": {"line": 2, "variable": "command"},
        "sink": {"line": 4, "variable": "command"}
      }
    }
    


Internal Implementation Details

Engine Architecture

// internal/taint/engine.go
type Engine struct {
    catalog *catalog.Catalog  // Source/sink/sanitizer definitions
    mode    Mode              // Fast (intra) or Deep (inter)
    config  *Config           // MaxDepth, timeout, etc.
}

// Entry point for analysis
func (e *Engine) Analyze(file *ast.File, surface *MCPSurface) []Finding {
    var findings []Finding

    for _, fn := range file.Functions {
        // Check if function is MCP handler
        handler := surface.FindHandler(fn.Name)

        // Initialize taint state
        state := NewTaintState()

        // Mark tool parameters as tainted
        if handler != nil {
            for _, param := range fn.Parameters {
                state.SetTaint(param.Name, &TaintInfo{
                    Source:     param.Location,
                    SourceType: SourceToolInput,
                    Confidence: High,
                })
            }
        }

        // Analyze function body
        fnFindings := e.analyzeStatements(fn.Body, state)
        findings = append(findings, fnFindings...)
    }

    return findings
}

Statement Processing

func (e *Engine) analyzeStatements(stmts []ast.Statement, state *TaintState) []Finding {
    var findings []Finding

    for _, stmt := range stmts {
        switch s := stmt.(type) {
        case *ast.Assignment:
            taint := e.evaluateExpression(s.Value, state)
            if taint != nil {
                state.SetTaint(s.Target.Name, taint.AddStep(TraceStep{
                    Location: s.Location,
                    Action:   "assign",
                    Variable: s.Target.Name,
                }))
            } else {
                state.ClearTaint(s.Target.Name)
            }

        case *ast.ExpressionStmt:
            if call, ok := s.Expression.(*ast.Call); ok {
                if sink := e.catalog.FindSink(call); sink != nil {
                    for idx, arg := range call.Arguments {
                        if !sink.AcceptsArg(idx) {
                            continue
                        }
                        taint := e.getTaintFromExpr(arg, state)
                        if taint != nil && e.taintMatchesSink(taint, sink) {
                            if !e.isSanitized(taint, sink.Category) {
                                findings = append(findings, Finding{
                                    RuleID:     "taint-" + string(sink.Category),
                                    Severity:   sink.Severity,
                                    Confidence: taint.Confidence,
                                    Source:     taint.Source,
                                    Sink:       call.Location,
                                    Trace:      taint.Via,
                                    Snippet:    extractSnippet(call),
                                })
                            }
                        }
                    }
                }
            }

        case *ast.IfStatement:
            // Clone state for branches
            thenState := state.Clone()
            elseState := state.Clone()

            findings = append(findings, e.analyzeStatements(s.Body, thenState)...)
            findings = append(findings, e.analyzeStatements(s.ElseBody, elseState)...)

            // Merge at join point (conservative: union of taints)
            state.Merge(thenState, elseState)

        case *ast.Return:
            taint := e.evaluateExpression(s.Value, state)
            state.SetReturnTaint(taint)
        }
    }

    return findings
}

Expression Evaluation

func (e *Engine) evaluateExpression(expr ast.Expression, state *TaintState) *TaintInfo {
    switch ex := expr.(type) {
    case *ast.Identifier:
        return state.GetTaint(ex.Name)

    case *ast.StringLiteral:
        return nil  // Literals are never tainted

    case *ast.BinaryOp:
        leftTaint := e.evaluateExpression(ex.Left, state)
        rightTaint := e.evaluateExpression(ex.Right, state)

        // Taint propagates if either operand is tainted
        if leftTaint != nil {
            return leftTaint.AddStep(TraceStep{
                Location: ex.Location,
                Action:   "binary_op",
                Variable: ex.Operator,
            })
        }
        if rightTaint != nil {
            return rightTaint.AddStep(TraceStep{
                Location: ex.Location,
                Action:   "binary_op",
                Variable: ex.Operator,
            })
        }
        return nil

    case *ast.Call:
        // Check if call is a source
        if source := e.catalog.FindSource(ex); source != nil {
            return &TaintInfo{
                Source:     ex.Location,
                SourceType: source.Category,
                Confidence: High,
                Via:        []TraceStep{{Location: ex.Location, Action: "source"}},
            }
        }

        // Check if call is a sanitizer
        if sanitizer := e.catalog.FindSanitizer(ex); sanitizer != nil {
            // Return nil to clear taint (for applicable categories)
            return nil
        }

        // Conservative: propagate taint from any argument
        for _, arg := range ex.Arguments {
            taint := e.evaluateExpression(arg, state)
            if taint != nil {
                return taint.AddStep(TraceStep{
                    Location: ex.Location,
                    Action:   "call_return",
                    Variable: getFunctionName(ex),
                })
            }
        }
        return nil

    case *ast.MemberAccess:
        // Check property access (e.g., request.body)
        objTaint := e.evaluateExpression(ex.Object, state)
        if objTaint != nil {
            return objTaint.AddStep(TraceStep{
                Location: ex.Location,
                Action:   "member_access",
                Variable: ex.Property,
            })
        }

        // Check if this property access is a source
        if source := e.catalog.FindPropertySource(ex); source != nil {
            return &TaintInfo{
                Source:     ex.Location,
                SourceType: source.Category,
                Confidence: High,
            }
        }

        return state.GetPropertyTaint(ex.Object, ex.Property)
    }

    return nil
}

Inter-Procedural Analysis (Deep Mode)

// Build function summaries for cross-function analysis
type FunctionSummary struct {
    TaintedParams []int           // Which params propagate taint
    ReturnsTaint  bool            // Whether return value is tainted
    SinksReached  []SinkCategory  // What sinks can be reached
}

func (e *Engine) buildSummaries(files []*ast.File) map[string]*FunctionSummary {
    summaries := make(map[string]*FunctionSummary)

    // Build call graph
    callGraph := e.buildCallGraph(files)

    // Process in reverse topological order
    for _, fn := range reverseTopoSort(callGraph) {
        summary := &FunctionSummary{}

        // Analyze with each parameter tainted
        for idx := range fn.Parameters {
            state := NewTaintState()
            state.SetTaint(fn.Parameters[idx].Name, &TaintInfo{
                SourceType: SourceToolInput,
                Confidence: High,
            })

            findings := e.analyzeStatements(fn.Body, state)

            if state.ReturnTaint != nil {
                summary.TaintedParams = append(summary.TaintedParams, idx)
                summary.ReturnsTaint = true
            }

            for _, f := range findings {
                summary.SinksReached = append(summary.SinksReached, f.SinkCategory)
            }
        }

        summaries[fn.Key()] = summary
    }

    return summaries
}

// Use summaries for context-sensitive analysis
func (e *Engine) analyzeWithSummaries(fn *ast.Function, summaries map[string]*FunctionSummary, state *TaintState, depth int) []Finding {
    if depth > e.config.MaxDepth {
        return nil
    }

    var findings []Finding

    for _, stmt := range fn.Body {
        if call, ok := stmt.(*ast.ExpressionStmt).Expression.(*ast.Call); ok {
            callee := resolveCallee(call)
            if summary, ok := summaries[callee]; ok {
                // Check if tainted args flow through callee
                for idx, arg := range call.Arguments {
                    argTaint := state.GetTaint(argName(arg))
                    if argTaint != nil && contains(summary.TaintedParams, idx) {
                        if len(summary.SinksReached) > 0 {
                            // Tainted data reaches sink through callee
                            findings = append(findings, Finding{
                                RuleID:     "deep-taint-flow",
                                Confidence: Medium,  // Lower confidence for cross-function
                                Trace:      argTaint.Via,
                                Context:    map[string]string{"through": callee},
                            })
                        }
                    }
                }
            }
        }
    }

    return findings
}

Confidence Adjustment

// Adjust confidence based on analysis quality
func adjustConfidence(taint *TaintInfo) Confidence {
    confidence := taint.Confidence

    // Reduce confidence for long propagation chains
    if len(taint.Via) > 5 {
        confidence = reduceConfidence(confidence)
    }

    // Reduce for cross-function flows (deep mode)
    for _, step := range taint.Via {
        if step.Action == "cross_function" {
            confidence = reduceConfidence(confidence)
            break
        }
    }

    return confidence
}

func reduceConfidence(c Confidence) Confidence {
    switch c {
    case High:
        return Medium
    case Medium:
        return Low
    default:
        return Low
    }
}

See Also