After a security audit, your organization received some sobering findings:

  • Vulnerable dependencies in container images
  • Hardcoded secrets in the codebase (API keys, passwords, tokens)
  • Missing security scanning in CI/CD pipelines
  • No automated compliance checks
  • Slow security review process (3-5 days per release)

The challenge? Implement a comprehensive DevSecOps pipeline that:

  • Maintains daily deployment frequency
  • Doesn’t add significant delays to the pipeline
  • Ensures regulatory compliance (PCI-DSS, GDPR, SOC 2)
  • Builds security into the culture, not just the tools

In this article, I’ll walk through building a complete DevSecOps pipeline on AWS that addresses all these concerns while maintaining developer velocity.

DevSecOps Pipeline Architecture

The Shift-Left Philosophy

Shift-Left Security means catching security issues as early as possible in the development lifecycle:

Developer → Pre-Commit → CI Pipeline → Pre-Deployment → Production ↓ ↓ ↓ ↓ ↓ Local Scan Git Hooks SAST/DAST Container Scan Runtime Protection 

The earlier we catch issues, the cheaper and faster they are to fix.

AWS DevSecOps Stack

Core AWS Services:

  • AWS CodePipeline – CI/CD orchestration
  • AWS CodeBuild – Build and security scanning
  • AWS CodeCommit/GitHub – Source code management
  • Amazon ECR – Container registry with scanning
  • AWS Secrets Manager – Secret management
  • AWS Security Hub – Centralized security findings
  • Amazon Inspector – Vulnerability assessment
  • AWS Config – Compliance monitoring
  • AWS IAM – Access control and least privilege

Phase 1: Secret Management Solution

The Problem: Hardcoded Secrets

Hardcoded secrets are a critical security risk. They can be:

  • Exposed in version control
  • Leaked in logs or error messages
  • Accessible to anyone with repository access

Solution: AWS Secrets Manager

Step 1: Migrate Existing Secrets

import boto3 import json import re secrets_client = boto3.client('secretsmanager') def migrate_secrets_from_codebase(): """Scan codebase and migrate hardcoded secrets to Secrets Manager""" # Patterns to detect secrets  secret_patterns = { 'api_key': r'api[_-]?key["']?s*[:=]s*["']([^"']+)["']', 'password': r'password["']?s*[:=]s*["']([^"']+)["']', 'token': r'token["']?s*[:=]s*["']([^"']+)["']', 'secret': r'secret["']?s*[:=]s*["']([^"']+)["']' } # Scan files (example)  files_to_scan = ['config.py', 'settings.py', '.env.example'] for file_path in files_to_scan: with open(file_path, 'r') as f: content = f.read() for secret_type, pattern in secret_patterns.items(): matches = re.findall(pattern, content, re.IGNORECASE) for match in matches: # Create secret in Secrets Manager  secret_name = f"payment-app/{secret_type}/{file_path}" secrets_client.create_secret( Name=secret_name, SecretString=match, Description=f"Migrated from {file_path}" ) print(f"Created secret: {secret_name}") # Run migration migrate_secrets_from_codebase() 

Step 2: Update Application Code

import boto3 import json secrets_client = boto3.client('secretsmanager') def get_secret(secret_name): """Retrieve secret from AWS Secrets Manager""" try: response = secrets_client.get_secret_value(SecretId=secret_name) return json.loads(response['SecretString']) except Exception as e: print(f"Error retrieving secret: {e}") raise # Usage in application def connect_to_database(): """Connect to database using secrets from Secrets Manager""" db_secrets = get_secret('payment-app/database/credentials') connection = psycopg2.connect( host=db_secrets['host'], database=db_secrets['database'], user=db_secrets['username'], password=db_secrets['password'] ) return connection 

Step 3: Automatic Secret Rotation

import boto3 lambda_client = boto3.client('lambda') def create_rotation_lambda(): """Create Lambda function for automatic secret rotation""" rotation_code = ''' import boto3 import json import psycopg2 def lambda_handler(event, context): """Rotate database password""" secrets_client = boto3.client('secretsmanager') # Get current secret secret_arn = event['SecretId'] current_secret = json.loads( secrets_client.get_secret_value(SecretId=secret_arn)['SecretString'] ) # Generate new password new_password = generate_secure_password() # Update database conn = psycopg2.connect( host=current_secret['host'], database=current_secret['database'], user=current_secret['username'], password=current_secret['password'] ) # Update password in database # ... (implementation) # Update secret secrets_client.update_secret( SecretId=secret_arn, SecretString=json.dumps({ **current_secret, 'password': new_password }) ) return {'statusCode': 200} ''' # Create Lambda function  lambda_client.create_function( FunctionName='rotate-db-secret', Runtime='python3.9', Role='arn:aws:iam::account:role/secret-rotation-role', Handler='index.lambda_handler', Code={'ZipFile': rotation_code.encode()} ) # Enable automatic rotation secrets_client = boto3.client('secretsmanager') secrets_client.rotate_secret( SecretId='payment-app/database/credentials', RotationLambdaARN='arn:aws:lambda:region:account:function:rotate-db-secret', RotationRules={ 'AutomaticallyAfterDays': 30 } ) 

Step 4: Pre-Commit Hook to Detect Secrets

#!/usr/bin/env python3 # .git/hooks/pre-commit  import subprocess import re import sys def detect_secrets(): """Detect potential secrets before commit""" # Get staged files  result = subprocess.run( ['git', 'diff', '--cached', '--name-only'], capture_output=True, text=True ) staged_files = result.stdout.strip().split('n') # Secret patterns  patterns = [ (r'passwords*[:=]s*["']([^"']+)["']', 'Password detected'), (r'api[_-]?keys*[:=]s*["']([^"']+)["']', 'API key detected'), (r'secrets*[:=]s*["']([^"']+)["']', 'Secret detected'), (r'-----BEGIN (RSA |OPENSSH )?PRIVATE KEY-----', 'Private key detected'), ] violations = [] for file_path in staged_files: if not file_path: continue try: with open(file_path, 'r') as f: content = f.read() for pattern, message in patterns: if re.search(pattern, content, re.IGNORECASE): violations.append(f"{file_path}: {message}") except Exception: continue if violations: print("❌ SECURITY VIOLATION: Potential secrets detected!") for violation in violations: print(f" - {violation}") print("nPlease use AWS Secrets Manager instead.") print("See: https://docs.aws.amazon.com/secretsmanager/") sys.exit(1) print("✅ No secrets detected in staged files") return 0 if __name__ == '__main__': sys.exit(detect_secrets()) 

Make it executable:

chmod +x .git/hooks/pre-commit 

Phase 2: Security Scanning Strategy

SAST (Static Application Security Testing)

AWS CodeBuild with SAST Tools

# buildspec-sast.yml version: 0.2 phases: pre_build: commands: - echo Installing SAST tools... - | # Install Semgrep for SAST pip install semgrep - | # Install Bandit for Python security scanning pip install bandit build: commands: - echo Running SAST scans... - | # Semgrep scan semgrep --config=auto  --json  --output=semgrep-results.json  . - | # Bandit scan (for Python) bandit -r .  -f json  -o bandit-results.json  || true - | # SonarQube scan (if using) sonar-scanner  -Dsonar.projectKey=payment-app  -Dsonar.sources=.  -Dsonar.host.url=$SONARQUBE_URL  -Dsonar.login=$SONARQUBE_TOKEN post_build: commands: - echo Uploading SAST results... - | # Upload to AWS Security Hub aws securityhub batch-import-findings  --findings file://convert-to-security-hub-format.json - | # Fail build if critical issues found python check-sast-results.py 

SAST Results Checker:

# check-sast-results.py import json import sys def check_sast_results(): """Check SAST results and fail build if critical issues found""" with open('semgrep-results.json', 'r') as f: semgrep_results = json.load(f) critical_issues = [] for result in semgrep_results.get('results', []): severity = result.get('extra', {}).get('severity', '') if severity in ['ERROR', 'WARNING']: critical_issues.append({ 'file': result.get('path', ''), 'message': result.get('message', ''), 'severity': severity }) if critical_issues: print("Critical security issues found:") for issue in critical_issues: print(f" - {issue['file']}: {issue['message']} ({issue['severity']})") sys.exit(1) print("SAST scan passed") return 0 if __name__ == '__main__': sys.exit(check_sast_results()) 

DAST (Dynamic Application Security Testing)

AWS CodeBuild with OWASP ZAP:

# buildspec-dast.yml version: 0.2 phases: pre_build: commands: - echo Installing DAST tools... - | # Install OWASP ZAP wget https://github.com/zaproxy/zaproxy/releases/download/v2.12.0/ZAP_2.12.0_Linux.tar.gz tar -xzf ZAP_2.12.0_Linux.tar.gz build: commands: - echo Starting application for DAST... - | # Start application (example) docker-compose up -d payment-app sleep 30 # Wait for app to be ready - | # Run OWASP ZAP baseline scan ./ZAP_2.12.0/zap-baseline.py  -t http://localhost:8080  -J zap-report.json - | # Run OWASP ZAP full scan (more thorough) ./ZAP_2.12.0/zap-full-scan.py  -t http://localhost:8080  -J zap-full-report.json  -I # Ignore warnings post_build: commands: - echo Processing DAST results... - | # Convert and upload to Security Hub python process-dast-results.py zap-report.json - | # Fail if critical vulnerabilities found python check-dast-results.py zap-report.json 

DAST Results Processor:

# process-dast-results.py import json import sys import boto3 securityhub = boto3.client('securityhub') def process_dast_results(report_file): """Process DAST results and send to Security Hub""" with open(report_file, 'r') as f: zap_results = json.load(f) findings = [] for site in zap_results.get('site', []): for alert in site.get('alerts', []): severity_map = { 'High': 'HIGH', 'Medium': 'MEDIUM', 'Low': 'LOW', 'Informational': 'INFORMATIONAL' } severity = severity_map.get(alert.get('risk', ''), 'INFORMATIONAL') if severity in ['HIGH', 'MEDIUM']: finding = { 'SchemaVersion': '2018-10-08', 'Id': f"zap-{alert.get('pluginid', 'unknown')}", 'ProductArn': 'arn:aws:securityhub:region:account:product/account/default', 'GeneratorId': 'owasp-zap', 'AwsAccountId': 'account-id', 'Types': ['Security Findings'], 'CreatedAt': '2024-01-01T00:00:00Z', 'UpdatedAt': '2024-01-01T00:00:00Z', 'Severity': { 'Label': severity }, 'Title': alert.get('name', 'Unknown vulnerability'), 'Description': alert.get('desc', ''), 'Remediation': { 'Recommendation': { 'Text': alert.get('solution', '') } } } findings.append(finding) if findings: # Batch import to Security Hub  securityhub.batch_import_findings(Findings=findings) print(f"✅ Imported {len(findings)} findings to Security Hub") return findings if __name__ == '__main__': if len(sys.argv) < 2: print("Usage: python process-dast-results.py <zap-report.json>") sys.exit(1) findings = process_dast_results(sys.argv[1]) sys.exit(0 if not findings else 1) 

Container Image Scanning

Amazon ECR Image Scanning:

# Enable automatic image scanning on push aws ecr put-image-scanning-configuration  --repository-name payment-app  --image-scanning-configuration scanOnPush=true # Scan existing images aws ecr start-image-scan  --repository-name payment-app  --image-id imageTag=latest # Get scan results aws ecr describe-image-scan-findings  --repository-name payment-app  --image-id imageTag=latest 

CodeBuild Integration:

# buildspec-container-scan.yml version: 0.2 phases: pre_build: commands: - echo Logging in to Amazon ECR... - aws ecr get-login-password --region $AWS_DEFAULT_REGION | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com - REPOSITORY_URI=$AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME - COMMIT_HASH=$(echo $CODEBUILD_RESOLVED_SOURCE_VERSION | cut -c 1-7) - IMAGE_TAG=${COMMIT_HASH:=latest} build: commands: - echo Building Docker image... - docker build -t $IMAGE_REPO_NAME:$IMAGE_TAG . - docker tag $IMAGE_REPO_NAME:$IMAGE_TAG $REPOSITORY_URI:$IMAGE_TAG post_build: commands: - echo Pushing Docker image... - docker push $REPOSITORY_URI:$IMAGE_TAG - echo Waiting for image scan to complete... - | # Wait for scan to complete (ECR scans automatically on push) aws ecr wait image-scan-complete  --repository-name $IMAGE_REPO_NAME  --image-id imageTag=$IMAGE_TAG  --max-attempts 30  --delay 10 - echo Retrieving scan results... - | # Get scan findings SCAN_RESULTS=$(aws ecr describe-image-scan-findings  --repository-name $IMAGE_REPO_NAME  --image-id imageTag=$IMAGE_TAG  --query 'imageScanFindings'  --output json) - | # Check for critical vulnerabilities CRITICAL_COUNT=$(echo $SCAN_RESULTS | jq '.findingCounts.CRITICAL // 0') HIGH_COUNT=$(echo $SCAN_RESULTS | jq '.findingCounts.HIGH // 0') if [ "$CRITICAL_COUNT" -gt 0 ] || [ "$HIGH_COUNT" -gt 5 ]; then echo "❌ Critical vulnerabilities found in image!" echo "Critical: $CRITICAL_COUNT, High: $HIGH_COUNT" exit 1 fi echo "Image scan passed (Critical: $CRITICAL_COUNT, High: $HIGH_COUNT)" - | # Export scan results to Security Hub python export-ecr-findings.py $IMAGE_REPO_NAME $IMAGE_TAG 

ECR Findings Exporter:

# export-ecr-findings.py import boto3 import json import sys def export_ecr_findings(repo_name, image_tag): """Export ECR scan findings to Security Hub""" ecr = boto3.client('ecr') securityhub = boto3.client('securityhub') # Get scan findings  response = ecr.describe_image_scan_findings( repositoryName=repo_name, imageId={'imageTag': image_tag} ) findings = response.get('imageScanFindings', {}) findings_list = findings.get('findings', []) security_hub_findings = [] for finding in findings_list: severity_map = { 'CRITICAL': 'CRITICAL', 'HIGH': 'HIGH', 'MEDIUM': 'MEDIUM', 'LOW': 'LOW', 'INFORMATIONAL': 'INFORMATIONAL' } severity = severity_map.get(finding.get('severity', ''), 'INFORMATIONAL') # Only export HIGH and CRITICAL  if severity in ['CRITICAL', 'HIGH']: security_hub_finding = { 'SchemaVersion': '2018-10-08', 'Id': f"ecr-{repo_name}-{image_tag}-{finding.get('name', 'unknown')}", 'ProductArn': f'arn:aws:securityhub:region:account:product/account/default', 'GeneratorId': 'amazon-ecr-image-scan', 'AwsAccountId': 'account-id', 'Types': ['Vulnerabilities'], 'CreatedAt': finding.get('firstObservedAt', ''), 'UpdatedAt': finding.get('lastObservedAt', ''), 'Severity': { 'Label': severity }, 'Title': finding.get('name', 'Unknown vulnerability'), 'Description': finding.get('description', ''), 'Remediation': { 'Recommendation': { 'Text': finding.get('remediation', {}).get('recommendation', {}).get('text', '') } }, 'Resources': [{ 'Type': 'AwsEcrContainerImage', 'Id': f"{repo_name}:{image_tag}", 'Region': 'us-east-1' }] } security_hub_findings.append(security_hub_finding) if security_hub_findings: securityhub.batch_import_findings(Findings=security_hub_findings) print(f"✅ Exported {len(security_hub_findings)} findings to Security Hub") return security_hub_findings if __name__ == '__main__': if len(sys.argv) < 3: print("Usage: python export-ecr-findings.py <repo-name> <image-tag>") sys.exit(1) findings = export_ecr_findings(sys.argv[1], sys.argv[2]) sys.exit(0) 

Dependency Scanning

Scanning Dependencies for Vulnerabilities:

# buildspec-dependency-scan.yml version: 0.2 phases: pre_build: commands: - echo Installing dependency scanning tools... - | # For Python pip install safety - | # For Node.js npm install -g npm-audit-resolver - | # For Java (Maven) # Use OWASP Dependency-Check wget https://github.com/jeremylong/DependencyCheck/releases/download/v8.4.0/dependency-check-8.4.0-release.zip unzip dependency-check-8.4.0-release.zip build: commands: - echo Scanning dependencies... - | # Python safety check safety check --json --output safety-report.json || true - | # Node.js npm audit npm audit --json > npm-audit-report.json || true - | # Java/Maven dependency check ./dependency-check/bin/dependency-check.sh  --project payment-app  --scan .  --format JSON  --out dependency-check-report.json post_build: commands: - echo Processing dependency scan results... - | # Process and upload to Security Hub python process-dependency-results.py - | # Fail build if critical vulnerabilities found python check-dependency-results.py 

Phase 3: Complete DevSecOps Pipeline

AWS CodePipeline with Security Gates

{ "pipeline": { "name": "payment-app-devsecops-pipeline", "roleArn": "arn:aws:iam::account:role/CodePipelineServiceRole", "artifactStore": { "type": "S3", "location": "payment-app-artifacts" }, "stages": [ { "name": "Source", "actions": [{ "name": "SourceAction", "actionTypeId": { "category": "Source", "owner": "AWS", "provider": "CodeCommit", "version": "1" }, "outputArtifacts": [{"name": "SourceOutput"}], "configuration": { "RepositoryName": "payment-app", "BranchName": "main" } }] }, { "name": "PreBuildSecurity", "actions": [{ "name": "SecretScan", "actionTypeId": { "category": "Build", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "SourceOutput"}], "outputArtifacts": [{"name": "SecretScanOutput"}], "configuration": { "ProjectName": "payment-app-secret-scan" } }] }, { "name": "Build", "actions": [{ "name": "BuildAndSAST", "actionTypeId": { "category": "Build", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "SourceOutput"}], "outputArtifacts": [{"name": "BuildOutput"}], "configuration": { "ProjectName": "payment-app-build-sast" } }] }, { "name": "SecurityGates", "actions": [ { "name": "DependencyScan", "actionTypeId": { "category": "Build", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "BuildOutput"}], "outputArtifacts": [{"name": "DependencyScanOutput"}], "configuration": { "ProjectName": "payment-app-dependency-scan" } }, { "name": "ContainerBuildAndScan", "actionTypeId": { "category": "Build", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "BuildOutput"}], "outputArtifacts": [{"name": "ContainerOutput"}], "configuration": { "ProjectName": "payment-app-container-build-scan" } } ] }, { "name": "DAST", "actions": [{ "name": "DynamicScan", "actionTypeId": { "category": "Test", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "ContainerOutput"}], "outputArtifacts": [{"name": "DASTOutput"}], "configuration": { "ProjectName": "payment-app-dast" } }] }, { "name": "DeployToStaging", "actions": [{ "name": "DeployStaging", "actionTypeId": { "category": "Deploy", "owner": "AWS", "provider": "ECS", "version": "1" }, "inputArtifacts": [{"name": "ContainerOutput"}], "configuration": { "ClusterName": "payment-staging", "ServiceName": "payment-service" } }] }, { "name": "ComplianceCheck", "actions": [{ "name": "ComplianceValidation", "actionTypeId": { "category": "Test", "owner": "AWS", "provider": "CodeBuild", "version": "1" }, "inputArtifacts": [{"name": "DASTOutput"}], "configuration": { "ProjectName": "payment-app-compliance-check" } }] }, { "name": "DeployToProduction", "actions": [{ "name": "DeployProduction", "actionTypeId": { "category": "Deploy", "owner": "AWS", "provider": "ECS", "version": "1" }, "inputArtifacts": [{"name": "ContainerOutput"}], "configuration": { "ClusterName": "payment-production", "ServiceName": "payment-service" } }] } ] } } 

Security Gates Implementation

Lambda Function for Security Gate Evaluation:

# security-gate-evaluator.py import boto3 import json securityhub = boto3.client('securityhub') codebuild = boto3.client('codebuild') def evaluate_security_gates(build_id): """Evaluate security gates before allowing deployment""" # Get Security Hub findings for this build  findings = securityhub.get_findings( Filters={ 'ResourceId': [{'Value': build_id, 'Comparison': 'EQUALS'}], 'SeverityLabel': [ {'Value': 'CRITICAL', 'Comparison': 'EQUALS'}, {'Value': 'HIGH', 'Comparison': 'EQUALS'} ] } ) critical_findings = [ f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'CRITICAL' ] high_findings = [ f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'HIGH' ] # Gate rules  gate_passed = True reasons = [] # Rule 1: No CRITICAL findings allowed  if critical_findings: gate_passed = False reasons.append(f"{len(critical_findings)} CRITICAL findings found") # Rule 2: Maximum 3 HIGH findings allowed  if len(high_findings) > 3: gate_passed = False reasons.append(f"{len(high_findings)} HIGH findings found (max 3 allowed)") # Rule 3: No hardcoded secrets  secret_findings = [ f for f in findings.get('Findings', []) if 'secret' in f.get('Title', '').lower() or 'password' in f.get('Title', '').lower() ] if secret_findings: gate_passed = False reasons.append(f"{len(secret_findings)} secret-related findings found") result = { 'gate_passed': gate_passed, 'reasons': reasons, 'critical_count': len(critical_findings), 'high_count': len(high_findings) } return result def lambda_handler(event, context): """Lambda handler for security gate evaluation""" build_id = event.get('build_id') result = evaluate_security_gates(build_id) if not result['gate_passed']: # Stop pipeline  raise Exception(f"Security gate failed: {', '.join(result['reasons'])}") return { 'statusCode': 200, 'body': json.dumps(result) } 

Phase 4: Compliance Automation

AWS Config for Compliance Monitoring

Enable AWS Config:

# Enable Config aws configservice put-configuration-recorder  --configuration-recorder name=default,roleArn=arn:aws:iam::account:role/ConfigRole # Start recording aws configservice start-configuration-recorder  --configuration-recorder-name default 

PCI-DSS Compliance Rules:

{ "ConfigRuleName": "pci-dss-encryption-check", "Description": "Check that EBS volumes are encrypted (PCI-DSS requirement)", "Scope": { "ComplianceResourceTypes": ["AWS::EC2::Volume"] }, "Source": { "Owner": "AWS", "SourceIdentifier": "ENCRYPTED_VOLUMES" } } 

Custom Compliance Rule:

# custom-compliance-rule.py import boto3 import json config = boto3.client('config') def evaluate_compliance(configuration_item): """Evaluate if resource is compliant""" compliance_status = 'COMPLIANT' annotation = '' # Example: Check if RDS instance has encryption enabled  if configuration_item['resourceType'] == 'AWS::RDS::DBInstance': if not configuration_item.get('configuration', {}).get('storageEncrypted', False): compliance_status = 'NON_COMPLIANT' annotation = 'RDS instance must have encryption enabled for PCI-DSS compliance' # Example: Check if security groups allow unrestricted access  if configuration_item['resourceType'] == 'AWS::EC2::SecurityGroup': ip_permissions = configuration_item.get('configuration', {}).get('ipPermissions', []) for perm in ip_permissions: for ip_range in perm.get('ipRanges', []): if ip_range.get('cidrIp') == '0.0.0.0/0': compliance_status = 'NON_COMPLIANT' annotation = 'Security group allows unrestricted access (0.0.0.0/0)' return { 'compliance_type': compliance_status, 'annotation': annotation } def lambda_handler(event, context): """Lambda handler for Config custom rule""" configuration_item = json.loads(event['invokingEvent'])['configurationItem'] evaluation = evaluate_compliance(configuration_item) config.put_evaluations( Evaluations=[{ 'ComplianceResourceType': configuration_item['resourceType'], 'ComplianceResourceId': configuration_item['resourceId'], 'ComplianceType': evaluation['compliance_type'], 'Annotation': evaluation['annotation'], 'OrderingTimestamp': configuration_item['configurationItemCaptureTime'] }] ) return evaluation 

Automated Compliance Reporting

# compliance-reporter.py import boto3 from datetime import datetime config = boto3.client('config') s3 = boto3.client('s3') def generate_compliance_report(): """Generate compliance report for audit""" # Get compliance summary  response = config.get_compliance_summary_by_config_rule() report = { 'timestamp': datetime.utcnow().isoformat(), 'compliance_summary': response.get('ComplianceSummariesByConfigRule', []), 'overall_compliance': calculate_overall_compliance(response) } # Generate detailed findings  findings = [] for rule_summary in response.get('ComplianceSummariesByConfigRule', []): if rule_summary.get('ComplianceSummary', {}).get('NonCompliantResourceCount', {}).get('CappedCount', 0) > 0: findings.append({ 'rule_name': rule_summary.get('ConfigRuleName', ''), 'non_compliant_count': rule_summary.get('ComplianceSummary', {}).get('NonCompliantResourceCount', {}).get('CappedCount', 0) }) report['findings'] = findings # Save to S3 for audit trail  s3.put_object( Bucket='compliance-reports', Key=f"compliance-report-{datetime.utcnow().strftime('%Y-%m-%d')}.json", Body=json.dumps(report, indent=2) ) return report def calculate_overall_compliance(response): """Calculate overall compliance percentage""" total_resources = 0 compliant_resources = 0 for rule_summary in response.get('ComplianceSummariesByConfigRule', []): summary = rule_summary.get('ComplianceSummary', {}) total_resources += summary.get('ComplianceResourceCount', {}).get('CappedCount', 0) compliant_resources += summary.get('CompliantResourceCount', {}).get('CappedCount', 0) if total_resources == 0: return 100.0 return (compliant_resources / total_resources) * 100 # Schedule with EventBridge import boto3 events = boto3.client('events') events.put_rule( Name='daily-compliance-report', ScheduleExpression='cron(0 2 * * ? *)', # Daily at 2 AM  State='ENABLED' ) events.put_targets( Rule='daily-compliance-report', Targets=[{ 'Id': '1', 'Arn': 'arn:aws:lambda:region:account:function:compliance-reporter', 'Input': json.dumps({}) }] ) 

Phase 5: Security Hub Integration

Centralized Security Findings

Enable AWS Security Hub:

# Enable Security Hub aws securityhub enable-security-hub # Enable security standards aws securityhub batch-enable-standards  --standards-subscription-requests  StandardsArn=arn:aws:securityhub:region::standards/aws-foundational-security-best-practices/v/1.0.0  StandardsArn=arn:aws:securityhub:region::standards/pci-dss/v/3.2.1 

Aggregate Findings from All Sources:

# security-findings-aggregator.py import boto3 from datetime import datetime, timedelta securityhub = boto3.client('securityhub') def aggregate_security_findings(): """Aggregate security findings from all sources""" # Get findings from last 24 hours  end_time = datetime.utcnow() start_time = end_time - timedelta(days=1) findings = securityhub.get_findings( Filters={ 'CreatedAt': [{ 'Start': start_time.isoformat(), 'End': end_time.isoformat() }], 'SeverityLabel': [ {'Value': 'CRITICAL', 'Comparison': 'EQUALS'}, {'Value': 'HIGH', 'Comparison': 'EQUALS'} ] }, MaxResults=100 ) # Group by source  findings_by_source = {} for finding in findings.get('Findings', []): source = finding.get('ProductFields', {}).get('aws/securityhub/SourceIdentifier', 'unknown') if source not in findings_by_source: findings_by_source[source] = [] findings_by_source[source].append(finding) # Generate summary  summary = { 'timestamp': datetime.utcnow().isoformat(), 'total_findings': len(findings.get('Findings', [])), 'findings_by_source': { source: len(finds) for source, finds in findings_by_source.items() }, 'critical_count': len([f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'CRITICAL']), 'high_count': len([f for f in findings.get('Findings', []) if f.get('Severity', {}).get('Label') == 'HIGH']) } return summary 

Phase 6: Pipeline Performance Optimization

Parallel Execution

Run Security Scans in Parallel:

# buildspec-parallel-security.yml version: 0.2 phases: build: commands: - echo Running security scans in parallel... - | # Run SAST, dependency scan, and secret scan in parallel semgrep --config=auto . & safety check & git-secrets --scan & wait # Wait for all to complete - echo All security scans completed 

Caching for Faster Builds

CodeBuild Cache Configuration:

{ "cache": { "type": "LOCAL", "modes": [ "LOCAL_DOCKER_LAYER_CACHE", "LOCAL_SOURCE_CACHE" ] } } 

Docker Layer Caching:

# Dockerfile with layer caching FROM maven:3.8-openjdk-11 AS dependencies COPY pom.xml . RUN mvn dependency:go-offline FROM dependencies AS build COPY src ./src RUN mvn package -DskipTests FROM openjdk:11-jre-slim COPY --from=build /app/target/*.jar app.jar ENTRYPOINT ["java", "-jar", "app.jar"] 

Fail-Fast Strategy

Early Failure Detection:

# fail-fast-security-checks.py import sys import subprocess def run_fast_security_checks(): """Run quick security checks that fail fast""" checks = [ ('Secret detection', 'git-secrets --scan'), ('Basic SAST', 'semgrep --config=auto --error'), ('Dependency vulnerabilities', 'safety check --short-report') ] failed_checks = [] for check_name, command in checks: print(f"Running {check_name}...") result = subprocess.run( command.split(), capture_output=True, timeout=60 # Fail fast with timeout  ) if result.returncode != 0: failed_checks.append(check_name) print(f"{check_name} failed") print(result.stdout.decode()) else: print(f"{check_name} passed") if failed_checks: print(f"n❌ Failed checks: {', '.join(failed_checks)}") sys.exit(1) print("n✅ All fast security checks passed") return 0 if __name__ == '__main__': sys.exit(run_fast_security_checks()) 

Phase 7: Team Training and Adoption

Developer Onboarding

Security Training Materials:

# Security Best Practices Guide ## Secret Management - Never commit secrets to version control - Use AWS Secrets Manager for all secrets - Rotate secrets regularly ## Dependency Management - Keep dependencies up to date - Review security advisories - Use dependency scanning tools ## Code Security - Follow OWASP Top 10 guidelines - Use SAST tools before committing - Review security findings promptly 

Automated Security Reminders

GitHub/GitLab Integration:

# security-reminder-bot.py import boto3 import requests def send_security_reminder(pr_number, findings_count): """Send reminder about security findings in PR""" message = f""" Security Review Required PR #{pr_number} has {findings_count} security findings that need attention. Please review and address: - Critical findings must be fixed before merge - High findings should be addressed or justified - Medium/Low findings can be tracked as technical debt View findings: https://security-hub.aws.amazon.com/findings """ # Send to Slack/Teams  webhook_url = os.environ.get('SLACK_WEBHOOK_URL') requests.post(webhook_url, json={'text': message}) 

Security Champions Program

Identify and Train Security Champions:

# security-champion-tracker.py import boto3 def identify_security_champions(): """Identify developers who actively address security issues""" codecommit = boto3.client('codecommit') # Analyze commit history for security-related commits  # Track who fixes security issues  # Recognize top contributors  champions = [ { 'name': 'Developer Name', 'security_fixes': 15, 'areas': ['SAST fixes', 'Dependency updates', 'Secret management'] } ] return champions 

Metrics and Success Criteria

Key Performance Indicators

# devsecops-metrics.py import boto3 from datetime import datetime, timedelta def calculate_devsecops_metrics(): """Calculate DevSecOps pipeline metrics""" codebuild = boto3.client('codebuild') securityhub = boto3.client('securityhub') # Get pipeline metrics  builds = codebuild.batch_get_builds( ids=get_recent_build_ids() ) metrics = { 'pipeline_duration': calculate_avg_pipeline_duration(builds), 'security_scan_time': calculate_avg_scan_time(builds), 'findings_detected': get_findings_count_last_30_days(), 'findings_fixed': get_findings_fixed_count(), 'deployment_frequency': get_deployment_frequency(), 'mean_time_to_remediate': calculate_mttr() } return metrics def calculate_avg_pipeline_duration(builds): """Calculate average pipeline duration""" durations = [] for build in builds.get('builds', []): start = build.get('startTime') end = build.get('endTime') if start and end: duration = (end - start).total_seconds() durations.append(duration) return sum(durations) / len(durations) if durations else 0 # Target metrics target_metrics = { 'pipeline_duration_minutes': 15, # Target: < 15 minutes  'security_scan_time_minutes': 5, # Target: < 5 minutes  'findings_detected_per_week': 0, # Target: Reduce over time  'mean_time_to_remediate_hours': 24, # Target: < 24 hours  'deployment_frequency_per_day': 1 # Target: Maintain daily deployments } 

Best Practices Summary

Do’s ✅

  1. Shift-Left: Run security checks as early as possible
  2. Automate Everything: Don’t rely on manual security reviews
  3. Fail Fast: Catch issues early, fail builds quickly
  4. Centralize Findings: Use Security Hub for unified view
  5. Educate Team: Security is everyone’s responsibility
  6. Measure Everything: Track metrics to improve

Don’ts ❌

  1. Don’t Block Developers: Balance security with velocity
  2. Don’t Ignore False Positives: Tune rules to reduce noise
  3. Don’t Skip Compliance: Automate compliance checks
  4. Don’t Forget Runtime: Security doesn’t end at deployment
  5. Don’t Set It and Forget It: Continuously improve the pipeline

Conclusion

Building a DevSecOps pipeline on AWS requires integrating security at every stage of the development lifecycle. Key takeaways:

  1. AWS Secrets Manager eliminates hardcoded secrets
  2. Multi-layered scanning (SAST, DAST, container, dependency) catches issues early
  3. Security Hub provides centralized visibility
  4. Automated compliance ensures regulatory requirements are met
  5. Security gates prevent vulnerable code from reaching production
  6. Performance optimization maintains daily deployment frequency

The result? A secure, compliant, and fast development pipeline that builds security into your culture, not just your tools.

Additional Resources


Source: DEV Community.


Leave a Reply

Your email address will not be published. Required fields are marked *

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.

The reCAPTCHA verification period has expired. Please reload the page.