Pattern Engine System¶
Overview¶
The pattern engine (internal/pattern/) provides rule-based vulnerability detection using AST analysis, regex patterns, and surface inspection. It's the primary detection mechanism for security issues that can be identified through static code patterns.
Architecture¶
┌─────────────────────────────────────────────────────────────┐
│ Pattern Engine │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────┐ ┌───────────────┐ ┌─────────────┐ │
│ │ Rules │ │ Detectors │ │ Matches │ │
│ │ (Registry) │───▶│ (Interface) │───▶│ (Results) │ │
│ └───────────────┘ └───────────────┘ └─────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ ┌─────────────┐ │
│ │ - ID │ │ AST File │ │ - Location │ │
│ │ - Severity │ │ MCP Surface │ │ - Snippet │ │
│ │ - Confidence │ │ Raw Content │ │ - Context │ │
│ │ - Description │ │ │ │ - Confidence│ │
│ └───────────────┘ └───────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Core Types¶
Rule¶
Represents a detection rule:
type Rule struct {
ID string // Unique identifier (e.g., "MCP-A003")
Class types.VulnClass // Vulnerability class (A-N)
Language []types.Language // Language filter (empty = all)
Severity types.Severity // Critical, High, Medium, Low
Confidence types.Confidence // High, Medium, Low
Description string // Human-readable description
Remediation string // How to fix the issue
Detector Detector // Implementation
}
Detector Interface¶
All detectors implement this interface:
Match¶
Result of a detection:
type Match struct {
Location types.Location // File path, line numbers
Snippet string // Code snippet
Context string // Additional context
Confidence types.Confidence // Detection confidence
}
Built-in Detectors¶
Class A - Remote Code Execution¶
DirectShellDetector (MCP-A003)¶
Detects direct shell command execution:
# Detected patterns:
os.system("command")
subprocess.call(..., shell=True)
subprocess.run(..., shell=True)
subprocess.Popen(..., shell=True)
child_process.exec(...)
child_process.execSync(...)
Regex patterns:
var shellPatterns = []*regexp.Regexp{
regexp.MustCompile(`(?i)os\.system\s*\(`),
regexp.MustCompile(`(?i)subprocess\.call\s*\([^,]*,\s*shell\s*=\s*True`),
regexp.MustCompile(`(?i)subprocess\.run\s*\([^,]*,\s*shell\s*=\s*True`),
regexp.MustCompile(`(?i)subprocess\.Popen\s*\([^,]*,\s*shell\s*=\s*True`),
regexp.MustCompile(`(?i)child_process\.exec\s*\(`),
regexp.MustCompile(`(?i)child_process\.execSync\s*\(`),
regexp.MustCompile(`(?i)execSync\s*\(`),
}
DangerousFunctionDetector (MCP-A004)¶
Detects use of eval, exec, compile:
# Detected functions:
eval(...)
exec(...)
compile(...)
__import__(...)
new Function(...) # JavaScript
Class B - Filesystem¶
PathTraversalPatternDetector (MCP-B002)¶
Detects path traversal patterns:
Class C - SSRF¶
UnvalidatedURLDetector (MCP-C002)¶
Detects URL construction without validation:
# Detected patterns:
requests.get(url_variable)
requests.post(host_variable)
fetch(endpoint)
axios.get(user_url)
urllib.request.urlopen(url)
Class D - SQL Injection¶
SQLConcatDetector (MCP-D002)¶
Detects SQL string concatenation:
# Detected patterns:
"SELECT * FROM users WHERE id=" + user_id
f"SELECT * FROM users WHERE id={user_id}"
`SELECT * FROM users WHERE id=${userId}`
execute("SELECT * FROM users WHERE id=" + id)
Regex patterns:
var sqlPatterns = []*regexp.Regexp{
regexp.MustCompile(`(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*\+.*`),
regexp.MustCompile(`(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*%s`),
regexp.MustCompile(`(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*\$\{`),
regexp.MustCompile(`(?i)f["'].*SELECT.*\{`),
regexp.MustCompile(`(?i)execute\s*\(\s*["'].*\+`),
}
Class E - Secrets¶
HardcodedSecretDetector (MCP-E001)¶
Detects hardcoded secrets:
# Detected patterns:
api_key = "sk_live_..."
password = "secret123"
token = "ghp_xxxx..."
AKIA... (AWS access keys)
-----BEGIN PRIVATE KEY-----
Patterns:
- API keys: api_key|apikey with 20+ char value
- Secrets: secret|password|passwd|pwd with 8+ char value
- Tokens: token|auth_token with 20+ char value
- GitHub tokens: ghp_[A-Za-z0-9]{36}
- OpenAI keys: sk-[A-Za-z0-9]{48}
- AWS keys: AKIA[A-Z0-9]{16}
- Private keys: -----BEGIN (RSA |EC |DSA |OPENSSH )?PRIVATE KEY
SecretVariableDetector (MCP-E002)¶
Detects variables that might contain secrets:
# Detected variable names:
api_key = ...
secret = ...
password = ...
token = ...
credential = ...
auth = ...
SecretLoggingDetector (MCP-E005)¶
Detects secrets being logged:
Class F - Auth/OAuth¶
InsecureCookieDetector (MCP-F001)¶
Detects insecure cookie settings:
WeakJWTDetector (MCP-F002)¶
Detects weak JWT configurations:
OAuthStateDetector (MCP-F003)¶
Detects missing OAuth state parameter:
Class G - Tool Poisoning¶
PromptInjectionDetector (MCP-G001)¶
Detects prompt injection markers in tool descriptions:
Injection markers:
var injectionMarkers = []string{
"ignore previous",
"ignore all instructions",
"disregard",
"system prompt",
"you are now",
"act as",
"pretend to be",
"forget your instructions",
"new instructions",
"override",
"<important>", "</important>",
"<hidden>", "</hidden>",
"<system>", "</system>",
"<instruction>", "</instruction>",
"<secret>", "</secret>",
"[system]", "[hidden]", "[important]",
"do not mention",
"do not reveal",
"without telling",
"secretly",
"must first",
"you must",
"include it in your response",
"access the resource",
}
UnicodeDetector (MCP-G002)¶
Detects suspicious Unicode characters:
// Detected characters:
'\u202E' // RTL override
'\u202D' // LTR override
'\u202C' // Pop directional formatting
'\u200B' // Zero-width space
'\u200C' // Zero-width non-joiner
'\u200D' // Zero-width joiner
'\uFEFF' // Byte order mark
// Unicode combining marks
ToolShadowingDetector (MCP-G003)¶
Detects tools that shadow system commands:
var shadowedTools = map[string]bool{
"shell": true,
"exec": true,
"run": true,
"execute": true,
"terminal": true,
"bash": true,
"sh": true,
"cmd": true,
"system": true,
"eval": true,
"python": true,
"node": true,
"npm": true,
"pip": true,
"curl": true,
"wget": true,
"sudo": true,
"admin": true,
"root": true,
}
Class N - Supply Chain¶
LockfileDetector (MCP-N001)¶
Checks for presence of lockfiles:
package-lock.jsonyarn.lockrequirements.txt.lockPipfile.lock
UntrustedDependencyDetector (MCP-N002)¶
Detects dependencies from untrusted sources:
# Detected patterns:
git+https://...
git+ssh://...
github.com/.../....git
file://...
http://... (non-HTTPS)
SuspiciousSetupDetector (MCP-N003)¶
Detects suspicious commands in setup scripts:
# Detected patterns:
curl ... | sh
wget ... | sh
curl ... | bash
wget ... | bash
base64 ... decode
nc -e ...
netcat ... -e
Engine API¶
Creating the Engine¶
Adding Custom Rules¶
// Add programmatic rule
engine.AddRule(&pattern.Rule{
ID: "CUSTOM-001",
Class: types.ClassA,
Severity: types.SeverityHigh,
Confidence: types.ConfidenceHigh,
Description: "Custom vulnerability detected",
Remediation: "Fix the issue",
Detector: &CustomDetector{},
})
// Add regex-based rule
engine.AddCustomRule(
"CUSTOM-002", // ID
`dangerous_function\s*\(`, // Pattern
types.SeverityHigh, // Severity
types.ConfidenceMedium, // Confidence
"Dangerous function call", // Description
"Use safer alternative", // Remediation
[]types.Language{types.Python}, // Languages
types.ClassA, // Vulnerability class
)
Configuring Rules¶
// Override severity
engine.SetSeverityOverride("MCP-A003", types.SeverityCritical)
// Disable rule
engine.SetDisabledRule("MCP-E002")
// Check if disabled
if engine.IsRuleDisabled("MCP-E002") {
// Rule is disabled
}
Running Analysis¶
// Analyze multiple files
findings := engine.Analyze(files, surface)
// Analyze single file
findings := engine.AnalyzeFile(file, surface)
Detection Process¶
Per-File Analysis¶
- Language filter: Skip rules that don't apply to file's language
- Disabled check: Skip disabled rules
- Detector execution: Run detector's
Detect()method - Location enrichment: Add file path to match locations
- Severity override: Apply any configured overrides
- Finding generation: Create typed findings with unique IDs
Detection Methods¶
Detectors use two primary methods:
1. Raw Content Scanning¶
func (d *MyDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
if file.RawContent != "" {
lines := strings.Split(file.RawContent, "\n")
for lineNum, line := range lines {
if pattern.MatchString(line) {
matches = append(matches, Match{
Location: types.Location{
File: file.Path,
StartLine: lineNum + 1,
EndLine: lineNum + 1,
},
Snippet: line,
Confidence: types.ConfidenceHigh,
})
}
}
}
return matches
}
2. AST-Based Detection¶
func (d *MyDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
for _, fn := range file.Functions {
for _, stmt := range fn.Body {
if call, ok := stmt.(*ast.ExpressionStatement); ok {
if callExpr, ok := call.Expression.(*ast.Call); ok {
if isDangerous(callExpr.Function) {
matches = append(matches, Match{
Location: callExpr.Location,
Snippet: callExpr.Function,
Confidence: types.ConfidenceHigh,
})
}
}
}
}
}
return matches
}
3. Surface-Based Detection¶
func (d *MyDetector) Detect(file *ast.File, surf *surface.MCPSurface) []Match {
if surf != nil {
for _, tool := range surf.Tools {
if isSuspicious(tool.Description) {
matches = append(matches, Match{
Location: tool.Location,
Snippet: tool.Description,
Context: "Tool: " + tool.Name,
Confidence: types.ConfidenceHigh,
})
}
}
}
return matches
}
Rule Categories by Load Function¶
LoadLifecycleRules (Class L)¶
Plugin lifecycle issues: - Insecure initialization - Missing cleanup - Resource leaks
LoadHiddenNetworkRules (Class M)¶
Hidden network activity: - Undocumented connections - Covert channels - Data exfiltration
LoadExtendedInjectionRules (Class G)¶
Extended prompt injection detection: - More injection patterns - Encoding evasion - Multi-language attacks
LoadPromptFlowRules (Class H)¶
Prompt flow analysis: - User input to LLM prompt - Declaration vs behavior mismatch
LoadMLRules (Class G)¶
ML-based detection:
- Uses ml.Classifier for classification
- See ML Classifier Documentation
Creating Custom Detectors¶
Regex-Based Detector¶
type RegexDetector struct {
Pattern *regexp.Regexp
}
func (d *RegexDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
var matches []Match
if file.RawContent != "" {
lines := strings.Split(file.RawContent, "\n")
for lineNum, line := range lines {
if d.Pattern.MatchString(line) {
matches = append(matches, Match{
Location: types.Location{
File: file.Path,
StartLine: lineNum + 1,
EndLine: lineNum + 1,
},
Snippet: strings.TrimSpace(line),
Confidence: types.ConfidenceMedium,
})
}
}
}
return matches
}
Surface-Aware Detector¶
type MyToolDetector struct{}
func (d *MyToolDetector) Detect(file *ast.File, surf *surface.MCPSurface) []Match {
var matches []Match
if surf == nil {
return matches
}
for _, tool := range surf.Tools {
// Check tool properties
if tool.Name == "" {
matches = append(matches, Match{
Location: tool.Location,
Snippet: "unnamed tool",
Context: "Tool must have a name",
Confidence: types.ConfidenceHigh,
})
}
// Check tool parameters
for _, param := range tool.Parameters {
if param.Type == "" {
matches = append(matches, Match{
Location: param.Location,
Snippet: param.Name,
Context: "Parameter missing type",
Confidence: types.ConfidenceMedium,
})
}
}
}
return matches
}
Configuration¶
# mcp-scan.yaml
rules:
# Disable specific rules
disabled:
- MCP-E002 # Secret variable names (too noisy)
- MCP-N001 # Lockfile check
# Override severities
severity_overrides:
MCP-A003: critical # Promote shell execution
MCP-E001: high # Demote hardcoded secrets
# Custom rules
custom:
- id: CUSTOM-001
pattern: "dangerous_pattern"
severity: high
confidence: medium
description: "Custom dangerous pattern"
remediation: "Remove dangerous pattern"
languages: [python, javascript]
class: A
Performance¶
Detection Complexity¶
| Detector | Complexity | Notes |
|---|---|---|
| Regex-based | O(n*p) | n=lines, p=patterns |
| AST-based | O(n) | n=AST nodes |
| Surface-based | O(t) | t=tools |
Optimization Tips¶
- Use raw content scanning for simple patterns - faster than AST
- Limit regex complexity - avoid backtracking
- Short-circuit - return early when possible
- Cache compiled regex - compile once at init
Thread Safety¶
The engine is thread-safe for analysis:
// Safe: concurrent file analysis
var wg sync.WaitGroup
for _, file := range files {
wg.Add(1)
go func(f *ast.File) {
defer wg.Done()
findings := engine.AnalyzeFile(f, surface)
// Process findings
}(file)
}
wg.Wait()
Related Documentation¶
- ML Classifier - ML-based detection
- Taint Analysis - Data flow analysis
- Vulnerability Classes - Class definitions
- MCP Surface - Surface extraction