Pattern Detection Engine¶
Technical document for security analysts
1. Introduction¶
The Pattern Engine is the rule-based detection engine that searches for specific patterns in source code. Unlike taint analysis that tracks data flow, the pattern engine detects code constructs known to be dangerous through regular expressions and AST analysis.
2. Pattern Engine Architecture¶
2.1 Components¶
+------------------+
| Pattern Engine |
+------------------+
|
v
+------------------+
| Rules |
| +------------+ |
| | Rule 1 | |
| | Rule 2 | |
| | ... | |
| +------------+ |
+------------------+
|
v
+------------------+ +------------------+
| Detectors | | AST + Surface |
| - Regex-based |<--->| (input) |
| - AST-based | | |
| - Hybrid | | |
+------------------+ +------------------+
|
v
+------------------+
| Matches |
+------------------+
2.2 Base Code¶
Location: internal/pattern/engine.go
type Engine struct {
rules []*Rule
severityOverrides map[string]types.Severity
disabledRules map[string]bool
}
3. Rule Structure¶
3.1 Rule Definition¶
type Rule struct {
ID string // Unique identifier (MCP-X001)
Class types.VulnClass // Vulnerability class (A-N)
Language []types.Language // Applicable languages
Severity types.Severity // critical/high/medium/low/info
Confidence types.Confidence // high/medium/low
Description string // Problem description
Remediation string // How to fix it
Detector Detector // Detection logic
}
3.2 ID Convention¶
The ID format follows the pattern MCP-X###:
| Component | Meaning |
|---|---|
MCP- |
Project prefix |
X |
Class letter (A-N) |
### |
Sequential number (001-999) |
Examples:
- MCP-A003: Third rule of class A (RCE)
- MCP-B002: Second rule of class B (Filesystem)
- MCP-G001: First rule of class G (Tool Poisoning)
4. Detector Interface¶
4.1 Definition¶
Every detector must implement this interface. It receives:
- file: Parsed AST of the file
- surface: Extracted MCP surface (tools, resources, etc.)
Returns list of found Match.
4.2 Match Structure¶
type Match struct {
Location types.Location // Position in file
Snippet string // Code fragment
Context string // Additional context
Confidence types.Confidence // Can override rule confidence
RuleID string // Can override rule ID
Title string // Can override title
Description string // Can override description
Severity types.Severity // Can override severity
Class types.VulnClass // Can override class
Remediation string // Can override remediation
Evidence Evidence // Extended evidence
}
5. Types of Detectors¶
5.1 RegexDetector (Regex-Based)¶
The simplest detector, searches for patterns with regular expressions:
type RegexDetector struct {
Pattern *regexp.Regexp
}
func (d *RegexDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
var matches []Match
if file.RawContent != "" {
lines := strings.Split(file.RawContent, "\n")
for lineNum, line := range lines {
if d.Pattern.MatchString(line) {
matches = append(matches, Match{
Location: types.Location{
StartLine: lineNum + 1,
},
Snippet: strings.TrimSpace(line),
})
}
}
}
return matches
}
Usage:
5.2 AST-Based Detectors¶
Detectors that analyze AST structure:
type DangerousFunctionDetector struct{}
var dangerousFunctions = map[string]bool{
"eval": true,
"exec": true,
"compile": true,
}
func (d *DangerousFunctionDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
var matches []Match
for _, fn := range file.Functions {
for _, stmt := range fn.Body {
if exprStmt, ok := stmt.(*ast.ExpressionStatement); ok {
if call, ok := exprStmt.Expression.(*ast.Call); ok {
if dangerousFunctions[call.Function] {
matches = append(matches, Match{
Location: call.Location,
Snippet: call.Function,
})
}
}
}
}
}
return matches
}
5.3 Surface-Aware Detectors¶
Detectors that use the MCP surface:
type PromptInjectionDetector struct{}
var injectionMarkers = []string{
"ignore previous",
"disregard",
"you are now",
"act as",
}
func (d *PromptInjectionDetector) Detect(file *ast.File, surf *surface.MCPSurface) []Match {
var matches []Match
if surf != nil {
for _, tool := range surf.Tools {
descLower := strings.ToLower(tool.Description)
for _, marker := range injectionMarkers {
if strings.Contains(descLower, marker) {
matches = append(matches, Match{
Location: tool.Location,
Snippet: tool.Description,
Context: "Tool: " + tool.Name,
})
break
}
}
}
}
return matches
}
5.4 Hybrid Detectors (Regex + AST)¶
Detectors that combine multiple techniques:
type DirectShellDetector struct{}
var shellPatterns = []*regexp.Regexp{
regexp.MustCompile(`(?i)os\.system\s*\(`),
regexp.MustCompile(`(?i)subprocess\.call\s*\([^,]*,\s*shell\s*=\s*True`),
regexp.MustCompile(`(?i)child_process\.exec\s*\(`),
}
func (d *DirectShellDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
var matches []Match
// First try regex on raw content (more precise)
if file.RawContent != "" {
lines := strings.Split(file.RawContent, "\n")
for lineNum, line := range lines {
for _, pattern := range shellPatterns {
if pattern.MatchString(line) {
matches = append(matches, Match{
Location: types.Location{StartLine: lineNum + 1},
Snippet: strings.TrimSpace(line),
})
break
}
}
}
return matches
}
// Fallback to AST if no raw content
for _, fn := range file.Functions {
// ... AST analysis
}
return matches
}
6. Implemented Rules¶
6.1 Class A - RCE¶
| Rule ID | Detector | Description |
|---|---|---|
| MCP-A003 | DirectShellDetector | Direct shell execution |
| MCP-A004 | DangerousFunctionDetector | eval/exec/compile |
MCP-A003 Patterns:
(?i)os\.system\s*\(
(?i)subprocess\.call\s*\([^,]*,\s*shell\s*=\s*True
(?i)subprocess\.run\s*\([^,]*,\s*shell\s*=\s*True
(?i)subprocess\.Popen\s*\([^,]*,\s*shell\s*=\s*True
(?i)child_process\.exec\s*\(
(?i)child_process\.execSync\s*\(
(?i)execSync\s*\(
MCP-A004 Patterns:
6.2 Class B - Filesystem¶
| Rule ID | Detector | Description |
|---|---|---|
| MCP-B002 | PathTraversalPatternDetector | Path traversal pattern |
Patterns:
6.3 Class C - SSRF¶
| Rule ID | Detector | Description |
|---|---|---|
| MCP-C002 | UnvalidatedURLDetector | Unvalidated URL |
Monitored functions:
- requests.get, requests.post, requests.put, requests.delete
- fetch
- axios.get, axios.post
- http.get
- urllib.request.urlopen
6.4 Class D - SQLi¶
| Rule ID | Detector | Description |
|---|---|---|
| MCP-D002 | SQLConcatDetector | SQL concatenation |
Patterns:
(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*\+.*
(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*%s
(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*\$\{
(?i)f["'].*SELECT.*\{
(?i)execute\s*\(\s*["'].*\+
6.5 Class E - Secrets¶
| Rule ID | Detector | Description |
|---|---|---|
| MCP-E001 | HardcodedSecretDetector | Hardcoded secrets |
| MCP-E002 | SecretVariableDetector | Variables with suspicious names |
| MCP-E005 | SecretLoggingDetector | Secret logging |
MCP-E001 Patterns:
(?i)(api[_-]?key|apikey)\s*[:=]\s*["']([A-Za-z0-9_\-]{20,})["']
(?i)(secret|password|passwd|pwd)\s*[:=]\s*["']([^"']{8,})["']
(?i)(token|auth[_-]?token)\s*[:=]\s*["']([A-Za-z0-9_\-\.]{20,})["']
(?i)(private[_-]?key)\s*[:=]\s*["']([^"']+)["']
(?i)bearer\s+[A-Za-z0-9_\-\.]{20,}
(?i)ghp_[A-Za-z0-9]{36} # GitHub token
(?i)sk-[A-Za-z0-9]{48} # OpenAI key
(?i)AKIA[A-Z0-9]{16} # AWS access key
(?i)-----BEGIN (RSA |EC |DSA |OPENSSH )?PRIVATE KEY
MCP-E002 Pattern:
6.6 Class F - Auth/OAuth¶
| Rule ID | Detector | Description |
|---|---|---|
| MCP-F001 | InsecureCookieDetector | Cookie without Secure |
| MCP-F002 | WeakJWTDetector | Weak JWT |
| MCP-F003 | OAuthStateDetector | OAuth without state |
MCP-F002 Patterns:
(?i)algorithms?\s*[=:]\s*\[?\s*["']none["']
(?i)verify\s*[=:]\s*False
(?i)verify\s*[=:]\s*false
(?i)options\s*:\s*\{[^}]*verify\s*:\s*false
(?i)ignoreExpiration\s*:\s*true
6.7 Class G - Tool Poisoning¶
| Rule ID | Detector | Description |
|---|---|---|
| MCP-G001 | PromptInjectionDetector | Injection in description |
| MCP-G002 | UnicodeDetector | Suspicious unicode |
| MCP-G003 | ToolShadowingDetector | Tool shadowing |
Injection markers (MCP-G001):
ignore previous
ignore all instructions
disregard
system prompt
you are now
act as
pretend to be
forget your instructions
new instructions
override
<important>
</important>
<hidden>
</hidden>
<system>
</system>
<instruction>
</instruction>
do not mention
do not reveal
secretly
must first
you must
include it in your response
Suspicious Unicode characters (MCP-G002):
\u202E - RTL override
\u202D - LTR override
\u202C - POP directional formatting
\u200B - Zero-width space
\u200C - Zero-width non-joiner
\u200D - Zero-width joiner
\uFEFF - BOM / Zero-width no-break space
Shadowed tools (MCP-G003):
shell, exec, run, execute, terminal
bash, sh, cmd, system, eval
python, node, npm, pip
curl, wget, sudo, admin, root
6.8 Class N - Supply Chain¶
| Rule ID | Detector | Description |
|---|---|---|
| MCP-N001 | LockfileDetector | No lockfile |
| MCP-N002 | UntrustedDependencyDetector | Untrusted dependency |
| MCP-N003 | SuspiciousSetupDetector | Suspicious setup |
MCP-N002 Patterns:
(?i)git\+https?://
(?i)git\+ssh://
(?i)github\.com/[^/]+/[^/]+\.git
(?i)file://
(?i)http:// # non-HTTPS
MCP-N003 Patterns:
(?i)curl.*\|.*sh
(?i)wget.*\|.*sh
(?i)curl.*\|.*bash
(?i)wget.*\|.*bash
(?i)base64.*decode
(?i)reverse.*shell
(?i)nc\s+-e
(?i)netcat.*-e
7. Extended Rules¶
7.1 Rule Loading¶
The engine loads multiple rule sets:
func (e *Engine) loadRules() {
e.LoadLifecycleRules() // Class L
e.LoadHiddenNetworkRules() // Class M
e.LoadExtendedInjectionRules() // Extended Class G
e.LoadPromptFlowRules() // Class H
e.LoadMLRules() // ML-based (Class G)
// ... core rules
}
7.2 ML-Based Rules¶
Integrate the ML classifier for tool poisoning:
type MLDetector struct {
classifier ml.Classifier
threshold float64
}
func (d *MLDetector) Detect(file *ast.File, surf *surface.MCPSurface) []Match {
var matches []Match
if surf != nil {
for _, tool := range surf.Tools {
result := d.classifier.Classify(tool.Description)
if result.IsInjection && result.Probability >= d.threshold {
matches = append(matches, Match{
Location: tool.Location,
Snippet: tool.Description,
Confidence: mapConfidence(result.Confidence),
Evidence: Evidence{
LLMAnalysis: result.Reason,
LLMConfidence: result.Probability,
LLMCategory: result.Category,
},
})
}
}
}
return matches
}
7.3 LLM-Based Rules¶
Use LLM for semantic analysis:
type LLMDetector struct {
detector *llm.Detector
}
func (d *LLMDetector) Detect(file *ast.File, surf *surface.MCPSurface) []Match {
// Analyzes tool descriptions with LLM
// Returns matches with Evidence.LLMAnalysis populated
}
7.4 CodeQL-Based Rules¶
Use CodeQL for secondary confirmation:
type CodeQLDetector struct {
client *codeql.Client
}
func (d *CodeQLDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
// Executes CodeQL queries
// Returns matches with Evidence.CodeQLConfirmed = true
}
8. Rule Configuration¶
8.1 Disable Rules¶
# .mcp-scan.yaml
rules:
disabled:
- MCP-E001 # Don't look for hardcoded secrets
- MCP-F001 # Don't verify cookies
Implementation:
func (e *Engine) SetDisabledRule(ruleID string) {
e.disabledRules[ruleID] = true
}
func (e *Engine) IsRuleDisabled(ruleID string) bool {
return e.disabledRules[ruleID]
}
8.2 Severity Override¶
rules:
severity_overrides:
MCP-A003: critical # Elevate to critical
MCP-E002: info # Lower to informational
Implementation:
func (e *Engine) SetSeverityOverride(ruleID string, severity types.Severity) {
e.severityOverrides[ruleID] = severity
}
func (e *Engine) GetEffectiveSeverity(ruleID string, defaultSeverity types.Severity) types.Severity {
if override, ok := e.severityOverrides[ruleID]; ok {
return override
}
return defaultSeverity
}
8.3 Custom Rules¶
rules:
custom:
- id: "CUSTOM-001"
pattern: "dangerous_function\\("
severity: high
confidence: medium
class: A
description: "Use of dangerous function"
remediation: "Use safe_function instead"
languages:
- python
- javascript
Implementation:
func (e *Engine) AddCustomRule(
id, pattern string,
severity types.Severity,
confidence types.Confidence,
description, remediation string,
languages []types.Language,
class types.VulnClass,
) error {
compiledPattern, err := regexp.Compile(pattern)
if err != nil {
return fmt.Errorf("invalid regex: %w", err)
}
rule := &Rule{
ID: id,
Class: class,
Language: languages,
Severity: severity,
Confidence: confidence,
Description: description,
Remediation: remediation,
Detector: &RegexDetector{Pattern: compiledPattern},
}
e.rules = append(e.rules, rule)
return nil
}
9. Language Filtering¶
9.1 Language-Specific Rules¶
e.rules = append(e.rules, &Rule{
ID: "MCP-A003",
Language: []types.Language{types.Python}, // Python only
Detector: &DirectShellDetector{},
})
9.2 Filtering Logic¶
func (e *Engine) AnalyzeFile(file *ast.File, surf *surface.MCPSurface) []types.Finding {
var findings []types.Finding
for _, rule := range e.rules {
// Skip disabled rules
if e.IsRuleDisabled(rule.ID) {
continue
}
// Verify language filter
if len(rule.Language) > 0 {
var langMatch bool
for _, lang := range rule.Language {
if lang == file.Language {
langMatch = true
break
}
}
if !langMatch {
continue // Doesn't apply to this language
}
}
// Execute detector
matches := rule.Detector.Detect(file, surf)
// ... process matches
}
return findings
}
10. Finding Generation¶
10.1 From Match to Finding¶
for _, match := range matches {
match.Location.File = file.Path
// Use match overrides if they exist, otherwise use rule defaults
ruleID := rule.ID
if match.RuleID != "" {
ruleID = match.RuleID
}
severity := e.GetEffectiveSeverity(ruleID, rule.Severity)
if match.Severity != "" {
severity = match.Severity
}
confidence := match.Confidence
if confidence == "" {
confidence = rule.Confidence
}
finding := types.Finding{
RuleID: ruleID,
Severity: severity,
Confidence: confidence,
Class: rule.Class,
Language: file.Language,
Location: match.Location,
Evidence: convertEvidence(match.Evidence),
Description: rule.Description,
Remediation: rule.Remediation,
}
finding.ID = finding.GenerateID()
findings = append(findings, finding)
}
10.2 Unique ID Generation¶
func (f *Finding) GenerateID() string {
// ID components
data := fmt.Sprintf("%s|%s|%d|%s",
f.RuleID,
f.Location.File,
f.Location.StartLine,
f.Evidence.Snippet,
)
// SHA-256 truncated to 16 hex characters
hash := sha256.Sum256([]byte(data))
return hex.EncodeToString(hash[:])[:16]
}
11. Rule Execution Order¶
11.1 Priority¶
- Lifecycle Rules (L) - Loaded first
- Hidden Network Rules (M) - Second
- Extended Injection (G) - Third
- Prompt Flow (H) - Fourth
- ML Rules (G) - Fifth
- Core Rules (A-G, N) - Last
11.2 Deduplication¶
If multiple rules detect the same problem, it's deduplicated by ID:
func NormalizeFindings(findings []Finding) []Finding {
seen := make(map[string]bool)
var unique []Finding
for _, f := range findings {
if !seen[f.ID] {
seen[f.ID] = true
unique = append(unique, f)
}
}
return unique
}
12. Pattern Engine Limitations¶
12.1 False Positives¶
- Too broad regex: Can match benign code
- Commented code: Regex doesn't distinguish comments
- Strings: Pattern in string literal != actual use
- Dead code: Code never executed
12.2 False Negatives¶
- Obfuscation:
ev+al()evadeseval\( - Indirection:
getattr(module, "system")(cmd) - Encoding: Base64/hex encoding of code
- Alias:
from os import system as s
12.3 Recommendations¶
- Manually verify critical findings
- Combine with taint analysis for better precision
- Use baseline for known/accepted findings
- Adjust confidence of rules based on context
Next document: ml-classifier.md