Skip to content

Concurrency and Thread Safety - mcp-scan

This guide explains the concurrency model of mcp-scan and how to use it safely in multi-threaded applications.


Table of Contents


Concurrency Model

Internal Architecture

mcp-scan uses a worker pool model for parallel processing:

        +-----------------+
        |    Scan()       |
        +--------+--------+
                 |
    +------------+------------+
    |                         |
    v                         v
+--------+              +--------+
| Disk   |              | Config |
| I/O    |              | Parse  |
+--------+              +--------+
    |
    v
+----------------------------+
|     Worker Pool            |
|  +---------+ +---------+   |
|  | Worker1 | | Worker2 |...|
|  +---------+ +---------+   |
+----------------------------+
    |
    v
+----------------------------+
|    Parallel Analysis       |
|  +---------+ +---------+   |
|  | Pattern | | Taint   |   |
|  +---------+ +---------+   |
+----------------------------+
    |
    v
+----------------------------+
|    Combined Results        |
+----------------------------+

Processing Phases

Phase Parallelism Description
Discovery Sequential Traverses the filesystem
Parsing Parallel Worker pool per file
Surface extraction Sequential Combines data from all files
Pattern analysis Parallel Worker pool per file
Taint analysis Parallel Worker pool per file
Normalization Sequential Deduplication and scoring
MSSS calculation Sequential Score calculation

Thread Safety

Thread-Safe Types

The following types are safe for concurrent use:

// Scanner is thread-safe for Scan() calls
// BUT Config must not be modified during a scan
type Scanner struct {
    config Config // immutable after construction
    // ... internal fields
}

// Result is immutable once returned
type Result struct {
    Findings   []types.Finding
    Summary    *types.Summary
    // ...
}

// Baseline is thread-safe for concurrent reads
// Modifications require external synchronization
type Baseline struct {
    // ...
}

Safety Guarantees

  1. Scanner.Scan() - Thread-safe. Multiple goroutines can call Scan() on the same Scanner instance.

  2. Result - Returned results are immutable. Each call to Scan() returns a new Result.

  3. GenerateReport() - Thread-safe. Does not modify the Result.

  4. Baseline - Read operations are thread-safe. Writes require synchronization.

Safe Concurrent Usage Example

package main

import (
    "context"
    "fmt"
    "sync"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    // Single scanner instance for multiple scans
    cfg := scanner.DefaultConfig()
    s := scanner.New(cfg)

    paths := []string{
        "./project-a",
        "./project-b",
        "./project-c",
    }

    var wg sync.WaitGroup
    results := make(chan *scanner.Result, len(paths))

    for _, path := range paths {
        wg.Add(1)
        go func(p string) {
            defer wg.Done()
            // Each goroutine can safely call Scan()
            result, err := s.Scan(context.Background(), p)
            if err != nil {
                fmt.Printf("Error in %s: %v\n", p, err)
                return
            }
            results <- result
        }(path)
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        fmt.Printf("Findings: %d\n", len(result.Findings))
    }
}

Incorrect Pattern (DO NOT DO)

// INCORRECT: Modifying config during scans
func badExample() {
    cfg := scanner.DefaultConfig()
    s := scanner.New(cfg)

    var wg sync.WaitGroup

    wg.Add(2)
    go func() {
        defer wg.Done()
        // DANGER: Modifying config while another scan is active
        cfg.Mode = scanner.ModeDeep  // DO NOT DO THIS
        s.Scan(context.Background(), "./src1")
    }()

    go func() {
        defer wg.Done()
        s.Scan(context.Background(), "./src2")
    }()

    wg.Wait()
}

Concurrent Usage Patterns

Scanner Pool

For scans with different configurations, use a scanner pool:

package main

import (
    "context"
    "sync"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

// ScannerPool manages multiple scanner instances
type ScannerPool struct {
    scanners chan *scanner.Scanner
}

func NewScannerPool(size int, cfg scanner.Config) *ScannerPool {
    pool := &ScannerPool{
        scanners: make(chan *scanner.Scanner, size),
    }

    for i := 0; i < size; i++ {
        pool.scanners <- scanner.New(cfg)
    }

    return pool
}

func (p *ScannerPool) Acquire() *scanner.Scanner {
    return <-p.scanners
}

func (p *ScannerPool) Release(s *scanner.Scanner) {
    p.scanners <- s
}

func (p *ScannerPool) Scan(ctx context.Context, path string) (*scanner.Result, error) {
    s := p.Acquire()
    defer p.Release(s)
    return s.Scan(ctx, path)
}

func main() {
    cfg := scanner.DefaultConfig()
    pool := NewScannerPool(4, cfg)

    paths := []string{"./p1", "./p2", "./p3", "./p4", "./p5", "./p6"}

    var wg sync.WaitGroup
    for _, path := range paths {
        wg.Add(1)
        go func(p string) {
            defer wg.Done()
            result, _ := pool.Scan(context.Background(), p)
            if result != nil {
                println(p, len(result.Findings))
            }
        }(path)
    }

    wg.Wait()
}

Baseline Synchronization

package main

import (
    "context"
    "sync"

    "github.com/mcphub/mcp-scan/pkg/scanner"
    "github.com/mcphub/mcp-scan/internal/types"
)

// ThreadSafeBaseline wraps a Baseline with synchronization
type ThreadSafeBaseline struct {
    mu       sync.RWMutex
    baseline *scanner.Baseline
}

func NewThreadSafeBaseline() *ThreadSafeBaseline {
    return &ThreadSafeBaseline{
        baseline: scanner.NewBaseline(),
    }
}

func (b *ThreadSafeBaseline) Contains(finding types.Finding) bool {
    b.mu.RLock()
    defer b.mu.RUnlock()
    return b.baseline.Contains(finding)
}

func (b *ThreadSafeBaseline) AddFinding(finding types.Finding, reason, acceptedBy string) {
    b.mu.Lock()
    defer b.mu.Unlock()
    b.baseline.AddFinding(finding, reason, acceptedBy)
}

func (b *ThreadSafeBaseline) Filter(findings []types.Finding) ([]types.Finding, int) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    return b.baseline.Filter(findings)
}

func main() {
    cfg := scanner.DefaultConfig()
    s := scanner.New(cfg)

    safeBaseline := NewThreadSafeBaseline()

    var wg sync.WaitGroup
    paths := []string{"./p1", "./p2", "./p3"}

    for _, path := range paths {
        wg.Add(1)
        go func(p string) {
            defer wg.Done()

            result, err := s.Scan(context.Background(), p)
            if err != nil {
                return
            }

            // Filter with baseline in a thread-safe manner
            filtered, _ := safeBaseline.Filter(result.Findings)

            // Process filtered findings
            for _, f := range filtered {
                println(f.RuleID)
            }
        }(path)
    }

    wg.Wait()
}

Fan-Out/Fan-In

package main

import (
    "context"
    "fmt"
    "sync"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func fanOutFanIn(paths []string, workers int) []*scanner.Result {
    cfg := scanner.DefaultConfig()

    // Input channel
    jobs := make(chan string, len(paths))

    // Output channel
    results := make(chan *scanner.Result, len(paths))

    // Workers
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            // Each worker has its own scanner
            s := scanner.New(cfg)

            for path := range jobs {
                result, err := s.Scan(context.Background(), path)
                if err != nil {
                    fmt.Printf("Worker %d: error in %s: %v\n", workerID, path, err)
                    continue
                }
                results <- result
            }
        }(i)
    }

    // Send jobs
    go func() {
        for _, path := range paths {
            jobs <- path
        }
        close(jobs)
    }()

    // Wait and close results
    go func() {
        wg.Wait()
        close(results)
    }()

    // Collect results
    var allResults []*scanner.Result
    for result := range results {
        allResults = append(allResults, result)
    }

    return allResults
}

func main() {
    paths := []string{
        "./service-1",
        "./service-2",
        "./service-3",
        "./service-4",
        "./service-5",
    }

    results := fanOutFanIn(paths, 3)

    totalFindings := 0
    for _, r := range results {
        totalFindings += len(r.Findings)
    }

    fmt.Printf("Total findings: %d in %d projects\n",
        totalFindings, len(results))
}

Resource Control

Limiting Workers

package main

import (
    "context"
    "runtime"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    cfg := scanner.DefaultConfig()

    // Limit workers based on system capacity
    maxWorkers := runtime.NumCPU()
    if maxWorkers > 8 {
        maxWorkers = 8  // Maximum cap
    }
    cfg.Workers = maxWorkers

    // Or use fixed value for constrained environments
    // cfg.Workers = 2  // For containers with limited resources

    s := scanner.New(cfg)
    result, _ := s.Scan(context.Background(), "./src")

    println("Findings:", len(result.Findings))
}

Limiting Memory

To control memory usage in large scans:

package main

import (
    "context"
    "fmt"
    "runtime"
    "time"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func scanWithMemoryLimit(path string, memoryLimitMB int) (*scanner.Result, error) {
    // Monitor memory usage
    done := make(chan struct{})
    defer close(done)

    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-done:
                return
            case <-ticker.C:
                var m runtime.MemStats
                runtime.ReadMemStats(&m)

                allocMB := m.Alloc / 1024 / 1024
                if int(allocMB) > memoryLimitMB {
                    fmt.Printf("WARNING: High memory usage: %d MB\n", allocMB)
                }
            }
        }
    }()

    cfg := scanner.DefaultConfig()
    // Fewer workers = less memory
    cfg.Workers = 2

    s := scanner.New(cfg)
    return s.Scan(context.Background(), path)
}

func main() {
    result, err := scanWithMemoryLimit("./src", 500)  // 500 MB limit
    if err != nil {
        panic(err)
    }

    fmt.Printf("Findings: %d\n", len(result.Findings))
}

Semaphore for Limiting Concurrency

package main

import (
    "context"
    "sync"

    "github.com/mcphub/mcp-scan/pkg/scanner"
    "golang.org/x/sync/semaphore"
)

func main() {
    cfg := scanner.DefaultConfig()

    // Maximum 3 simultaneous scans
    sem := semaphore.NewWeighted(3)

    paths := []string{
        "./project-1", "./project-2", "./project-3",
        "./project-4", "./project-5", "./project-6",
    }

    var wg sync.WaitGroup
    results := make([]*scanner.Result, len(paths))

    for i, path := range paths {
        wg.Add(1)
        go func(idx int, p string) {
            defer wg.Done()

            // Acquire permit
            if err := sem.Acquire(context.Background(), 1); err != nil {
                return
            }
            defer sem.Release(1)

            s := scanner.New(cfg)
            result, err := s.Scan(context.Background(), p)
            if err != nil {
                return
            }

            results[idx] = result
        }(i, path)
    }

    wg.Wait()

    for i, r := range results {
        if r != nil {
            println(paths[i], ":", len(r.Findings), "findings")
        }
    }
}

Best Practices

1. One Scanner Instance per Configuration

// CORRECT: Reuse scanner for same configuration
cfg := scanner.DefaultConfig()
s := scanner.New(cfg)

// Multiple scans with the same scanner
result1, _ := s.Scan(ctx, "./path1")
result2, _ := s.Scan(ctx, "./path2")

2. Do Not Modify Config After Creating Scanner

// CORRECT
cfg := scanner.DefaultConfig()
cfg.Mode = scanner.ModeDeep  // Modify before
s := scanner.New(cfg)

// INCORRECT
s := scanner.New(cfg)
cfg.Mode = scanner.ModeDeep  // DO NOT modify after

3. Use Contexts for Cancellation

// CORRECT: Context with timeout per scan
func scanWithTimeout(s *scanner.Scanner, path string, timeout time.Duration) (*scanner.Result, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    return s.Scan(ctx, path)
}

4. Protect Shared Data

// CORRECT: Protect access to shared data
var (
    mu         sync.Mutex
    allResults []*scanner.Result
)

// In goroutine
mu.Lock()
allResults = append(allResults, result)
mu.Unlock()

5. Limit Parallelism in Containers

// In environments with limited resources
func getWorkerCount() int {
    // Respect container CPU limits
    cpuQuota := runtime.NumCPU()

    // Do not exceed 4 workers in containers
    if cpuQuota > 4 {
        return 4
    }
    return cpuQuota
}

6. Handle Errors Atomically

// CORRECT: Atomic error channel
errCh := make(chan error, 1)

go func() {
    _, err := s.Scan(ctx, path)
    if err != nil {
        select {
        case errCh <- err:  // Only the first error
        default:
        }
    }
}()

7. Avoid Data Races

// Use go test -race to detect data races
// go test -race ./...

// INCORRECT: Unsynchronized access
var totalFindings int
for result := range results {
    totalFindings += len(result.Findings)  // Data race!
}

// CORRECT: Use atomic or mutex
var totalFindings int64
for result := range results {
    atomic.AddInt64(&totalFindings, int64(len(result.Findings)))
}