Concurrencia y Seguridad de Hilos - mcp-scan¶
Esta guia explica el modelo de concurrencia de mcp-scan y como usarlo de manera segura en aplicaciones multi-hilo.
Indice¶
- Modelo de Concurrencia
- Seguridad de Hilos
- Patrones de Uso Concurrente
- Control de Recursos
- Mejores Practicas
Modelo de Concurrencia¶
Arquitectura Interna¶
mcp-scan utiliza un modelo de worker pool para el procesamiento paralelo:
+-----------------+
| Scan() |
+--------+--------+
|
+------------+------------+
| |
v v
+--------+ +--------+
| Disco | | Config |
| I/O | | Parse |
+--------+ +--------+
|
v
+----------------------------+
| Worker Pool |
| +---------+ +---------+ |
| | Worker1 | | Worker2 |...|
| +---------+ +---------+ |
+----------------------------+
|
v
+----------------------------+
| Analisis Paralelo |
| +---------+ +---------+ |
| | Pattern | | Taint | |
| +---------+ +---------+ |
+----------------------------+
|
v
+----------------------------+
| Resultados Combinados |
+----------------------------+
Fases de Procesamiento¶
| Fase | Paralelismo | Descripcion |
|---|---|---|
| Descubrimiento | Secuencial | Recorre el filesystem |
| Parsing | Paralelo | Worker pool por archivo |
| Extraccion de superficie | Secuencial | Combina datos de todos los archivos |
| Analisis de patrones | Paralelo | Worker pool por archivo |
| Analisis de taint | Paralelo | Worker pool por archivo |
| Normalizacion | Secuencial | Deduplicacion y scoring |
| Calculo MSSS | Secuencial | Calculo de puntuacion |
Seguridad de Hilos¶
Tipos Thread-Safe¶
Los siguientes tipos son seguros para uso concurrente:
// Scanner es thread-safe para llamadas a Scan()
// PERO no se debe modificar Config durante un escaneo
type Scanner struct {
config Config // inmutable despues de construccion
// ... campos internos
}
// Result es inmutable una vez retornado
type Result struct {
Findings []types.Finding
Summary *types.Summary
// ...
}
// Baseline es thread-safe para lecturas concurrentes
// Las modificaciones requieren sincronizacion externa
type Baseline struct {
// ...
}
Garantias de Seguridad¶
-
Scanner.Scan() - Thread-safe. Multiples goroutines pueden llamar a
Scan()en la misma instancia de Scanner. -
Result - Los resultados retornados son inmutables. Cada llamada a
Scan()retorna un nuevoResult. -
GenerateReport() - Thread-safe. No modifica el Result.
-
Baseline - Las operaciones de lectura son thread-safe. Las escrituras requieren sincronizacion.
Ejemplo de Uso Concurrente Seguro¶
package main
import (
"context"
"fmt"
"sync"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
func main() {
// Una sola instancia de scanner para multiples escaneos
cfg := scanner.DefaultConfig()
s := scanner.New(cfg)
paths := []string{
"./project-a",
"./project-b",
"./project-c",
}
var wg sync.WaitGroup
results := make(chan *scanner.Result, len(paths))
for _, path := range paths {
wg.Add(1)
go func(p string) {
defer wg.Done()
// Cada goroutine puede llamar a Scan() de forma segura
result, err := s.Scan(context.Background(), p)
if err != nil {
fmt.Printf("Error en %s: %v\n", p, err)
return
}
results <- result
}(path)
}
go func() {
wg.Wait()
close(results)
}()
for result := range results {
fmt.Printf("Hallazgos: %d\n", len(result.Findings))
}
}
Patron Incorrecto (NO HACER)¶
// INCORRECTO: Modificar config durante escaneos
func badExample() {
cfg := scanner.DefaultConfig()
s := scanner.New(cfg)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
// PELIGRO: Modificar config mientras otro escaneo esta activo
cfg.Mode = scanner.ModeDeep // NO HACER ESTO
s.Scan(context.Background(), "./src1")
}()
go func() {
defer wg.Done()
s.Scan(context.Background(), "./src2")
}()
wg.Wait()
}
Patrones de Uso Concurrente¶
Pool de Scanners¶
Para escaneos con diferentes configuraciones, usa un pool de scanners:
package main
import (
"context"
"sync"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
// ScannerPool gestiona multiples instancias de scanner
type ScannerPool struct {
scanners chan *scanner.Scanner
}
func NewScannerPool(size int, cfg scanner.Config) *ScannerPool {
pool := &ScannerPool{
scanners: make(chan *scanner.Scanner, size),
}
for i := 0; i < size; i++ {
pool.scanners <- scanner.New(cfg)
}
return pool
}
func (p *ScannerPool) Acquire() *scanner.Scanner {
return <-p.scanners
}
func (p *ScannerPool) Release(s *scanner.Scanner) {
p.scanners <- s
}
func (p *ScannerPool) Scan(ctx context.Context, path string) (*scanner.Result, error) {
s := p.Acquire()
defer p.Release(s)
return s.Scan(ctx, path)
}
func main() {
cfg := scanner.DefaultConfig()
pool := NewScannerPool(4, cfg)
paths := []string{"./p1", "./p2", "./p3", "./p4", "./p5", "./p6"}
var wg sync.WaitGroup
for _, path := range paths {
wg.Add(1)
go func(p string) {
defer wg.Done()
result, _ := pool.Scan(context.Background(), p)
if result != nil {
println(p, len(result.Findings))
}
}(path)
}
wg.Wait()
}
Sincronizacion de Baseline¶
package main
import (
"context"
"sync"
"github.com/mcphub/mcp-scan/pkg/scanner"
"github.com/mcphub/mcp-scan/internal/types"
)
// ThreadSafeBaseline envuelve un Baseline con sincronizacion
type ThreadSafeBaseline struct {
mu sync.RWMutex
baseline *scanner.Baseline
}
func NewThreadSafeBaseline() *ThreadSafeBaseline {
return &ThreadSafeBaseline{
baseline: scanner.NewBaseline(),
}
}
func (b *ThreadSafeBaseline) Contains(finding types.Finding) bool {
b.mu.RLock()
defer b.mu.RUnlock()
return b.baseline.Contains(finding)
}
func (b *ThreadSafeBaseline) AddFinding(finding types.Finding, reason, acceptedBy string) {
b.mu.Lock()
defer b.mu.Unlock()
b.baseline.AddFinding(finding, reason, acceptedBy)
}
func (b *ThreadSafeBaseline) Filter(findings []types.Finding) ([]types.Finding, int) {
b.mu.RLock()
defer b.mu.RUnlock()
return b.baseline.Filter(findings)
}
func main() {
cfg := scanner.DefaultConfig()
s := scanner.New(cfg)
safeBaseline := NewThreadSafeBaseline()
var wg sync.WaitGroup
paths := []string{"./p1", "./p2", "./p3"}
for _, path := range paths {
wg.Add(1)
go func(p string) {
defer wg.Done()
result, err := s.Scan(context.Background(), p)
if err != nil {
return
}
// Filtrar con baseline de forma thread-safe
filtered, _ := safeBaseline.Filter(result.Findings)
// Procesar hallazgos filtrados
for _, f := range filtered {
println(f.RuleID)
}
}(path)
}
wg.Wait()
}
Fan-Out/Fan-In¶
package main
import (
"context"
"fmt"
"sync"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
func fanOutFanIn(paths []string, workers int) []*scanner.Result {
cfg := scanner.DefaultConfig()
// Canal de entrada
jobs := make(chan string, len(paths))
// Canal de salida
results := make(chan *scanner.Result, len(paths))
// Workers
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
// Cada worker tiene su propio scanner
s := scanner.New(cfg)
for path := range jobs {
result, err := s.Scan(context.Background(), path)
if err != nil {
fmt.Printf("Worker %d: error en %s: %v\n", workerID, path, err)
continue
}
results <- result
}
}(i)
}
// Enviar trabajos
go func() {
for _, path := range paths {
jobs <- path
}
close(jobs)
}()
// Esperar y cerrar resultados
go func() {
wg.Wait()
close(results)
}()
// Recolectar resultados
var allResults []*scanner.Result
for result := range results {
allResults = append(allResults, result)
}
return allResults
}
func main() {
paths := []string{
"./service-1",
"./service-2",
"./service-3",
"./service-4",
"./service-5",
}
results := fanOutFanIn(paths, 3)
totalFindings := 0
for _, r := range results {
totalFindings += len(r.Findings)
}
fmt.Printf("Total hallazgos: %d en %d proyectos\n",
totalFindings, len(results))
}
Control de Recursos¶
Limitar Workers¶
package main
import (
"context"
"runtime"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
func main() {
cfg := scanner.DefaultConfig()
// Limitar workers segun la capacidad del sistema
maxWorkers := runtime.NumCPU()
if maxWorkers > 8 {
maxWorkers = 8 // Cap maximo
}
cfg.Workers = maxWorkers
// O usar valor fijo para entornos restringidos
// cfg.Workers = 2 // Para contenedores con recursos limitados
s := scanner.New(cfg)
result, _ := s.Scan(context.Background(), "./src")
println("Hallazgos:", len(result.Findings))
}
Limitar Memoria¶
Para controlar el uso de memoria en escaneos grandes:
package main
import (
"context"
"fmt"
"runtime"
"time"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
func scanWithMemoryLimit(path string, memoryLimitMB int) (*scanner.Result, error) {
// Monitorear uso de memoria
done := make(chan struct{})
defer close(done)
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
var m runtime.MemStats
runtime.ReadMemStats(&m)
allocMB := m.Alloc / 1024 / 1024
if int(allocMB) > memoryLimitMB {
fmt.Printf("ADVERTENCIA: Uso de memoria alto: %d MB\n", allocMB)
}
}
}
}()
cfg := scanner.DefaultConfig()
// Menos workers = menos memoria
cfg.Workers = 2
s := scanner.New(cfg)
return s.Scan(context.Background(), path)
}
func main() {
result, err := scanWithMemoryLimit("./src", 500) // 500 MB limite
if err != nil {
panic(err)
}
fmt.Printf("Hallazgos: %d\n", len(result.Findings))
}
Semaforo para Limitar Concurrencia¶
package main
import (
"context"
"sync"
"github.com/mcphub/mcp-scan/pkg/scanner"
"golang.org/x/sync/semaphore"
)
func main() {
cfg := scanner.DefaultConfig()
// Maximo 3 escaneos simultaneos
sem := semaphore.NewWeighted(3)
paths := []string{
"./project-1", "./project-2", "./project-3",
"./project-4", "./project-5", "./project-6",
}
var wg sync.WaitGroup
results := make([]*scanner.Result, len(paths))
for i, path := range paths {
wg.Add(1)
go func(idx int, p string) {
defer wg.Done()
// Adquirir permiso
if err := sem.Acquire(context.Background(), 1); err != nil {
return
}
defer sem.Release(1)
s := scanner.New(cfg)
result, err := s.Scan(context.Background(), p)
if err != nil {
return
}
results[idx] = result
}(i, path)
}
wg.Wait()
for i, r := range results {
if r != nil {
println(paths[i], ":", len(r.Findings), "hallazgos")
}
}
}
Mejores Practicas¶
1. Una Instancia de Scanner por Configuracion¶
// CORRECTO: Reutilizar scanner para misma configuracion
cfg := scanner.DefaultConfig()
s := scanner.New(cfg)
// Multiples escaneos con el mismo scanner
result1, _ := s.Scan(ctx, "./path1")
result2, _ := s.Scan(ctx, "./path2")
2. No Modificar Config Despues de Crear Scanner¶
// CORRECTO
cfg := scanner.DefaultConfig()
cfg.Mode = scanner.ModeDeep // Modificar antes
s := scanner.New(cfg)
// INCORRECTO
s := scanner.New(cfg)
cfg.Mode = scanner.ModeDeep // NO modificar despues
3. Usar Contextos para Cancelacion¶
// CORRECTO: Contexto con timeout por escaneo
func scanWithTimeout(s *scanner.Scanner, path string, timeout time.Duration) (*scanner.Result, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return s.Scan(ctx, path)
}
4. Proteger Datos Compartidos¶
// CORRECTO: Proteger acceso a datos compartidos
var (
mu sync.Mutex
allResults []*scanner.Result
)
// En goroutine
mu.Lock()
allResults = append(allResults, result)
mu.Unlock()
5. Limitar Paralelismo en Contenedores¶
// En entornos con recursos limitados
func getWorkerCount() int {
// Respetar limites de CPU del contenedor
cpuQuota := runtime.NumCPU()
// No sobrepasar 4 workers en contenedores
if cpuQuota > 4 {
return 4
}
return cpuQuota
}
6. Manejar Errores de Forma Atomica¶
// CORRECTO: Canal de errores atomico
errCh := make(chan error, 1)
go func() {
_, err := s.Scan(ctx, path)
if err != nil {
select {
case errCh <- err: // Solo el primer error
default:
}
}
}()
7. Evitar Data Races¶
// Usar go test -race para detectar data races
// go test -race ./...
// INCORRECTO: Acceso no sincronizado
var totalFindings int
for result := range results {
totalFindings += len(result.Findings) // Data race!
}
// CORRECTO: Usar atomic o mutex
var totalFindings int64
for result := range results {
atomic.AddInt64(&totalFindings, int64(len(result.Findings)))
}