Usage Patterns - mcp-scan¶
This guide describes common patterns for integrating mcp-scan in different development and operations scenarios.
Table of Contents¶
- CI/CD Integration
- Pre-commit Hooks
- Custom Reporters
- Slack Integration
- Teams Integration
- Metrics Dashboard
- Certification Pipeline
CI/CD Integration¶
GitHub Actions¶
# .github/workflows/security-scan.yml
name: Security Scan
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
mcp-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: '1.24'
- name: Run MCP Scan
run: |
go run ./tools/scan/main.go \
--fail-on high \
--output sarif \
--output-file results.sarif \
./src
- name: Upload SARIF
if: always()
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: results.sarif
Scanning tool for CI:
// tools/scan/main.go
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"time"
"github.com/mcphub/mcp-scan/pkg/scanner"
"github.com/mcphub/mcp-scan/internal/types"
)
func main() {
failOn := flag.String("fail-on", "", "Minimum severity to fail")
output := flag.String("output", "json", "Output format (json, sarif)")
outputFile := flag.String("output-file", "", "Output file")
baseline := flag.String("baseline", "", "Baseline file")
flag.Parse()
if flag.NArg() < 1 {
log.Fatal("Usage: scan [options] <path>")
}
path := flag.Arg(0)
cfg := scanner.DefaultConfig()
cfg.Mode = scanner.ModeFast
cfg.Timeout = 10 * time.Minute
if *failOn != "" {
cfg.FailOn = types.SeverityFromString(*failOn)
}
if *baseline != "" {
cfg.Baseline = *baseline
}
s := scanner.New(cfg)
ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
defer cancel()
result, err := s.Scan(ctx, path)
if err != nil {
log.Fatalf("Scan error: %v", err)
}
report, err := s.GenerateReport(result, *output)
if err != nil {
log.Fatalf("Report generation error: %v", err)
}
if *outputFile != "" {
if err := os.WriteFile(*outputFile, report, 0644); err != nil {
log.Fatalf("Error writing file: %v", err)
}
fmt.Printf("Report saved to %s\n", *outputFile)
} else {
fmt.Println(string(report))
}
// Summary
fmt.Fprintf(os.Stderr, "\n=== Summary ===\n")
fmt.Fprintf(os.Stderr, "Files: %d\n", result.Manifest.TotalFiles)
fmt.Fprintf(os.Stderr, "Findings: %d\n", len(result.Findings))
fmt.Fprintf(os.Stderr, "MSSS: %.1f (Level %d)\n",
result.MSSSScore.Total, result.MSSSScore.Level)
if result.BaselinedCount > 0 {
fmt.Fprintf(os.Stderr, "Baselined: %d\n", result.BaselinedCount)
}
os.Exit(result.ExitCode)
}
GitLab CI¶
# .gitlab-ci.yml
stages:
- security
mcp-scan:
stage: security
image: golang:1.24
script:
- go run ./tools/scan/main.go
--fail-on high
--output sarif
--output-file gl-sast-report.json
./src
artifacts:
reports:
sast: gl-sast-report.json
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
- if: '$CI_COMMIT_BRANCH == "main"'
Jenkins Pipeline¶
// Jenkinsfile
pipeline {
agent any
stages {
stage('Security Scan') {
steps {
sh '''
go run ./tools/scan/main.go \
--fail-on high \
--output json \
--output-file scan-results.json \
./src
'''
}
post {
always {
archiveArtifacts artifacts: 'scan-results.json'
}
}
}
}
}
Pre-commit Hooks¶
Configuration with pre-commit¶
# .pre-commit-config.yaml
repos:
- repo: local
hooks:
- id: mcp-scan
name: MCP Security Scan
entry: go run ./tools/scan/main.go --fail-on high ./
language: system
types: [python, javascript, typescript]
pass_filenames: false
Native Git Hook¶
#!/bin/bash
# .git/hooks/pre-commit
echo "Running mcp-scan..."
# Get staged files
STAGED_FILES=$(git diff --cached --name-only --diff-filter=ACM | grep -E '\.(py|ts|js)$')
if [ -z "$STAGED_FILES" ]; then
echo "No relevant files to scan"
exit 0
fi
# Create temporary directory
TMP_DIR=$(mktemp -d)
trap "rm -rf $TMP_DIR" EXIT
# Copy staged files
for file in $STAGED_FILES; do
mkdir -p "$TMP_DIR/$(dirname $file)"
git show ":$file" > "$TMP_DIR/$file"
done
# Run scan
go run ./tools/scan/main.go --fail-on high "$TMP_DIR"
EXIT_CODE=$?
if [ $EXIT_CODE -ne 0 ]; then
echo ""
echo "BLOCKED: High severity findings found"
echo "Review the findings before committing"
exit 1
fi
echo "mcp-scan: OK"
exit 0
Pre-commit Tool in Go¶
// tools/precommit/main.go
package main
import (
"context"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/mcphub/mcp-scan/pkg/scanner"
"github.com/mcphub/mcp-scan/internal/types"
)
func getStagedFiles() ([]string, error) {
cmd := exec.Command("git", "diff", "--cached", "--name-only", "--diff-filter=ACM")
output, err := cmd.Output()
if err != nil {
return nil, err
}
var files []string
for _, line := range strings.Split(string(output), "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
ext := filepath.Ext(line)
if ext == ".py" || ext == ".ts" || ext == ".js" {
files = append(files, line)
}
}
return files, nil
}
func main() {
files, err := getStagedFiles()
if err != nil {
log.Fatalf("Error getting files: %v", err)
}
if len(files) == 0 {
fmt.Println("mcp-scan: No files to scan")
os.Exit(0)
}
fmt.Printf("mcp-scan: Scanning %d files...\n", len(files))
cfg := scanner.Config{
Include: []string{"**/*.py", "**/*.ts", "**/*.js"},
Exclude: []string{"**/node_modules/**", "**/venv/**"},
Mode: scanner.ModeFast,
Timeout: 1 * time.Minute,
FailOn: types.SeverityHigh,
}
s := scanner.New(cfg)
ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
defer cancel()
result, err := s.Scan(ctx, ".")
if err != nil {
log.Fatalf("Scan error: %v", err)
}
// Filter findings only in staged files
stagedSet := make(map[string]bool)
for _, f := range files {
absPath, _ := filepath.Abs(f)
stagedSet[absPath] = true
}
var relevantFindings []types.Finding
for _, f := range result.Findings {
absPath, _ := filepath.Abs(f.Location.File)
if stagedSet[absPath] {
relevantFindings = append(relevantFindings, f)
}
}
if len(relevantFindings) == 0 {
fmt.Println("mcp-scan: OK - No findings found")
os.Exit(0)
}
// Check if there are high/critical findings
hasBlocking := false
for _, f := range relevantFindings {
if f.Severity == types.SeverityHigh || f.Severity == types.SeverityCritical {
hasBlocking = true
break
}
}
fmt.Printf("\nmcp-scan: %d findings found:\n", len(relevantFindings))
for _, f := range relevantFindings {
fmt.Printf(" [%s] %s - %s:%d\n",
f.Severity, f.RuleID, f.Location.File, f.Location.StartLine)
}
if hasBlocking {
fmt.Println("\nBLOCKED: Resolve high/critical findings before committing")
os.Exit(1)
}
fmt.Println("\nWARNING: There are lower severity findings")
os.Exit(0)
}
Custom Reporters¶
Custom JSON Reporter¶
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"
"github.com/mcphub/mcp-scan/pkg/scanner"
"github.com/mcphub/mcp-scan/internal/types"
)
// CustomReport is the custom report format
type CustomReport struct {
Timestamp time.Time `json:"timestamp"`
Project string `json:"project"`
Branch string `json:"branch"`
Commit string `json:"commit"`
Summary ReportSummary `json:"summary"`
Findings []ReportFinding `json:"findings"`
Compliance ComplianceStatus `json:"compliance"`
}
type ReportSummary struct {
TotalFiles int `json:"total_files"`
TotalFindings int `json:"total_findings"`
BySeverity map[string]int `json:"by_severity"`
Duration string `json:"duration"`
}
type ReportFinding struct {
ID string `json:"id"`
Rule string `json:"rule"`
Severity string `json:"severity"`
File string `json:"file"`
Line int `json:"line"`
Title string `json:"title"`
Description string `json:"description"`
Tool string `json:"tool,omitempty"`
}
type ComplianceStatus struct {
Score float64 `json:"score"`
Level int `json:"level"`
LevelName string `json:"level_name"`
Compliant bool `json:"compliant"`
}
func generateCustomReport(result *scanner.Result, project, branch, commit string) *CustomReport {
levelNames := []string{"Non-Compliant", "Basic", "Enterprise", "Certified"}
report := &CustomReport{
Timestamp: time.Now().UTC(),
Project: project,
Branch: branch,
Commit: commit,
Summary: ReportSummary{
TotalFiles: result.Manifest.TotalFiles,
TotalFindings: len(result.Findings),
BySeverity: result.Summary.BySeverity,
Duration: result.ScanDuration.String(),
},
Compliance: ComplianceStatus{
Score: result.MSSSScore.Total,
Level: result.MSSSScore.Level,
LevelName: levelNames[result.MSSSScore.Level],
Compliant: result.MSSSScore.Compliant,
},
}
for _, f := range result.Findings {
rf := ReportFinding{
ID: f.ID,
Rule: f.RuleID,
Severity: string(f.Severity),
File: f.Location.File,
Line: f.Location.StartLine,
Title: f.Title,
Description: f.Description,
}
if f.MCPContext != nil {
rf.Tool = f.MCPContext.ToolName
}
report.Findings = append(report.Findings, rf)
}
return report
}
func main() {
s := scanner.New(scanner.DefaultConfig())
result, err := s.Scan(context.Background(), "./src")
if err != nil {
log.Fatal(err)
}
// Get project metadata
project := os.Getenv("PROJECT_NAME")
branch := os.Getenv("GIT_BRANCH")
commit := os.Getenv("GIT_COMMIT")
report := generateCustomReport(result, project, branch, commit)
output, _ := json.MarshalIndent(report, "", " ")
fmt.Println(string(output))
}
HTML Reporter¶
package main
import (
"context"
"html/template"
"log"
"os"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
const htmlTemplate = `<!DOCTYPE html>
<html>
<head>
<title>MCP Security Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.summary { background: #f5f5f5; padding: 20px; margin-bottom: 20px; }
.finding { border: 1px solid #ddd; padding: 15px; margin: 10px 0; }
.critical { border-left: 4px solid #d32f2f; }
.high { border-left: 4px solid #f57c00; }
.medium { border-left: 4px solid #fbc02d; }
.low { border-left: 4px solid #388e3c; }
.severity { font-weight: bold; text-transform: uppercase; }
.score { font-size: 48px; font-weight: bold; }
.level-0 { color: #d32f2f; }
.level-1 { color: #f57c00; }
.level-2 { color: #1976d2; }
.level-3 { color: #388e3c; }
</style>
</head>
<body>
<h1>MCP Security Report</h1>
<div class="summary">
<h2>Summary</h2>
<p>Files scanned: {{.Manifest.TotalFiles}}</p>
<p>Total findings: {{len .Findings}}</p>
<p>Duration: {{.ScanDuration}}</p>
<h3>MSSS Compliance</h3>
<p class="score level-{{.MSSSScore.Level}}">{{printf "%.1f" .MSSSScore.Total}}/100</p>
<p>Level: {{.MSSSScore.Level}}</p>
</div>
<h2>Findings</h2>
{{range .Findings}}
<div class="finding {{.Severity}}">
<span class="severity">{{.Severity}}</span> - {{.RuleID}}
<h3>{{.Title}}</h3>
<p><strong>File:</strong> {{.Location.File}}:{{.Location.StartLine}}</p>
<p>{{.Description}}</p>
{{if .Remediation}}
<p><strong>Remediation:</strong> {{.Remediation}}</p>
{{end}}
</div>
{{end}}
</body>
</html>`
func main() {
s := scanner.New(scanner.DefaultConfig())
result, err := s.Scan(context.Background(), "./src")
if err != nil {
log.Fatal(err)
}
tmpl := template.Must(template.New("report").Parse(htmlTemplate))
f, err := os.Create("report.html")
if err != nil {
log.Fatal(err)
}
defer f.Close()
if err := tmpl.Execute(f, result); err != nil {
log.Fatal(err)
}
log.Println("HTML report generated at report.html")
}
Slack Integration¶
Slack Notifications¶
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"time"
"github.com/mcphub/mcp-scan/pkg/scanner"
"github.com/mcphub/mcp-scan/internal/types"
)
type SlackMessage struct {
Text string `json:"text,omitempty"`
Blocks []SlackBlock `json:"blocks,omitempty"`
Attachments []Attachment `json:"attachments,omitempty"`
}
type SlackBlock struct {
Type string `json:"type"`
Text *BlockText `json:"text,omitempty"`
Fields []BlockText `json:"fields,omitempty"`
}
type BlockText struct {
Type string `json:"type"`
Text string `json:"text"`
}
type Attachment struct {
Color string `json:"color"`
Text string `json:"text"`
}
func sendSlackNotification(webhookURL string, result *scanner.Result, projectName string) error {
// Determine color based on level
colors := []string{"#d32f2f", "#f57c00", "#1976d2", "#388e3c"}
color := colors[result.MSSSScore.Level]
// Count by severity
criticals := result.Summary.BySeverity["critical"]
highs := result.Summary.BySeverity["high"]
// Build message
msg := SlackMessage{
Blocks: []SlackBlock{
{
Type: "header",
Text: &BlockText{
Type: "plain_text",
Text: fmt.Sprintf("Security Scan: %s", projectName),
},
},
{
Type: "section",
Fields: []BlockText{
{Type: "mrkdwn", Text: fmt.Sprintf("*Files:*\n%d", result.Manifest.TotalFiles)},
{Type: "mrkdwn", Text: fmt.Sprintf("*Findings:*\n%d", len(result.Findings))},
{Type: "mrkdwn", Text: fmt.Sprintf("*MSSS Score:*\n%.1f", result.MSSSScore.Total)},
{Type: "mrkdwn", Text: fmt.Sprintf("*Level:*\n%d", result.MSSSScore.Level)},
},
},
},
}
// Add alert if there are criticals/highs
if criticals > 0 || highs > 0 {
msg.Attachments = append(msg.Attachments, Attachment{
Color: color,
Text: fmt.Sprintf("ALERT: %d critical, %d high", criticals, highs),
})
}
payload, _ := json.Marshal(msg)
resp, err := http.Post(webhookURL, "application/json", bytes.NewReader(payload))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("slack webhook error: %s", resp.Status)
}
return nil
}
func main() {
webhookURL := os.Getenv("SLACK_WEBHOOK_URL")
if webhookURL == "" {
log.Fatal("SLACK_WEBHOOK_URL not configured")
}
projectName := os.Getenv("PROJECT_NAME")
if projectName == "" {
projectName = "MCP Server"
}
s := scanner.New(scanner.DefaultConfig())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
result, err := s.Scan(ctx, "./src")
if err != nil {
log.Fatal(err)
}
if err := sendSlackNotification(webhookURL, result, projectName); err != nil {
log.Fatalf("Error sending to Slack: %v", err)
}
log.Println("Notification sent to Slack")
}
Teams Integration¶
Microsoft Teams Notifications¶
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"time"
"github.com/mcphub/mcp-scan/pkg/scanner"
)
type TeamsMessage struct {
Type string `json:"@type"`
Context string `json:"@context"`
Summary string `json:"summary"`
ThemeColor string `json:"themeColor"`
Title string `json:"title"`
Sections []TeamsSection `json:"sections"`
}
type TeamsSection struct {
ActivityTitle string `json:"activityTitle,omitempty"`
Facts []TeamsFact `json:"facts,omitempty"`
Text string `json:"text,omitempty"`
}
type TeamsFact struct {
Name string `json:"name"`
Value string `json:"value"`
}
func sendTeamsNotification(webhookURL string, result *scanner.Result, projectName string) error {
// Color based on level
colors := []string{"d32f2f", "f57c00", "1976d2", "388e3c"}
themeColor := colors[result.MSSSScore.Level]
levelNames := []string{"Non-Compliant", "Basic", "Enterprise", "Certified"}
msg := TeamsMessage{
Type: "MessageCard",
Context: "http://schema.org/extensions",
Summary: fmt.Sprintf("Security scan: %s", projectName),
ThemeColor: themeColor,
Title: fmt.Sprintf("Scan Results: %s", projectName),
Sections: []TeamsSection{
{
Facts: []TeamsFact{
{Name: "Files", Value: fmt.Sprintf("%d", result.Manifest.TotalFiles)},
{Name: "Findings", Value: fmt.Sprintf("%d", len(result.Findings))},
{Name: "MSSS Score", Value: fmt.Sprintf("%.1f/100", result.MSSSScore.Total)},
{Name: "Level", Value: levelNames[result.MSSSScore.Level]},
{Name: "Duration", Value: result.ScanDuration.String()},
},
},
},
}
// Add section for critical findings
criticals := result.Summary.BySeverity["critical"]
highs := result.Summary.BySeverity["high"]
if criticals > 0 || highs > 0 {
msg.Sections = append(msg.Sections, TeamsSection{
ActivityTitle: "High Severity Findings",
Text: fmt.Sprintf("**Critical:** %d | **High:** %d",
criticals, highs),
})
}
payload, _ := json.Marshal(msg)
resp, err := http.Post(webhookURL, "application/json", bytes.NewReader(payload))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("teams webhook error: %s", resp.Status)
}
return nil
}
func main() {
webhookURL := os.Getenv("TEAMS_WEBHOOK_URL")
if webhookURL == "" {
log.Fatal("TEAMS_WEBHOOK_URL not configured")
}
projectName := os.Getenv("PROJECT_NAME")
if projectName == "" {
projectName = "MCP Server"
}
s := scanner.New(scanner.DefaultConfig())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
result, err := s.Scan(ctx, "./src")
if err != nil {
log.Fatal(err)
}
if err := sendTeamsNotification(webhookURL, result, projectName); err != nil {
log.Fatalf("Error sending to Teams: %v", err)
}
log.Println("Notification sent to Teams")
}
Metrics Dashboard¶
Export Metrics to Prometheus¶
package main
import (
"context"
"log"
"net/http"
"time"
"github.com/mcphub/mcp-scan/pkg/scanner"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
scanDuration = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "mcp_scan_duration_seconds",
Help: "Scan duration in seconds",
},
[]string{"project"},
)
findingsTotal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "mcp_scan_findings_total",
Help: "Total number of findings",
},
[]string{"project", "severity"},
)
msssScore = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "mcp_scan_msss_score",
Help: "MSSS score",
},
[]string{"project"},
)
msssLevel = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "mcp_scan_msss_level",
Help: "MSSS compliance level",
},
[]string{"project"},
)
filesScanned = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "mcp_scan_files_total",
Help: "Number of files scanned",
},
[]string{"project"},
)
)
func init() {
prometheus.MustRegister(scanDuration)
prometheus.MustRegister(findingsTotal)
prometheus.MustRegister(msssScore)
prometheus.MustRegister(msssLevel)
prometheus.MustRegister(filesScanned)
}
func runScanAndExport(projectName, path string) error {
s := scanner.New(scanner.DefaultConfig())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
result, err := s.Scan(ctx, path)
if err != nil {
return err
}
// Export metrics
scanDuration.WithLabelValues(projectName).Set(result.ScanDuration.Seconds())
msssScore.WithLabelValues(projectName).Set(result.MSSSScore.Total)
msssLevel.WithLabelValues(projectName).Set(float64(result.MSSSScore.Level))
filesScanned.WithLabelValues(projectName).Set(float64(result.Manifest.TotalFiles))
// Findings by severity
severities := []string{"info", "low", "medium", "high", "critical"}
for _, sev := range severities {
count := result.Summary.BySeverity[sev]
findingsTotal.WithLabelValues(projectName, sev).Set(float64(count))
}
return nil
}
func main() {
// Initial scan
if err := runScanAndExport("my-mcp-server", "./src"); err != nil {
log.Printf("Scan error: %v", err)
}
// Metrics endpoint
http.Handle("/metrics", promhttp.Handler())
log.Println("Metrics server at :9090/metrics")
log.Fatal(http.ListenAndServe(":9090", nil))
}
Store in InfluxDB¶
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/mcphub/mcp-scan/pkg/scanner"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)
func main() {
// Configure InfluxDB client
client := influxdb2.NewClient(
"http://localhost:8086",
"my-token",
)
defer client.Close()
writeAPI := client.WriteAPIBlocking("my-org", "mcp-scan")
// Execute scan
s := scanner.New(scanner.DefaultConfig())
result, err := s.Scan(context.Background(), "./src")
if err != nil {
log.Fatal(err)
}
// Create data point
tags := map[string]string{
"project": "my-mcp-server",
"branch": "main",
}
fields := map[string]interface{}{
"files": result.Manifest.TotalFiles,
"findings": len(result.Findings),
"msss_score": result.MSSSScore.Total,
"msss_level": result.MSSSScore.Level,
"duration_ms": result.ScanDuration.Milliseconds(),
"critical": result.Summary.BySeverity["critical"],
"high": result.Summary.BySeverity["high"],
"medium": result.Summary.BySeverity["medium"],
"low": result.Summary.BySeverity["low"],
}
point := influxdb2.NewPoint("scan_results", tags, fields, time.Now())
if err := writeAPI.WritePoint(context.Background(), point); err != nil {
log.Fatalf("Error writing to InfluxDB: %v", err)
}
fmt.Println("Metrics exported to InfluxDB")
}
Certification Pipeline¶
Complete Certification Pipeline¶
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"
"github.com/mcphub/mcp-scan/pkg/scanner"
"github.com/mcphub/mcp-scan/internal/types"
)
// CertificationResult is the certification pipeline result
type CertificationResult struct {
Timestamp time.Time `json:"timestamp"`
Project string `json:"project"`
Version string `json:"version"`
Passed bool `json:"passed"`
Level int `json:"level"`
Score float64 `json:"score"`
Criteria []Check `json:"criteria"`
Findings int `json:"total_findings"`
BlockingCount int `json:"blocking_findings"`
}
type Check struct {
Name string `json:"name"`
Passed bool `json:"passed"`
Details string `json:"details"`
}
func runCertificationPipeline(path, project, version string, requiredLevel int) *CertificationResult {
result := &CertificationResult{
Timestamp: time.Now().UTC(),
Project: project,
Version: version,
Criteria: []Check{},
}
// Configuration for certification (deep mode)
cfg := scanner.Config{
Include: []string{"**/*.py", "**/*.ts", "**/*.js"},
Exclude: []string{
"**/node_modules/**",
"**/venv/**",
"**/test/**",
"**/tests/**",
},
Mode: scanner.ModeDeep,
Timeout: 30 * time.Minute,
}
s := scanner.New(cfg)
ctx, cancel := context.WithTimeout(context.Background(), cfg.Timeout)
defer cancel()
scanResult, err := s.Scan(ctx, path)
if err != nil {
result.Criteria = append(result.Criteria, Check{
Name: "Scan completed",
Passed: false,
Details: fmt.Sprintf("Error: %v", err),
})
return result
}
result.Score = scanResult.MSSSScore.Total
result.Level = scanResult.MSSSScore.Level
result.Findings = len(scanResult.Findings)
// Check 1: Scan completed
result.Criteria = append(result.Criteria, Check{
Name: "Scan completed",
Passed: true,
Details: fmt.Sprintf("%d files in %v", scanResult.Manifest.TotalFiles, scanResult.ScanDuration),
})
// Check 2: No critical findings
criticalCount := scanResult.Summary.BySeverity["critical"]
result.Criteria = append(result.Criteria, Check{
Name: "No critical findings",
Passed: criticalCount == 0,
Details: fmt.Sprintf("%d critical findings", criticalCount),
})
// Check 3: Required MSSS level
result.Criteria = append(result.Criteria, Check{
Name: fmt.Sprintf("MSSS Level >= %d", requiredLevel),
Passed: scanResult.MSSSScore.Level >= requiredLevel,
Details: fmt.Sprintf("Current level: %d, Score: %.1f", scanResult.MSSSScore.Level, scanResult.MSSSScore.Total),
})
// Check 4: Maximum high findings by level
highCount := scanResult.Summary.BySeverity["high"]
maxHighByLevel := map[int]int{1: 3, 2: 0, 3: 0}
maxHigh := maxHighByLevel[requiredLevel]
result.Criteria = append(result.Criteria, Check{
Name: fmt.Sprintf("High findings <= %d", maxHigh),
Passed: highCount <= maxHigh,
Details: fmt.Sprintf("%d high findings", highCount),
})
// Check 5: MCP surface documented
hasTools := len(scanResult.MCPSurface.Tools) > 0
result.Criteria = append(result.Criteria, Check{
Name: "MCP surface detected",
Passed: hasTools,
Details: fmt.Sprintf("%d tools, %d resources, %d prompts",
len(scanResult.MCPSurface.Tools),
len(scanResult.MCPSurface.Resources),
len(scanResult.MCPSurface.Prompts)),
})
// Count blocking findings
for _, f := range scanResult.Findings {
if f.Severity == types.SeverityCritical || f.Severity == types.SeverityHigh {
result.BlockingCount++
}
}
// Determine if passed
result.Passed = true
for _, check := range result.Criteria {
if !check.Passed {
result.Passed = false
break
}
}
return result
}
func main() {
path := os.Getenv("SCAN_PATH")
if path == "" {
path = "./src"
}
project := os.Getenv("PROJECT_NAME")
version := os.Getenv("PROJECT_VERSION")
requiredLevel := 2 // Enterprise
result := runCertificationPipeline(path, project, version, requiredLevel)
// Output JSON
output, _ := json.MarshalIndent(result, "", " ")
fmt.Println(string(output))
// Save result
os.WriteFile("certification-result.json", output, 0644)
// Exit code
if result.Passed {
fmt.Fprintf(os.Stderr, "\nCERTIFICATION PASSED: Level %d\n", result.Level)
os.Exit(0)
} else {
fmt.Fprintf(os.Stderr, "\nCERTIFICATION FAILED\n")
for _, check := range result.Criteria {
status := "OK"
if !check.Passed {
status = "FAILED"
}
fmt.Fprintf(os.Stderr, " [%s] %s: %s\n", status, check.Name, check.Details)
}
os.Exit(1)
}
}