Concurrency and Thread Safety - mcp-scan¶
This guide explains the concurrency model of mcp-scan and how to use it safely in multi-threaded applications.
Table of Contents¶
Concurrency Model¶
Internal Architecture¶
mcp-scan uses a worker pool model for parallel processing:
+-----------------+
| Scan() |
+--------+--------+
|
+------------+------------+
| |
v v
+--------+ +--------+
| Disk | | Config |
| I/O | | Parse |
+--------+ +--------+
|
v
+----------------------------+
| Worker Pool |
| +---------+ +---------+ |
| | Worker1 | | Worker2 |...|
| +---------+ +---------+ |
+----------------------------+
|
v
+----------------------------+
| Parallel Analysis |
| +---------+ +---------+ |
| | Pattern | | Taint | |
| +---------+ +---------+ |
+----------------------------+
|
v
+----------------------------+
| Combined Results |
+----------------------------+
Processing Phases¶
| Phase | Parallelism | Description |
|---|---|---|
| Discovery | Sequential | Traverses the filesystem |
| Parsing | Parallel | Worker pool per file |
| Surface extraction | Sequential | Combines data from all files |
| Pattern analysis | Parallel | Worker pool per file |
| Taint analysis | Parallel | Worker pool per file |
| Normalization | Sequential | Deduplication and scoring |
| MSSS calculation | Sequential | Score calculation |
Thread Safety¶
Thread-Safe Types¶
The following types are safe for concurrent use:
// Scanner is thread-safe for Scan() calls
// BUT Config must not be modified during a scan
type Scanner struct {
config Config // immutable after construction
// ... internal fields
}
// Result is immutable once returned
type Result struct {
Findings []types.Finding
Summary *types.Summary
// ...
}
// Baseline is thread-safe for concurrent reads
// Modifications require external synchronization
type Baseline struct {
// ...
}
Safety Guarantees¶
-
Scanner.Scan() - Thread-safe. Multiple goroutines can call
Scan()on the same Scanner instance. -
Result - Returned results are immutable. Each call to
Scan()returns a newResult. -
GenerateReport() - Thread-safe. Does not modify the Result.
-
Baseline - Read operations are thread-safe. Writes require synchronization.
Safe Concurrent Usage Example¶
package main
import (
"context"
"fmt"
"sync"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
func main() {
// Single scanner instance for multiple scans
cfg := scanner.DefaultConfig()
s := scanner.New(cfg)
paths := []string{
"./project-a",
"./project-b",
"./project-c",
}
var wg sync.WaitGroup
results := make(chan *scanner.Result, len(paths))
for _, path := range paths {
wg.Add(1)
go func(p string) {
defer wg.Done()
// Each goroutine can safely call Scan()
result, err := s.Scan(context.Background(), p)
if err != nil {
fmt.Printf("Error in %s: %v\n", p, err)
return
}
results <- result
}(path)
}
go func() {
wg.Wait()
close(results)
}()
for result := range results {
fmt.Printf("Findings: %d\n", len(result.Findings))
}
}
Incorrect Pattern (DO NOT DO)¶
// INCORRECT: Modifying config during scans
func badExample() {
cfg := scanner.DefaultConfig()
s := scanner.New(cfg)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
// DANGER: Modifying config while another scan is active
cfg.Mode = scanner.ModeDeep // DO NOT DO THIS
s.Scan(context.Background(), "./src1")
}()
go func() {
defer wg.Done()
s.Scan(context.Background(), "./src2")
}()
wg.Wait()
}
Concurrent Usage Patterns¶
Scanner Pool¶
For scans with different configurations, use a scanner pool:
package main
import (
"context"
"sync"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
// ScannerPool manages multiple scanner instances
type ScannerPool struct {
scanners chan *scanner.Scanner
}
func NewScannerPool(size int, cfg scanner.Config) *ScannerPool {
pool := &ScannerPool{
scanners: make(chan *scanner.Scanner, size),
}
for i := 0; i < size; i++ {
pool.scanners <- scanner.New(cfg)
}
return pool
}
func (p *ScannerPool) Acquire() *scanner.Scanner {
return <-p.scanners
}
func (p *ScannerPool) Release(s *scanner.Scanner) {
p.scanners <- s
}
func (p *ScannerPool) Scan(ctx context.Context, path string) (*scanner.Result, error) {
s := p.Acquire()
defer p.Release(s)
return s.Scan(ctx, path)
}
func main() {
cfg := scanner.DefaultConfig()
pool := NewScannerPool(4, cfg)
paths := []string{"./p1", "./p2", "./p3", "./p4", "./p5", "./p6"}
var wg sync.WaitGroup
for _, path := range paths {
wg.Add(1)
go func(p string) {
defer wg.Done()
result, _ := pool.Scan(context.Background(), p)
if result != nil {
println(p, len(result.Findings))
}
}(path)
}
wg.Wait()
}
Baseline Synchronization¶
package main
import (
"context"
"sync"
"github.com/mcphub/mcp-scan/pkg/scanner"
"github.com/mcphub/mcp-scan/internal/types"
)
// ThreadSafeBaseline wraps a Baseline with synchronization
type ThreadSafeBaseline struct {
mu sync.RWMutex
baseline *scanner.Baseline
}
func NewThreadSafeBaseline() *ThreadSafeBaseline {
return &ThreadSafeBaseline{
baseline: scanner.NewBaseline(),
}
}
func (b *ThreadSafeBaseline) Contains(finding types.Finding) bool {
b.mu.RLock()
defer b.mu.RUnlock()
return b.baseline.Contains(finding)
}
func (b *ThreadSafeBaseline) AddFinding(finding types.Finding, reason, acceptedBy string) {
b.mu.Lock()
defer b.mu.Unlock()
b.baseline.AddFinding(finding, reason, acceptedBy)
}
func (b *ThreadSafeBaseline) Filter(findings []types.Finding) ([]types.Finding, int) {
b.mu.RLock()
defer b.mu.RUnlock()
return b.baseline.Filter(findings)
}
func main() {
cfg := scanner.DefaultConfig()
s := scanner.New(cfg)
safeBaseline := NewThreadSafeBaseline()
var wg sync.WaitGroup
paths := []string{"./p1", "./p2", "./p3"}
for _, path := range paths {
wg.Add(1)
go func(p string) {
defer wg.Done()
result, err := s.Scan(context.Background(), p)
if err != nil {
return
}
// Filter with baseline in a thread-safe manner
filtered, _ := safeBaseline.Filter(result.Findings)
// Process filtered findings
for _, f := range filtered {
println(f.RuleID)
}
}(path)
}
wg.Wait()
}
Fan-Out/Fan-In¶
package main
import (
"context"
"fmt"
"sync"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
func fanOutFanIn(paths []string, workers int) []*scanner.Result {
cfg := scanner.DefaultConfig()
// Input channel
jobs := make(chan string, len(paths))
// Output channel
results := make(chan *scanner.Result, len(paths))
// Workers
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
// Each worker has its own scanner
s := scanner.New(cfg)
for path := range jobs {
result, err := s.Scan(context.Background(), path)
if err != nil {
fmt.Printf("Worker %d: error in %s: %v\n", workerID, path, err)
continue
}
results <- result
}
}(i)
}
// Send jobs
go func() {
for _, path := range paths {
jobs <- path
}
close(jobs)
}()
// Wait and close results
go func() {
wg.Wait()
close(results)
}()
// Collect results
var allResults []*scanner.Result
for result := range results {
allResults = append(allResults, result)
}
return allResults
}
func main() {
paths := []string{
"./service-1",
"./service-2",
"./service-3",
"./service-4",
"./service-5",
}
results := fanOutFanIn(paths, 3)
totalFindings := 0
for _, r := range results {
totalFindings += len(r.Findings)
}
fmt.Printf("Total findings: %d in %d projects\n",
totalFindings, len(results))
}
Resource Control¶
Limiting Workers¶
package main
import (
"context"
"runtime"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
func main() {
cfg := scanner.DefaultConfig()
// Limit workers based on system capacity
maxWorkers := runtime.NumCPU()
if maxWorkers > 8 {
maxWorkers = 8 // Maximum cap
}
cfg.Workers = maxWorkers
// Or use fixed value for constrained environments
// cfg.Workers = 2 // For containers with limited resources
s := scanner.New(cfg)
result, _ := s.Scan(context.Background(), "./src")
println("Findings:", len(result.Findings))
}
Limiting Memory¶
To control memory usage in large scans:
package main
import (
"context"
"fmt"
"runtime"
"time"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
func scanWithMemoryLimit(path string, memoryLimitMB int) (*scanner.Result, error) {
// Monitor memory usage
done := make(chan struct{})
defer close(done)
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
allocMB := m.Alloc / 1024 / 1024
if int(allocMB) > memoryLimitMB {
fmt.Printf("WARNING: High memory usage: %d MB\n", allocMB)
}
}
}
}()
cfg := scanner.DefaultConfig()
// Fewer workers = less memory
cfg.Workers = 2
s := scanner.New(cfg)
return s.Scan(context.Background(), path)
}
func main() {
result, err := scanWithMemoryLimit("./src", 500) // 500 MB limit
if err != nil {
panic(err)
}
fmt.Printf("Findings: %d\n", len(result.Findings))
}
Semaphore for Limiting Concurrency¶
package main
import (
"context"
"sync"
"github.com/mcphub/mcp-scan/pkg/scanner"
"golang.org/x/sync/semaphore"
)
func main() {
cfg := scanner.DefaultConfig()
// Maximum 3 simultaneous scans
sem := semaphore.NewWeighted(3)
paths := []string{
"./project-1", "./project-2", "./project-3",
"./project-4", "./project-5", "./project-6",
}
var wg sync.WaitGroup
results := make([]*scanner.Result, len(paths))
for i, path := range paths {
wg.Add(1)
go func(idx int, p string) {
defer wg.Done()
// Acquire permit
if err := sem.Acquire(context.Background(), 1); err != nil {
return
}
defer sem.Release(1)
s := scanner.New(cfg)
result, err := s.Scan(context.Background(), p)
if err != nil {
return
}
results[idx] = result
}(i, path)
}
wg.Wait()
for i, r := range results {
if r != nil {
println(paths[i], ":", len(r.Findings), "findings")
}
}
}
Best Practices¶
1. One Scanner Instance per Configuration¶
// CORRECT: Reuse scanner for same configuration
cfg := scanner.DefaultConfig()
s := scanner.New(cfg)
// Multiple scans with the same scanner
result1, _ := s.Scan(ctx, "./path1")
result2, _ := s.Scan(ctx, "./path2")
2. Do Not Modify Config After Creating Scanner¶
// CORRECT
cfg := scanner.DefaultConfig()
cfg.Mode = scanner.ModeDeep // Modify before
s := scanner.New(cfg)
// INCORRECT
s := scanner.New(cfg)
cfg.Mode = scanner.ModeDeep // DO NOT modify after
3. Use Contexts for Cancellation¶
// CORRECT: Context with timeout per scan
func scanWithTimeout(s *scanner.Scanner, path string, timeout time.Duration) (*scanner.Result, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return s.Scan(ctx, path)
}
4. Protect Shared Data¶
// CORRECT: Protect access to shared data
var (
mu sync.Mutex
allResults []*scanner.Result
)
// In goroutine
mu.Lock()
allResults = append(allResults, result)
mu.Unlock()
5. Limit Parallelism in Containers¶
// In environments with limited resources
func getWorkerCount() int {
// Respect container CPU limits
cpuQuota := runtime.NumCPU()
// Do not exceed 4 workers in containers
if cpuQuota > 4 {
return 4
}
return cpuQuota
}
6. Handle Errors Atomically¶
// CORRECT: Atomic error channel
errCh := make(chan error, 1)
go func() {
_, err := s.Scan(ctx, path)
if err != nil {
select {
case errCh <- err: // Only the first error
default:
}
}
}()
7. Avoid Data Races¶
// Use go test -race to detect data races
// go test -race ./...
// INCORRECT: Unsynchronized access
var totalFindings int
for result := range results {
totalFindings += len(result.Findings) // Data race!
}
// CORRECT: Use atomic or mutex
var totalFindings int64
for result := range results {
atomic.AddInt64(&totalFindings, int64(len(result.Findings)))
}