Skip to content

Error Handling - mcp-scan

This guide describes the types of errors that mcp-scan can generate and how to handle them correctly.


Table of Contents


Error Types

mcp-scan can produce different categories of errors:

Category Description Recoverable
Discovery Errors when searching for files Partially
Parsing Errors when parsing source code Yes (file ignored)
Analysis Errors during analysis Yes (file ignored)
Context Timeout or cancellation No
Baseline Errors loading/saving baseline Yes
Report Errors generating reports Yes
Configuration Invalid configuration No

Scanner Errors

Discovery Error

Occurs when the specified directory or file cannot be accessed.

package main

import (
    "context"
    "errors"
    "fmt"
    "os"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    s := scanner.New(scanner.DefaultConfig())

    result, err := s.Scan(context.Background(), "/nonexistent/path")
    if err != nil {
        // Check if it's a file not found error
        if os.IsNotExist(errors.Unwrap(err)) {
            fmt.Println("Error: The specified path does not exist")
            os.Exit(2)
        }

        // Check if it's a permissions error
        if os.IsPermission(errors.Unwrap(err)) {
            fmt.Println("Error: Insufficient permissions to access the path")
            os.Exit(3)
        }

        // Other discovery error
        fmt.Printf("Discovery error: %v\n", err)
        os.Exit(1)
    }

    fmt.Printf("Files scanned: %d\n", result.Manifest.TotalFiles)
}

Parsing Error

Parsing errors do not stop the scan. Files that cannot be parsed are silently ignored, but you can detect the situation:

package main

import (
    "context"
    "fmt"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    cfg := scanner.DefaultConfig()
    cfg.Include = []string{"**/*.py"}

    s := scanner.New(cfg)
    result, err := s.Scan(context.Background(), "./src")
    if err != nil {
        // Parsing errors don't reach here
        // Only fatal errors
        panic(err)
    }

    // Check if there are files that weren't parsed
    // Compare discovered files vs files in manifest
    fmt.Printf("Files in manifest: %d\n", result.Manifest.TotalFiles)

    // Files that failed parsing won't be in the manifest
    // but were initially discovered
}

Handling Partial Errors

Some errors allow continuing with partial results:

package main

import (
    "context"
    "fmt"
    "log"
    "strings"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func scanWithPartialResults(path string) (*scanner.Result, []string) {
    s := scanner.New(scanner.DefaultConfig())
    var warnings []string

    result, err := s.Scan(context.Background(), path)
    if err != nil {
        // Analyze the error
        errMsg := err.Error()

        if strings.Contains(errMsg, "discovery failed") {
            // Fatal discovery error
            log.Fatalf("Cannot scan: %v", err)
        }

        if strings.Contains(errMsg, "parsing failed") {
            // Some files failed but others may be OK
            warnings = append(warnings, fmt.Sprintf("Partial parsing: %v", err))
        }

        if strings.Contains(errMsg, "surface extraction failed") {
            // Surface extraction failed
            warnings = append(warnings, fmt.Sprintf("Incomplete MCP surface: %v", err))
        }
    }

    return result, warnings
}

func main() {
    result, warnings := scanWithPartialResults("./src")

    if len(warnings) > 0 {
        fmt.Println("Warnings:")
        for _, w := range warnings {
            fmt.Printf("  - %s\n", w)
        }
    }

    if result != nil {
        fmt.Printf("Findings: %d\n", len(result.Findings))
    }
}

Context Errors

Timeout

package main

import (
    "context"
    "errors"
    "fmt"
    "os"
    "time"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    cfg := scanner.DefaultConfig()
    cfg.Timeout = 30 * time.Second  // Short timeout for example

    s := scanner.New(cfg)

    ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
    defer cancel()

    result, err := s.Scan(ctx, "./large-project")
    if err != nil {
        if errors.Is(err, context.DeadlineExceeded) {
            fmt.Printf("Error: Scan exceeded timeout of %v\n", cfg.Timeout)
            fmt.Println("Suggestions:")
            fmt.Println("  - Increase timeout with --timeout")
            fmt.Println("  - Use fast mode instead of deep")
            fmt.Println("  - Exclude non-relevant directories")
            os.Exit(124) // Standard code for timeout
        }
        fmt.Printf("Error: %v\n", err)
        os.Exit(1)
    }

    fmt.Printf("Completed in %v\n", result.ScanDuration)
}

Cancellation

package main

import (
    "context"
    "errors"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    s := scanner.New(scanner.DefaultConfig())

    ctx, cancel := context.WithCancel(context.Background())

    // Handle Ctrl+C
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        sig := <-sigChan
        fmt.Printf("\nReceived signal %v, canceling scan...\n", sig)
        cancel()
    }()

    result, err := s.Scan(ctx, "./src")
    if err != nil {
        if errors.Is(err, context.Canceled) {
            fmt.Println("Scan canceled by user")
            // Not an error, it's a user action
            os.Exit(130) // 128 + SIGINT(2)
        }
        fmt.Printf("Error: %v\n", err)
        os.Exit(1)
    }

    fmt.Printf("Findings: %d\n", len(result.Findings))
}

Custom Timeout per Phase

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

// ScanWithPhasedTimeout executes a scan with granular timeout control
func ScanWithPhasedTimeout(path string, discoveryTimeout, analysisTimeout time.Duration) (*scanner.Result, error) {
    cfg := scanner.DefaultConfig()

    // Total timeout is the sum
    cfg.Timeout = discoveryTimeout + analysisTimeout

    s := scanner.New(cfg)

    ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
    defer cancel()

    // Start scan
    start := time.Now()
    result, err := s.Scan(ctx, path)
    elapsed := time.Since(start)

    if err != nil {
        return nil, fmt.Errorf("scan failed after %v: %w", elapsed, err)
    }

    return result, nil
}

func main() {
    result, err := ScanWithPhasedTimeout(
        "./src",
        30*time.Second,   // Timeout for discovery
        5*time.Minute,    // Timeout for analysis
    )
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        return
    }

    fmt.Printf("Completed: %d findings in %v\n",
        len(result.Findings), result.ScanDuration)
}

Baseline Errors

Error Loading Baseline

package main

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "os"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    baselinePath := ".mcp-scan-baseline.json"

    // Try to load baseline
    baseline, err := scanner.LoadBaseline(baselinePath)
    if err != nil {
        // Check error type
        if os.IsNotExist(err) {
            fmt.Printf("Warning: Baseline '%s' does not exist, scanning without filter\n", baselinePath)
            baseline = nil // Continue without baseline
        } else if errors.Is(err, json.SyntaxError{}) || isSyntaxError(err) {
            fmt.Printf("Error: Baseline '%s' has invalid JSON format\n", baselinePath)
            fmt.Println("Suggestions:")
            fmt.Println("  - Verify that the file is valid JSON")
            fmt.Println("  - Regenerate the baseline with 'mcp-scan baseline generate'")
            os.Exit(1)
        } else {
            fmt.Printf("Error loading baseline: %v\n", err)
            os.Exit(1)
        }
    }

    // Continue with scan
    cfg := scanner.DefaultConfig()
    s := scanner.New(cfg)

    result, err := s.Scan(context.Background(), "./src")
    if err != nil {
        fmt.Printf("Scan error: %v\n", err)
        os.Exit(1)
    }

    // Apply baseline if it exists
    if baseline != nil {
        filtered := s.ApplyBaseline(result, baseline)
        fmt.Printf("Filtered %d findings by baseline\n", filtered)
    }

    fmt.Printf("Findings: %d\n", len(result.Findings))
}

func isSyntaxError(err error) bool {
    var syntaxErr *json.SyntaxError
    return errors.As(err, &syntaxErr)
}

Error Saving Baseline

package main

import (
    "context"
    "fmt"
    "os"
    "path/filepath"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    s := scanner.New(scanner.DefaultConfig())

    result, err := s.Scan(context.Background(), "./src")
    if err != nil {
        fmt.Printf("Error: %v\n", err)
        os.Exit(1)
    }

    baseline := s.GenerateBaseline(result, "Initial baseline", "user@example.com")

    baselinePath := ".mcp-scan-baseline.json"

    // Verify we can write
    dir := filepath.Dir(baselinePath)
    if dir != "" && dir != "." {
        if err := os.MkdirAll(dir, 0755); err != nil {
            fmt.Printf("Error: Cannot create directory '%s': %v\n", dir, err)
            os.Exit(1)
        }
    }

    // Try to save
    if err := scanner.SaveBaseline(baseline, baselinePath); err != nil {
        if os.IsPermission(err) {
            fmt.Printf("Error: No permissions to write '%s'\n", baselinePath)
            os.Exit(1)
        }
        fmt.Printf("Error saving baseline: %v\n", err)
        os.Exit(1)
    }

    fmt.Printf("Baseline saved to %s (%d entries)\n",
        baselinePath, baseline.Len())
}

Report Errors

Error Generating Report

package main

import (
    "context"
    "fmt"
    "os"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    s := scanner.New(scanner.DefaultConfig())

    result, err := s.Scan(context.Background(), "./src")
    if err != nil {
        fmt.Printf("Scan error: %v\n", err)
        os.Exit(1)
    }

    // Try to generate report in different formats
    formats := []string{"json", "sarif", "evidence"}

    for _, format := range formats {
        report, err := s.GenerateReport(result, format)
        if err != nil {
            fmt.Printf("Error generating %s report: %v\n", format, err)
            continue // Try next format
        }

        outputFile := fmt.Sprintf("scan-results.%s", formatExtension(format))
        if err := os.WriteFile(outputFile, report, 0644); err != nil {
            fmt.Printf("Error writing %s: %v\n", outputFile, err)
            continue
        }

        fmt.Printf("Report %s saved to %s\n", format, outputFile)
    }
}

func formatExtension(format string) string {
    switch format {
    case "sarif":
        return "sarif"
    default:
        return "json"
    }
}

Handling Patterns

Wrapper with Retries

package main

import (
    "context"
    "errors"
    "fmt"
    "log"
    "time"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

type ScanError struct {
    Attempts int
    LastErr  error
}

func (e *ScanError) Error() string {
    return fmt.Sprintf("scan failed after %d attempts: %v", e.Attempts, e.LastErr)
}

func scanWithRetry(path string, maxAttempts int, backoff time.Duration) (*scanner.Result, error) {
    cfg := scanner.DefaultConfig()
    s := scanner.New(cfg)

    var lastErr error

    for attempt := 1; attempt <= maxAttempts; attempt++ {
        ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)

        result, err := s.Scan(ctx, path)
        cancel()

        if err == nil {
            return result, nil
        }

        lastErr = err

        // Don't retry on non-recoverable errors
        if errors.Is(err, context.Canceled) {
            return nil, err
        }
        if isPathError(err) {
            return nil, err
        }

        // Wait before retrying
        if attempt < maxAttempts {
            wait := backoff * time.Duration(attempt)
            log.Printf("Attempt %d failed, retrying in %v...", attempt, wait)
            time.Sleep(wait)
        }
    }

    return nil, &ScanError{
        Attempts: maxAttempts,
        LastErr:  lastErr,
    }
}

func isPathError(err error) bool {
    return err != nil && (errors.Is(err, errors.New("no such file")) ||
        errors.Is(err, errors.New("permission denied")))
}

func main() {
    result, err := scanWithRetry("./src", 3, 5*time.Second)
    if err != nil {
        var scanErr *ScanError
        if errors.As(err, &scanErr) {
            fmt.Printf("Persistent failure: %v\n", scanErr)
        } else {
            fmt.Printf("Error: %v\n", err)
        }
        return
    }

    fmt.Printf("Findings: %d\n", len(result.Findings))
}

Structured Error Handling

package main

import (
    "context"
    "errors"
    "fmt"
    "os"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

// ExitCode represents standardized exit codes
type ExitCode int

const (
    ExitSuccess         ExitCode = 0
    ExitScanError       ExitCode = 1
    ExitFindingsFound   ExitCode = 2
    ExitPathNotFound    ExitCode = 3
    ExitPermissionError ExitCode = 4
    ExitTimeout         ExitCode = 124
    ExitCanceled        ExitCode = 130
)

type ScanOutcome struct {
    Result   *scanner.Result
    Error    error
    ExitCode ExitCode
    Message  string
}

func runScan(path string) ScanOutcome {
    s := scanner.New(scanner.DefaultConfig())

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    result, err := s.Scan(ctx, path)
    if err != nil {
        return classifyError(err)
    }

    // Check if there are findings
    if result.ExitCode != 0 {
        return ScanOutcome{
            Result:   result,
            ExitCode: ExitFindingsFound,
            Message:  fmt.Sprintf("Found %d findings", len(result.Findings)),
        }
    }

    return ScanOutcome{
        Result:   result,
        ExitCode: ExitSuccess,
        Message:  "Scan completed without blocking findings",
    }
}

func classifyError(err error) ScanOutcome {
    switch {
    case errors.Is(err, context.DeadlineExceeded):
        return ScanOutcome{
            Error:    err,
            ExitCode: ExitTimeout,
            Message:  "Scan exceeded time limit",
        }

    case errors.Is(err, context.Canceled):
        return ScanOutcome{
            Error:    err,
            ExitCode: ExitCanceled,
            Message:  "Scan canceled",
        }

    case os.IsNotExist(errors.Unwrap(err)):
        return ScanOutcome{
            Error:    err,
            ExitCode: ExitPathNotFound,
            Message:  "Path not found",
        }

    case os.IsPermission(errors.Unwrap(err)):
        return ScanOutcome{
            Error:    err,
            ExitCode: ExitPermissionError,
            Message:  "Insufficient permissions",
        }

    default:
        return ScanOutcome{
            Error:    err,
            ExitCode: ExitScanError,
            Message:  "Scan error",
        }
    }
}

func main() {
    outcome := runScan("./src")

    // Logging
    if outcome.Error != nil {
        fmt.Fprintf(os.Stderr, "ERROR: %s\n", outcome.Message)
        fmt.Fprintf(os.Stderr, "Details: %v\n", outcome.Error)
    } else {
        fmt.Println(outcome.Message)
        if outcome.Result != nil {
            fmt.Printf("Files: %d\n", outcome.Result.Manifest.TotalFiles)
            fmt.Printf("Findings: %d\n", len(outcome.Result.Findings))
            fmt.Printf("MSSS: %.1f\n", outcome.Result.MSSSScore.Total)
        }
    }

    os.Exit(int(outcome.ExitCode))
}

Logging and Diagnostics

Structured Logger

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log/slog"
    "os"
    "time"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    // Configure structured logger
    logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
        Level: slog.LevelDebug,
    }))

    logger.Info("Starting scan",
        slog.String("path", "./src"),
        slog.String("mode", "fast"))

    cfg := scanner.DefaultConfig()
    s := scanner.New(cfg)

    start := time.Now()
    ctx := context.Background()

    result, err := s.Scan(ctx, "./src")
    duration := time.Since(start)

    if err != nil {
        logger.Error("Scan failed",
            slog.String("error", err.Error()),
            slog.Duration("duration", duration))
        os.Exit(1)
    }

    logger.Info("Scan completed",
        slog.Duration("duration", duration),
        slog.Int("files", result.Manifest.TotalFiles),
        slog.Int("findings", len(result.Findings)),
        slog.Float64("msss_score", result.MSSSScore.Total),
        slog.Int("msss_level", result.MSSSScore.Level))

    // Detailed logging for critical findings
    for _, f := range result.Findings {
        if f.Severity == "critical" {
            logger.Warn("Critical finding detected",
                slog.String("rule_id", f.RuleID),
                slog.String("file", f.Location.File),
                slog.Int("line", f.Location.StartLine),
                slog.String("title", f.Title))
        }
    }
}

Diagnostic Report

package main

import (
    "context"
    "fmt"
    "os"
    "runtime"
    "time"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

type DiagnosticReport struct {
    Timestamp  time.Time         `json:"timestamp"`
    System     SystemInfo        `json:"system"`
    Config     scanner.Config    `json:"config"`
    ScanResult *ScanDiagnostic   `json:"scan_result,omitempty"`
    Error      *ErrorDiagnostic  `json:"error,omitempty"`
}

type SystemInfo struct {
    OS        string `json:"os"`
    Arch      string `json:"arch"`
    GoVersion string `json:"go_version"`
    NumCPU    int    `json:"num_cpu"`
}

type ScanDiagnostic struct {
    Duration    string `json:"duration"`
    Files       int    `json:"files"`
    Findings    int    `json:"findings"`
    MSSSScore   float64 `json:"msss_score"`
    MemoryUsed  uint64 `json:"memory_used_bytes"`
}

type ErrorDiagnostic struct {
    Type    string `json:"type"`
    Message string `json:"message"`
}

func runWithDiagnostics(path string) *DiagnosticReport {
    report := &DiagnosticReport{
        Timestamp: time.Now().UTC(),
        System: SystemInfo{
            OS:        runtime.GOOS,
            Arch:      runtime.GOARCH,
            GoVersion: runtime.Version(),
            NumCPU:    runtime.NumCPU(),
        },
    }

    cfg := scanner.DefaultConfig()
    report.Config = cfg

    s := scanner.New(cfg)

    ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
    defer cancel()

    result, err := s.Scan(ctx, path)

    if err != nil {
        report.Error = &ErrorDiagnostic{
            Type:    fmt.Sprintf("%T", err),
            Message: err.Error(),
        }
        return report
    }

    var m runtime.MemStats
    runtime.ReadMemStats(&m)

    report.ScanResult = &ScanDiagnostic{
        Duration:   result.ScanDuration.String(),
        Files:      result.Manifest.TotalFiles,
        Findings:   len(result.Findings),
        MSSSScore:  result.MSSSScore.Total,
        MemoryUsed: m.Alloc,
    }

    return report
}

func main() {
    path := "./src"
    if len(os.Args) > 1 {
        path = os.Args[1]
    }

    report := runWithDiagnostics(path)

    // Print diagnostic
    fmt.Println("=== Scan Diagnostic ===")
    fmt.Printf("Date: %v\n", report.Timestamp)
    fmt.Printf("System: %s/%s (%d CPUs)\n",
        report.System.OS, report.System.Arch, report.System.NumCPU)

    if report.Error != nil {
        fmt.Printf("\nError: %s\n", report.Error.Message)
        fmt.Printf("Type: %s\n", report.Error.Type)
    }

    if report.ScanResult != nil {
        fmt.Printf("\nResults:\n")
        fmt.Printf("  Duration: %s\n", report.ScanResult.Duration)
        fmt.Printf("  Files: %d\n", report.ScanResult.Files)
        fmt.Printf("  Findings: %d\n", report.ScanResult.Findings)
        fmt.Printf("  MSSS Score: %.1f\n", report.ScanResult.MSSSScore)
        fmt.Printf("  Memory used: %.2f MB\n",
            float64(report.ScanResult.MemoryUsed)/1024/1024)
    }
}