Skip to content

Taint Analysis System

Overview

The taint analysis system (internal/taint/) performs data flow analysis to track how untrusted data (sources) flows through the program to potentially dangerous operations (sinks). This is the core mechanism for detecting vulnerabilities like RCE, SQL injection, path traversal, and SSRF.

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                     Taint Analysis Engine                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                   │
│  ┌───────────┐    ┌───────────┐    ┌───────────┐    ┌────────┐ │
│  │  Sources  │───▶│  Taint    │───▶│   Sinks   │───▶│Findings│ │
│  │(Untrusted)│    │Propagation│    │(Dangerous)│    │        │ │
│  └───────────┘    └───────────┘    └───────────┘    └────────┘ │
│        │               │                │                       │
│        ▼               ▼                ▼                       │
│  ┌───────────┐    ┌───────────┐    ┌───────────┐              │
│  │ Catalog   │    │TaintState │    │Sanitizers │              │
│  │(Defs)     │    │(Tracking) │    │(Break Flow)│              │
│  └───────────┘    └───────────┘    └───────────┘              │
│                                                                   │
└─────────────────────────────────────────────────────────────────┘

Analysis Modes

Fast Mode (Intra-procedural)

Analyzes each function in isolation: - Quick execution - Suitable for CI/CD pipelines - May miss cross-function flows

cfg := taint.Config{
    Mode:  taint.ModeFast,
    Depth: 3,
}
engine := taint.New(catalog, cfg)

Deep Mode (Inter-procedural)

Follows data flow across function calls: - Builds function summaries - Uses call graph for complete analysis - Suitable for certification

cfg := taint.Config{
    Mode:  taint.ModeDeep,
    Depth: 10,
}
engine := taint.New(catalog, cfg)

Core Components

TaintState

Tracks tainted variables in a scope:

type TaintState struct {
    Variables  map[string]*TaintInfo          // Variable name → taint info
    Properties map[string]map[string]*TaintInfo // Object → property → taint
    Parent     *TaintState                     // For closure capture
    Language   types.Language                  // Source file language
}

Operations: - GetTaint(name) - Get taint for variable (checks parent scopes) - SetTaint(name, info) - Set taint for variable - GetPropertyTaint(obj, prop) - Get taint for object property - SetPropertyTaint(obj, prop, info) - Set taint for property - Clone() - Deep copy for control flow analysis - Merge(other) - Merge states at join points - NewChildState() - Create child for closures

TaintInfo

Contains information about a tainted value:

type TaintInfo struct {
    Source     types.Location       // Where taint originated
    SourceType types.SourceCategory // Type of source
    Via        []types.TraceStep    // Propagation path
    Confidence types.Confidence     // Detection confidence
}

TraceStep

Records each step in the taint propagation:

type TraceStep struct {
    Location types.Location
    Action   string  // "assign", "property_access", "binary_op:+", etc.
    Variable string  // Affected variable
}

Sources (Untrusted Data Origins)

Source Categories

Category Description Risk
SourceToolInput MCP tool parameters High - user controlled
SourceEnvVar Environment variables Medium - may be external
SourceHTTPRequest HTTP request data High - user controlled
SourceFileContent File contents Medium - external data
SourceDBResult Database query results Low - usually controlled

Built-in Sources

Python Sources

ID Pattern Category
py-os-environ os.environ EnvVar
py-os-getenv os.getenv() EnvVar
py-request-args request.args HTTPRequest
py-request-form request.form HTTPRequest
py-request-json request.json HTTPRequest
py-file-read file.read() FileContent

JavaScript/TypeScript Sources

ID Pattern Category
js-process-env process.env EnvVar
js-req-body req.body HTTPRequest
js-req-query req.query HTTPRequest
js-req-params req.params HTTPRequest
js-fs-readfile fs.readFileSync() FileContent

MCP Tool Input Sources

Parameters of functions decorated with @tool are automatically marked as tainted:

@server.tool()
def search_files(query: str):  # query is tainted
    # ...

Sinks (Dangerous Operations)

Sink Categories

Category Vulnerability Severity
SinkExec Command Injection (RCE) Critical
SinkEval Code Evaluation Critical
SinkFilesystem Path Traversal High
SinkNetwork SSRF High
SinkDatabase SQL Injection High
SinkLogging Information Disclosure Medium
SinkResponse Data Leakage Medium
SinkLLMPrompt Prompt Injection High

Built-in Sinks

Exec Sinks (Class A)

ID Pattern Language
py-os-system os.system() Python
py-os-popen os.popen() Python
py-subprocess-call subprocess.call() Python
py-subprocess-run subprocess.run() Python
py-subprocess-popen subprocess.Popen() Python
js-child-process-exec child_process.exec() JavaScript
js-child-process-execsync child_process.execSync() JavaScript
js-child-process-spawn child_process.spawn() JavaScript

Eval Sinks (Class A)

ID Pattern Language
py-eval eval() Python
py-exec exec() Python
py-compile compile() Python
js-eval eval() JavaScript
js-function new Function() JavaScript

Filesystem Sinks (Class B)

ID Pattern Language
py-open open() Python
py-pathlib-read Path.read_text() Python
py-shutil-copy shutil.copy() Python
py-os-remove os.remove() Python
js-fs-readfile fs.readFileSync() JavaScript
js-fs-writefile fs.writeFileSync() JavaScript

Network Sinks (Class C)

ID Pattern Language
py-requests-get requests.get() Python
py-requests-post requests.post() Python
py-urllib-urlopen urllib.request.urlopen() Python
py-httpx-get httpx.get() Python
js-fetch fetch() JavaScript
js-axios-get axios.get() JavaScript

Database Sinks (Class D)

ID Pattern Language
py-cursor-execute cursor.execute() Python
py-conn-execute connection.execute() Python
js-query .query() JavaScript
js-raw .raw() JavaScript

Logging Sinks (Class E)

ID Pattern Language
py-print print() Python
py-logging-info logging.info() Python
py-logger-info logger.info() Python
js-console-log console.log() JavaScript
js-console-error console.error() JavaScript

LLM Sinks (Class H)

ID Pattern Language
py-openai-chat openai.ChatCompletion.create() Python
py-anthropic-messages anthropic.messages.create() Python
py-langchain-invoke langchain.llms.invoke() Python
js-openai-chat openai.chat.completions.create() JavaScript

Sanitizers (Taint Breakers)

Sanitizers break the taint chain for specific sink categories:

Path Sanitizers (Class B)

ID Pattern Sanitizes
py-os-path-normpath os.path.normpath() Filesystem
py-os-path-abspath os.path.abspath() Filesystem
py-os-path-realpath os.path.realpath() Filesystem
py-pathlib-resolve Path.resolve() Filesystem
js-path-normalize path.normalize() Filesystem
js-path-resolve path.resolve() Filesystem

URL Sanitizers (Class C)

ID Pattern Sanitizes
py-urllib-parse urllib.parse.urlparse() Network
js-url-parse new URL() Network

Shell Sanitizers (Class A)

ID Pattern Sanitizes
py-shlex-quote shlex.quote() Exec
py-shlex-split shlex.split() Exec

Taint Propagation

Assignment

Taint propagates through assignment:

user_input = request.args.get("q")  # tainted
query = user_input                   # tainted (assigned)

String Operations

Taint propagates through concatenation and formatting:

user_input = request.args.get("q")  # tainted
cmd = "grep " + user_input          # tainted (concat)
cmd2 = f"grep {user_input}"         # tainted (f-string)

Property Access

Taint propagates through object properties:

obj = tainted_object                # obj is tainted
value = obj.property                # value is tainted
value2 = obj["key"]                 # value2 is tainted

Control Flow

Taint is merged at join points:

if condition:
    x = tainted_value  # x tainted in then-branch
else:
    x = safe_value     # x not tainted in else-branch
# x is tainted (conservative merge)

Loops

Loops are analyzed twice to propagate taint through iterations:

items = tainted_list
for item in items:  # item is tainted (from iteration)
    process(item)

Closures/Lambdas

Closures capture taint from parent scope:

tainted = request.args.get("q")
callback = lambda: execute(tainted)  # captures taint

Callback Arguments

Iterator methods propagate taint to callbacks:

tainted_list = get_tainted_data()
tainted_list.forEach(lambda x: sink(x))  # x is tainted

Analysis Flow

Per-Function Analysis

  1. Initialize state: Create TaintState for function
  2. Check tool handler: If @tool decorated, mark parameters as tainted
  3. Analyze body: Process each statement in order
  4. Report findings: Generate findings for source→sink flows

Statement Analysis

func analyzeStatement(stmt, state, file, fn) []Finding {
    switch s := stmt.(type) {
    case *ast.Assignment:
        // Propagate taint from value to target
    case *ast.ExpressionStatement:
        // Check for sinks in calls
    case *ast.Return:
        // Check for sensitive data in return
    case *ast.IfStatement:
        // Clone state, analyze branches, merge
    case *ast.ForStatement:
        // Propagate taint through iteration
    case *ast.TryStatement:
        // Analyze try/catch/finally with separate states
    }
}

Expression Taint Extraction

func getExpressionTaint(expr, state) *TaintInfo {
    switch ex := expr.(type) {
    case *ast.Identifier:
        return state.GetTaint(ex.Name)
    case *ast.MemberAccess:
        // Check property taint or object taint
    case *ast.Call:
        // Check if returns tainted data (source)
    case *ast.BinaryOp:
        // Tainted if either operand is tainted
    case *ast.FormattedString:
        // Tainted if any interpolation is tainted
    }
}

Call Analysis

func analyzeCall(call, state, file, fn) []Finding {
    // 1. Get receiver and function name
    // 2. Analyze callback arguments (lambdas)
    // 3. Check if call is a sink
    // 4. For each argument, check if tainted
    // 5. Check if sanitizer applies
    // 6. Generate finding if tainted and not sanitized
}

Finding Generation

When tainted data reaches a sink, a finding is generated:

type Finding struct {
    RuleID      string           // e.g., "MCP-A001"
    Class       types.VulnClass  // e.g., ClassA (RCE)
    Severity    types.Severity   // Critical, High, Medium, Low
    Confidence  types.Confidence // High, Medium, Low
    Location    types.Location   // Sink location
    Trace       *TaintTrace      // Full path from source to sink
    SinkID      string           // Catalog sink ID
    Description string           // Human-readable description
    Remediation string           // How to fix
}

Rule Mapping

Sink Category Rule ID Class Description
SinkExec MCP-A001 A Tool input flows to command execution
SinkEval MCP-A002 A Tool input flows to code evaluation
SinkFilesystem MCP-B001 B Tool input flows to filesystem operation
SinkNetwork MCP-C001 C Tool input flows to network request
SinkDatabase MCP-D001 D Tool input flows to database query
SinkLogging MCP-E003 E Potentially sensitive data logged
SinkResponse MCP-E004 E Potentially sensitive data in response

Configuration

# mcp-scan.yaml
taint:
  mode: fast           # fast or deep
  depth: 3             # Max inter-procedural depth
  track_properties: true
  track_closures: true

  # Source categories to track
  sources:
    - tool_input
    - http_request
    - env_var
    - file_content

  # Sink categories to detect
  sinks:
    - exec
    - eval
    - filesystem
    - network
    - database

API Usage

Basic Analysis

// Create catalog and engine
cat := catalog.New()
cfg := taint.DefaultConfig()
engine := taint.New(cat, cfg)

// Analyze files
files := []*ast.File{...}
surface := surface.Extract(files)
findings := engine.Analyze(files, surface)

Per-File Analysis

// Thread-safe per-file analysis
for _, file := range files {
    findings := engine.AnalyzeFile(file, surface)
}

Extending the Catalog

Adding a Source

catalog.Sources = append(catalog.Sources, catalog.SourceDef{
    ID:          "custom-source",
    Language:    types.Python,
    Receiver:    "mylib",
    Function:    "get_input",
    Category:    types.SourceToolInput,
    Description: "Custom input source",
})

Adding a Sink

catalog.Sinks = append(catalog.Sinks, catalog.SinkDef{
    ID:          "custom-sink",
    Language:    types.Python,
    Receiver:    "mylib",
    Function:    "dangerous_op",
    Category:    types.SinkExec,
    Severity:    types.SeverityCritical,
    ArgIndex:    0,
    Description: "Custom dangerous operation",
})

Adding a Sanitizer

catalog.Sanitizers = append(catalog.Sanitizers, catalog.SanitizerDef{
    ID:          "custom-sanitizer",
    Language:    types.Python,
    Receiver:    "mylib",
    Function:    "sanitize",
    Sanitizes:   []types.SinkCategory{types.SinkExec, types.SinkFilesystem},
    Description: "Custom sanitizer",
})

Example: Complete Flow

# Source: Tool input
@server.tool()
def process_file(filename: str):  # filename is TAINTED
    # Propagation: Assignment
    path = "/data/" + filename    # path is TAINTED (concat)

    # Sanitizer check (if used)
    # safe_path = os.path.normpath(path)  # Would break taint

    # Sink: Filesystem operation
    with open(path) as f:         # FINDING: MCP-B001
        return f.read()

Generated Trace:

Source: process_file:filename (line 2, SourceToolInput)
  → assign to path (line 4)
  → binary_op:+ (line 4)
Sink: open(path) (line 8, SinkFilesystem)