Architecture¶
Overview of MCP-Scan's internal architecture and design decisions.
High-Level Architecture¶
┌─────────────────────────────────────────────────────────────────────────┐
│ MCP-Scan │
├─────────────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ CLI (cmd/mcp-scan) │ │
│ │ scan | version | init | surface | baseline │ │
│ └────────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────▼────────────────────────────────────┐ │
│ │ Scanner (pkg/scanner) │ │
│ │ Public API: Scan(), GenerateReport() │ │
│ └────────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────▼────────────────────────────────────┐ │
│ │ Pipeline │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────────┐│ │
│ │ │Discovery │→ │ Parser │→ │ Surface │→ │ Type Inference ││ │
│ │ │ │ │(tree- │ │Extractor │ │ (typeinfo/) ││ │
│ │ │ │ │ sitter) │ │ │ │ ││ │
│ │ └──────────┘ └──────────┘ └──────────┘ └────────┬─────────┘│ │
│ │ │ │ │ │ │ │
│ │ │ │ │ ┌───────────▼────────┐│ │
│ │ │ │ │ │ Import Resolver ││ │
│ │ │ │ │ │ (imports/) ││ │
│ │ │ │ │ └───────────┬────────┘│ │
│ │ │ │ │ │ │ │
│ │ │ │ │ ┌───────────▼────────┐│ │
│ │ │ │ │ │ Call Graph ││ │
│ │ │ │ │ │ (callgraph/) ││ │
│ │ │ │ │ └───────────┬────────┘│ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ ┌──────────────────────────────────────────────────────────┐ │ │
│ │ │ Taint Engine │ │ │
│ │ │ Sources → Propagation → Sinks (type-aware) │ │ │
│ │ └──────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌──────────────────────────────────────────────────────────┐ │ │
│ │ │ Pattern Engine │ │ │
│ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────┐ │ │ │
│ │ │ │ AST Rules │ │Regex Rules │ │ ML Classifier │ │ │ │
│ │ │ │ (Classes │ │ (Patterns) │ │ (ml/) │ │ │ │
│ │ │ │ A-N) │ │ │ │ 29 features │ │ │ │
│ │ │ └─────────────┘ └─────────────┘ └─────────────────┘ │ │ │
│ │ └──────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌──────────┐ ┌────────▼───┐ ┌──────────┐ │ │
│ │ │ Baseline │ │ MSSS │ │ Reporter │ │ │
│ │ │ Filter │ │ Scorer │ │JSON/SARIF│ │ │
│ │ └──────────┘ └────────────┘ └──────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
New Components (v2.0):
- Type Inference (internal/typeinfo/): Infers variable types for type-aware taint propagation
- Import Resolver (internal/imports/): Resolves imports for cross-file analysis
- Call Graph (internal/callgraph/): Persistent call graph for incremental inter-procedural analysis
- ML Classifier (internal/ml/): Machine learning-based prompt injection detection
Package Structure¶
mcp-scan/
├── cmd/mcp-scan/ # CLI entry point
│ └── main.go # Command definitions (Cobra)
│
├── pkg/scanner/ # Public Go API
│ └── scanner.go # Scanner type, Config, Result
│
└── internal/ # Internal packages
├── ast/ # Normalized AST structures
├── baseline/ # Baseline management
├── callgraph/ # Persistent call graph (Deep mode)
├── catalog/ # Sources, sinks, sanitizers
├── config/ # YAML configuration
├── discovery/ # File discovery
├── imports/ # Cross-file import resolution
├── ml/ # ML-based prompt injection detection
├── msss/ # MSSS scoring
├── parser/ # Tree-sitter parsing
├── pattern/ # Pattern engine and rules
├── reporter/ # Output formatters
├── scoring/ # Severity/confidence
├── surface/ # MCP surface extraction
├── taint/ # Taint analysis engine
├── typeinfo/ # Type inference engine
└── types/ # Shared types
New Advanced Components¶
The analyzer includes several advanced components for enhanced detection capabilities:
| Component | Package | Purpose | Documentation |
|---|---|---|---|
| Type Inference | internal/typeinfo/ |
Infer variable types for type-aware taint | type-inference.md |
| Import Resolver | internal/imports/ |
Cross-file import resolution | import-resolver.md |
| Call Graph | internal/callgraph/ |
Persistent call graph for inter-procedural analysis | call-graph.md |
| ML Classifier | internal/ml/ |
ML-based prompt injection detection | ml-classifier.md |
| Surface Extractor | internal/surface/ |
MCP tool, resource, prompt detection | surface-extraction.md |
| Taint Engine | internal/taint/ |
Data flow analysis (source→sink) | taint-analysis.md |
| Pattern Engine | internal/pattern/ |
Rule-based vulnerability detection | pattern-engine.md |
Component Details¶
Discovery (internal/discovery)¶
Purpose: Find files to scan based on include/exclude patterns.
type Discoverer struct {
include []string // Glob patterns to include
exclude []string // Glob patterns to exclude
}
func (d *Discoverer) Discover(root string) ([]string, error)
Features: - Glob pattern matching (doublestar library) - Language detection from file extension - Symlink handling - .gitignore respect (optional)
Parser (internal/parser)¶
Purpose: Parse source code into normalized AST using tree-sitter.
type Parser struct {
tsParser *TreeSitterParser
}
func (p *Parser) ParseFile(path string, lang types.Language) (*ast.File, error)
Supported Languages: - Python - TypeScript - JavaScript - Go
Tree-sitter Integration: - Uses smacker/go-tree-sitter bindings - Extracts: functions, classes, imports, statements - Preserves location information
AST (internal/ast)¶
Purpose: Language-agnostic AST representation.
type File struct {
Path string
Language types.Language
RawContent string
Functions []Function
Classes []Class
Imports []Import
}
type Function struct {
Name string
Parameters []Parameter
Body []Statement
Location types.Location
IsMethod bool
ClassName string
Decorators []string
}
Surface Extractor (internal/surface)¶
Purpose: Identify MCP-specific elements.
type MCPSurface struct {
Transport string
Tools []Tool
Resources []Resource
AuthSignals []AuthSignal
}
func Extract(files []*ast.File) *MCPSurface
Detection: - MCP SDK decorators (@tool, @resource) - Handler registrations - Transport initialization - Auth patterns (JWT, OAuth)
Taint Engine (internal/taint)¶
Purpose: Track data flow from sources to sinks.
type Engine struct {
catalog *catalog.Catalog
mode Mode
config *Config
}
func (e *Engine) Analyze(file *ast.File, surface *surface.MCPSurface) []Finding
Modes: - Fast: Intra-procedural only - Deep: Inter-procedural with function summaries
Taint State:
type TaintState struct {
Variables map[string]*TaintInfo
Properties map[string]map[string]*TaintInfo
Returns *TaintInfo
Parent *TaintState
}
Catalog (internal/catalog)¶
Purpose: Define sources, sinks, and sanitizers.
type Catalog struct {
Sources []SourceDef
Sinks []SinkDef
Sanitizers []SanitizerDef
}
type SourceDef struct {
ID string
Language types.Language
Pattern string
Category types.SourceCategory
}
Pattern Engine (internal/pattern)¶
Purpose: Apply detection rules to files.
type Engine struct {
rules []*Rule
}
type Rule struct {
ID string
Class types.VulnClass
Language []types.Language
Severity types.Severity
Confidence types.Confidence
Description string
Remediation string
Detector Detector
}
type Detector interface {
Detect(file *ast.File, surface *surface.MCPSurface) []Match
}
Rule Organization:
- deep_rules.go: Classes H, I, J, K (Deep mode only)
- lifecycle_rules.go: Class L
- hidden_network_rules.go: Class M
MSSS Scorer (internal/msss)¶
Purpose: Calculate security score and compliance level.
type Score struct {
Total float64
Level int
Compliant bool
Version string
Categories map[string]*CategoryScore
}
func Calculate(findings []types.Finding, mode string) *Score
Reporter (internal/reporter)¶
Purpose: Generate output in various formats.
type Reporter interface {
Generate(result *Result) ([]byte, error)
}
// Implementations
type JSONReporter struct{}
type SARIFReporter struct{}
type EvidenceReporter struct{}
Baseline (internal/baseline)¶
Purpose: Manage accepted findings.
type Baseline struct {
Version string
Generated time.Time
Findings []BaselinedFinding
}
func (b *Baseline) Filter(findings []types.Finding) []types.Finding
Data Flow¶
Scan Pipeline¶
1. Discovery
Input: root path, include/exclude patterns
Output: []string (file paths)
2. Parsing
Input: file paths
Output: []*ast.File
3. Surface Extraction
Input: []*ast.File
Output: *MCPSurface
4. Taint Analysis
Input: *ast.File, *MCPSurface, *Catalog
Output: []TaintFinding
5. Pattern Matching
Input: *ast.File, *MCPSurface, []Rule
Output: []Match
6. Finding Aggregation
Input: []TaintFinding, []Match
Output: []Finding
7. Baseline Filtering
Input: []Finding, *Baseline
Output: []Finding (filtered)
8. MSSS Scoring
Input: []Finding, Mode
Output: *Score
9. Report Generation
Input: *Result, format
Output: []byte
Finding Generation¶
Source Detection
│
▼
Taint Propagation
│
▼
Sink Detection ────► No sink reached → No finding
│
│ Sink reached
▼
Sanitizer Check ───► Sanitized → No finding
│
│ Not sanitized
▼
Generate Finding
│
▼
Add Evidence (snippet, trace)
│
▼
Assign ID (deterministic hash)
Design Decisions¶
Why Tree-Sitter?¶
- Language Support: Native grammars for all target languages
- Accuracy: Full parsing vs regex approximation
- Performance: Incremental parsing capability
- Consistency: Same behavior across languages
Why Normalized AST?¶
- Simplicity: One representation for all languages
- Maintainability: Rules work across languages
- Testability: Easier to test with uniform structures
Why Separate Taint and Pattern Engines?¶
- Separation of Concerns: Taint tracks data flow, patterns match syntax
- Flexibility: Some rules need taint, others just patterns
- Performance: Can run pattern-only rules in fast mode
Why MSSS Scoring?¶
- Quantifiable: Numeric score for comparison
- Actionable: Clear levels with requirements
- MCP-Specific: Weights based on MCP threat model
Extension Points¶
Adding a New Language¶
- Add language constant to
types/types.go - Implement tree-sitter extraction in
parser/treesitter.go - Add sources/sinks to
catalog/catalog.go - Update discovery patterns in
discovery/discovery.go
Adding a New Rule¶
- Choose vulnerability class (A-N)
- Implement
Detectorinterface - Register in appropriate rules file
- Add test fixtures in
testdata/fixtures/
Adding a New Output Format¶
- Implement
Reporterinterface - Register in
reporter/reporter.go - Add CLI option in
cmd/mcp-scan/main.go
Performance Considerations¶
Memory¶
- Files parsed one at a time in fast mode
- AST nodes freed after processing
- Taint state scoped to function
CPU¶
- Parallel file processing (configurable workers)
- Pattern matching uses compiled regex
- Tree-sitter is highly optimized
Timeouts¶
- Configurable scan timeout
- Per-file timeout for parsing
- Context cancellation throughout
Deep Dive: Tree-Sitter Integration¶
Why Tree-Sitter?¶
Tree-sitter provides: - Full syntax trees: Not regex approximations, real ASTs - Error recovery: Continues parsing with syntax errors - Incremental parsing: Fast re-parsing (not currently used) - Language grammars: Native support for Python, TypeScript, JavaScript, Go
Go Bindings¶
// Dependencies in go.mod
github.com/smacker/go-tree-sitter // Core bindings
github.com/smacker/go-tree-sitter/python // Python grammar
github.com/smacker/go-tree-sitter/typescript/typescript // TS grammar
github.com/smacker/go-tree-sitter/javascript // JS grammar
github.com/smacker/go-tree-sitter/golang // Go grammar
Thread Safety¶
Tree-sitter parsers are NOT thread-safe. MCP-Scan creates a new parser instance per file:
func (p *TreeSitterParser) ParseContent(content []byte, lang Language) (*ast.File, error) {
// Create fresh parser for thread safety
parser := sitter.NewParser()
defer parser.Close()
parser.SetLanguage(getLanguage(lang))
tree, err := parser.ParseCtx(ctx, nil, content)
// ... extraction logic
}
Field-Based Extraction¶
Uses tree-sitter's field API for precise extraction:
// Example: extracting Python function
func (p *TreeSitterParser) extractPythonFunction(node *sitter.Node, content []byte) *ast.Function {
fn := &ast.Function{}
// Field-based extraction (precise)
if nameNode := node.ChildByFieldName("name"); nameNode != nil {
fn.Name = p.nodeText(nameNode, content)
}
if paramsNode := node.ChildByFieldName("parameters"); paramsNode != nil {
fn.Parameters = p.extractPythonParameters(paramsNode, content)
}
if bodyNode := node.ChildByFieldName("body"); bodyNode != nil {
fn.Body = p.extractPythonStatements(bodyNode, content)
}
return fn
}
Node Types by Language¶
Python:
- function_definition → Function
- class_definition → Class
- import_statement, import_from_statement → Import
- decorated_definition → Function with decorators
TypeScript/JavaScript:
- function_declaration → Function
- arrow_function → Function (anonymous)
- class_declaration → Class
- import_statement → Import
- export_statement → Export wrapper
Deep Dive: Taint Analysis Engine¶
Core Algorithm (Intra-Procedural)¶
ALGORITHM: AnalyzeFunction(fn)
─────────────────────────────
INPUT: fn = Function AST node
OUTPUT: []Finding
1. state ← new TaintState()
2. FOR EACH param IN fn.Parameters:
IF param.IsTool AND param.IsInput:
state.SetTaint(param.Name, TaintInfo{
Source: param.Location,
SourceType: ToolInput,
Confidence: High
})
3. FOR EACH stmt IN fn.Body:
PROCESS(stmt, state)
4. RETURN state.Findings
─────────────────────────────
PROCEDURE: PROCESS(stmt, state)
─────────────────────────────
CASE stmt IS Assignment(target, value):
taint ← EVAL(value, state)
IF taint ≠ nil:
state.SetTaint(target, taint.AddStep("assign", target))
ELSE:
state.ClearTaint(target)
CASE stmt IS Call(func, args):
IF IsSink(func):
FOR EACH arg, idx IN args:
IF SinkAcceptsArg(func, idx):
taint ← state.GetTaint(arg)
IF taint ≠ nil AND TaintMatchesSink(taint, func):
IF NOT IsSanitized(taint, func.Category):
EMIT Finding(taint, func)
CASE stmt IS Return(value):
taint ← EVAL(value, state)
state.SetReturnTaint(taint)
CASE stmt IS IfStatement(cond, body, else):
bodyState ← state.Clone()
elseState ← state.Clone()
PROCESS_ALL(body, bodyState)
PROCESS_ALL(else, elseState)
state.Merge(bodyState, elseState) // Join point
Taint Evaluation¶
FUNCTION: EVAL(expr, state) → TaintInfo | nil
─────────────────────────────────────────────
CASE expr IS Identifier(name):
RETURN state.GetTaint(name)
CASE expr IS BinaryOp(left, op, right):
leftTaint ← EVAL(left, state)
rightTaint ← EVAL(right, state)
IF leftTaint ≠ nil OR rightTaint ≠ nil:
RETURN Merge(leftTaint, rightTaint).AddStep("binary_op", op)
RETURN nil
CASE expr IS Call(func, args):
IF IsSource(func):
RETURN TaintInfo{Source: expr.Location, SourceType: GetSourceType(func)}
IF IsSanitizer(func):
RETURN nil // Taint cleared
// Conservative: propagate from args
FOR EACH arg IN args:
taint ← EVAL(arg, state)
IF taint ≠ nil:
RETURN taint.AddStep("call_through", func.Name)
RETURN nil
CASE expr IS MemberAccess(obj, prop):
objTaint ← EVAL(obj, state)
IF objTaint ≠ nil:
RETURN objTaint.AddStep("member_access", prop)
RETURN state.GetPropertyTaint(obj, prop)
CASE expr IS StringLiteral:
RETURN nil // Literals not tainted
DEFAULT:
RETURN nil
Inter-Procedural Analysis (Deep Mode)¶
PHASE 1: Build Call Graph
─────────────────────────
functionMap = {} // {file:funcName} → Function
callEdges = [] // [(caller, callee)]
FOR EACH file IN files:
FOR EACH fn IN file.Functions:
functionMap[key(fn)] = fn
FOR EACH call IN fn.Body:
callEdges.append((fn, resolve(call.target)))
PHASE 2: Compute Function Summaries
───────────────────────────────────
summaries = {}
FOR EACH fn IN TopologicalSort(callGraph, reverse=true):
summary = {
TaintedParams: [], // Indices that propagate taint
ReturnsTaint: false, // Whether return is tainted
SinksReached: [] // Sinks reachable from params
}
FOR idx, param IN enumerate(fn.Parameters):
state = TaintState with param[idx] tainted
AnalyzeFunction(fn, state)
IF state.ReturnTaint ≠ nil:
summary.TaintedParams.append(idx)
summary.ReturnsTaint = true
summary.SinksReached.extend(state.Findings)
summaries[fn] = summary
PHASE 3: Context-Sensitive Analysis
───────────────────────────────────
FOR EACH entryPoint IN mcpSurface.Tools:
AnalyzeWithContext(entryPoint.Handler, summaries, depth=0)
FUNCTION AnalyzeWithContext(fn, summaries, depth):
IF depth > MAX_DEPTH: RETURN
FOR EACH call IN fn.Body:
callee = resolve(call.target)
IF callee IN summaries:
calleeSummary = summaries[callee]
// Check if tainted args reach callee sinks
FOR idx, arg IN enumerate(call.Arguments):
IF state.GetTaint(arg) ≠ nil:
IF idx IN calleeSummary.TaintedParams:
// Taint propagates through call
IF calleeSummary.SinksReached:
EMIT Finding with cross-function trace
// Recurse if needed
AnalyzeWithContext(callee, summaries, depth + 1)
Taint State Data Structures¶
// Main taint state
type TaintState struct {
// Variable taints: {"user_input": TaintInfo, ...}
Variables map[string]*TaintInfo
// Property taints: {"request": {"body": TaintInfo, ...}, ...}
Properties map[string]map[string]*TaintInfo
// For closures and nested scopes
Parent *TaintState
// Return value taint (for function summaries)
ReturnTaint *TaintInfo
// Accumulated findings
Findings []*Finding
}
// Information about a tainted value
type TaintInfo struct {
// Where the taint originated
Source Location
// Category of the source
SourceType SourceCategory // tool_input, env_var, http_request, etc.
// Propagation trace
Via []TraceStep
// Confidence level
Confidence Confidence // High, Medium, Low
// Which sink categories this taint has been sanitized for
SanitizedFor []SinkCategory
}
// A step in the propagation trace
type TraceStep struct {
Location Location
Action string // "assign", "concat", "call_return", "member_access"
Variable string
Context string // Additional context
}
Deep Dive: Catalog System¶
Source Definition¶
type SourceDef struct {
ID string // Unique identifier: "py-os-environ"
Language Language // Python, TypeScript, JavaScript, Go
Receiver string // Object: "os", "request", ""
Function string // Method/function: "getenv", "get"
Property string // For property access: "environ", "body"
Category SourceCategory // tool_input, env_var, http_request, etc.
}
// Matching logic
func (s *SourceDef) Matches(call *ast.Call) bool {
if s.Receiver != "" && !matchesReceiver(call, s.Receiver) {
return false
}
if s.Function != "" && !matchesFunction(call, s.Function) {
return false
}
return true
}
Sink Definition¶
type SinkDef struct {
ID string
Language Language
Receiver string
Function string
Category SinkCategory // exec, eval, filesystem, network, database
ArgIndex int // Which argument is dangerous (-1 = all)
Severity Severity // Critical, High, Medium, Low
}
// Example definitions
var PythonSinks = []SinkDef{
{ID: "py-os-system", Receiver: "os", Function: "system",
Category: SinkExec, ArgIndex: 0, Severity: Critical},
{ID: "py-subprocess-run", Receiver: "subprocess", Function: "run",
Category: SinkExec, ArgIndex: 0, Severity: Critical},
{ID: "py-eval", Function: "eval",
Category: SinkEval, ArgIndex: 0, Severity: Critical},
{ID: "py-open", Function: "open",
Category: SinkFilesystem, ArgIndex: 0, Severity: High},
}
Sanitizer Definition¶
type SanitizerDef struct {
ID string
Language Language
Receiver string
Function string
Sanitizes []SinkCategory // What this sanitizes
}
// Example
var PythonSanitizers = []SanitizerDef{
{ID: "py-shlex-quote", Receiver: "shlex", Function: "quote",
Sanitizes: []SinkCategory{SinkExec}},
{ID: "py-html-escape", Receiver: "html", Function: "escape",
Sanitizes: []SinkCategory{SinkResponse}},
{ID: "py-int", Function: "int",
Sanitizes: []SinkCategory{SinkExec, SinkDatabase, SinkFilesystem}},
}
Adding to Catalog¶
// In internal/catalog/catalog.go
func NewCatalog() *Catalog {
c := &Catalog{}
// Python sources
c.Sources = append(c.Sources,
&SourceDef{ID: "py-tool-param", Category: SourceToolInput},
&SourceDef{ID: "py-os-environ", Receiver: "os", Property: "environ",
Category: SourceEnvVar},
// ... more
)
// Python sinks
c.Sinks = append(c.Sinks,
&SinkDef{ID: "py-os-system", Receiver: "os", Function: "system",
Category: SinkExec, Severity: Critical},
// ... more
)
// TypeScript sources
c.Sources = append(c.Sources,
&SourceDef{ID: "ts-process-env", Receiver: "process", Property: "env",
Language: TypeScript, Category: SourceEnvVar},
// ... more
)
return c
}
Deep Dive: Pattern Engine¶
Rule Structure¶
type Rule struct {
ID string
Class VulnClass // A, B, C, ..., N
Severity Severity
Confidence Confidence
Languages []Language
Description string
Remediation string
Detector Detector // The detection strategy
Enabled bool
}
type Detector interface {
Detect(file *ast.File, surface *MCPSurface) []Match
}
type Match struct {
Location Location
Snippet string
Context map[string]string // Additional context
}
Built-in Detector Types¶
FunctionCallDetector: Matches specific function calls
type FunctionCallDetector struct {
Patterns []FunctionPattern
}
func (d *FunctionCallDetector) Detect(file *ast.File, surface *MCPSurface) []Match {
var matches []Match
for _, fn := range file.Functions {
ast.Walk(fn.Body, func(node ast.Node) {
if call, ok := node.(*ast.Call); ok {
for _, pattern := range d.Patterns {
if pattern.Matches(call) {
matches = append(matches, Match{
Location: call.Location,
Snippet: extractSnippet(file, call.Location),
})
}
}
}
})
}
return matches
}
RegexDetector: Pattern matching on raw content
type RegexDetector struct {
Pattern *regexp.Regexp
}
func (d *RegexDetector) Detect(file *ast.File, surface *MCPSurface) []Match {
var matches []Match
for _, m := range d.Pattern.FindAllStringIndex(file.RawContent, -1) {
loc := computeLocation(file, m[0], m[1])
matches = append(matches, Match{
Location: loc,
Snippet: file.RawContent[m[0]:m[1]],
})
}
return matches
}
CompositeDetector: Combines multiple detectors
type CompositeDetector struct {
Detectors []Detector
Mode CompositeMode // AND, OR
}
func (d *CompositeDetector) Detect(file *ast.File, surface *MCPSurface) []Match {
if d.Mode == AND {
// All detectors must match
var commonMatches []Match
for i, det := range d.Detectors {
matches := det.Detect(file, surface)
if i == 0 {
commonMatches = matches
} else {
commonMatches = intersect(commonMatches, matches)
}
}
return commonMatches
}
// OR mode: any detector can match
var allMatches []Match
for _, det := range d.Detectors {
allMatches = append(allMatches, det.Detect(file, surface)...)
}
return deduplicate(allMatches)
}
Creating Custom Rules¶
// 1. Define the detector
type DeprecatedAPIDetector struct {
DeprecatedAPIs []string
}
func (d *DeprecatedAPIDetector) Detect(file *ast.File, surface *MCPSurface) []Match {
var matches []Match
for _, fn := range file.Functions {
for _, call := range extractCalls(fn) {
for _, api := range d.DeprecatedAPIs {
if call.FunctionName() == api {
matches = append(matches, Match{
Location: call.Location,
Snippet: extractSnippet(file, call.Location),
Context: map[string]string{"api": api},
})
}
}
}
}
return matches
}
// 2. Register the rule
func init() {
RegisterRule(&Rule{
ID: "MCP-CUSTOM-001",
Class: ClassN, // Supply chain
Severity: Medium,
Confidence: High,
Languages: []Language{Python, TypeScript},
Description: "Use of deprecated MCP API",
Remediation: "Upgrade to current API version",
Detector: &DeprecatedAPIDetector{
DeprecatedAPIs: []string{"old_register_tool", "legacy_handler"},
},
})
}
Extensibility Guide¶
Adding a New Language¶
-
Add language constant:
-
Add tree-sitter grammar:
-
Implement extractor:
// internal/parser/treesitter.go func (p *TreeSitterParser) extractRust(node *sitter.Node, content []byte, file *ast.File) { for i := 0; i < int(node.ChildCount()); i++ { child := node.Child(i) switch child.Type() { case "function_item": file.Functions = append(file.Functions, p.extractRustFunction(child, content)) case "impl_item": file.Classes = append(file.Classes, p.extractRustImpl(child, content)) case "use_declaration": file.Imports = append(file.Imports, p.extractRustUse(child, content)) } } } -
Add sources/sinks:
-
Update discovery:
Adding New Sources/Sinks¶
// internal/catalog/custom.go
// Add a new source category
const SourceMessageQueue SourceCategory = "message_queue"
// Add sources
customSources := []SourceDef{
{ID: "py-kafka-poll", Receiver: "consumer", Function: "poll",
Language: Python, Category: SourceMessageQueue},
{ID: "py-rabbitmq-consume", Receiver: "channel", Function: "basic_get",
Language: Python, Category: SourceMessageQueue},
}
// Add corresponding sinks
customSinks := []SinkDef{
{ID: "py-kafka-send", Receiver: "producer", Function: "send",
Language: Python, Category: SinkMessageQueue, Severity: Medium},
}
// Register
func init() {
catalog.RegisterSources(customSources)
catalog.RegisterSinks(customSinks)
}
Adding MCP SDK Support¶
// internal/surface/sdk.go
const SDKNewMCP SDKType = "new_mcp_sdk"
func detectSDK(file *ast.File) SDKType {
for _, imp := range file.Imports {
switch {
case strings.HasPrefix(imp.Module, "mcp"):
return SDKPythonOfficial
case imp.Module == "fastmcp":
return SDKPythonFastMCP
case strings.Contains(imp.Module, "@modelcontextprotocol"):
return SDKTypeScriptOfficial
case imp.Module == "new_mcp_sdk": // NEW
return SDKNewMCP
}
}
return SDKUnknown
}
// Add decorator patterns for new SDK
var newSDKToolDecorators = []string{
"@new_sdk.tool",
"@new_sdk.register_tool",
}
Related Documentation¶
For detailed documentation on each component, see:
Core Analysis Components¶
- Taint Analysis - Data flow tracking from sources to sinks
- Pattern Engine - Rule-based vulnerability detection (20+ detectors)
- Surface Extraction - MCP tool/resource/prompt detection
- ML Classifier - Machine learning prompt injection detection
Advanced Analysis Components (Deep Mode)¶
- Type Inference - Variable type inference for type-aware taint
- Import Resolver - Cross-file import resolution
- Call Graph - Persistent call graph for inter-procedural analysis
Analysis Flow Diagram (Complete)¶
┌─────────────────────────────────────────────────────────────────────────┐
│ MCP-Scan Full Pipeline │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Discovery │──► Find files matching patterns │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Parser │──► Tree-sitter → Normalized AST │
│ │(tree-sitter)│ │
│ └──────┬──────┘ │
│ │ │
│ ├──────────────────────────────────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Surface │ │ Type │ │
│ │ Extractor │──► Tools, Resources, │ Inference │──► Variable │
│ │ │ Prompts, Transport │ │ types │
│ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │
│ │ │ │
│ │ ┌───────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────┐ ┌─────────────┐ │
│ │ Import Resolver │────►│ Call Graph │──► Persistent graph │
│ │ (cross-file analysis) │ │ (Deep mode)│ │
│ └───────────┬─────────────┘ └──────┬──────┘ │
│ │ │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ TAINT ENGINE │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ Sources │ │ Propagation │ │ Sinks │ │ │
│ │ │ (tool_input │──│ (assign, │──│ (exec, eval, fs, │ │ │
│ │ │ env, http) │ │ call, etc) │ │ network, database) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ Type-Aware Propagation (reduced FPs with typeinfo) │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ PATTERN ENGINE │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ │
│ │ │ AST │ │ Regex │ │ ML Classifier │ │ │
│ │ │ Detectors │ │ Detectors │ │ (prompt injection) │ │ │
│ │ │ (20+ rules) │ │ (patterns) │ │ (29 features) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────────────┘ │ │
│ └─────────────────────────┬────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Finding Aggregation │ │
│ │ (Taint findings + Pattern findings → Unified findings) │ │
│ └─────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌──────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Baseline │ │ MSSS │ │ Reporter │ │
│ │ Filter │ │ Scorer │ │ JSON/SARIF │ │
│ │ │ │ (0-100/0-3) │ │ Evidence │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Configuration Reference¶
All components can be configured via mcp-scan.yaml:
# Analysis modes
analysis:
mode: deep # fast | deep
max_depth: 10 # Call graph depth
timeout: 300 # Analysis timeout (seconds)
# Type inference (new)
type_inference:
enabled: true
strict: false # Require explicit types
infer_from_usage: true # Infer from variable usage
# Import resolution (new)
imports:
resolve_external: false # Resolve site-packages
cache: true # Cache resolved imports
# Call graph (new)
call_graph:
enabled: true
cache: true # Persist call graph
cache_dir: .mcp-scan # Cache directory
incremental: true # Incremental updates
# ML detection (new)
ml_detection:
enabled: true
threshold: 0.3 # Classification threshold
classifier: rule_based # rule_based | weighted | ensemble
model_path: "" # Custom model path
# Surface extraction
surface:
heuristic_detection: true # Detect by naming patterns
tool_decorators: # Custom decorators
- "custom.tool"
# Taint analysis
taint:
track_properties: true # Track object properties
track_returns: true # Track function returns
max_trace_length: 50 # Max trace steps
# Pattern engine
patterns:
enabled_classes: # Enable specific classes
- A # RCE
- B # Filesystem
- C # SSRF
- G # Tool poisoning
custom_rules: [] # Custom rule paths
# Output
output:
format: json # json | sarif | evidence
include_trace: true # Include propagation trace
include_snippet: true # Include code snippets