Prompt Flow Detection Guide¶
Overview¶
The Prompt Flow Detection module tracks how user input flows to LLM API calls, detecting potential indirect prompt injection vulnerabilities. It extends the taint analysis engine to understand LLM-specific sinks.
Architecture¶
┌─────────────────────────────────────────────────────────────────┐
│ Prompt Flow Detection │
│ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Sources (User Input) ││
│ │ - MCP tool parameters ││
│ │ - HTTP request data ││
│ │ - Environment variables ││
│ │ - File content ││
│ └──────────────────────────┬──────────────────────────────────┘│
│ │ │
│ ┌──────────────────────────▼──────────────────────────────────┐│
│ │ Taint Propagation ││
│ │ - Assignments ││
│ │ - String concatenation ││
│ │ - Function calls ││
│ │ - Template formatting ││
│ └──────────────────────────┬──────────────────────────────────┘│
│ │ │
│ ┌──────────────────────────▼──────────────────────────────────┐│
│ │ LLM Sinks ││
│ │ - OpenAI API ││
│ │ - Anthropic API ││
│ │ - LangChain ││
│ │ - Local LLM calls ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
LLM Sink Catalog¶
OpenAI¶
// internal/catalog/llm_sinks.go
var OpenAISinks = []SinkDef{
// Chat Completions
{
ID: "py-openai-chat",
Language: types.Python,
Receiver: "openai.ChatCompletion",
Function: "create",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgName: "messages",
VulnClass: types.ClassH,
},
// New client style
{
ID: "py-openai-client-chat",
Language: types.Python,
Receiver: "client.chat.completions",
Function: "create",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgName: "messages",
VulnClass: types.ClassH,
},
// Completions (legacy)
{
ID: "py-openai-completion",
Language: types.Python,
Receiver: "openai.Completion",
Function: "create",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgName: "prompt",
VulnClass: types.ClassH,
},
}
Detected patterns:
from openai import OpenAI
client = OpenAI()
@tool
def chat(user_message: str):
# user_message flows to LLM - detected as MCP-H-001
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": user_message}]
)
Anthropic¶
var AnthropicSinks = []SinkDef{
{
ID: "py-anthropic-messages",
Language: types.Python,
Receiver: "anthropic.Anthropic",
Function: "messages.create",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgName: "messages",
VulnClass: types.ClassH,
},
{
ID: "py-anthropic-client",
Language: types.Python,
Receiver: "client.messages",
Function: "create",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgName: "messages",
VulnClass: types.ClassH,
},
}
Detected patterns:
import anthropic
client = anthropic.Anthropic()
@tool
def ask_claude(question: str):
# question flows to LLM - detected
response = client.messages.create(
model="claude-3-opus",
messages=[{"role": "user", "content": question}]
)
LangChain¶
var LangChainSinks = []SinkDef{
// LLM invoke
{
ID: "py-langchain-llm-invoke",
Language: types.Python,
Receiver: "llm",
Function: "invoke",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgIndex: 0,
VulnClass: types.ClassH,
},
// Chain run
{
ID: "py-langchain-chain-run",
Language: types.Python,
Receiver: "chain",
Function: "run",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgIndex: 0,
VulnClass: types.ClassH,
},
// Agent execute
{
ID: "py-langchain-agent",
Language: types.Python,
Receiver: "agent",
Function: "run",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgIndex: 0,
VulnClass: types.ClassH,
},
// Prompt template
{
ID: "py-langchain-prompt-format",
Language: types.Python,
Receiver: "prompt",
Function: "format",
Category: SinkLLMPrompt,
Severity: types.SeverityMedium,
ArgName: "*",
VulnClass: types.ClassH,
},
}
Detected patterns:
from langchain.llms import OpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
llm = OpenAI()
prompt = PromptTemplate(template="Answer: {question}")
chain = LLMChain(llm=llm, prompt=prompt)
@tool
def answer(question: str):
# question flows through chain to LLM - detected
return chain.run(question)
TypeScript/JavaScript Sinks¶
var TypeScriptLLMSinks = []SinkDef{
// OpenAI
{
ID: "ts-openai-chat",
Language: types.TypeScript,
Receiver: "openai.chat.completions",
Function: "create",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgName: "messages",
VulnClass: types.ClassH,
},
// Anthropic
{
ID: "ts-anthropic-messages",
Language: types.TypeScript,
Receiver: "anthropic.messages",
Function: "create",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgName: "messages",
VulnClass: types.ClassH,
},
// Vercel AI SDK
{
ID: "ts-vercel-ai",
Language: types.TypeScript,
Receiver: "",
Function: "generateText",
Category: SinkLLMPrompt,
Severity: types.SeverityHigh,
ArgName: "prompt",
VulnClass: types.ClassH,
},
}
Prompt Flow Detector¶
Implementation¶
// internal/pattern/prompt_flow.go
type PromptFlowDetector struct {
taintEngine *taint.Engine
llmSinks []catalog.SinkDef
}
func NewPromptFlowDetector() *PromptFlowDetector {
return &PromptFlowDetector{
taintEngine: taint.NewEngine(),
llmSinks: catalog.GetLLMSinks(),
}
}
func (d *PromptFlowDetector) Detect(file *ast.File, surface *surface.MCPSurface) []Match {
var matches []Match
// Initialize sources from MCP tool parameters
for _, tool := range surface.Tools {
for _, param := range tool.Parameters {
d.taintEngine.AddSource(param.Name, taint.SourceInfo{
Location: tool.Location,
Category: taint.SourceToolInput,
ToolName: tool.Name,
ParamName: param.Name,
})
}
}
// Run taint analysis
d.taintEngine.Analyze(file)
// Check if tainted data reaches LLM sinks
for _, fn := range file.Functions {
for _, call := range d.findCalls(fn) {
if sink := d.matchLLMSink(call); sink != nil {
// Check if any argument is tainted
for _, arg := range call.Arguments {
if taintInfo := d.taintEngine.GetTaint(arg); taintInfo != nil {
matches = append(matches, d.createMatch(call, sink, taintInfo))
}
}
}
}
}
return matches
}
Detection Rules¶
MCP-H-001: Unsanitized Tool Input to LLM¶
@tool
def ask(question: str): # Source: tool parameter
# Direct flow to LLM - HIGH severity
response = openai.chat.completions.create(
messages=[{"role": "user", "content": question}] # Sink
)
Finding:
{
"rule_id": "MCP-H-001",
"title": "Unsanitized Tool Input to LLM",
"severity": "high",
"description": "User input from tool parameter 'question' flows directly to LLM API without sanitization",
"trace": [
{"location": "tool.py:2", "action": "source", "variable": "question"},
{"location": "tool.py:4", "action": "sink", "variable": "messages[0].content"}
]
}
MCP-H-002: Concatenated Prompt Injection Risk¶
SYSTEM_PROMPT = "You are a helpful assistant."
@tool
def chat(user_input: str): # Source
# String concatenation - MEDIUM severity
prompt = SYSTEM_PROMPT + "\n\nUser: " + user_input
response = llm.invoke(prompt) # Sink
Finding:
{
"rule_id": "MCP-H-002",
"title": "Concatenated Prompt Injection Risk",
"severity": "medium",
"description": "User input concatenated into prompt without proper separation",
"evidence": {
"snippet": "prompt = SYSTEM_PROMPT + \"\\n\\nUser: \" + user_input"
}
}
MCP-H-003: Template Injection via Format String¶
TEMPLATE = "Answer the question: {question}\nBe concise."
@tool
def answer(question: str): # Source
# Format string - MEDIUM severity
prompt = TEMPLATE.format(question=question)
return llm.invoke(prompt) # Sink
Sanitization Detection¶
The detector recognizes common sanitization patterns:
var PromptSanitizers = []SanitizerDef{
// Input validation
{
ID: "input-length-check",
Pattern: `len\(.*\)\s*[<>]=?\s*\d+`,
Category: SanitizerInputValidation,
},
// Content filtering
{
ID: "content-filter",
Pattern: `filter_content|sanitize_input|clean_input`,
Category: SanitizerContentFilter,
},
// Structured output
{
ID: "json-output",
Pattern: `response_format.*json`,
Category: SanitizerStructuredOutput,
},
// System prompt separation
{
ID: "system-message",
Pattern: `"role":\s*"system"`,
Category: SanitizerPromptSeparation,
},
}
Sanitized example (no finding):
@tool
def safe_chat(user_input: str):
# Length validation - recognized as sanitizer
if len(user_input) > 1000:
raise ValueError("Input too long")
# System prompt separation - recognized as safe pattern
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_input} # Properly separated
]
response = client.chat.completions.create(
model="gpt-4",
messages=messages
)
Exfiltration Detection¶
Exfiltration Patterns¶
// internal/pattern/exfiltration.go
var ExfiltrationPatterns = []*regexp.Regexp{
// Output manipulation
regexp.MustCompile(`(?i)include\s+.{1,50}\s+in\s+(your|the)\s+response`),
regexp.MustCompile(`(?i)output\s+.{1,50}\s+to\s+me`),
// Information extraction
regexp.MustCompile(`(?i)reveal\s+.{1,50}\s+information`),
regexp.MustCompile(`(?i)show\s+me\s+.{1,50}\s+secret`),
regexp.MustCompile(`(?i)what\s+is\s+your\s+.{1,50}\s+prompt`),
// System prompt extraction
regexp.MustCompile(`(?i)tell\s+me\s+your\s+instructions`),
regexp.MustCompile(`(?i)print\s+your\s+system`),
// Resource access
regexp.MustCompile(`(?i)access\s+the\s+resource\s+.{1,100}`),
regexp.MustCompile(`(?i)read\s+file\s+.{1,100}`),
regexp.MustCompile(`(?i)fetch\s+url\s+.{1,100}`),
}
Exfiltration Detector¶
type ExfiltrationDetector struct {
patterns []*regexp.Regexp
}
func (d *ExfiltrationDetector) Detect(file *ast.File, surface *surface.MCPSurface) []Match {
var matches []Match
// Check tool descriptions
for _, tool := range surface.Tools {
for _, pattern := range d.patterns {
if pattern.MatchString(tool.Description) {
matches = append(matches, Match{
RuleID: "MCP-G-004",
Title: "Exfiltration Pattern in Tool Description",
Location: tool.Location,
Severity: types.SeverityHigh,
Evidence: Evidence{
Snippet: truncate(tool.Description, 100),
Pattern: pattern.String(),
},
})
}
}
}
// Check string literals
for _, str := range file.Strings {
for _, pattern := range d.patterns {
if pattern.MatchString(str.Value) {
matches = append(matches, Match{
RuleID: "MCP-G-005",
Title: "Exfiltration Pattern in String",
Location: str.Location,
Severity: types.SeverityMedium,
Evidence: Evidence{
Snippet: truncate(str.Value, 100),
},
})
}
}
}
return matches
}
Trace Generation¶
Flow Trace¶
type FlowTrace struct {
Source TraceNode `json:"source"`
Sink TraceNode `json:"sink"`
Steps []TraceStep `json:"steps"`
Risk string `json:"risk"`
}
type TraceNode struct {
Location types.Location `json:"location"`
Variable string `json:"variable"`
Type string `json:"type"`
}
type TraceStep struct {
Location types.Location `json:"location"`
Action string `json:"action"`
From string `json:"from"`
To string `json:"to"`
Note string `json:"note,omitempty"`
}
Example Trace¶
{
"source": {
"location": {"file": "server.py", "line": 10, "col": 5},
"variable": "user_query",
"type": "tool_parameter"
},
"sink": {
"location": {"file": "server.py", "line": 25, "col": 12},
"variable": "messages[0].content",
"type": "llm_api_call"
},
"steps": [
{
"location": {"file": "server.py", "line": 10},
"action": "source",
"to": "user_query",
"note": "Tool parameter from MCP client"
},
{
"location": {"file": "server.py", "line": 15},
"action": "assign",
"from": "user_query",
"to": "query"
},
{
"location": {"file": "server.py", "line": 20},
"action": "format",
"from": "query",
"to": "prompt",
"note": "String interpolation"
},
{
"location": {"file": "server.py", "line": 25},
"action": "call_arg",
"from": "prompt",
"to": "messages[0].content",
"note": "Passed to OpenAI API"
}
],
"risk": "User input flows to LLM without sanitization"
}
Configuration¶
Enabling Prompt Flow Detection¶
# mcp-scan.yaml
analysis:
prompt_flow:
enabled: true
track_llm_sinks: true
detect_exfiltration: true
min_confidence: medium
llm_sinks:
# Additional custom sinks
custom:
- receiver: "my_llm_client"
function: "generate"
arg_name: "prompt"
severity: high
CLI Usage¶
# Enable prompt flow detection
mcp-scan scan /path/to/project --prompt-flow
# Show detailed traces
mcp-scan scan /path/to/project --prompt-flow --show-traces
# JSON output with traces
mcp-scan scan /path/to/project --prompt-flow --output json
Integration with Other Detectors¶
Combined Detection Pipeline¶
func CombinedPromptSecurityScan(file *ast.File, surface *surface.MCPSurface) []types.Finding {
var findings []types.Finding
// Layer 1: Pattern-based detection (fast)
injectionDetector := NewExtendedInjectionDetector()
findings = append(findings, injectionDetector.Detect(file, surface)...)
// Layer 2: Prompt flow analysis (medium)
flowDetector := NewPromptFlowDetector()
findings = append(findings, flowDetector.Detect(file, surface)...)
// Layer 3: Exfiltration detection (medium)
exfilDetector := NewExfiltrationDetector()
findings = append(findings, exfilDetector.Detect(file, surface)...)
// Layer 4: ML classification (if enabled)
if mlDetector.IsEnabled() {
findings = append(findings, mlDetector.Detect(file, surface)...)
}
// Layer 5: LLM analysis (expensive, most accurate)
if llmDetector.IsEnabled() {
findings = append(findings, llmDetector.Detect(file, surface)...)
}
return deduplicateFindings(findings)
}
API Reference¶
PromptFlowDetector¶
| Method | Parameters | Returns | Description |
|---|---|---|---|
NewPromptFlowDetector |
- | *PromptFlowDetector | Create detector |
Detect |
file, surface | []Match | Run detection |
GetTraces |
- | []FlowTrace | Get flow traces |
AddSink |
SinkDef | - | Add custom sink |
SetMinConfidence |
Confidence | - | Set threshold |
ExfiltrationDetector¶
| Method | Parameters | Returns | Description |
|---|---|---|---|
NewExfiltrationDetector |
- | *ExfiltrationDetector | Create detector |
Detect |
file, surface | []Match | Run detection |
AddPattern |
*regexp.Regexp | - | Add custom pattern |
LLM Sink Functions¶
| Function | Description |
|---|---|
GetLLMSinks() |
Get all registered LLM sinks |
GetSinksByLanguage(lang) |
Get sinks for specific language |
GetSinksByProvider(provider) |
Get sinks for specific provider |
RegisterSink(sink) |
Register custom sink |