Skip to content

Concurrencia y Seguridad de Hilos - mcp-scan

Esta guia explica el modelo de concurrencia de mcp-scan y como usarlo de manera segura en aplicaciones multi-hilo.


Indice


Modelo de Concurrencia

Arquitectura Interna

mcp-scan utiliza un modelo de worker pool para el procesamiento paralelo:

        +-----------------+
        |    Scan()       |
        +--------+--------+
                 |
    +------------+------------+
    |                         |
    v                         v
+--------+              +--------+
| Disco  |              | Config |
| I/O    |              | Parse  |
+--------+              +--------+
    |
    v
+----------------------------+
|     Worker Pool            |
|  +---------+ +---------+   |
|  | Worker1 | | Worker2 |...|
|  +---------+ +---------+   |
+----------------------------+
    |
    v
+----------------------------+
|    Analisis Paralelo       |
|  +---------+ +---------+   |
|  | Pattern | | Taint   |   |
|  +---------+ +---------+   |
+----------------------------+
    |
    v
+----------------------------+
|    Resultados Combinados   |
+----------------------------+

Fases de Procesamiento

Fase Paralelismo Descripcion
Descubrimiento Secuencial Recorre el filesystem
Parsing Paralelo Worker pool por archivo
Extraccion de superficie Secuencial Combina datos de todos los archivos
Analisis de patrones Paralelo Worker pool por archivo
Analisis de taint Paralelo Worker pool por archivo
Normalizacion Secuencial Deduplicacion y scoring
Calculo MSSS Secuencial Calculo de puntuacion

Seguridad de Hilos

Tipos Thread-Safe

Los siguientes tipos son seguros para uso concurrente:

// Scanner es thread-safe para llamadas a Scan()
// PERO no se debe modificar Config durante un escaneo
type Scanner struct {
    config Config // inmutable despues de construccion
    // ... campos internos
}

// Result es inmutable una vez retornado
type Result struct {
    Findings   []types.Finding
    Summary    *types.Summary
    // ...
}

// Baseline es thread-safe para lecturas concurrentes
// Las modificaciones requieren sincronizacion externa
type Baseline struct {
    // ...
}

Garantias de Seguridad

  1. Scanner.Scan() - Thread-safe. Multiples goroutines pueden llamar a Scan() en la misma instancia de Scanner.

  2. Result - Los resultados retornados son inmutables. Cada llamada a Scan() retorna un nuevo Result.

  3. GenerateReport() - Thread-safe. No modifica el Result.

  4. Baseline - Las operaciones de lectura son thread-safe. Las escrituras requieren sincronizacion.

Ejemplo de Uso Concurrente Seguro

package main

import (
    "context"
    "fmt"
    "sync"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    // Una sola instancia de scanner para multiples escaneos
    cfg := scanner.DefaultConfig()
    s := scanner.New(cfg)

    paths := []string{
        "./project-a",
        "./project-b",
        "./project-c",
    }

    var wg sync.WaitGroup
    results := make(chan *scanner.Result, len(paths))

    for _, path := range paths {
        wg.Add(1)
        go func(p string) {
            defer wg.Done()
            // Cada goroutine puede llamar a Scan() de forma segura
            result, err := s.Scan(context.Background(), p)
            if err != nil {
                fmt.Printf("Error en %s: %v\n", p, err)
                return
            }
            results <- result
        }(path)
    }

    go func() {
        wg.Wait()
        close(results)
    }()

    for result := range results {
        fmt.Printf("Hallazgos: %d\n", len(result.Findings))
    }
}

Patron Incorrecto (NO HACER)

// INCORRECTO: Modificar config durante escaneos
func badExample() {
    cfg := scanner.DefaultConfig()
    s := scanner.New(cfg)

    var wg sync.WaitGroup

    wg.Add(2)
    go func() {
        defer wg.Done()
        // PELIGRO: Modificar config mientras otro escaneo esta activo
        cfg.Mode = scanner.ModeDeep  // NO HACER ESTO
        s.Scan(context.Background(), "./src1")
    }()

    go func() {
        defer wg.Done()
        s.Scan(context.Background(), "./src2")
    }()

    wg.Wait()
}

Patrones de Uso Concurrente

Pool de Scanners

Para escaneos con diferentes configuraciones, usa un pool de scanners:

package main

import (
    "context"
    "sync"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

// ScannerPool gestiona multiples instancias de scanner
type ScannerPool struct {
    scanners chan *scanner.Scanner
}

func NewScannerPool(size int, cfg scanner.Config) *ScannerPool {
    pool := &ScannerPool{
        scanners: make(chan *scanner.Scanner, size),
    }

    for i := 0; i < size; i++ {
        pool.scanners <- scanner.New(cfg)
    }

    return pool
}

func (p *ScannerPool) Acquire() *scanner.Scanner {
    return <-p.scanners
}

func (p *ScannerPool) Release(s *scanner.Scanner) {
    p.scanners <- s
}

func (p *ScannerPool) Scan(ctx context.Context, path string) (*scanner.Result, error) {
    s := p.Acquire()
    defer p.Release(s)
    return s.Scan(ctx, path)
}

func main() {
    cfg := scanner.DefaultConfig()
    pool := NewScannerPool(4, cfg)

    paths := []string{"./p1", "./p2", "./p3", "./p4", "./p5", "./p6"}

    var wg sync.WaitGroup
    for _, path := range paths {
        wg.Add(1)
        go func(p string) {
            defer wg.Done()
            result, _ := pool.Scan(context.Background(), p)
            if result != nil {
                println(p, len(result.Findings))
            }
        }(path)
    }

    wg.Wait()
}

Sincronizacion de Baseline

package main

import (
    "context"
    "sync"

    "github.com/mcphub/mcp-scan/pkg/scanner"
    "github.com/mcphub/mcp-scan/internal/types"
)

// ThreadSafeBaseline envuelve un Baseline con sincronizacion
type ThreadSafeBaseline struct {
    mu       sync.RWMutex
    baseline *scanner.Baseline
}

func NewThreadSafeBaseline() *ThreadSafeBaseline {
    return &ThreadSafeBaseline{
        baseline: scanner.NewBaseline(),
    }
}

func (b *ThreadSafeBaseline) Contains(finding types.Finding) bool {
    b.mu.RLock()
    defer b.mu.RUnlock()
    return b.baseline.Contains(finding)
}

func (b *ThreadSafeBaseline) AddFinding(finding types.Finding, reason, acceptedBy string) {
    b.mu.Lock()
    defer b.mu.Unlock()
    b.baseline.AddFinding(finding, reason, acceptedBy)
}

func (b *ThreadSafeBaseline) Filter(findings []types.Finding) ([]types.Finding, int) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    return b.baseline.Filter(findings)
}

func main() {
    cfg := scanner.DefaultConfig()
    s := scanner.New(cfg)

    safeBaseline := NewThreadSafeBaseline()

    var wg sync.WaitGroup
    paths := []string{"./p1", "./p2", "./p3"}

    for _, path := range paths {
        wg.Add(1)
        go func(p string) {
            defer wg.Done()

            result, err := s.Scan(context.Background(), p)
            if err != nil {
                return
            }

            // Filtrar con baseline de forma thread-safe
            filtered, _ := safeBaseline.Filter(result.Findings)

            // Procesar hallazgos filtrados
            for _, f := range filtered {
                println(f.RuleID)
            }
        }(path)
    }

    wg.Wait()
}

Fan-Out/Fan-In

package main

import (
    "context"
    "fmt"
    "sync"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func fanOutFanIn(paths []string, workers int) []*scanner.Result {
    cfg := scanner.DefaultConfig()

    // Canal de entrada
    jobs := make(chan string, len(paths))

    // Canal de salida
    results := make(chan *scanner.Result, len(paths))

    // Workers
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            // Cada worker tiene su propio scanner
            s := scanner.New(cfg)

            for path := range jobs {
                result, err := s.Scan(context.Background(), path)
                if err != nil {
                    fmt.Printf("Worker %d: error en %s: %v\n", workerID, path, err)
                    continue
                }
                results <- result
            }
        }(i)
    }

    // Enviar trabajos
    go func() {
        for _, path := range paths {
            jobs <- path
        }
        close(jobs)
    }()

    // Esperar y cerrar resultados
    go func() {
        wg.Wait()
        close(results)
    }()

    // Recolectar resultados
    var allResults []*scanner.Result
    for result := range results {
        allResults = append(allResults, result)
    }

    return allResults
}

func main() {
    paths := []string{
        "./service-1",
        "./service-2",
        "./service-3",
        "./service-4",
        "./service-5",
    }

    results := fanOutFanIn(paths, 3)

    totalFindings := 0
    for _, r := range results {
        totalFindings += len(r.Findings)
    }

    fmt.Printf("Total hallazgos: %d en %d proyectos\n",
        totalFindings, len(results))
}

Control de Recursos

Limitar Workers

package main

import (
    "context"
    "runtime"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func main() {
    cfg := scanner.DefaultConfig()

    // Limitar workers segun la capacidad del sistema
    maxWorkers := runtime.NumCPU()
    if maxWorkers > 8 {
        maxWorkers = 8  // Cap maximo
    }
    cfg.Workers = maxWorkers

    // O usar valor fijo para entornos restringidos
    // cfg.Workers = 2  // Para contenedores con recursos limitados

    s := scanner.New(cfg)
    result, _ := s.Scan(context.Background(), "./src")

    println("Hallazgos:", len(result.Findings))
}

Limitar Memoria

Para controlar el uso de memoria en escaneos grandes:

package main

import (
    "context"
    "fmt"
    "runtime"
    "time"

    "github.com/mcphub/mcp-scan/pkg/scanner"
)

func scanWithMemoryLimit(path string, memoryLimitMB int) (*scanner.Result, error) {
    // Monitorear uso de memoria
    done := make(chan struct{})
    defer close(done)

    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()

        for {
            select {
            case <-done:
                return
            case <-ticker.C:
                var m runtime.MemStats
                runtime.ReadMemStats(&m)

                allocMB := m.Alloc / 1024 / 1024
                if int(allocMB) > memoryLimitMB {
                    fmt.Printf("ADVERTENCIA: Uso de memoria alto: %d MB\n", allocMB)
                }
            }
        }
    }()

    cfg := scanner.DefaultConfig()
    // Menos workers = menos memoria
    cfg.Workers = 2

    s := scanner.New(cfg)
    return s.Scan(context.Background(), path)
}

func main() {
    result, err := scanWithMemoryLimit("./src", 500)  // 500 MB limite
    if err != nil {
        panic(err)
    }

    fmt.Printf("Hallazgos: %d\n", len(result.Findings))
}

Semaforo para Limitar Concurrencia

package main

import (
    "context"
    "sync"

    "github.com/mcphub/mcp-scan/pkg/scanner"
    "golang.org/x/sync/semaphore"
)

func main() {
    cfg := scanner.DefaultConfig()

    // Maximo 3 escaneos simultaneos
    sem := semaphore.NewWeighted(3)

    paths := []string{
        "./project-1", "./project-2", "./project-3",
        "./project-4", "./project-5", "./project-6",
    }

    var wg sync.WaitGroup
    results := make([]*scanner.Result, len(paths))

    for i, path := range paths {
        wg.Add(1)
        go func(idx int, p string) {
            defer wg.Done()

            // Adquirir permiso
            if err := sem.Acquire(context.Background(), 1); err != nil {
                return
            }
            defer sem.Release(1)

            s := scanner.New(cfg)
            result, err := s.Scan(context.Background(), p)
            if err != nil {
                return
            }

            results[idx] = result
        }(i, path)
    }

    wg.Wait()

    for i, r := range results {
        if r != nil {
            println(paths[i], ":", len(r.Findings), "hallazgos")
        }
    }
}

Mejores Practicas

1. Una Instancia de Scanner por Configuracion

// CORRECTO: Reutilizar scanner para misma configuracion
cfg := scanner.DefaultConfig()
s := scanner.New(cfg)

// Multiples escaneos con el mismo scanner
result1, _ := s.Scan(ctx, "./path1")
result2, _ := s.Scan(ctx, "./path2")

2. No Modificar Config Despues de Crear Scanner

// CORRECTO
cfg := scanner.DefaultConfig()
cfg.Mode = scanner.ModeDeep  // Modificar antes
s := scanner.New(cfg)

// INCORRECTO
s := scanner.New(cfg)
cfg.Mode = scanner.ModeDeep  // NO modificar despues

3. Usar Contextos para Cancelacion

// CORRECTO: Contexto con timeout por escaneo
func scanWithTimeout(s *scanner.Scanner, path string, timeout time.Duration) (*scanner.Result, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    return s.Scan(ctx, path)
}

4. Proteger Datos Compartidos

// CORRECTO: Proteger acceso a datos compartidos
var (
    mu         sync.Mutex
    allResults []*scanner.Result
)

// En goroutine
mu.Lock()
allResults = append(allResults, result)
mu.Unlock()

5. Limitar Paralelismo en Contenedores

// En entornos con recursos limitados
func getWorkerCount() int {
    // Respetar limites de CPU del contenedor
    cpuQuota := runtime.NumCPU()

    // No sobrepasar 4 workers en contenedores
    if cpuQuota > 4 {
        return 4
    }
    return cpuQuota
}

6. Manejar Errores de Forma Atomica

// CORRECTO: Canal de errores atomico
errCh := make(chan error, 1)

go func() {
    _, err := s.Scan(ctx, path)
    if err != nil {
        select {
        case errCh <- err:  // Solo el primer error
        default:
        }
    }
}()

7. Evitar Data Races

// Usar go test -race para detectar data races
// go test -race ./...

// INCORRECTO: Acceso no sincronizado
var totalFindings int
for result := range results {
    totalFindings += len(result.Findings)  // Data race!
}

// CORRECTO: Usar atomic o mutex
var totalFindings int64
for result := range results {
    atomic.AddInt64(&totalFindings, int64(len(result.Findings)))
}