Skip to content

Pattern Detection Engine

Technical document for security analysts


1. Introduction

The Pattern Engine is the rule-based detection engine that searches for specific patterns in source code. Unlike taint analysis that tracks data flow, the pattern engine detects code constructs known to be dangerous through regular expressions and AST analysis.


2. Pattern Engine Architecture

2.1 Components

+------------------+
|  Pattern Engine  |
+------------------+
        |
        v
+------------------+
|     Rules        |
| +------------+   |
| | Rule 1     |   |
| | Rule 2     |   |
| | ...        |   |
| +------------+   |
+------------------+
        |
        v
+------------------+     +------------------+
|    Detectors     |     | AST + Surface    |
| - Regex-based    |<--->|   (input)        |
| - AST-based      |     |                  |
| - Hybrid         |     |                  |
+------------------+     +------------------+
        |
        v
+------------------+
|     Matches      |
+------------------+

2.2 Base Code

Location: internal/pattern/engine.go

type Engine struct {
    rules             []*Rule
    severityOverrides map[string]types.Severity
    disabledRules     map[string]bool
}

3. Rule Structure

3.1 Rule Definition

type Rule struct {
    ID          string              // Unique identifier (MCP-X001)
    Class       types.VulnClass     // Vulnerability class (A-N)
    Language    []types.Language    // Applicable languages
    Severity    types.Severity      // critical/high/medium/low/info
    Confidence  types.Confidence    // high/medium/low
    Description string              // Problem description
    Remediation string              // How to fix it
    Detector    Detector            // Detection logic
}

3.2 ID Convention

The ID format follows the pattern MCP-X###:

Component Meaning
MCP- Project prefix
X Class letter (A-N)
### Sequential number (001-999)

Examples: - MCP-A003: Third rule of class A (RCE) - MCP-B002: Second rule of class B (Filesystem) - MCP-G001: First rule of class G (Tool Poisoning)


4. Detector Interface

4.1 Definition

type Detector interface {
    Detect(file *ast.File, surface *surface.MCPSurface) []Match
}

Every detector must implement this interface. It receives: - file: Parsed AST of the file - surface: Extracted MCP surface (tools, resources, etc.)

Returns list of found Match.

4.2 Match Structure

type Match struct {
    Location    types.Location      // Position in file
    Snippet     string              // Code fragment
    Context     string              // Additional context
    Confidence  types.Confidence    // Can override rule confidence
    RuleID      string              // Can override rule ID
    Title       string              // Can override title
    Description string              // Can override description
    Severity    types.Severity      // Can override severity
    Class       types.VulnClass     // Can override class
    Remediation string              // Can override remediation
    Evidence    Evidence            // Extended evidence
}

5. Types of Detectors

5.1 RegexDetector (Regex-Based)

The simplest detector, searches for patterns with regular expressions:

type RegexDetector struct {
    Pattern *regexp.Regexp
}

func (d *RegexDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
    var matches []Match

    if file.RawContent != "" {
        lines := strings.Split(file.RawContent, "\n")
        for lineNum, line := range lines {
            if d.Pattern.MatchString(line) {
                matches = append(matches, Match{
                    Location: types.Location{
                        StartLine: lineNum + 1,
                    },
                    Snippet: strings.TrimSpace(line),
                })
            }
        }
    }
    return matches
}

Usage:

engine.AddCustomRule("CUSTOM-001", `os\.system\(`, types.SeverityCritical, ...)

5.2 AST-Based Detectors

Detectors that analyze AST structure:

type DangerousFunctionDetector struct{}

var dangerousFunctions = map[string]bool{
    "eval":    true,
    "exec":    true,
    "compile": true,
}

func (d *DangerousFunctionDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
    var matches []Match

    for _, fn := range file.Functions {
        for _, stmt := range fn.Body {
            if exprStmt, ok := stmt.(*ast.ExpressionStatement); ok {
                if call, ok := exprStmt.Expression.(*ast.Call); ok {
                    if dangerousFunctions[call.Function] {
                        matches = append(matches, Match{
                            Location: call.Location,
                            Snippet:  call.Function,
                        })
                    }
                }
            }
        }
    }
    return matches
}

5.3 Surface-Aware Detectors

Detectors that use the MCP surface:

type PromptInjectionDetector struct{}

var injectionMarkers = []string{
    "ignore previous",
    "disregard",
    "you are now",
    "act as",
}

func (d *PromptInjectionDetector) Detect(file *ast.File, surf *surface.MCPSurface) []Match {
    var matches []Match

    if surf != nil {
        for _, tool := range surf.Tools {
            descLower := strings.ToLower(tool.Description)
            for _, marker := range injectionMarkers {
                if strings.Contains(descLower, marker) {
                    matches = append(matches, Match{
                        Location: tool.Location,
                        Snippet:  tool.Description,
                        Context:  "Tool: " + tool.Name,
                    })
                    break
                }
            }
        }
    }
    return matches
}

5.4 Hybrid Detectors (Regex + AST)

Detectors that combine multiple techniques:

type DirectShellDetector struct{}

var shellPatterns = []*regexp.Regexp{
    regexp.MustCompile(`(?i)os\.system\s*\(`),
    regexp.MustCompile(`(?i)subprocess\.call\s*\([^,]*,\s*shell\s*=\s*True`),
    regexp.MustCompile(`(?i)child_process\.exec\s*\(`),
}

func (d *DirectShellDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
    var matches []Match

    // First try regex on raw content (more precise)
    if file.RawContent != "" {
        lines := strings.Split(file.RawContent, "\n")
        for lineNum, line := range lines {
            for _, pattern := range shellPatterns {
                if pattern.MatchString(line) {
                    matches = append(matches, Match{
                        Location: types.Location{StartLine: lineNum + 1},
                        Snippet:  strings.TrimSpace(line),
                    })
                    break
                }
            }
        }
        return matches
    }

    // Fallback to AST if no raw content
    for _, fn := range file.Functions {
        // ... AST analysis
    }
    return matches
}

6. Implemented Rules

6.1 Class A - RCE

Rule ID Detector Description
MCP-A003 DirectShellDetector Direct shell execution
MCP-A004 DangerousFunctionDetector eval/exec/compile

MCP-A003 Patterns:

(?i)os\.system\s*\(
(?i)subprocess\.call\s*\([^,]*,\s*shell\s*=\s*True
(?i)subprocess\.run\s*\([^,]*,\s*shell\s*=\s*True
(?i)subprocess\.Popen\s*\([^,]*,\s*shell\s*=\s*True
(?i)child_process\.exec\s*\(
(?i)child_process\.execSync\s*\(
(?i)execSync\s*\(

MCP-A004 Patterns:

\beval\s*\(
\bexec\s*\(
\bcompile\s*\(
\b__import__\s*\(
\bnew\s+Function\s*\(

6.2 Class B - Filesystem

Rule ID Detector Description
MCP-B002 PathTraversalPatternDetector Path traversal pattern

Patterns:

\.\.\/
\.\.\\
%2e%2e%2f
%2e%2e/
\.\.%2f

6.3 Class C - SSRF

Rule ID Detector Description
MCP-C002 UnvalidatedURLDetector Unvalidated URL

Monitored functions: - requests.get, requests.post, requests.put, requests.delete - fetch - axios.get, axios.post - http.get - urllib.request.urlopen

6.4 Class D - SQLi

Rule ID Detector Description
MCP-D002 SQLConcatDetector SQL concatenation

Patterns:

(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*\+.*
(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*%s
(?i)(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER).*\$\{
(?i)f["'].*SELECT.*\{
(?i)execute\s*\(\s*["'].*\+

6.5 Class E - Secrets

Rule ID Detector Description
MCP-E001 HardcodedSecretDetector Hardcoded secrets
MCP-E002 SecretVariableDetector Variables with suspicious names
MCP-E005 SecretLoggingDetector Secret logging

MCP-E001 Patterns:

(?i)(api[_-]?key|apikey)\s*[:=]\s*["']([A-Za-z0-9_\-]{20,})["']
(?i)(secret|password|passwd|pwd)\s*[:=]\s*["']([^"']{8,})["']
(?i)(token|auth[_-]?token)\s*[:=]\s*["']([A-Za-z0-9_\-\.]{20,})["']
(?i)(private[_-]?key)\s*[:=]\s*["']([^"']+)["']
(?i)bearer\s+[A-Za-z0-9_\-\.]{20,}
(?i)ghp_[A-Za-z0-9]{36}           # GitHub token
(?i)sk-[A-Za-z0-9]{48}            # OpenAI key
(?i)AKIA[A-Z0-9]{16}              # AWS access key
(?i)-----BEGIN (RSA |EC |DSA |OPENSSH )?PRIVATE KEY

MCP-E002 Pattern:

(?i)(api[_-]?key|secret|password|token|credential|auth)

6.6 Class F - Auth/OAuth

Rule ID Detector Description
MCP-F001 InsecureCookieDetector Cookie without Secure
MCP-F002 WeakJWTDetector Weak JWT
MCP-F003 OAuthStateDetector OAuth without state

MCP-F002 Patterns:

(?i)algorithms?\s*[=:]\s*\[?\s*["']none["']
(?i)verify\s*[=:]\s*False
(?i)verify\s*[=:]\s*false
(?i)options\s*:\s*\{[^}]*verify\s*:\s*false
(?i)ignoreExpiration\s*:\s*true

6.7 Class G - Tool Poisoning

Rule ID Detector Description
MCP-G001 PromptInjectionDetector Injection in description
MCP-G002 UnicodeDetector Suspicious unicode
MCP-G003 ToolShadowingDetector Tool shadowing

Injection markers (MCP-G001):

ignore previous
ignore all instructions
disregard
system prompt
you are now
act as
pretend to be
forget your instructions
new instructions
override
<important>
</important>
<hidden>
</hidden>
<system>
</system>
<instruction>
</instruction>
do not mention
do not reveal
secretly
must first
you must
include it in your response

Suspicious Unicode characters (MCP-G002):

\u202E  - RTL override
\u202D  - LTR override
\u202C  - POP directional formatting
\u200B  - Zero-width space
\u200C  - Zero-width non-joiner
\u200D  - Zero-width joiner
\uFEFF  - BOM / Zero-width no-break space

Shadowed tools (MCP-G003):

shell, exec, run, execute, terminal
bash, sh, cmd, system, eval
python, node, npm, pip
curl, wget, sudo, admin, root

6.8 Class N - Supply Chain

Rule ID Detector Description
MCP-N001 LockfileDetector No lockfile
MCP-N002 UntrustedDependencyDetector Untrusted dependency
MCP-N003 SuspiciousSetupDetector Suspicious setup

MCP-N002 Patterns:

(?i)git\+https?://
(?i)git\+ssh://
(?i)github\.com/[^/]+/[^/]+\.git
(?i)file://
(?i)http://   # non-HTTPS

MCP-N003 Patterns:

(?i)curl.*\|.*sh
(?i)wget.*\|.*sh
(?i)curl.*\|.*bash
(?i)wget.*\|.*bash
(?i)base64.*decode
(?i)reverse.*shell
(?i)nc\s+-e
(?i)netcat.*-e


7. Extended Rules

7.1 Rule Loading

The engine loads multiple rule sets:

func (e *Engine) loadRules() {
    e.LoadLifecycleRules()          // Class L
    e.LoadHiddenNetworkRules()      // Class M
    e.LoadExtendedInjectionRules()  // Extended Class G
    e.LoadPromptFlowRules()         // Class H
    e.LoadMLRules()                 // ML-based (Class G)
    // ... core rules
}

7.2 ML-Based Rules

Integrate the ML classifier for tool poisoning:

type MLDetector struct {
    classifier ml.Classifier
    threshold  float64
}

func (d *MLDetector) Detect(file *ast.File, surf *surface.MCPSurface) []Match {
    var matches []Match

    if surf != nil {
        for _, tool := range surf.Tools {
            result := d.classifier.Classify(tool.Description)
            if result.IsInjection && result.Probability >= d.threshold {
                matches = append(matches, Match{
                    Location:   tool.Location,
                    Snippet:    tool.Description,
                    Confidence: mapConfidence(result.Confidence),
                    Evidence: Evidence{
                        LLMAnalysis:   result.Reason,
                        LLMConfidence: result.Probability,
                        LLMCategory:   result.Category,
                    },
                })
            }
        }
    }
    return matches
}

7.3 LLM-Based Rules

Use LLM for semantic analysis:

type LLMDetector struct {
    detector *llm.Detector
}

func (d *LLMDetector) Detect(file *ast.File, surf *surface.MCPSurface) []Match {
    // Analyzes tool descriptions with LLM
    // Returns matches with Evidence.LLMAnalysis populated
}

7.4 CodeQL-Based Rules

Use CodeQL for secondary confirmation:

type CodeQLDetector struct {
    client *codeql.Client
}

func (d *CodeQLDetector) Detect(file *ast.File, _ *surface.MCPSurface) []Match {
    // Executes CodeQL queries
    // Returns matches with Evidence.CodeQLConfirmed = true
}

8. Rule Configuration

8.1 Disable Rules

# .mcp-scan.yaml
rules:
  disabled:
    - MCP-E001  # Don't look for hardcoded secrets
    - MCP-F001  # Don't verify cookies

Implementation:

func (e *Engine) SetDisabledRule(ruleID string) {
    e.disabledRules[ruleID] = true
}

func (e *Engine) IsRuleDisabled(ruleID string) bool {
    return e.disabledRules[ruleID]
}

8.2 Severity Override

rules:
  severity_overrides:
    MCP-A003: critical  # Elevate to critical
    MCP-E002: info      # Lower to informational

Implementation:

func (e *Engine) SetSeverityOverride(ruleID string, severity types.Severity) {
    e.severityOverrides[ruleID] = severity
}

func (e *Engine) GetEffectiveSeverity(ruleID string, defaultSeverity types.Severity) types.Severity {
    if override, ok := e.severityOverrides[ruleID]; ok {
        return override
    }
    return defaultSeverity
}

8.3 Custom Rules

rules:
  custom:
    - id: "CUSTOM-001"
      pattern: "dangerous_function\\("
      severity: high
      confidence: medium
      class: A
      description: "Use of dangerous function"
      remediation: "Use safe_function instead"
      languages:
        - python
        - javascript

Implementation:

func (e *Engine) AddCustomRule(
    id, pattern string,
    severity types.Severity,
    confidence types.Confidence,
    description, remediation string,
    languages []types.Language,
    class types.VulnClass,
) error {
    compiledPattern, err := regexp.Compile(pattern)
    if err != nil {
        return fmt.Errorf("invalid regex: %w", err)
    }

    rule := &Rule{
        ID:          id,
        Class:       class,
        Language:    languages,
        Severity:    severity,
        Confidence:  confidence,
        Description: description,
        Remediation: remediation,
        Detector:    &RegexDetector{Pattern: compiledPattern},
    }
    e.rules = append(e.rules, rule)
    return nil
}


9. Language Filtering

9.1 Language-Specific Rules

e.rules = append(e.rules, &Rule{
    ID:       "MCP-A003",
    Language: []types.Language{types.Python},  // Python only
    Detector: &DirectShellDetector{},
})

9.2 Filtering Logic

func (e *Engine) AnalyzeFile(file *ast.File, surf *surface.MCPSurface) []types.Finding {
    var findings []types.Finding

    for _, rule := range e.rules {
        // Skip disabled rules
        if e.IsRuleDisabled(rule.ID) {
            continue
        }

        // Verify language filter
        if len(rule.Language) > 0 {
            var langMatch bool
            for _, lang := range rule.Language {
                if lang == file.Language {
                    langMatch = true
                    break
                }
            }
            if !langMatch {
                continue  // Doesn't apply to this language
            }
        }

        // Execute detector
        matches := rule.Detector.Detect(file, surf)
        // ... process matches
    }
    return findings
}

10. Finding Generation

10.1 From Match to Finding

for _, match := range matches {
    match.Location.File = file.Path

    // Use match overrides if they exist, otherwise use rule defaults
    ruleID := rule.ID
    if match.RuleID != "" {
        ruleID = match.RuleID
    }

    severity := e.GetEffectiveSeverity(ruleID, rule.Severity)
    if match.Severity != "" {
        severity = match.Severity
    }

    confidence := match.Confidence
    if confidence == "" {
        confidence = rule.Confidence
    }

    finding := types.Finding{
        RuleID:      ruleID,
        Severity:    severity,
        Confidence:  confidence,
        Class:       rule.Class,
        Language:    file.Language,
        Location:    match.Location,
        Evidence:    convertEvidence(match.Evidence),
        Description: rule.Description,
        Remediation: rule.Remediation,
    }
    finding.ID = finding.GenerateID()
    findings = append(findings, finding)
}

10.2 Unique ID Generation

func (f *Finding) GenerateID() string {
    // ID components
    data := fmt.Sprintf("%s|%s|%d|%s",
        f.RuleID,
        f.Location.File,
        f.Location.StartLine,
        f.Evidence.Snippet,
    )

    // SHA-256 truncated to 16 hex characters
    hash := sha256.Sum256([]byte(data))
    return hex.EncodeToString(hash[:])[:16]
}

11. Rule Execution Order

11.1 Priority

  1. Lifecycle Rules (L) - Loaded first
  2. Hidden Network Rules (M) - Second
  3. Extended Injection (G) - Third
  4. Prompt Flow (H) - Fourth
  5. ML Rules (G) - Fifth
  6. Core Rules (A-G, N) - Last

11.2 Deduplication

If multiple rules detect the same problem, it's deduplicated by ID:

func NormalizeFindings(findings []Finding) []Finding {
    seen := make(map[string]bool)
    var unique []Finding

    for _, f := range findings {
        if !seen[f.ID] {
            seen[f.ID] = true
            unique = append(unique, f)
        }
    }

    return unique
}

12. Pattern Engine Limitations

12.1 False Positives

  1. Too broad regex: Can match benign code
  2. Commented code: Regex doesn't distinguish comments
  3. Strings: Pattern in string literal != actual use
  4. Dead code: Code never executed

12.2 False Negatives

  1. Obfuscation: ev + al() evades eval\(
  2. Indirection: getattr(module, "system")(cmd)
  3. Encoding: Base64/hex encoding of code
  4. Alias: from os import system as s

12.3 Recommendations

  1. Manually verify critical findings
  2. Combine with taint analysis for better precision
  3. Use baseline for known/accepted findings
  4. Adjust confidence of rules based on context

Next document: ml-classifier.md