Call Graph System¶
Overview¶
The call graph system (internal/callgraph/) builds and maintains a graph of function calls across the codebase. This enables inter-procedural analysis, incremental updates, and efficient deep-mode scanning.
Architecture¶
┌────────────────────────────────────────────────────────────────┐
│ Call Graph System │
├────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────────────┐ │
│ │ Builder │───▶│ CallGraph │───▶│ Persistence │ │
│ │ (construct)│ │ (Nodes + │ │ (gob/JSON save) │ │
│ │ │ │ Edges) │ │ │ │
│ └────────────┘ └────────────┘ └────────────────────┘ │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────────────┐ │
│ │ Import │ │ Function │ │ Incremental │ │
│ │ Resolver │ │ Summaries │ │ Updates │ │
│ └────────────┘ └────────────┘ └────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────┘
Data Structures¶
CallGraph¶
The main graph structure:
type CallGraph struct {
Nodes map[string]*Node // funcID → Node
Edges map[string][]*Edge // funcID → outgoing edges
Files map[string][]string // filepath → funcIDs in file
Metadata *Metadata // Creation time, file hashes
}
Node¶
Represents a function:
type Node struct {
ID string // Unique identifier: "filepath:class.funcname"
Name string // Function name
File string // Source file path
Class string // Class name (empty for standalone functions)
Line int // Line number
Signature string // Parameter signature
IsPublic bool // Exported/public
IsTool bool // MCP tool handler
IsAsync bool // Async function
Summary *FunctionSummary // For inter-procedural analysis
}
Edge¶
Represents a call relationship:
type Edge struct {
From string // Caller function ID
To string // Callee function ID
CallSite types.Location // Where the call occurs
Type EdgeType // direct, method, callback, dynamic
ArgFlow []int // Which arguments flow to callee
}
EdgeType¶
Types of call edges:
const (
EdgeDirect EdgeType = "direct" // foo()
EdgeMethod EdgeType = "method" // obj.foo()
EdgeCallback EdgeType = "callback" // map(foo), addEventListener(foo)
EdgeDynamic EdgeType = "dynamic" // getattr(obj, name)()
)
FunctionSummary¶
Summary for inter-procedural analysis:
type FunctionSummary struct {
TaintedParams []int // Parameters that propagate taint
ReturnsTaint bool // Whether return can be tainted
TaintSources []int // Parameters that become sources
SanitizedParams []int // Parameters that are sanitized
HasSink bool // Contains a dangerous sink
SinkTypes []SinkCategory // Types of sinks present
}
Function ID Format¶
Function IDs uniquely identify functions:
Examples:
- /project/main.py:handler
- /project/models/user.py:User.save
- /project/utils.ts:formatDate
Building the Graph¶
Basic Usage¶
// Create builder
builder := callgraph.NewBuilder("/path/to/project")
// Parse files
files := []*ast.File{...}
surface := surface.Extract(files)
// Build graph
graph := builder.Build(files, surface)
// Access nodes and edges
for _, node := range graph.Nodes {
fmt.Printf("Function: %s at %s:%d\n", node.Name, node.File, node.Line)
for _, edge := range graph.Edges[node.ID] {
fmt.Printf(" Calls: %s\n", edge.To)
}
}
Build Process¶
- Index files for import resolution
- First pass: Add all function nodes
- Second pass: Add call edges
- Tool detection: Mark MCP tool handlers
Graph Operations¶
Query Functions¶
// Get a specific node
node := graph.GetNode("main.py:handler")
// Get all functions in a file
funcs := graph.GetFunctionsInFile("main.py")
// Get callees (functions this calls)
callees := graph.GetCallees("main.py:handler")
// Get callers (functions that call this)
callers := graph.GetCallers("main.py:helper")
// Get reachable functions (transitive callees)
reachable := graph.GetReachableFunctions("main.py:handler", 5)
// Get all MCP tool handlers
handlers := graph.GetToolHandlers()
Statistics¶
stats := graph.Stats()
fmt.Printf("Nodes: %d\n", stats["nodes"])
fmt.Printf("Edges: %d\n", stats["edges"])
fmt.Printf("Files: %d\n", stats["files"])
fmt.Printf("Tool handlers: %d\n", stats["tool_handlers"])
Persistence¶
Saving¶
// Save to .mcp-scan/callgraph.gob
err := graph.Save("/path/to/project")
// Save as JSON (for debugging)
err := graph.SaveJSON("/path/to/project")
Loading¶
// Load from cache
graph, err := callgraph.Load("/path/to/project")
if graph == nil {
// No cache exists, build fresh
graph = builder.Build(files, surface)
}
Validation¶
// Check if cache is valid
currentHashes := computeFileHashes(files)
if graph.IsValid(currentHashes) {
// Use cached graph
} else {
// Rebuild needed
invalidFiles := graph.InvalidatedFiles(currentHashes)
// Rebuild only changed files
}
Incremental Updates¶
For large projects, incremental updates are more efficient:
// Load existing graph
graph, _ := callgraph.Load(rootPath)
// Detect changed files
changedPaths := callgraph.GetChangedFiles(graph, currentHashes)
// Parse only changed files
var changedFiles []*ast.File
for _, path := range changedPaths {
file := parser.ParseFile(path, detectLanguage(path))
changedFiles = append(changedFiles, file)
}
// Incremental update
incBuilder := callgraph.NewIncrementalBuilder(rootPath, graph)
updatedGraph := incBuilder.UpdateFiles(changedFiles, allFiles, surface)
// Save updated graph
updatedGraph.Save(rootPath)
Update Process¶
- Remove old nodes/edges for changed files
- Re-add nodes from new AST
- Re-add edges from new AST
- Recompute affected function summaries
Use in Taint Analysis¶
Inter-Procedural Taint Flow¶
// Example: taint flows from tool handler through helper function
// main.py:tool_handler
// -> calls helper(user_input)
//
// utils.py:helper
// -> calls os.system(input) // SINK!
// Without call graph: Only finds sink in helper, no trace to source
// With call graph: Traces tool_handler -> helper -> os.system
Building Function Summaries¶
// Process files in dependency order
sortedPaths := graph.DependencyOrder()
summaries := make(map[string]*FunctionSummary)
for _, path := range sortedPaths {
file := files[path]
for _, fn := range file.Functions {
nodeID := callgraph.MakeFunctionID(path, "", fn.Name)
// Compute summary using already-computed callee summaries
summary := computeSummary(fn, graph, summaries)
summaries[nodeID] = summary
}
}
Reachability Analysis¶
// Check if tool input can reach a dangerous sink
for _, handler := range graph.GetToolHandlers() {
reachable := graph.GetReachableFunctions(handler.ID, 10)
for _, fn := range reachable {
if fn.Summary != nil && fn.Summary.HasSink {
// Potential vulnerability: tool input can reach sink
reportPotentialPath(handler, fn)
}
}
}
Configuration¶
# mcp-scan.yaml
analysis:
call_graph:
enabled: true
cache: true # Enable persistence
cache_dir: .mcp-scan # Cache directory
max_depth: 10 # Max call chain depth
resolve_dynamic: false # Try to resolve dynamic calls
File Format¶
Gob Format (Default)¶
Binary format using Go's gob encoding: - Fast serialization/deserialization - Type-safe - Compact
JSON Format (Debug)¶
Human-readable JSON:
{
"Nodes": {
"main.py:handler": {
"ID": "main.py:handler",
"Name": "handler",
"File": "main.py",
"Line": 10,
"IsTool": true
}
},
"Edges": {
"main.py:handler": [
{"From": "main.py:handler", "To": "utils.py:helper", "Type": "direct"}
]
},
"Metadata": {
"CreatedAt": "2024-01-20T10:00:00Z",
"FileHashes": {
"main.py": "abc123...",
"utils.py": "def456..."
}
}
}
Limitations¶
- Dynamic Calls:
getattr(),eval(), reflection not resolved - Virtual Methods: Inheritance not fully tracked
- Callbacks: Some callback patterns may be missed
- External Libraries: Calls to external packages marked as "external:*"
Edge Cases¶
Recursive Functions¶
Handled naturally - cycles in the graph are allowed:
reachable := graph.GetReachableFunctions("main.py:recursive", 5)
// Uses visited set to prevent infinite loops
Multiple Files with Same Name¶
Uses full paths as keys:
Anonymous Functions (Lambdas)¶
Assigned synthetic IDs:
Examples¶
Finding All Paths to a Sink¶
func findPathsToSink(graph *CallGraph, sinkID string) [][]string {
var paths [][]string
for _, handler := range graph.GetToolHandlers() {
path := findPath(graph, handler.ID, sinkID, nil)
if path != nil {
paths = append(paths, path)
}
}
return paths
}
func findPath(g *CallGraph, from, to string, visited []string) []string {
if from == to {
return []string{to}
}
if contains(visited, from) {
return nil // Cycle
}
visited = append(visited, from)
for _, edge := range g.Edges[from] {
path := findPath(g, edge.To, to, visited)
if path != nil {
return append([]string{from}, path...)
}
}
return nil
}
Visualizing the Graph¶
// Generate DOT format for Graphviz
func toDOT(graph *CallGraph) string {
var b strings.Builder
b.WriteString("digraph callgraph {\n")
for id, node := range graph.Nodes {
label := node.Name
if node.IsTool {
label = "[TOOL] " + label
}
b.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\"];\n", id, label))
}
for from, edges := range graph.Edges {
for _, edge := range edges {
b.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\";\n", from, edge.To))
}
}
b.WriteString("}\n")
return b.String()
}