After a security audit, your organization received some sobering findings:
- Vulnerable dependencies in container images
- Hardcoded secrets in the codebase (API keys, passwords, tokens)
- Missing security scanning in CI/CD pipelines
- No automated compliance checks
- Slow security review process (3-5 days per release)
The challenge? Implement a comprehensive DevSecOps pipeline that:
- Maintains daily deployment frequency
- Doesn’t add significant delays to the pipeline
- Ensures regulatory compliance (PCI-DSS, GDPR, SOC 2)
- Builds security into the culture, not just the tools
In this article, I’ll walk through building a complete DevSecOps pipeline on AWS that addresses all these concerns while maintaining developer velocity.
DevSecOps Pipeline Architecture
The Shift-Left Philosophy
Shift-Left Security means catching security issues as early as possible in the development lifecycle:
Developer → Pre-Commit → CI Pipeline → Pre-Deployment → Production ↓ ↓ ↓ ↓ ↓ Local Scan Git Hooks SAST/DAST Container Scan Runtime Protection
The earlier we catch issues, the cheaper and faster they are to fix.
AWS DevSecOps Stack
Core AWS Services:
- AWS CodePipeline – CI/CD orchestration
- AWS CodeBuild – Build and security scanning
- AWS CodeCommit/GitHub – Source code management
- Amazon ECR – Container registry with scanning
- AWS Secrets Manager – Secret management
- AWS Security Hub – Centralized security findings
- Amazon Inspector – Vulnerability assessment
- AWS Config – Compliance monitoring
- AWS IAM – Access control and least privilege
Phase 1: Secret Management Solution
The Problem: Hardcoded Secrets
Hardcoded secrets are a critical security risk. They can be:
- Exposed in version control
- Leaked in logs or error messages
- Accessible to anyone with repository access
Solution: AWS Secrets Manager
Step 1: Migrate Existing Secrets
import boto3 import json import re secrets_client = boto3.client('secretsmanager') def migrate_secrets_from_codebase(): """Scan codebase and migrate hardcoded secrets to Secrets Manager""" # Patterns to detect secrets secret_patterns = { 'api_key': r'api[_-]?key["']?s*[:=]s*["']([^"']+)["']', 'password': r'password["']?s*[:=]s*["']([^"']+)["']', 'token': r'token["']?s*[:=]s*["']([^"']+)["']', 'secret': r'secret["']?s*[:=]s*["']([^"']+)["']' } # Scan files (example) files_to_scan = ['config.py', 'settings.py', '.env.example'] for file_path in files_to_scan: with open(file_path, 'r') as f: content = f.read() for secret_type, pattern in secret_patterns.items(): matches = re.findall(pattern, content, re.IGNORECASE) for match in matches: # Create secret in Secrets Manager secret_name = f"payment-app/{secret_type}/{file_path}" secrets_client.create_secret( Name=secret_name, SecretString=match, Description=f"Migrated from {file_path}" ) print(f"Created secret: {secret_name}") # Run migration migrate_secrets_from_codebase()
Step 2: Update Application Code
import boto3 import json secrets_client = boto3.client('secretsmanager') def get_secret(secret_name): """Retrieve secret from AWS Secrets Manager""" try: response = secrets_client.get_secret_value(SecretId=secret_name) return json.loads(response['SecretString']) except Exception as e: print(f"Error retrieving secret: {e}") raise # Usage in application def connect_to_database(): """Connect to database using secrets from Secrets Manager""" db_secrets = get_secret('payment-app/database/credentials') connection = psycopg2.connect( host=db_secrets['host'], database=db_secrets['database'], user=db_secrets['username'], password=db_secrets['password'] ) return connection
Step 3: Automatic Secret Rotation
import boto3 lambda_client = boto3.client('lambda') def create_rotation_lambda(): """Create Lambda function for automatic secret rotation""" rotation_code = ''' import boto3 import json import psycopg2 def lambda_handler(event, context): """Rotate database password""" secrets_client = boto3.client('secretsmanager') # Get current secret secret_arn = event['SecretId'] current_secret = json.loads( secrets_client.get_secret_value(SecretId=secret_arn)['SecretString'] ) # Generate new password new_password = generate_secure_password() # Update database conn = psycopg2.connect( host=current_secret['host'], database=current_secret['database'], user=current_secret['username'], password=current_secret['password'] ) # Update password in database # ... (implementation) # Update secret secrets_client.update_secret( SecretId=secret_arn, SecretString=json.dumps({ **current_secret, 'password': new_password }) ) return {'statusCode': 200} ''' # Create Lambda function lambda_client.create_function( FunctionName='rotate-db-secret', Runtime='python3.9', Role='arn:aws:iam::account:role/secret-rotation-role', Handler='index.lambda_handler', Code={'ZipFile': rotation_code.encode()} ) # Enable automatic rotation secrets_client = boto3.client('secretsmanager') secrets_client.rotate_secret( SecretId='payment-app/database/credentials', RotationLambdaARN='arn:aws:lambda:region:account:function:rotate-db-secret', RotationRules={ 'AutomaticallyAfterDays': 30 } )
Step 4: Pre-Commit Hook to Detect Secrets
#!/usr/bin/env python3 # .git/hooks/pre-commit import subprocess import re import sys def detect_secrets(): """Detect potential secrets before commit""" # Get staged files result = subprocess.run( ['git', 'diff', '--cached', '--name-only'], capture_output=True, text=True ) staged_files = result.stdout.strip().split('n') # Secret patterns patterns = [ (r'passwords*[:=]s*["']([^"']+)["']', 'Password detected'), (r'api[_-]?keys*[:=]s*["']([^"']+)["']', 'API key detected'), (r'secrets*[:=]s*["']([^"']+)["']', 'Secret detected'), (r'-----BEGIN (RSA |OPENSSH )?PRIVATE KEY-----', 'Private key detected'), ] violations = [] for file_path in staged_files: if not file_path: continue try: with open(file_path, 'r') as f: content = f.read() for pattern, message in patterns: if re.search(pattern, content, re.IGNORECASE): violations.append(f"{file_path}: {message}") except Exception: continue if violations: print("❌ SECURITY VIOLATION: Potential secrets detected!") for violation in violations: print(f" - {violation}") print("nPlease use AWS Secrets Manager instead.") print("See: https://docs.aws.amazon.com/secretsmanager/") sys.exit(1) print("✅ No secrets detected in staged files") return 0 if __name__ == '__main__': sys.exit(detect_secrets())
Make it executable:
chmod +x .git/hooks/pre-commit
Phase 2: Security Scanning Strategy
SAST (Static Application Security Testing)
AWS CodeBuild with SAST Tools
# buildspec-sast.yml version: 0.2 phases: pre_build: commands: - echo Installing SAST tools... - | # Install Semgrep for SAST pip install semgrep - | # Install Bandit for Python security scanning pip install bandit build: commands: - echo Running SAST scans... - | # Semgrep scan semgrep --config=auto --json --output=semgrep-results.json . - | # Bandit scan (for Python) bandit -r . -f json -o bandit-results.json || true - | # SonarQube scan (if using) sonar-scanner -Dsonar.projectKey=payment-app -Dsonar.sources=. -Dsonar.host.url=$SONARQUBE_URL -Dsonar.login=$SONARQUBE_TOKEN post_build: commands: - echo Uploading SAST results... - | # Upload to AWS Security Hub aws securityhub batch-import-findings --findings file://convert-to-security-hub-format.json - | # Fail build if critical issues found python check-sast-results.py
SAST Results Checker:
# check-sast-results.py import json import sys def check_sast_results(): """Check SAST results and fail build if critical issues found""" with open('semgrep-results.json', 'r') as f: semgrep_results = json.load(f) critical_issues = [] for result in semgrep_results.get('results', []): severity = result.get('extra', {}).get('severity', '') if severity in ['ERROR', 'WARNING']: critical_issues.append({ 'file': result.get('path', ''), 'message': result.get('message', ''), 'severity': severity }) if critical_issues: print("Critical security issues found:") for issue in critical_issues: print(f" - {issue['file']}: {issue['message']} ({issue['severity']})") sys.exit(1) print("SAST scan passed") return 0 if __name__ == '__main__': sys.exit(check_sast_results())
DAST (Dynamic Application Security Testing)
AWS CodeBuild with OWASP ZAP:
# buildspec-dast.yml version: 0.2 phases: pre_build: commands: - echo Installing DAST tools... - | # Install OWASP ZAP wget https://github.com/zaproxy/zaproxy/releases/download/v2.12.0/ZAP_2.12.0_Linux.tar.gz tar -xzf ZAP_2.12.0_Linux.tar.gz build: commands: - echo Starting application for DAST... - | # Start application (example) docker-compose up -d payment-app sleep 30 # Wait for app to be ready - | # Run OWASP ZAP baseline scan ./ZAP_2.12.0/zap-baseline.py -t http://localhost:8080 -J zap-report.json - | # Run OWASP ZAP full scan (more thorough) ./ZAP_2.12.0/zap-full-scan.py -t http://localhost:8080 -J zap-full-report.json -I # Ignore warnings post_build: commands: - echo Processing DAST results... - | # Convert and upload to Security Hub python process-dast-results.py zap-report.json - | # Fail if critical vulnerabilities found python check-dast-results.py zap-report.json
DAST Results Processor:
# process-dast-results.py import json import sys import boto3 securityhub = boto3.client('securityhub') def process_dast_results(report_file): """Process DAST results and send to Security Hub""" with open(report_file, 'r') as f: zap_results = json.load(f) findings = [] for site in zap_results.get('site', []): for alert in site.get('alerts', []): severity_map = { 'High': 'HIGH', 'Medium': 'MEDIUM', 'Low': 'LOW', 'Informational': 'INFORMATIONAL' } severity = severity_map.get(alert.get('risk', ''), 'INFORMATIONAL') if severity in ['HIGH', 'MEDIUM']: finding = { 'SchemaVersion': '2018-10-08', 'Id': f"zap-{alert.get('pluginid', 'unknown')}", 'ProductArn': 'arn:aws:securityhub:region:account:product/account/default', 'GeneratorId': 'owasp-zap', 'AwsAccountId': 'account-id', 'Types': ['Security Findings'], 'CreatedAt': '2024-01-01T00:00:00Z', 'UpdatedAt': '2024-01-01T00:00:00Z', 'Severity': { 'Label': severity }, 'Title': alert.get('name', 'Unknown vulnerability'), 'Description': alert.get('desc', ''), 'Remediation': { 'Recommendation': { 'Text': alert.get('solution', '') } } } findings.append(finding) if findings: # Batch import to Security Hub securityhub.batch_import_findings(Findings=findings) print(f"✅ Imported {len(findings)} findings to Security Hub") return findings if __name__ == '__main__': if len(sys.argv) < 2: print("Usage: python process-dast-results.py <zap-report.json>") sys.exit(1) findings = process_dast_results(sys.argv[1]) sys.exit(0 if not findings else 1)
Container Image Scanning
Amazon ECR Image Scanning:
# Enable automatic image scanning on push aws ecr put-image-scanning-configuration --repository-name payment-app --image-scanning-configuration scanOnPush=true # Scan existing images aws ecr start-image-scan --repository-name payment-app --image-id imageTag=latest # Get scan results aws ecr describe-image-scan-findings --repository-name payment-app --image-id imageTag=latest
CodeBuild Integration:
# buildspec-container-scan.yml version: 0.2 phases: pre_build: commands: - echo Logging in to Amazon ECR... - aws ecr get-login-password --region $AWS_DEFAULT_REGION | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com - REPOSITORY_URI=$AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME - COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION | cut -c 1-7) - IMAGE_TAG=${COMMIT_HASH:=latest} build: commands: - echo Building Docker image... - docker build -t $IMAGE_REPO_NAME:$IMAGE_TAG . - docker tag $IMAGE_REPO_NAME:$IMAGE_TAG $REPOSITORY_URI:$IMAGE_TAG post_build: commands: - echo Pushing Docker image... - docker push $REPOSITORY_URI:$IMAGE_TAG - echo Waiting for image scan to complete... - | # Wait for scan to complete (ECR scans automatically on push) aws ecr wait image-scan-complete --repository-name $IMAGE_REPO_NAME --image-id imageTag=$IMAGE_TAG --max-attempts 30 --delay 10 - echo Retrieving scan results... - | # Get scan findings SCAN_RESULTS=$(aws ecr describe-image-scan-findings --repository-name $IMAGE_REPO_NAME --image-id imageTag=$IMAGE_TAG --query 'imageScanFindings' --output json) - | # Check for critical vulnerabilities CRITICAL_COUNT=$(echo $SCAN_RESULTS | jq '.findingCounts.CRITICAL // 0') HIGH_COUNT=$(echo $SCAN_RESULTS | jq '.findingCounts.HIGH // 0') if [ "$CRITICAL_COUNT" -gt 0 ] || [ "$HIGH_COUNT" -gt 5 ]; then echo "❌ Critical vulnerabilities found in image!" echo "Critical: $CRITICAL_COUNT, High: $HIGH_COUNT" exit 1 fi echo "Image scan passed (Critical: $CRITICAL_COUNT, High: $HIGH_COUNT)" - | # Export scan results to Security Hub python export-ecr-findings.py $IMAGE_REPO_NAME $IMAGE_TAG
ECR Findings Exporter:
# export-ecr-findings.py import boto3 import json import sys def export_ecr_findings(repo_name, image_tag): """Export ECR scan findings to Security Hub""" ecr = boto3.client('ecr') securityhub = boto3.client('securityhub') # Get scan findings response = ecr.describe_image_scan_findings( repositoryName=repo_name, imageId={'imageTag': image_tag} ) findings = response.get('imageScanFindings', {}) findings_list = findings.get('findings', []) security_hub_findings = [] for finding in findings_list: severity_map = { 'CRITICAL': 'CRITICAL', 'HIGH': 'HIGH', 'MEDIUM': 'MEDIUM', 'LOW': 'LOW', 'INFORMATIONAL': 'INFORMATIONAL' } severity = severity_map.get(finding.get('severity', ''), 'INFORMATIONAL') # Only export HIGH and CRITICAL if severity in ['CRITICAL', 'HIGH']: security_hub_finding = { 'SchemaVersion': '2018-10-08', 'Id': f"ecr-{repo_name}-{image_tag}-{finding.get('name', 'unknown')}", 'ProductArn': f'arn:aws:securityhub:region:account:product/account/default', 'GeneratorId': 'amazon-ecr-image-scan', 'AwsAccountId': 'account-id', 'Types': ['Vulnerabilities'], 'CreatedAt': finding.get('firstObservedAt', ''), 'UpdatedAt': finding.get('lastObservedAt', ''), 'Severity': { 'Label': severity }, 'Title': finding.get('name', 'Unknown vulnerability'), 'Description': finding.get('description', ''), 'Remediation': { 'Recommendation': { 'Text': finding.get('remediation', {}).get('recommendation', {}).get('text', '') } }, 'Resources': [{ 'Type': 'AwsEcrContainerImage', 'Id': f"{repo_name}:{image_tag}", 'Region': 'us-east-1' }] } security_hub_findings.append(security_hub_finding) if security_hub_findings: securityhub.batch_import_findings(Findings=security_hub_findings) print(f"✅ Exported {len(security_hub_findings)} findings to Security Hub") return security_hub_findings if __name__ == '__main__': if len(sys.argv) < 3: print("Usage: python export-ecr-findings.py <repo-name> <image-tag>") sys.exit(1) findings = export_ecr_findings(sys.argv[1], sys.argv[2]) sys.exit(0)
Dependency Scanning
Scanning Dependencies for Vulnerabilities:
# buildspec-dependency-scan.yml version: 0.2 phases: pre_build: commands: - echo Installing dependency scanning tools... - | # For Python pip install safety - | # For Node.js npm install -g npm-audit-resolver - | # For Java (Maven) # Use OWASP Dependency-Check wget https://github.com/jeremylong/DependencyCheck/releases/download/v8.4.0/dependency-check-8.4.0-release.zip unzip dependency-check-8.4.0-release.zip build: commands: - echo Scanning dependencies... - | # Python safety check safety check --json --output safety-report.json || true - | # Node.js npm audit npm audit --json > npm-audit-report.json || true - | # Java/Maven dependency check ./dependency-check/bin/dependency-check.sh --project payment-app --scan . --format JSON --out dependency-check-report.json post_build: commands: - echo Processing dependency scan results... - | # Process and upload to Security Hub python process-dependency-results.py - | # Fail build if critical vulnerabilities found python check-dependency-results.py
Phase 3: Complete DevSecOps Pipeline
AWS CodePipeline with Security Gates
{ "pipeline": { "name": "payment-app-devsecops-pipeline", "roleArn": "arn:aws:iam::account:role/CodePipelineServiceRole", "artifactStore": { "type": "S3", "location": "payment-app-artifacts" }, "stages": [ { "name": "Source", "actions": [{ "name": "SourceAction", "actionTypeId": { "category": "Source", "owner": "AWS", "provider": "CodeCommit", "version": "1" }, "outputArtifacts": [{"name": "SourceOutput"}], "configuration": { "RepositoryName": "payment-app", "BranchName": "main" } }] }, { "name": "PreBuildSecurity", "actions": [{ "name": "SecretScan", "actionTypeId": { "category": "Build", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "SourceOutput"}], "outputArtifacts": [{"name": "SecretScanOutput"}], "configuration": { "ProjectName": "payment-app-secret-scan" } }] }, { "name": "Build", "actions": [{ "name": "BuildAndSAST", "actionTypeId": { "category": "Build", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "SourceOutput"}], "outputArtifacts": [{"name": "BuildOutput"}], "configuration": { "ProjectName": "payment-app-build-sast" } }] }, { "name": "SecurityGates", "actions": [ { "name": "DependencyScan", "actionTypeId": { "category": "Build", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "BuildOutput"}], "outputArtifacts": [{"name": "DependencyScanOutput"}], "configuration": { "ProjectName": "payment-app-dependency-scan" } }, { "name": "ContainerBuildAndScan", "actionTypeId": { "category": "Build", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "BuildOutput"}], "outputArtifacts": [{"name": "ContainerOutput"}], "configuration": { "ProjectName": "payment-app-container-build-scan" } } ] }, { "name": "DAST", "actions": [{ "name": "DynamicScan", "actionTypeId": { "category": "Test", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "ContainerOutput"}], "outputArtifacts": [{"name": "DASTOutput"}], "configuration": { "ProjectName": "payment-app-dast" } }] }, { "name": "DeployToStaging", "actions": [{ "name": "DeployStaging", "actionTypeId": { "category": "Deploy", "owner": "AWS", "provider": "ECS", "version": "1" }, "inputArtifacts": [{"name": "ContainerOutput"}], "configuration": { "ClusterName": "payment-staging", "ServiceName": "payment-service" } }] }, { "name": "ComplianceCheck", "actions": [{ "name": "ComplianceValidation", "actionTypeId": { "category": "Test", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "DASTOutput"}], "configuration": { "ProjectName": "payment-app-compliance-check" } }] }, { "name": "DeployToProduction", "actions": [{ "name": "DeployProduction", "actionTypeId": { "category": "Deploy", "owner": "AWS", "provider": "ECS", "version": "1" }, "inputArtifacts": [{"name": "ContainerOutput"}], "configuration": { "ClusterName": "payment-production", "ServiceName": "payment-service" } }] } ] } }
Security Gates Implementation
Lambda Function for Security Gate Evaluation:
# security-gate-evaluator.py import boto3 import json securityhub = boto3.client('securityhub') codebuild = boto3.client('codebuild') def evaluate_security_gates(build_id): """Evaluate security gates before allowing deployment""" # Get Security Hub findings for this build findings = securityhub.get_findings( Filters={ 'ResourceId': [{'Value': build_id, 'Comparison': 'EQUALS'}], 'SeverityLabel': [ {'Value': 'CRITICAL', 'Comparison': 'EQUALS'}, {'Value': 'HIGH', 'Comparison': 'EQUALS'} ] } ) critical_findings = [ f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'CRITICAL' ] high_findings = [ f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'HIGH' ] # Gate rules gate_passed = True reasons = [] # Rule 1: No CRITICAL findings allowed if critical_findings: gate_passed = False reasons.append(f"{len(critical_findings)} CRITICAL findings found") # Rule 2: Maximum 3 HIGH findings allowed if len(high_findings) > 3: gate_passed = False reasons.append(f"{len(high_findings)} HIGH findings found (max 3 allowed)") # Rule 3: No hardcoded secrets secret_findings = [ f for f in findings.get('Findings', []) if 'secret' in f.get('Title', '').lower() or 'password' in f.get('Title', '').lower() ] if secret_findings: gate_passed = False reasons.append(f"{len(secret_findings)} secret-related findings found") result = { 'gate_passed': gate_passed, 'reasons': reasons, 'critical_count': len(critical_findings), 'high_count': len(high_findings) } return result def lambda_handler(event, context): """Lambda handler for security gate evaluation""" build_id = event.get('build_id') result = evaluate_security_gates(build_id) if not result['gate_passed']: # Stop pipeline raise Exception(f"Security gate failed: {', '.join(result['reasons'])}") return { 'statusCode': 200, 'body': json.dumps(result) }
Phase 4: Compliance Automation
AWS Config for Compliance Monitoring
Enable AWS Config:
# Enable Config aws configservice put-configuration-recorder --configuration-recorder name=default,roleArn=arn:aws:iam::account:role/ConfigRole # Start recording aws configservice start-configuration-recorder --configuration-recorder-name default
PCI-DSS Compliance Rules:
{ "ConfigRuleName": "pci-dss-encryption-check", "Description": "Check that EBS volumes are encrypted (PCI-DSS requirement)", "Scope": { "ComplianceResourceTypes": ["AWS::EC2::Volume"] }, "Source": { "Owner": "AWS", "SourceIdentifier": "ENCRYPTED_VOLUMES" } }
Custom Compliance Rule:
# custom-compliance-rule.py import boto3 import json config = boto3.client('config') def evaluate_compliance(configuration_item): """Evaluate if resource is compliant""" compliance_status = 'COMPLIANT' annotation = '' # Example: Check if RDS instance has encryption enabled if configuration_item['resourceType'] == 'AWS::RDS::DBInstance': if not configuration_item.get('configuration', {}).get('storageEncrypted', False): compliance_status = 'NON_COMPLIANT' annotation = 'RDS instance must have encryption enabled for PCI-DSS compliance' # Example: Check if security groups allow unrestricted access if configuration_item['resourceType'] == 'AWS::EC2::SecurityGroup': ip_permissions = configuration_item.get('configuration', {}).get('ipPermissions', []) for perm in ip_permissions: for ip_range in perm.get('ipRanges', []): if ip_range.get('cidrIp') == '0.0.0.0/0': compliance_status = 'NON_COMPLIANT' annotation = 'Security group allows unrestricted access (0.0.0.0/0)' return { 'compliance_type': compliance_status, 'annotation': annotation } def lambda_handler(event, context): """Lambda handler for Config custom rule""" configuration_item = json.loads(event['invokingEvent'])['configurationItem'] evaluation = evaluate_compliance(configuration_item) config.put_evaluations( Evaluations=[{ 'ComplianceResourceType': configuration_item['resourceType'], 'ComplianceResourceId': configuration_item['resourceId'], 'ComplianceType': evaluation['compliance_type'], 'Annotation': evaluation['annotation'], 'OrderingTimestamp': configuration_item['configurationItemCaptureTime'] }] ) return evaluation
Automated Compliance Reporting
# compliance-reporter.py import boto3 from datetime import datetime config = boto3.client('config') s3 = boto3.client('s3') def generate_compliance_report(): """Generate compliance report for audit""" # Get compliance summary response = config.get_compliance_summary_by_config_rule() report = { 'timestamp': datetime.utcnow().isoformat(), 'compliance_summary': response.get('ComplianceSummariesByConfigRule', []), 'overall_compliance': calculate_overall_compliance(response) } # Generate detailed findings findings = [] for rule_summary in response.get('ComplianceSummariesByConfigRule', []): if rule_summary.get('ComplianceSummary', {}).get('NonCompliantResourceCount', {}).get('CappedCount', 0) > 0: findings.append({ 'rule_name': rule_summary.get('ConfigRuleName', ''), 'non_compliant_count': rule_summary.get('ComplianceSummary', {}).get('NonCompliantResourceCount', {}).get('CappedCount', 0) }) report['findings'] = findings # Save to S3 for audit trail s3.put_object( Bucket='compliance-reports', Key=f"compliance-report-{datetime.utcnow().strftime('%Y-%m-%d')}.json", Body=json.dumps(report, indent=2) ) return report def calculate_overall_compliance(response): """Calculate overall compliance percentage""" total_resources = 0 compliant_resources = 0 for rule_summary in response.get('ComplianceSummariesByConfigRule', []): summary = rule_summary.get('ComplianceSummary', {}) total_resources += summary.get('ComplianceResourceCount', {}).get('CappedCount', 0) compliant_resources += summary.get('CompliantResourceCount', {}).get('CappedCount', 0) if total_resources == 0: return 100.0 return (compliant_resources / total_resources) * 100 # Schedule with EventBridge import boto3 events = boto3.client('events') events.put_rule( Name='daily-compliance-report', ScheduleExpression='cron(0 2 * * ? *)', # Daily at 2 AM State='ENABLED' ) events.put_targets( Rule='daily-compliance-report', Targets=[{ 'Id': '1', 'Arn': 'arn:aws:lambda:region:account:function:compliance-reporter', 'Input': json.dumps({}) }] )
Phase 5: Security Hub Integration
Centralized Security Findings
Enable AWS Security Hub:
# Enable Security Hub aws securityhub enable-security-hub # Enable security standards aws securityhub batch-enable-standards --standards-subscription-requests StandardsArn=arn:aws:securityhub:region::standards/aws-foundational-security-best-practices/v/1.0.0 StandardsArn=arn:aws:securityhub:region::standards/pci-dss/v/3.2.1
Aggregate Findings from All Sources:
# security-findings-aggregator.py import boto3 from datetime import datetime, timedelta securityhub = boto3.client('securityhub') def aggregate_security_findings(): """Aggregate security findings from all sources""" # Get findings from last 24 hours end_time = datetime.utcnow() start_time = end_time - timedelta(days=1) findings = securityhub.get_findings( Filters={ 'CreatedAt': [{ 'Start': start_time.isoformat(), 'End': end_time.isoformat() }], 'SeverityLabel': [ {'Value': 'CRITICAL', 'Comparison': 'EQUALS'}, {'Value': 'HIGH', 'Comparison': 'EQUALS'} ] }, MaxResults=100 ) # Group by source findings_by_source = {} for finding in findings.get('Findings', []): source = finding.get('ProductFields', {}).get('aws/securityhub/SourceIdentifier', 'unknown') if source not in findings_by_source: findings_by_source[source] = [] findings_by_source[source].append(finding) # Generate summary summary = { 'timestamp': datetime.utcnow().isoformat(), 'total_findings': len(findings.get('Findings', [])), 'findings_by_source': { source: len(finds) for source, finds in findings_by_source.items() }, 'critical_count': len([f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'CRITICAL']), 'high_count': len([f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'HIGH']) } return summary
Phase 6: Pipeline Performance Optimization
Parallel Execution
Run Security Scans in Parallel:
# buildspec-parallel-security.yml version: 0.2 phases: build: commands: - echo Running security scans in parallel... - | # Run SAST, dependency scan, and secret scan in parallel semgrep --config=auto . & safety check & git-secrets --scan & wait # Wait for all to complete - echo All security scans completed
Caching for Faster Builds
CodeBuild Cache Configuration:
{ "cache": { "type": "LOCAL", "modes": [ "LOCAL_DOCKER_LAYER_CACHE", "LOCAL_SOURCE_CACHE" ] } }
Docker Layer Caching:
# Dockerfile with layer caching FROM maven:3.8-openjdk-11 AS dependencies COPY pom.xml . RUN mvn dependency:go-offline FROM dependencies AS build COPY src ./src RUN mvn package -DskipTests FROM openjdk:11-jre-slim COPY --from=build /app/target/*.jar app.jar ENTRYPOINT ["java", "-jar", "app.jar"]
Fail-Fast Strategy
Early Failure Detection:
# fail-fast-security-checks.py import sys import subprocess def run_fast_security_checks(): """Run quick security checks that fail fast""" checks = [ ('Secret detection', 'git-secrets --scan'), ('Basic SAST', 'semgrep --config=auto --error'), ('Dependency vulnerabilities', 'safety check --short-report') ] failed_checks = [] for check_name, command in checks: print(f"Running {check_name}...") result = subprocess.run( command.split(), capture_output=True, timeout=60 # Fail fast with timeout ) if result.returncode != 0: failed_checks.append(check_name) print(f"❌ {check_name} failed") print(result.stdout.decode()) else: print(f"✅ {check_name} passed") if failed_checks: print(f"n❌ Failed checks: {', '.join(failed_checks)}") sys.exit(1) print("n✅ All fast security checks passed") return 0 if __name__ == '__main__': sys.exit(run_fast_security_checks())
Phase 7: Team Training and Adoption
Developer Onboarding
Security Training Materials:
# Security Best Practices Guide ## Secret Management - Never commit secrets to version control - Use AWS Secrets Manager for all secrets - Rotate secrets regularly ## Dependency Management - Keep dependencies up to date - Review security advisories - Use dependency scanning tools ## Code Security - Follow OWASP Top 10 guidelines - Use SAST tools before committing - Review security findings promptly
Automated Security Reminders
GitHub/GitLab Integration:
# security-reminder-bot.py import boto3 import requests def send_security_reminder(pr_number, findings_count): """Send reminder about security findings in PR""" message = f""" Security Review Required PR #{pr_number} has {findings_count} security findings that need attention. Please review and address: - Critical findings must be fixed before merge - High findings should be addressed or justified - Medium/Low findings can be tracked as technical debt View findings: https://security-hub.aws.amazon.com/findings """ # Send to Slack/Teams webhook_url = os.environ.get('SLACK_WEBHOOK_URL') requests.post(webhook_url, json={'text': message})
Security Champions Program
Identify and Train Security Champions:
# security-champion-tracker.py import boto3 def identify_security_champions(): """Identify developers who actively address security issues""" codecommit = boto3.client('codecommit') # Analyze commit history for security-related commits # Track who fixes security issues # Recognize top contributors champions = [ { 'name': 'Developer Name', 'security_fixes': 15, 'areas': ['SAST fixes', 'Dependency updates', 'Secret management'] } ] return champions
Metrics and Success Criteria
Key Performance Indicators
# devsecops-metrics.py import boto3 from datetime import datetime, timedelta def calculate_devsecops_metrics(): """Calculate DevSecOps pipeline metrics""" codebuild = boto3.client('codebuild') securityhub = boto3.client('securityhub') # Get pipeline metrics builds = codebuild.batch_get_builds( ids=get_recent_build_ids() ) metrics = { 'pipeline_duration': calculate_avg_pipeline_duration(builds), 'security_scan_time': calculate_avg_scan_time(builds), 'findings_detected': get_findings_count_last_30_days(), 'findings_fixed': get_findings_fixed_count(), 'deployment_frequency': get_deployment_frequency(), 'mean_time_to_remediate': calculate_mttr() } return metrics def calculate_avg_pipeline_duration(builds): """Calculate average pipeline duration""" durations = [] for build in builds.get('builds', []): start = build.get('startTime') end = build.get('endTime') if start and end: duration = (end - start).total_seconds() durations.append(duration) return sum(durations) / len(durations) if durations else 0 # Target metrics target_metrics = { 'pipeline_duration_minutes': 15, # Target: < 15 minutes 'security_scan_time_minutes': 5, # Target: < 5 minutes 'findings_detected_per_week': 0, # Target: Reduce over time 'mean_time_to_remediate_hours': 24, # Target: < 24 hours 'deployment_frequency_per_day': 1 # Target: Maintain daily deployments }
Best Practices Summary
Do’s ✅
- Shift-Left: Run security checks as early as possible
- Automate Everything: Don’t rely on manual security reviews
- Fail Fast: Catch issues early, fail builds quickly
- Centralize Findings: Use Security Hub for unified view
- Educate Team: Security is everyone’s responsibility
- Measure Everything: Track metrics to improve
Don’ts ❌
- Don’t Block Developers: Balance security with velocity
- Don’t Ignore False Positives: Tune rules to reduce noise
- Don’t Skip Compliance: Automate compliance checks
- Don’t Forget Runtime: Security doesn’t end at deployment
- Don’t Set It and Forget It: Continuously improve the pipeline
Conclusion
Building a DevSecOps pipeline on AWS requires integrating security at every stage of the development lifecycle. Key takeaways:
- AWS Secrets Manager eliminates hardcoded secrets
- Multi-layered scanning (SAST, DAST, container, dependency) catches issues early
- Security Hub provides centralized visibility
- Automated compliance ensures regulatory requirements are met
- Security gates prevent vulnerable code from reaching production
- Performance optimization maintains daily deployment frequency
The result? A secure, compliant, and fast development pipeline that builds security into your culture, not just your tools.
Additional Resources
- AWS Security Hub User Guide
- AWS Secrets Manager Best Practices
- OWASP Top 10
- AWS Well-Architected Security Pillar
- PCI-DSS Compliance on AWS
Source: DEV Community.

Leave a Reply