Use Cases & Implementation Examples
Real-world scenarios and detailed implementation guides for RAKṢĀ integration across different organizational contexts.
Use Case 1: Pre-commit Security Gate
Scenario
Development team wants to catch security issues before code reaches the main branch. Developers get immediate feedback on security problems in their local environment.
Implementation
Setup Script
Create scripts/setup-security-hooks.sh:
#!/bin/bash
# Setup RAKṢĀ pre-commit security scanning
set -e
RAKSHA_URL="${RAKSHA_URL:-https://raksha-449012790678.asia-southeast1.run.app}"
echo "🛡️ Setting up RAKṢĀ security hooks..."
# Test connection to RAKṢĀ
echo "Testing connection to RAKṢĀ..."
if ! curl -s "$RAKSHA_URL/health" > /dev/null; then
echo "❌ Cannot connect to RAKṢĀ at $RAKSHA_URL"
exit 1
fi
echo "✅ Connection successful"
# Create pre-commit hook
cat > .git/hooks/pre-commit << 'EOF'
#!/bin/bash
# RAKṢĀ pre-commit security scan with smart filtering
RAKSHA_URL="https://raksha-449012790678.asia-southeast1.run.app"
SCAN_STAGED_ONLY=true
CRITICAL_BLOCK=true
HIGH_WARN_THRESHOLD=2
# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m'
echo -e "${BLUE}🛡️ RAKṢĀ Security Scan${NC}"
# Get list of staged files
if [ "$SCAN_STAGED_ONLY" = true ]; then
staged_files=$(git diff --cached --name-only --diff-filter=ACM | grep -E '\.(py|js|ts|java|php|go|rb|cpp|c|cs)$' || true)
if [ -z "$staged_files" ]; then
echo "No relevant staged files to scan"
exit 0
fi
echo "Scanning $(echo "$staged_files" | wc -l) staged files..."
# Create temporary directory with staged files
temp_dir=$(mktemp -d)
mkdir -p "$temp_dir/code"
echo "$staged_files" | while read file; do
if [ -f "$file" ]; then
mkdir -p "$temp_dir/code/$(dirname "$file")"
cp "$file" "$temp_dir/code/$file"
fi
done
# Create archive
(cd "$temp_dir/code" && zip -r "$temp_dir/scan.zip" . -q)
scan_file="$temp_dir/scan.zip"
else
# Scan entire repository
echo "Scanning entire repository..."
zip -r /tmp/full-repo-scan.zip . \
-x ".git/*" "node_modules/*" "__pycache__/*" \
"*.pyc" "*.log" ".env*" "venv/*" \
-q
scan_file="/tmp/full-repo-scan.zip"
fi
# Run scan
response=$(curl -s -X POST "$RAKSHA_URL/scan/upload" \
-F "file=@$scan_file")
# Parse results
scan_id=$(echo "$response" | jq -r '.scan_id // "unknown"')
critical=$(echo "$response" | jq -r '.findings_by_severity.critical // 0')
high=$(echo "$response" | jq -r '.findings_by_severity.high // 0')
medium=$(echo "$response" | jq -r '.findings_by_severity.medium // 0')
total=$(echo "$response" | jq -r '.total_findings // 0')
echo -e "\n📊 Results (Scan ID: $scan_id):"
echo -e " 🔴 Critical: $critical"
echo -e " 🟠 High: $high"
echo -e " 🟡 Medium: $medium"
echo -e " 📊 Total: $total"
# Show critical and high issues
if [ "$total" -gt 0 ]; then
echo -e "\n🔍 Security Issues Found:"
echo "$response" | jq -r '.findings[] | select(.severity=="critical" or .severity=="high") |
" \(if .severity=="critical" then "🔴" else "🟠" end) \(.title) in \(.file):\(.line)"'
fi
# Apply security gates
blocked=false
if [ "$CRITICAL_BLOCK" = true ] && [ "$critical" -gt 0 ]; then
echo -e "\n${RED}❌ COMMIT BLOCKED: Critical security issues must be fixed${NC}"
blocked=true
fi
if [ "$high" -gt "$HIGH_WARN_THRESHOLD" ]; then
if [ "$blocked" = false ]; then
echo -e "\n${YELLOW}⚠️ WARNING: $high high severity issues (threshold: $HIGH_WARN_THRESHOLD)${NC}"
echo -e "Proceed anyway? [y/N] "
read -r confirm
if [[ ! $confirm =~ ^[Yy]$ ]]; then
blocked=true
fi
fi
fi
# Cleanup
rm -rf "$temp_dir" 2>/dev/null || true
rm -f /tmp/full-repo-scan.zip 2>/dev/null || true
if [ "$blocked" = true ]; then
echo -e "\n${RED}Use 'git commit --no-verify' to bypass (not recommended)${NC}"
echo -e "Or view details at: $RAKSHA_URL/scan/$scan_id"
exit 1
fi
echo -e "\n${GREEN}✅ Security scan passed${NC}"
exit 0
EOF
chmod +x .git/hooks/pre-commit
echo "✅ Pre-commit hook installed"
echo "Run 'git commit' to test the security scan"
EOF
chmod +x scripts/setup-security-hooks.sh
# Run setup
./scripts/setup-security-hooks.shDeveloper Workflow Integration
Create .vscode/settings.json for VS Code users:
{
"git.enablePreCommitHook": true,
"git.alwaysSignOff": false,
"security-scan.enabled": true,
"tasks.runWithDefault": [
"RAKṢĀ Security Scan"
]
}Team Guidelines Document
Create SECURITY-WORKFLOW.md:
# Security Workflow Guidelines
## Pre-commit Security Scanning
Every commit is automatically scanned by RAKṢĀ for security vulnerabilities.
### What gets scanned?
- Only staged files (not entire repository)
- Code files: .py, .js, .ts, .java, .php, .go, .rb, .cpp, .c, .cs
- Excludes: tests, node_modules, build artifacts
### Security Gates
- **Critical issues**: Commit blocked automatically
- **High issues**: Warning shown, manual confirmation required if >2 issues
- **Medium/Low**: Informational only
### Bypass Options
```bash
# Skip security scan (emergency only)
git commit --no-verify -m "Emergency fix"
# Scan entire repo instead of staged files
SCAN_STAGED_ONLY=false git commit
# Adjust thresholds for specific commits
HIGH_WARN_THRESHOLD=5 git commitBest Practices
- Fix critical issues immediately
- Address high issues before pushing
- Review scan results link for detailed remediation
- Use
git add -pto stage only secure changes
### Results
- **Developer Experience**: Immediate security feedback
- **Security Coverage**: 100% of commits scanned
- **Time Impact**: ~5-10 seconds per commit
- **False Positive Rate**: <5% with tuned rules
---
## Use Case 2: Pull Request Security Gate
### Scenario
Organization requires security review for all code changes before merging to main branch. Automated blocking of PRs with security issues, with clear reporting for reviewers.
### Implementation
#### GitHub Actions Workflow
Create `.github/workflows/pr-security-gate.yml`:
```yaml
name: PR Security Gate
on:
pull_request:
branches: [main, develop]
types: [opened, synchronize, reopened]
permissions:
contents: read
pull-requests: write
security-events: write
checks: write
jobs:
security-analysis:
runs-on: ubuntu-latest
name: Security Analysis & Reporting
outputs:
scan-id: ${{ steps.scan.outputs.scan-id }}
security-status: ${{ steps.gate.outputs.status }}
critical-count: ${{ steps.scan.outputs.critical }}
high-count: ${{ steps.scan.outputs.high }}
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
# Fetch full history for accurate diff analysis
fetch-depth: 0
- name: Get changed files
id: changes
run: |
# Get files changed in this PR
git diff --name-only origin/${{ github.base_ref }}..HEAD > changed-files.txt
# Filter for security-relevant files
grep -E '\.(py|js|ts|jsx|tsx|java|php|go|rb|cpp|c|cs|php|rb|scala|kt|swift)$' changed-files.txt > security-relevant.txt || true
if [ ! -s security-relevant.txt ]; then
echo "No security-relevant files changed"
echo "has-code-changes=false" >> $GITHUB_OUTPUT
else
echo "Security-relevant files:"
cat security-relevant.txt
echo "has-code-changes=true" >> $GITHUB_OUTPUT
fi
- name: Create analysis archive
if: steps.changes.outputs.has-code-changes == 'true'
run: |
# Create focused archive of changed files plus context
mkdir -p analysis-context
# Include changed files
while IFS= read -r file; do
if [ -f "$file" ]; then
mkdir -p "analysis-context/$(dirname "$file")"
cp "$file" "analysis-context/$file"
fi
done < security-relevant.txt
# Add configuration files for context
for config in "requirements.txt" "package.json" "pom.xml" "go.mod" ".env.example" "Dockerfile"; do
if [ -f "$config" ]; then
cp "$config" analysis-context/
fi
done
# Create archive
cd analysis-context && zip -r ../pr-analysis.zip . -q
- name: Run RAKṢĀ security scan
if: steps.changes.outputs.has-code-changes == 'true'
id: scan
run: |
# Run scan with PR context
response=$(curl -s -w "\n%{http_code}" -X POST \
"${{ secrets.RAKSHA_URL || 'https://raksha-449012790678.asia-southeast1.run.app' }}/scan/upload" \
-F "file=@pr-analysis.zip")
http_code=$(echo "$response" | tail -n1)
json_response=$(echo "$response" | head -n -1)
if [ "$http_code" -ne 200 ]; then
echo "❌ Scan failed with HTTP $http_code"
echo "$json_response"
exit 1
fi
echo "Scan response: $json_response"
# Parse results
scan_id=$(echo "$json_response" | jq -r '.scan_id')
critical=$(echo "$json_response" | jq -r '.findings_by_severity.critical // 0')
high=$(echo "$json_response" | jq -r '.findings_by_severity.high // 0')
medium=$(echo "$json_response" | jq -r '.findings_by_severity.medium // 0')
low=$(echo "$json_response" | jq -r '.findings_by_severity.low // 0')
total=$(echo "$json_response" | jq -r '.total_findings // 0')
# Output for other steps
echo "scan-id=$scan_id" >> $GITHUB_OUTPUT
echo "critical=$critical" >> $GITHUB_OUTPUT
echo "high=$high" >> $GITHUB_OUTPUT
echo "medium=$medium" >> $GITHUB_OUTPUT
echo "low=$low" >> $GITHUB_OUTPUT
echo "total=$total" >> $GITHUB_OUTPUT
# Store full results
echo "$json_response" > scan-results.json
- name: Security gate decision
if: steps.changes.outputs.has-code-changes == 'true'
id: gate
run: |
critical="${{ steps.scan.outputs.critical }}"
high="${{ steps.scan.outputs.high }}"
# Define thresholds
CRITICAL_THRESHOLD=0 # Block any critical issues
HIGH_THRESHOLD=1 # Block if more than 1 high issue
status="passed"
reason=""
if [ "$critical" -gt "$CRITICAL_THRESHOLD" ]; then
status="failed"
reason="Critical security issues found: $critical"
elif [ "$high" -gt "$HIGH_THRESHOLD" ]; then
status="failed"
reason="High security issues exceed threshold: $high (max: $HIGH_THRESHOLD)"
fi
echo "status=$status" >> $GITHUB_OUTPUT
echo "reason=$reason" >> $GITHUB_OUTPUT
if [ "$status" = "failed" ]; then
echo "🚨 Security gate: FAILED"
echo "Reason: $reason"
exit 1
else
echo "✅ Security gate: PASSED"
fi
- name: Download detailed results
if: always() && steps.changes.outputs.has-code-changes == 'true'
run: |
curl -o detailed-results.json \
"${{ secrets.RAKSHA_URL || 'https://raksha-449012790678.asia-southeast1.run.app' }}/scan/${{ steps.scan.outputs.scan-id }}/export"
- name: Generate security report
if: always() && steps.changes.outputs.has-code-changes == 'true'
run: |
python3 << 'EOF'
import json
import os
# Load scan results
with open('detailed-results.json') as f:
results = json.load(f)
# Generate markdown report
report = f"""
## 🛡️ RAKṢĀ Security Analysis Report
**Scan ID:** `{results['scan_id']}`
**Files Analyzed:** {results['scanned_files']} / {results['total_files']}
**Duration:** {results.get('duration_seconds', 0):.1f}s
### 📊 Summary
| Severity | Count |
|----------|-------|
| 🔴 Critical | {results['findings_by_severity'].get('critical', 0)} |
| 🟠 High | {results['findings_by_severity'].get('high', 0)} |
| 🟡 Medium | {results['findings_by_severity'].get('medium', 0)} |
| 🔵 Low | {results['findings_by_severity'].get('low', 0)} |
| **Total** | **{results['total_findings']}** |
"""
# Add findings details
if results['total_findings'] > 0:
report += "\n### 🚨 Security Findings\n\n"
# Group by severity
by_severity = {}
for finding in results['findings']:
severity = finding['severity']
if severity not in by_severity:
by_severity[severity] = []
by_severity[severity].append(finding)
# Show critical and high issues first
for severity in ['critical', 'high', 'medium', 'low']:
if severity in by_severity:
icon = {'critical': '🔴', 'high': '🟠', 'medium': '🟡', 'low': '🔵'}[severity]
report += f"\n#### {icon} {severity.title()} Severity\n\n"
for finding in by_severity[severity][:5]: # Limit to 5 per severity
report += f"""
**{finding['title']}**
📁 `{finding['file']}:{finding['line']}`
🔍 {finding['scanner']} • {finding.get('cwe', 'N/A')}
{finding['description']}
"""
if finding.get('remediation'):
report += f"💡 **Remediation:** {finding['remediation']}\n\n"
report += "---\n\n"
if len(by_severity[severity]) > 5:
remaining = len(by_severity[severity]) - 5
report += f"*... and {remaining} more {severity} issue(s)*\n\n"
else:
report += "\n✅ **No security issues found!**\n\n"
# Add footer
report += f"""
### 📋 Actions
- 📊 [View detailed results](https://raksha-449012790678.asia-southeast1.run.app/scan/{results['scan_id']})
- 📥 [Download JSON report](https://raksha-449012790678.asia-southeast1.run.app/scan/{results['scan_id']}/export)
---
*Scanned by RAKṢĀ • [रक्षा](https://avyay.ai/raksha) - Ancient Wisdom, Modern Security*
"""
# Save report
with open('security-report.md', 'w') as f:
f.write(report)
print("Security report generated")
EOF
- name: Post security report comment
if: always() && steps.changes.outputs.has-code-changes == 'true'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
// Read security report
const report = fs.readFileSync('security-report.md', 'utf8');
// Find existing security comment
const comments = await github.rest.issues.listComments({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
});
const existingComment = comments.data.find(comment =>
comment.body.includes('🛡️ RAKṢĀ Security Analysis Report'));
if (existingComment) {
// Update existing comment
await github.rest.issues.updateComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: existingComment.id,
body: report
});
} else {
// Create new comment
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: report
});
}
- name: Update check status
if: always() && steps.changes.outputs.has-code-changes == 'true'
uses: actions/github-script@v7
with:
script: |
const status = '${{ steps.gate.outputs.status }}';
const scanId = '${{ steps.scan.outputs.scan-id }}';
const critical = '${{ steps.scan.outputs.critical }}';
const high = '${{ steps.scan.outputs.high }}';
let conclusion = status === 'passed' ? 'success' : 'failure';
let title = status === 'passed' ?
'✅ Security scan passed' :
'❌ Security issues found';
let summary = `Scan ID: ${scanId}\nCritical: ${critical}\nHigh: ${high}`;
await github.rest.checks.create({
owner: context.repo.owner,
repo: context.repo.repo,
name: 'RAKṢĀ Security Gate',
head_sha: context.payload.pull_request.head.sha,
status: 'completed',
conclusion: conclusion,
output: {
title: title,
summary: summary
}
});
- name: Upload scan artifacts
if: always() && steps.changes.outputs.has-code-changes == 'true'
uses: actions/upload-artifact@v4
with:
name: security-scan-artifacts
path: |
detailed-results.json
security-report.md
changed-files.txt
retention-days: 30Branch Protection Rules
Configure via GitHub API or web interface:
# Set branch protection with security requirements
curl -X PUT \
-H "Accept: application/vnd.github.v3+json" \
-H "Authorization: token $GITHUB_TOKEN" \
https://api.github.com/repos/OWNER/REPO/branches/main/protection \
-d '{
"required_status_checks": {
"strict": true,
"contexts": ["RAKṢĀ Security Gate"]
},
"enforce_admins": true,
"required_pull_request_reviews": {
"required_approving_review_count": 1,
"dismiss_stale_reviews": true,
"require_code_owner_reviews": true
},
"restrictions": null
}'Results
- Automated Blocking: PRs with critical issues cannot be merged
- Clear Reporting: Detailed security findings in PR comments
- Developer Guidance: Inline remediation suggestions
- Audit Trail: Complete scan history and artifacts
Use Case 3: Organization-wide Security Audit
Scenario
Security team needs to perform periodic audits across all repositories in the organization. Automated scanning of multiple repositories with consolidated reporting and trend analysis.
Implementation
Bulk Repository Scanner
Create scripts/org-security-audit.py:
#!/usr/bin/env python3
"""
Organization-wide Security Audit Script
Scans all repositories in a GitHub organization using RAKṢĀ
and generates consolidated security reports.
"""
import os
import json
import time
import requests
import argparse
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
class OrgSecurityAudit:
def __init__(self, github_token, raksha_url, org_name):
self.github_token = github_token
self.raksha_url = raksha_url.rstrip('/')
self.org_name = org_name
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'token {github_token}',
'Accept': 'application/vnd.github.v3+json'
})
self.results = []
def get_org_repositories(self, include_private=True, include_forks=False):
"""Get all repositories in the organization"""
repos = []
page = 1
while True:
url = f"https://api.github.com/orgs/{self.org_name}/repos"
params = {
'page': page,
'per_page': 100,
'type': 'all' if include_private else 'public',
'sort': 'updated'
}
response = self.session.get(url, params=params)
response.raise_for_status()
page_repos = response.json()
if not page_repos:
break
for repo in page_repos:
# Skip forks if not included
if repo['fork'] and not include_forks:
continue
# Skip archived repositories
if repo['archived']:
continue
repos.append({
'name': repo['name'],
'full_name': repo['full_name'],
'clone_url': repo['clone_url'],
'html_url': repo['html_url'],
'language': repo['language'],
'size': repo['size'],
'updated_at': repo['updated_at']
})
page += 1
return repos
def scan_repository(self, repo):
"""Scan a single repository using RAKṢĀ"""
print(f"🔍 Scanning {repo['full_name']}...")
start_time = time.time()
try:
# Scan via GitHub URL
scan_data = {
'url': f"https://github.com/{repo['full_name']}"
}
response = requests.post(
f"{self.raksha_url}/scan/github",
json=scan_data,
timeout=600 # 10 minute timeout
)
if response.status_code == 200:
result = response.json()
result['repository'] = repo
result['scan_duration'] = time.time() - start_time
result['status'] = 'success'
print(f"✅ {repo['full_name']}: {result['total_findings']} findings")
return result
else:
error_result = {
'repository': repo,
'status': 'error',
'error_code': response.status_code,
'error_message': response.text[:500],
'scan_duration': time.time() - start_time
}
print(f"❌ {repo['full_name']}: HTTP {response.status_code}")
return error_result
except Exception as e:
error_result = {
'repository': repo,
'status': 'error',
'error_message': str(e)[:500],
'scan_duration': time.time() - start_time
}
print(f"❌ {repo['full_name']}: {str(e)}")
return error_result
def run_audit(self, max_workers=5, repo_filter=None):
"""Run security audit on all repositories"""
print(f"🛡️ Starting security audit for organization: {self.org_name}")
# Get repositories
print("📋 Fetching repository list...")
repos = self.get_org_repositories()
# Apply filter if provided
if repo_filter:
repos = [r for r in repos if repo_filter.lower() in r['name'].lower()]
print(f"📊 Found {len(repos)} repositories to scan")
# Scan repositories in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_repo = {executor.submit(self.scan_repository, repo): repo
for repo in repos}
for future in as_completed(future_to_repo):
result = future.result()
self.results.append(result)
return self.results
def generate_report(self, output_dir="audit-reports"):
"""Generate comprehensive audit report"""
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
# Summary statistics
total_repos = len(self.results)
successful_scans = len([r for r in self.results if r['status'] == 'success'])
failed_scans = total_repos - successful_scans
total_findings = sum(r.get('total_findings', 0) for r in self.results)
# Severity breakdown
severity_totals = {
'critical': 0, 'high': 0, 'medium': 0, 'low': 0, 'info': 0
}
for result in self.results:
if result['status'] == 'success':
findings_by_severity = result.get('findings_by_severity', {})
for severity in severity_totals:
severity_totals[severity] += findings_by_severity.get(severity, 0)
# Generate markdown report
report_md = f"""
# Security Audit Report - {self.org_name}
**Generated:** {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
**Repositories Scanned:** {successful_scans} / {total_repos}
**Total Security Findings:** {total_findings}
## 📊 Executive Summary
### Overall Security Posture
| Severity | Count | Percentage |
|----------|-------|------------|
| 🔴 Critical | {severity_totals['critical']} | {(severity_totals['critical']/max(total_findings,1)*100):.1f}% |
| 🟠 High | {severity_totals['high']} | {(severity_totals['high']/max(total_findings,1)*100):.1f}% |
| 🟡 Medium | {severity_totals['medium']} | {(severity_totals['medium']/max(total_findings,1)*100):.1f}% |
| 🔵 Low | {severity_totals['low']} | {(severity_totals['low']/max(total_findings,1)*100):.1f}% |
| ℹ️ Info | {severity_totals['info']} | {(severity_totals['info']/max(total_findings,1)*100):.1f}% |
### Scan Results
- ✅ **Successful scans:** {successful_scans}
- ❌ **Failed scans:** {failed_scans}
- 📊 **Total findings:** {total_findings}
- 🎯 **Average findings per repo:** {(total_findings/max(successful_scans,1)):.1f}
## 🚨 High-Risk Repositories
"""
# Find repositories with critical or high issues
high_risk_repos = []
for result in self.results:
if result['status'] == 'success':
findings = result.get('findings_by_severity', {})
critical = findings.get('critical', 0)
high = findings.get('high', 0)
if critical > 0 or high > 2:
risk_score = critical * 10 + high * 3
high_risk_repos.append((result, risk_score))
# Sort by risk score
high_risk_repos.sort(key=lambda x: x[1], reverse=True)
if high_risk_repos:
report_md += "\n| Repository | Critical | High | Medium | Risk Score |\n"
report_md += "|------------|----------|------|--------|-----------|\n"
for result, risk_score in high_risk_repos[:10]: # Top 10
repo = result['repository']
findings = result.get('findings_by_severity', {})
report_md += f"| [{repo['name']}]({repo['html_url']}) | {findings.get('critical', 0)} | {findings.get('high', 0)} | {findings.get('medium', 0)} | {risk_score} |\n"
else:
report_md += "\n✅ No high-risk repositories identified!\n"
# Repository details
report_md += "\n## 📋 Repository Details\n\n"
# Sort by total findings (descending)
successful_results = [r for r in self.results if r['status'] == 'success']
successful_results.sort(key=lambda x: x.get('total_findings', 0), reverse=True)
for result in successful_results[:20]: # Top 20 most findings
repo = result['repository']
findings = result.get('findings_by_severity', {})
report_md += f"""
### {repo['name']}
**Repository:** [{repo['full_name']}]({repo['html_url']})
**Language:** {repo.get('language', 'Unknown')}
**Size:** {repo['size']} KB
**Last Updated:** {repo['updated_at'][:10]}
**Security Findings:**
- 🔴 Critical: {findings.get('critical', 0)}
- 🟠 High: {findings.get('high', 0)}
- 🟡 Medium: {findings.get('medium', 0)}
- 🔵 Low: {findings.get('low', 0)}
- **Total:** {result.get('total_findings', 0)}
**Scan ID:** `{result.get('scan_id', 'N/A')}`
---
"""
# Failed scans
failed_results = [r for r in self.results if r['status'] == 'error']
if failed_results:
report_md += "\n## ❌ Failed Scans\n\n"
for result in failed_results:
repo = result['repository']
report_md += f"- **{repo['name']}**: {result.get('error_message', 'Unknown error')}\n"
# Recommendations
report_md += f"""
## 📋 Recommendations
### Immediate Actions Required
1. **Address Critical Issues**: {severity_totals['critical']} critical vulnerabilities need immediate attention
2. **Review High-Risk Repositories**: Focus on top 10 repositories listed above
3. **Implement Security Gates**: Add RAKṢĀ scanning to CI/CD pipelines
### Medium-Term Improvements
1. **Security Training**: Provide developer training on secure coding practices
2. **Custom Rules**: Implement organization-specific security rules
3. **Regular Audits**: Schedule monthly organization-wide security scans
### Long-Term Strategy
1. **Security Culture**: Integrate security scanning into development workflow
2. **Metrics Tracking**: Monitor security improvement trends over time
3. **Policy Enforcement**: Establish security policies and enforcement mechanisms
---
*Generated by RAKṢĀ • [रक्षा](https://avyay.ai/raksha) - Ancient Wisdom, Modern Security*
"""
# Save reports
report_file = f"{output_dir}/security-audit-{self.org_name}-{timestamp}.md"
with open(report_file, 'w') as f:
f.write(report_md)
# Save raw data as JSON
json_file = f"{output_dir}/security-audit-{self.org_name}-{timestamp}.json"
with open(json_file, 'w') as f:
json.dump(self.results, f, indent=2, default=str)
print(f"📄 Report saved: {report_file}")
print(f"📊 Raw data saved: {json_file}")
return report_file, json_file
def main():
parser = argparse.ArgumentParser(description='Organization Security Audit')
parser.add_argument('org_name', help='GitHub organization name')
parser.add_argument('--github-token', required=True, help='GitHub personal access token')
parser.add_argument('--raksha-url',
default='https://raksha-449012790678.asia-southeast1.run.app',
help='RAKṢĀ scanner URL')
parser.add_argument('--max-workers', type=int, default=5,
help='Maximum concurrent scans')
parser.add_argument('--filter', help='Filter repositories by name')
parser.add_argument('--output-dir', default='audit-reports',
help='Output directory for reports')
args = parser.parse_args()
# Run audit
audit = OrgSecurityAudit(args.github_token, args.raksha_url, args.org_name)
results = audit.run_audit(max_workers=args.max_workers, repo_filter=args.filter)
# Generate report
report_file, json_file = audit.generate_report(args.output_dir)
print(f"\n🎉 Audit completed!")
print(f"📊 Scanned {len(results)} repositories")
print(f"📄 Report: {report_file}")
if __name__ == '__main__':
main()Automated Audit Scheduling
Create scripts/weekly-audit.sh:
#!/bin/bash
# Weekly automated security audit
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
DATE=$(date +%Y-%m-%d)
LOG_FILE="/var/log/raksha/weekly-audit-$DATE.log"
echo "🛡️ Starting weekly security audit - $DATE" | tee -a "$LOG_FILE"
# Load environment variables
source ~/.env.security-audit
# Run organization audit
python3 "$SCRIPT_DIR/org-security-audit.py" \
"$GITHUB_ORG" \
--github-token "$GITHUB_TOKEN" \
--raksha-url "$RAKSHA_URL" \
--max-workers 3 \
--output-dir "/opt/security-reports" \
2>&1 | tee -a "$LOG_FILE"
# Upload to S3 (optional)
if [ -n "$AWS_S3_BUCKET" ]; then
aws s3 sync /opt/security-reports "s3://$AWS_S3_BUCKET/security-audits/" \
--exclude "*" --include "security-audit-*" \
2>&1 | tee -a "$LOG_FILE"
fi
# Send notification
if [ -n "$SLACK_WEBHOOK" ]; then
report_file=$(ls -t /opt/security-reports/security-audit-*.md | head -1)
critical_count=$(grep "🔴 Critical" "$report_file" | grep -o "[0-9]\+" | head -1)
curl -X POST -H 'Content-type: application/json' \
--data "{
\"text\": \"📊 Weekly Security Audit Complete\",
\"attachments\": [{
\"color\": \"$([ "$critical_count" -gt 0 ] && echo "danger" || echo "good")\",
\"fields\": [{
\"title\": \"Critical Issues\",
\"value\": \"$critical_count\",
\"short\": true
}, {
\"title\": \"Report\",
\"value\": \"<file://$report_file|View Report>\",
\"short\": true
}]
}]
}" \
"$SLACK_WEBHOOK"
fi
echo "✅ Weekly audit completed" | tee -a "$LOG_FILE"Cron Job Setup
# Add to crontab for weekly execution
0 2 * * 1 /opt/scripts/weekly-audit.sh
# Monthly comprehensive audit
0 3 1 * * /opt/scripts/monthly-comprehensive-audit.shResults
- Comprehensive Coverage: All organization repositories scanned
- Risk Prioritization: High-risk repositories identified and prioritized
- Trend Tracking: Historical data for security improvement measurement
- Executive Reporting: Clear summaries for management decision-making
Use Case 4: AI-Assisted Code Review Integration
Scenario
Development team wants to enhance code review process with AI-powered security analysis. Integration with existing review tools to provide intelligent suggestions and security insights.
Implementation
GitHub App for Enhanced Reviews
Create github-review-app.py:
#!/usr/bin/env python3
"""
RAKṢĀ AI-Assisted Code Review GitHub App
Provides intelligent security insights during code reviews
with AI-powered remediation suggestions.
"""
import os
import hmac
import hashlib
import json
from flask import Flask, request, jsonify
import requests
from openai import OpenAI
app = Flask(__name__)
# Configuration
GITHUB_APP_ID = os.getenv('GITHUB_APP_ID')
GITHUB_PRIVATE_KEY = os.getenv('GITHUB_PRIVATE_KEY')
GITHUB_WEBHOOK_SECRET = os.getenv('GITHUB_WEBHOOK_SECRET')
RAKSHA_URL = os.getenv('RAKSHA_URL', 'https://raksha-449012790678.asia-southeast1.run.app')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
openai_client = OpenAI(api_key=OPENAI_API_KEY)
def verify_signature(payload_body, secret_token, signature_header):
"""Verify GitHub webhook signature"""
if not signature_header:
return False
hash_object = hmac.new(
secret_token.encode('utf-8'),
payload_body,
hashlib.sha256
)
expected_signature = f"sha256={hash_object.hexdigest()}"
return hmac.compare_digest(expected_signature, signature_header)
def analyze_code_with_ai(code_snippet, finding):
"""Use AI to provide detailed analysis and remediation"""
prompt = f"""
You are a senior security engineer reviewing code. A security scanner found the following issue:
**Security Finding:**
- Title: {finding['title']}
- Severity: {finding['severity']}
- Description: {finding['description']}
- File: {finding['file']}:{finding['line']}
**Code Context:**
```
{code_snippet}
```
Provide:
1. **Root Cause Analysis**: Why is this a security issue?
2. **Exploit Scenario**: How could an attacker exploit this?
3. **Remediation Steps**: Specific, actionable fix instructions
4. **Secure Code Example**: Show the corrected code
5. **Prevention**: How to avoid this pattern in future
Keep your response concise but thorough. Use markdown formatting.
"""
try:
response = openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are an expert security engineer providing code review feedback."},
{"role": "user", "content": prompt}
],
max_tokens=1500,
temperature=0.3
)
return response.choices[0].message.content
except Exception as e:
return f"AI analysis unavailable: {str(e)}"
def get_file_content(repo_full_name, file_path, ref, installation_token):
"""Get file content from GitHub"""
url = f"https://api.github.com/repos/{repo_full_name}/contents/{file_path}"
headers = {
'Authorization': f'token {installation_token}',
'Accept': 'application/vnd.github.v3+json'
}
params = {'ref': ref}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
import base64
content = response.json()
return base64.b64decode(content['content']).decode('utf-8')
return None
def create_review_comment(repo_full_name, pr_number, commit_sha, file_path, line, body, installation_token):
"""Create a review comment on GitHub PR"""
url = f"https://api.github.com/repos/{repo_full_name}/pulls/{pr_number}/comments"
headers = {
'Authorization': f'token {installation_token}',
'Accept': 'application/vnd.github.v3+json'
}
data = {
'commit_id': commit_sha,
'path': file_path,
'line': line,
'body': body
}
response = requests.post(url, headers=headers, json=data)
return response.status_code == 201
@app.route('/webhook', methods=['POST'])
def handle_webhook():
"""Handle GitHub webhook events"""
# Verify signature
signature = request.headers.get('X-Hub-Signature-256')
if not verify_signature(request.data, GITHUB_WEBHOOK_SECRET, signature):
return jsonify({'error': 'Invalid signature'}), 403
event_type = request.headers.get('X-GitHub-Event')
payload = request.json
# Handle pull request events
if event_type == 'pull_request':
action = payload['action']
if action in ['opened', 'synchronize', 'reopened']:
return handle_pr_review(payload)
return jsonify({'status': 'ignored'}), 200
def handle_pr_review(payload):
"""Handle PR review security analysis"""
pr = payload['pull_request']
repo = payload['repository']
installation = payload['installation']
# Get installation token (implement JWT token generation)
installation_token = get_installation_token(installation['id'])
# Get changed files
changed_files_url = pr['url'] + '/files'
headers = {'Authorization': f'token {installation_token}'}
response = requests.get(changed_files_url, headers=headers)
if response.status_code != 200:
return jsonify({'error': 'Failed to get changed files'}), 500
changed_files = response.json()
# Filter security-relevant files
security_files = [f for f in changed_files
if f['filename'].endswith(('.py', '.js', '.ts', '.java', '.php', '.go'))]
if not security_files:
return jsonify({'status': 'no security-relevant files'}), 200
# Create archive of changed files
archive_path = create_archive_from_changed_files(
repo['full_name'], pr['head']['sha'], security_files, installation_token
)
# Run RAKṢĀ scan
with open(archive_path, 'rb') as f:
scan_response = requests.post(
f"{RAKSHA_URL}/scan/upload",
files={'file': f}
)
if scan_response.status_code != 200:
return jsonify({'error': 'Scan failed'}), 500
scan_result = scan_response.json()
findings = scan_result.get('findings', [])
# Filter findings for changed lines
relevant_findings = filter_findings_for_changes(findings, changed_files)
# Create AI-enhanced review comments
for finding in relevant_findings:
if finding['severity'] in ['critical', 'high']:
# Get code context around the finding
file_content = get_file_content(
repo['full_name'],
finding['file'],
pr['head']['sha'],
installation_token
)
if file_content:
# Extract code snippet around the issue
lines = file_content.split('\n')
start_line = max(0, finding['line'] - 5)
end_line = min(len(lines), finding['line'] + 5)
code_snippet = '\n'.join(lines[start_line:end_line])
# Get AI analysis
ai_analysis = analyze_code_with_ai(code_snippet, finding)
# Create enhanced comment
comment_body = f"""
## 🛡️ Security Issue Detected
**{finding['title']}** ({finding['severity']} severity)
{finding['description']}
### 🤖 AI Security Analysis
{ai_analysis}
### 📊 Scan Details
- **Scanner**: {finding['scanner']}
- **CWE**: {finding.get('cwe', 'N/A')}
- **Scan ID**: `{scan_result['scan_id']}`
---
*Powered by RAKṢĀ AI Security Review*
"""
# Post comment on the specific line
create_review_comment(
repo['full_name'],
pr['number'],
pr['head']['sha'],
finding['file'],
finding['line'],
comment_body,
installation_token
)
return jsonify({'status': 'completed', 'findings': len(relevant_findings)}), 200
def filter_findings_for_changes(findings, changed_files):
"""Filter findings to only include issues in changed lines"""
relevant_findings = []
# Create mapping of files to changed line ranges
changed_lines = {}
for file_info in changed_files:
filename = file_info['filename']
changed_lines[filename] = []
# Parse patch to get changed line numbers
patch = file_info.get('patch', '')
current_line = 0
for line in patch.split('\n'):
if line.startswith('@@'):
# Extract line number from hunk header
import re
match = re.search(r'\+(\d+)', line)
if match:
current_line = int(match.group(1))
elif line.startswith('+') and not line.startswith('+++'):
changed_lines[filename].append(current_line)
current_line += 1
elif not line.startswith('-'):
current_line += 1
# Filter findings
for finding in findings:
file_path = finding['file']
line_number = finding['line']
if file_path in changed_lines:
# Check if the finding is in a changed line (or nearby)
for changed_line in changed_lines[file_path]:
if abs(line_number - changed_line) <= 2: # Within 2 lines
relevant_findings.append(finding)
break
return relevant_findings
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)Intelligent Review Dashboard
Create review-dashboard.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>RAKṢĀ Security Review Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.dashboard { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; }
.card { border: 1px solid #ddd; padding: 20px; border-radius: 8px; }
.metric { font-size: 2em; font-weight: bold; }
.critical { color: #dc3545; }
.high { color: #fd7e14; }
.medium { color: #ffc107; }
.low { color: #28a745; }
.finding { border-left: 4px solid #007bff; padding: 10px; margin: 10px 0; }
</style>
</head>
<body>
<h1>🛡️ RAKṢĀ Security Review Dashboard</h1>
<div class="dashboard">
<div class="card">
<h3>📊 Active Pull Requests</h3>
<div id="activePRs">Loading...</div>
</div>
<div class="card">
<h3>🚨 Critical Issues</h3>
<div class="metric critical" id="criticalCount">0</div>
</div>
<div class="card">
<h3>📈 Security Trends</h3>
<canvas id="trendsChart"></canvas>
</div>
<div class="card">
<h3>🔍 Recent Findings</h3>
<div id="recentFindings">Loading...</div>
</div>
</div>
<script>
// Dashboard JavaScript implementation
class SecurityDashboard {
constructor() {
this.loadDashboardData();
setInterval(() => this.loadDashboardData(), 30000); // Refresh every 30s
}
async loadDashboardData() {
try {
const response = await fetch('/api/dashboard-data');
const data = await response.json();
this.updateMetrics(data.metrics);
this.updateActivePRs(data.activePRs);
this.updateRecentFindings(data.recentFindings);
this.updateTrendsChart(data.trends);
} catch (error) {
console.error('Failed to load dashboard data:', error);
}
}
updateMetrics(metrics) {
document.getElementById('criticalCount').textContent = metrics.critical;
}
updateActivePRs(prs) {
const container = document.getElementById('activePRs');
container.innerHTML = prs.map(pr => `
<div class="finding">
<strong><a href="${pr.html_url}" target="_blank">#${pr.number}</a></strong><br>
${pr.title}<br>
<small>Critical: ${pr.critical}, High: ${pr.high}</small>
</div>
`).join('');
}
updateRecentFindings(findings) {
const container = document.getElementById('recentFindings');
container.innerHTML = findings.map(finding => `
<div class="finding">
<span class="${finding.severity}">${finding.severity.toUpperCase()}</span>
<strong>${finding.title}</strong><br>
${finding.file}:${finding.line}<br>
<small>${finding.repository}</small>
</div>
`).join('');
}
updateTrendsChart(trends) {
const ctx = document.getElementById('trendsChart').getContext('2d');
new Chart(ctx, {
type: 'line',
data: {
labels: trends.dates,
datasets: [
{
label: 'Critical',
data: trends.critical,
borderColor: '#dc3545',
fill: false
},
{
label: 'High',
data: trends.high,
borderColor: '#fd7e14',
fill: false
}
]
},
options: {
responsive: true,
scales: {
y: { beginAtZero: true }
}
}
});
}
}
// Initialize dashboard
document.addEventListener('DOMContentLoaded', () => {
new SecurityDashboard();
});
</script>
</body>
</html>Results
- Real-time Security Insights: Security analysis integrated into code review workflow
- AI-Powered Recommendations: Intelligent remediation suggestions for security issues
- Developer Education: In-context learning about secure coding practices
- Reduced False Positives: Context-aware filtering of security findings
Summary
These use cases demonstrate RAKṢĀ’s versatility across different organizational needs:
- Pre-commit Gates: Early detection with minimal developer friction
- PR Security Gates: Automated blocking with detailed reporting
- Organizational Audits: Comprehensive security posture assessment
- AI-Enhanced Reviews: Intelligent code review integration
Each implementation can be customized based on:
- Organization size and structure
- Security requirements and risk tolerance
- Development workflow and toolchain
- Integration capabilities and constraints
Next: Monitoring Guide for setting up Datadog APM and tracking security metrics.