Skip to content

Prompt Flow Detection Guide

Overview

The Prompt Flow Detection module tracks how user input flows to LLM API calls, detecting potential indirect prompt injection vulnerabilities. It extends the taint analysis engine to understand LLM-specific sinks.

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                  Prompt Flow Detection                           │
│                                                                  │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  Sources (User Input)                                        ││
│  │  - MCP tool parameters                                       ││
│  │  - HTTP request data                                         ││
│  │  - Environment variables                                     ││
│  │  - File content                                              ││
│  └──────────────────────────┬──────────────────────────────────┘│
│                             │                                    │
│  ┌──────────────────────────▼──────────────────────────────────┐│
│  │  Taint Propagation                                           ││
│  │  - Assignments                                               ││
│  │  - String concatenation                                      ││
│  │  - Function calls                                            ││
│  │  - Template formatting                                       ││
│  └──────────────────────────┬──────────────────────────────────┘│
│                             │                                    │
│  ┌──────────────────────────▼──────────────────────────────────┐│
│  │  LLM Sinks                                                   ││
│  │  - OpenAI API                                                ││
│  │  - Anthropic API                                             ││
│  │  - LangChain                                                 ││
│  │  - Local LLM calls                                           ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘

LLM Sink Catalog

OpenAI

// internal/catalog/llm_sinks.go

var OpenAISinks = []SinkDef{
    // Chat Completions
    {
        ID:        "py-openai-chat",
        Language:  types.Python,
        Receiver:  "openai.ChatCompletion",
        Function:  "create",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgName:   "messages",
        VulnClass: types.ClassH,
    },
    // New client style
    {
        ID:        "py-openai-client-chat",
        Language:  types.Python,
        Receiver:  "client.chat.completions",
        Function:  "create",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgName:   "messages",
        VulnClass: types.ClassH,
    },
    // Completions (legacy)
    {
        ID:        "py-openai-completion",
        Language:  types.Python,
        Receiver:  "openai.Completion",
        Function:  "create",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgName:   "prompt",
        VulnClass: types.ClassH,
    },
}

Detected patterns:

from openai import OpenAI

client = OpenAI()

@tool
def chat(user_message: str):
    # user_message flows to LLM - detected as MCP-H-001
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": user_message}]
    )

Anthropic

var AnthropicSinks = []SinkDef{
    {
        ID:        "py-anthropic-messages",
        Language:  types.Python,
        Receiver:  "anthropic.Anthropic",
        Function:  "messages.create",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgName:   "messages",
        VulnClass: types.ClassH,
    },
    {
        ID:        "py-anthropic-client",
        Language:  types.Python,
        Receiver:  "client.messages",
        Function:  "create",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgName:   "messages",
        VulnClass: types.ClassH,
    },
}

Detected patterns:

import anthropic

client = anthropic.Anthropic()

@tool
def ask_claude(question: str):
    # question flows to LLM - detected
    response = client.messages.create(
        model="claude-3-opus",
        messages=[{"role": "user", "content": question}]
    )

LangChain

var LangChainSinks = []SinkDef{
    // LLM invoke
    {
        ID:        "py-langchain-llm-invoke",
        Language:  types.Python,
        Receiver:  "llm",
        Function:  "invoke",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgIndex:  0,
        VulnClass: types.ClassH,
    },
    // Chain run
    {
        ID:        "py-langchain-chain-run",
        Language:  types.Python,
        Receiver:  "chain",
        Function:  "run",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgIndex:  0,
        VulnClass: types.ClassH,
    },
    // Agent execute
    {
        ID:        "py-langchain-agent",
        Language:  types.Python,
        Receiver:  "agent",
        Function:  "run",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgIndex:  0,
        VulnClass: types.ClassH,
    },
    // Prompt template
    {
        ID:        "py-langchain-prompt-format",
        Language:  types.Python,
        Receiver:  "prompt",
        Function:  "format",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityMedium,
        ArgName:   "*",
        VulnClass: types.ClassH,
    },
}

Detected patterns:

from langchain.llms import OpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

llm = OpenAI()
prompt = PromptTemplate(template="Answer: {question}")
chain = LLMChain(llm=llm, prompt=prompt)

@tool
def answer(question: str):
    # question flows through chain to LLM - detected
    return chain.run(question)

TypeScript/JavaScript Sinks

var TypeScriptLLMSinks = []SinkDef{
    // OpenAI
    {
        ID:        "ts-openai-chat",
        Language:  types.TypeScript,
        Receiver:  "openai.chat.completions",
        Function:  "create",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgName:   "messages",
        VulnClass: types.ClassH,
    },
    // Anthropic
    {
        ID:        "ts-anthropic-messages",
        Language:  types.TypeScript,
        Receiver:  "anthropic.messages",
        Function:  "create",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgName:   "messages",
        VulnClass: types.ClassH,
    },
    // Vercel AI SDK
    {
        ID:        "ts-vercel-ai",
        Language:  types.TypeScript,
        Receiver:  "",
        Function:  "generateText",
        Category:  SinkLLMPrompt,
        Severity:  types.SeverityHigh,
        ArgName:   "prompt",
        VulnClass: types.ClassH,
    },
}

Prompt Flow Detector

Implementation

// internal/pattern/prompt_flow.go

type PromptFlowDetector struct {
    taintEngine *taint.Engine
    llmSinks    []catalog.SinkDef
}

func NewPromptFlowDetector() *PromptFlowDetector {
    return &PromptFlowDetector{
        taintEngine: taint.NewEngine(),
        llmSinks:    catalog.GetLLMSinks(),
    }
}

func (d *PromptFlowDetector) Detect(file *ast.File, surface *surface.MCPSurface) []Match {
    var matches []Match

    // Initialize sources from MCP tool parameters
    for _, tool := range surface.Tools {
        for _, param := range tool.Parameters {
            d.taintEngine.AddSource(param.Name, taint.SourceInfo{
                Location:   tool.Location,
                Category:   taint.SourceToolInput,
                ToolName:   tool.Name,
                ParamName:  param.Name,
            })
        }
    }

    // Run taint analysis
    d.taintEngine.Analyze(file)

    // Check if tainted data reaches LLM sinks
    for _, fn := range file.Functions {
        for _, call := range d.findCalls(fn) {
            if sink := d.matchLLMSink(call); sink != nil {
                // Check if any argument is tainted
                for _, arg := range call.Arguments {
                    if taintInfo := d.taintEngine.GetTaint(arg); taintInfo != nil {
                        matches = append(matches, d.createMatch(call, sink, taintInfo))
                    }
                }
            }
        }
    }

    return matches
}

Detection Rules

MCP-H-001: Unsanitized Tool Input to LLM

@tool
def ask(question: str):  # Source: tool parameter
    # Direct flow to LLM - HIGH severity
    response = openai.chat.completions.create(
        messages=[{"role": "user", "content": question}]  # Sink
    )

Finding:

{
    "rule_id": "MCP-H-001",
    "title": "Unsanitized Tool Input to LLM",
    "severity": "high",
    "description": "User input from tool parameter 'question' flows directly to LLM API without sanitization",
    "trace": [
        {"location": "tool.py:2", "action": "source", "variable": "question"},
        {"location": "tool.py:4", "action": "sink", "variable": "messages[0].content"}
    ]
}

MCP-H-002: Concatenated Prompt Injection Risk

SYSTEM_PROMPT = "You are a helpful assistant."

@tool
def chat(user_input: str):  # Source
    # String concatenation - MEDIUM severity
    prompt = SYSTEM_PROMPT + "\n\nUser: " + user_input
    response = llm.invoke(prompt)  # Sink

Finding:

{
    "rule_id": "MCP-H-002",
    "title": "Concatenated Prompt Injection Risk",
    "severity": "medium",
    "description": "User input concatenated into prompt without proper separation",
    "evidence": {
        "snippet": "prompt = SYSTEM_PROMPT + \"\\n\\nUser: \" + user_input"
    }
}

MCP-H-003: Template Injection via Format String

TEMPLATE = "Answer the question: {question}\nBe concise."

@tool
def answer(question: str):  # Source
    # Format string - MEDIUM severity
    prompt = TEMPLATE.format(question=question)
    return llm.invoke(prompt)  # Sink

Sanitization Detection

The detector recognizes common sanitization patterns:

var PromptSanitizers = []SanitizerDef{
    // Input validation
    {
        ID:       "input-length-check",
        Pattern:  `len\(.*\)\s*[<>]=?\s*\d+`,
        Category: SanitizerInputValidation,
    },
    // Content filtering
    {
        ID:       "content-filter",
        Pattern:  `filter_content|sanitize_input|clean_input`,
        Category: SanitizerContentFilter,
    },
    // Structured output
    {
        ID:       "json-output",
        Pattern:  `response_format.*json`,
        Category: SanitizerStructuredOutput,
    },
    // System prompt separation
    {
        ID:       "system-message",
        Pattern:  `"role":\s*"system"`,
        Category: SanitizerPromptSeparation,
    },
}

Sanitized example (no finding):

@tool
def safe_chat(user_input: str):
    # Length validation - recognized as sanitizer
    if len(user_input) > 1000:
        raise ValueError("Input too long")

    # System prompt separation - recognized as safe pattern
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": user_input}  # Properly separated
    ]

    response = client.chat.completions.create(
        model="gpt-4",
        messages=messages
    )

Exfiltration Detection

Exfiltration Patterns

// internal/pattern/exfiltration.go

var ExfiltrationPatterns = []*regexp.Regexp{
    // Output manipulation
    regexp.MustCompile(`(?i)include\s+.{1,50}\s+in\s+(your|the)\s+response`),
    regexp.MustCompile(`(?i)output\s+.{1,50}\s+to\s+me`),

    // Information extraction
    regexp.MustCompile(`(?i)reveal\s+.{1,50}\s+information`),
    regexp.MustCompile(`(?i)show\s+me\s+.{1,50}\s+secret`),
    regexp.MustCompile(`(?i)what\s+is\s+your\s+.{1,50}\s+prompt`),

    // System prompt extraction
    regexp.MustCompile(`(?i)tell\s+me\s+your\s+instructions`),
    regexp.MustCompile(`(?i)print\s+your\s+system`),

    // Resource access
    regexp.MustCompile(`(?i)access\s+the\s+resource\s+.{1,100}`),
    regexp.MustCompile(`(?i)read\s+file\s+.{1,100}`),
    regexp.MustCompile(`(?i)fetch\s+url\s+.{1,100}`),
}

Exfiltration Detector

type ExfiltrationDetector struct {
    patterns []*regexp.Regexp
}

func (d *ExfiltrationDetector) Detect(file *ast.File, surface *surface.MCPSurface) []Match {
    var matches []Match

    // Check tool descriptions
    for _, tool := range surface.Tools {
        for _, pattern := range d.patterns {
            if pattern.MatchString(tool.Description) {
                matches = append(matches, Match{
                    RuleID:   "MCP-G-004",
                    Title:    "Exfiltration Pattern in Tool Description",
                    Location: tool.Location,
                    Severity: types.SeverityHigh,
                    Evidence: Evidence{
                        Snippet: truncate(tool.Description, 100),
                        Pattern: pattern.String(),
                    },
                })
            }
        }
    }

    // Check string literals
    for _, str := range file.Strings {
        for _, pattern := range d.patterns {
            if pattern.MatchString(str.Value) {
                matches = append(matches, Match{
                    RuleID:   "MCP-G-005",
                    Title:    "Exfiltration Pattern in String",
                    Location: str.Location,
                    Severity: types.SeverityMedium,
                    Evidence: Evidence{
                        Snippet: truncate(str.Value, 100),
                    },
                })
            }
        }
    }

    return matches
}

Trace Generation

Flow Trace

type FlowTrace struct {
    Source   TraceNode   `json:"source"`
    Sink     TraceNode   `json:"sink"`
    Steps    []TraceStep `json:"steps"`
    Risk     string      `json:"risk"`
}

type TraceNode struct {
    Location types.Location `json:"location"`
    Variable string         `json:"variable"`
    Type     string         `json:"type"`
}

type TraceStep struct {
    Location types.Location `json:"location"`
    Action   string         `json:"action"`
    From     string         `json:"from"`
    To       string         `json:"to"`
    Note     string         `json:"note,omitempty"`
}

Example Trace

{
    "source": {
        "location": {"file": "server.py", "line": 10, "col": 5},
        "variable": "user_query",
        "type": "tool_parameter"
    },
    "sink": {
        "location": {"file": "server.py", "line": 25, "col": 12},
        "variable": "messages[0].content",
        "type": "llm_api_call"
    },
    "steps": [
        {
            "location": {"file": "server.py", "line": 10},
            "action": "source",
            "to": "user_query",
            "note": "Tool parameter from MCP client"
        },
        {
            "location": {"file": "server.py", "line": 15},
            "action": "assign",
            "from": "user_query",
            "to": "query"
        },
        {
            "location": {"file": "server.py", "line": 20},
            "action": "format",
            "from": "query",
            "to": "prompt",
            "note": "String interpolation"
        },
        {
            "location": {"file": "server.py", "line": 25},
            "action": "call_arg",
            "from": "prompt",
            "to": "messages[0].content",
            "note": "Passed to OpenAI API"
        }
    ],
    "risk": "User input flows to LLM without sanitization"
}

Configuration

Enabling Prompt Flow Detection

# mcp-scan.yaml
analysis:
  prompt_flow:
    enabled: true
    track_llm_sinks: true
    detect_exfiltration: true
    min_confidence: medium

llm_sinks:
  # Additional custom sinks
  custom:
    - receiver: "my_llm_client"
      function: "generate"
      arg_name: "prompt"
      severity: high

CLI Usage

# Enable prompt flow detection
mcp-scan scan /path/to/project --prompt-flow

# Show detailed traces
mcp-scan scan /path/to/project --prompt-flow --show-traces

# JSON output with traces
mcp-scan scan /path/to/project --prompt-flow --output json

Integration with Other Detectors

Combined Detection Pipeline

func CombinedPromptSecurityScan(file *ast.File, surface *surface.MCPSurface) []types.Finding {
    var findings []types.Finding

    // Layer 1: Pattern-based detection (fast)
    injectionDetector := NewExtendedInjectionDetector()
    findings = append(findings, injectionDetector.Detect(file, surface)...)

    // Layer 2: Prompt flow analysis (medium)
    flowDetector := NewPromptFlowDetector()
    findings = append(findings, flowDetector.Detect(file, surface)...)

    // Layer 3: Exfiltration detection (medium)
    exfilDetector := NewExfiltrationDetector()
    findings = append(findings, exfilDetector.Detect(file, surface)...)

    // Layer 4: ML classification (if enabled)
    if mlDetector.IsEnabled() {
        findings = append(findings, mlDetector.Detect(file, surface)...)
    }

    // Layer 5: LLM analysis (expensive, most accurate)
    if llmDetector.IsEnabled() {
        findings = append(findings, llmDetector.Detect(file, surface)...)
    }

    return deduplicateFindings(findings)
}

API Reference

PromptFlowDetector

Method Parameters Returns Description
NewPromptFlowDetector - *PromptFlowDetector Create detector
Detect file, surface []Match Run detection
GetTraces - []FlowTrace Get flow traces
AddSink SinkDef - Add custom sink
SetMinConfidence Confidence - Set threshold

ExfiltrationDetector

Method Parameters Returns Description
NewExfiltrationDetector - *ExfiltrationDetector Create detector
Detect file, surface []Match Run detection
AddPattern *regexp.Regexp - Add custom pattern

LLM Sink Functions

Function Description
GetLLMSinks() Get all registered LLM sinks
GetSinksByLanguage(lang) Get sinks for specific language
GetSinksByProvider(provider) Get sinks for specific provider
RegisterSink(sink) Register custom sink

See Also