feat: initial Argus - AI-powered FinOps agent

Argus is an all-seeing eye on your AWS costs:
- Scan for optimization opportunities (unused resources, oversized instances)
- Evaluate Terraform plans for cost impact
- Generate weekly/monthly cost reports
- Integrate with Atlantis for pre-apply cost analysis

Components:
- CLI tool (argus scan/evaluate/report)
- GitHub Action for CI/CD integration
- AWS Scanner for resource analysis
- AI engine for intelligent recommendations

Features:
- Unused EBS volumes, idle EC2, oversized RDS
- Cost delta on Terraform changes
- Atlantis integration
- Slack/email notifications
This commit is contained in:
Argus Bot
2026-02-01 06:42:51 +00:00
commit d775525b2b
12 changed files with 1073 additions and 0 deletions

58
.gitignore vendored Normal file
View File

@@ -0,0 +1,58 @@
# Python
__pycache__/
*.py[cod]
*$py.class
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
venv/
.venv/
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# Testing
.tox/
.nox/
.coverage
htmlcov/
.pytest_cache/
# Terraform
.terraform/
*.tfstate
*.tfstate.*
*.tfplan
crash.log
# Secrets
.env
*.pem
*.key
secrets/
# OS
.DS_Store
Thumbs.db
# Temp
/tmp/
*.log

15
LICENSE Normal file
View File

@@ -0,0 +1,15 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

292
README.md Normal file
View File

@@ -0,0 +1,292 @@
# Argus
**AI-powered FinOps agent for AWS** — Find waste, optimize costs, evaluate changes.
[![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](LICENSE)
## What is Argus?
Argus is an all-seeing eye on your AWS costs. It uses AI to:
- 🔍 **Find waste** — Unused resources, oversized instances, missing reservations
- 💰 **Estimate impact** — Cost analysis on Terraform changes before you apply
- 🤖 **Auto-optimize** — Generate PRs to fix inefficiencies
- 📊 **Report trends** — Weekly/monthly cost reports with actionable insights
## Features
### Standalone Mode
Run Argus on a schedule to continuously find savings:
```bash
# Weekly cost report
argus report --period weekly
# Find all optimization opportunities
argus scan --output recommendations.md
# Auto-fix with PR
argus fix --create-pr
```
### Atlantis Integration
Add cost analysis to your Terraform PR workflow:
```yaml
# atlantis.yaml
workflows:
default:
plan:
steps:
- init
- plan
- run: argus evaluate --plan-file $PLANFILE
```
**Result:**
```
💰 Argus Cost Analysis
This PR will change your monthly spend:
+ aws_rds_cluster.main +$680/mo
+ aws_nat_gateway.private +$32/mo
- aws_instance.deprecated -$45/mo
─────────────────────────────────────────
Net Impact: +$667/mo (+15%)
⚠️ Suggestions:
• Consider db.r5.large instead of xlarge (-$340/mo)
• VPC endpoints could replace NAT for S3 traffic
```
### GitHub Action
Use Argus as a GitHub Action:
```yaml
- uses: ghndrx/argus-action@v1
with:
aws-role-arn: ${{ secrets.AWS_ROLE_ARN }}
mode: evaluate # or 'scan', 'report'
```
## Quick Start
### 1. Install
```bash
# Via pip
pip install argus-finops
# Via Docker
docker pull ghcr.io/ghndrx/argus:latest
# Via GitHub Action
uses: ghndrx/argus-action@v1
```
### 2. Configure AWS Access
```bash
# Option A: IAM Role (recommended)
export AWS_ROLE_ARN=arn:aws:iam::123456789012:role/argus
# Option B: Environment variables
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
```
### 3. Configure AI Provider
```bash
# AWS Bedrock (recommended)
export ARGUS_AI_PROVIDER=bedrock
export ARGUS_AI_MODEL=anthropic.claude-3-5-sonnet-20241022-v2:0
# Or OpenAI
export ARGUS_AI_PROVIDER=openai
export OPENAI_API_KEY=...
```
### 4. Run
```bash
# Generate cost report
argus report
# Scan for optimizations
argus scan
# Evaluate a Terraform plan
argus evaluate --plan-file tfplan.json
```
## What Argus Finds
| Category | Examples | Typical Savings |
|----------|----------|-----------------|
| **Unused Resources** | Unattached EBS, idle load balancers, orphaned snapshots | 10-20% |
| **Oversized Instances** | EC2, RDS, ElastiCache running at <20% utilization | 20-40% |
| **Missing Reservations** | Steady-state workloads without RIs or Savings Plans | 30-60% |
| **Architecture Issues** | NAT Gateway for S3 traffic, cross-AZ data transfer | 5-15% |
| **Storage Optimization** | S3 lifecycle policies, EBS type optimization | 10-30% |
## Configuration
```yaml
# argus.yaml
scan:
regions:
- us-east-1
- us-west-2
exclude_tags:
- Key: argus-ignore
Value: "true"
thresholds:
idle_cpu_percent: 10
idle_days: 14
min_savings_to_report: 10 # dollars
notifications:
slack_webhook: https://hooks.slack.com/...
email: finops@company.com
ai:
provider: bedrock
model: anthropic.claude-3-5-sonnet-20241022-v2:0
```
## Atlantis Integration
### Setup
1. Add Argus to your Atlantis server
2. Configure the workflow:
```yaml
# atlantis.yaml
workflows:
default:
plan:
steps:
- init
- plan
- run: |
argus evaluate \
--plan-file $PLANFILE \
--output-format github-comment \
> $OUTPUT_FILE
apply:
steps:
- apply
```
### How It Works
1. Developer opens PR with Terraform changes
2. Atlantis runs `terraform plan`
3. Argus analyzes the plan:
- Calculates cost delta
- Identifies optimization opportunities
- Checks for cost policy violations
4. Argus comments on PR with findings
5. Team reviews cost impact before merge
## GitHub Action
### Evaluate PR Changes
```yaml
name: Argus Cost Check
on: [pull_request]
jobs:
cost-check:
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
pull-requests: write
steps:
- uses: actions/checkout@v4
- uses: aws-actions/configure-aws-credentials@v4
with:
role-to-assume: ${{ secrets.AWS_ROLE_ARN }}
aws-region: us-east-1
- uses: hashicorp/setup-terraform@v3
- run: terraform init && terraform plan -out=tfplan
- uses: ghndrx/argus-action@v1
with:
mode: evaluate
plan-file: tfplan
comment-on-pr: true
```
### Scheduled Cost Report
```yaml
name: Weekly Cost Report
on:
schedule:
- cron: '0 9 * * 1' # Monday 9am
jobs:
report:
runs-on: ubuntu-latest
steps:
- uses: ghndrx/argus-action@v1
with:
mode: report
period: weekly
slack-webhook: ${{ secrets.SLACK_WEBHOOK }}
```
## Architecture
```
┌──────────────────────────────────────────────────────────────┐
│ Argus │
├──────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Scanner │ │ Evaluator │ │ Reporter │ │
│ │ │ │ │ │ │ │
│ │ • AWS APIs │ │ • TF Plans │ │ • Markdown │ │
│ │ • Usage │ │ • Cost Calc │ │ • Slack │ │
│ │ • Pricing │ │ • AI Review │ │ • Email │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ AI Engine │ │
│ │ (Bedrock) │ │
│ └─────────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
```
## Roadmap
- [x] Cost evaluation on Terraform plans
- [x] AWS resource scanning
- [x] Atlantis integration
- [x] GitHub Action
- [ ] Slack bot interface
- [ ] Multi-cloud (GCP, Azure)
- [ ] Cost anomaly detection
- [ ] Budget enforcement policies
- [ ] Recommendation auto-apply
## Contributing
See [CONTRIBUTING.md](CONTRIBUTING.md)
## License
Apache 2.0 - See [LICENSE](LICENSE)

138
action/action.yml Normal file
View File

@@ -0,0 +1,138 @@
name: 'Argus FinOps'
description: 'AI-powered cost analysis for AWS infrastructure'
author: 'ghndrx'
branding:
icon: 'dollar-sign'
color: 'green'
inputs:
mode:
description: 'Operation mode: scan, evaluate, or report'
required: true
default: 'evaluate'
plan-file:
description: 'Terraform plan file (for evaluate mode)'
required: false
regions:
description: 'AWS regions to scan (comma-separated)'
required: false
default: 'us-east-1'
period:
description: 'Report period: daily, weekly, monthly'
required: false
default: 'weekly'
ai-provider:
description: 'AI provider: bedrock or openai'
required: false
default: 'bedrock'
ai-model:
description: 'AI model ID'
required: false
default: 'anthropic.claude-3-5-sonnet-20241022-v2:0'
comment-on-pr:
description: 'Comment results on PR'
required: false
default: 'true'
fail-on-increase:
description: 'Fail if cost increases above threshold'
required: false
default: 'false'
fail-threshold:
description: 'Cost increase threshold (monthly $)'
required: false
default: '100'
slack-webhook:
description: 'Slack webhook for notifications'
required: false
output-format:
description: 'Output format: markdown, json, github'
required: false
default: 'github'
outputs:
monthly-delta:
description: 'Monthly cost change in dollars'
total-savings:
description: 'Total potential savings identified'
findings-count:
description: 'Number of optimization findings'
report:
description: 'Full report content'
runs:
using: 'composite'
steps:
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install Argus
shell: bash
run: |
pip install boto3 requests
# In production: pip install argus-finops
- name: Run Argus
id: argus
shell: bash
env:
ARGUS_AI_PROVIDER: ${{ inputs.ai-provider }}
ARGUS_AI_MODEL: ${{ inputs.ai-model }}
run: |
case "${{ inputs.mode }}" in
scan)
python -m argus scan \
--regions ${{ inputs.regions }} \
--output-format ${{ inputs.output-format }} \
--output /tmp/argus-report.md
;;
evaluate)
python -m argus evaluate \
--plan-file "${{ inputs.plan-file }}" \
--output-format ${{ inputs.output-format }} \
--output /tmp/argus-report.md \
${{ inputs.fail-on-increase == 'true' && '--fail-on-increase' || '' }} \
--fail-threshold ${{ inputs.fail-threshold }}
;;
report)
python -m argus report \
--period ${{ inputs.period }} \
--regions ${{ inputs.regions }} \
--output-format ${{ inputs.output-format }} \
--output /tmp/argus-report.md \
${{ inputs.slack-webhook && format('--slack-webhook {0}', inputs.slack-webhook) || '' }}
;;
esac
# Set outputs
echo "report<<EOF" >> $GITHUB_OUTPUT
cat /tmp/argus-report.md >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: Comment on PR
if: inputs.comment-on-pr == 'true' && github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const report = `${{ steps.argus.outputs.report }}`;
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## 💰 Argus Cost Analysis\n\n${report}`
});

72
pyproject.toml Normal file
View File

@@ -0,0 +1,72 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "argus-finops"
version = "0.1.0"
description = "AI-powered FinOps agent for AWS"
readme = "README.md"
license = "Apache-2.0"
requires-python = ">=3.10"
authors = [
{ name = "ghndrx" }
]
keywords = ["aws", "finops", "cost", "optimization", "terraform", "ai"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Intended Audience :: System Administrators",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: System :: Systems Administration",
]
dependencies = [
"boto3>=1.34.0",
"requests>=2.31.0",
"pyyaml>=6.0",
"rich>=13.0.0",
"typer>=0.9.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4.0",
"pytest-cov>=4.1.0",
"black>=23.0.0",
"ruff>=0.1.0",
"mypy>=1.7.0",
]
[project.scripts]
argus = "argus.cli:main"
[project.urls]
Homepage = "https://github.com/ghndrx/argus"
Documentation = "https://github.com/ghndrx/argus#readme"
Repository = "https://github.com/ghndrx/argus"
Issues = "https://github.com/ghndrx/argus/issues"
[tool.hatch.build.targets.wheel]
packages = ["src/argus"]
[tool.black]
line-length = 100
target-version = ["py310", "py311", "py312"]
[tool.ruff]
line-length = 100
select = ["E", "F", "I", "N", "W"]
[tool.mypy]
python_version = "3.11"
warn_return_any = true
warn_unused_configs = true
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-v --cov=argus"

0
src/argus/__init__.py Normal file
View File

0
src/argus/ai/__init__.py Normal file
View File

177
src/argus/cli.py Normal file
View File

@@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
Argus CLI - AI-powered FinOps agent for AWS
"""
import argparse
import json
import sys
from pathlib import Path
from argus.scanner import AWSScanner
from argus.evaluator import TerraformEvaluator
from argus.reporter import Reporter
from argus.ai import AIEngine
def cmd_scan(args):
"""Scan AWS account for optimization opportunities."""
scanner = AWSScanner(
regions=args.regions,
exclude_tags=args.exclude_tags
)
findings = scanner.scan()
ai = AIEngine()
recommendations = ai.analyze_findings(findings)
reporter = Reporter(format=args.output_format)
report = reporter.generate_scan_report(findings, recommendations)
if args.output:
Path(args.output).write_text(report)
print(f"Report written to {args.output}")
else:
print(report)
if args.create_pr:
# TODO: Create PR with fixes
pass
return 0 if not findings.critical else 1
def cmd_evaluate(args):
"""Evaluate Terraform plan for cost impact."""
evaluator = TerraformEvaluator()
# Load plan
if args.plan_file.endswith('.json'):
plan = json.loads(Path(args.plan_file).read_text())
else:
plan = evaluator.parse_plan_binary(args.plan_file)
# Calculate cost delta
cost_analysis = evaluator.analyze_cost_impact(plan)
# AI review
ai = AIEngine()
recommendations = ai.review_plan(plan, cost_analysis)
# Generate report
reporter = Reporter(format=args.output_format)
report = reporter.generate_plan_report(cost_analysis, recommendations)
if args.output:
Path(args.output).write_text(report)
else:
print(report)
# Exit code based on thresholds
if args.fail_on_increase and cost_analysis.monthly_delta > args.fail_threshold:
return 1
return 0
def cmd_report(args):
"""Generate cost report for specified period."""
scanner = AWSScanner(regions=args.regions)
# Get cost data
cost_data = scanner.get_cost_data(period=args.period)
# AI analysis
ai = AIEngine()
analysis = ai.analyze_cost_trends(cost_data)
# Generate report
reporter = Reporter(format=args.output_format)
report = reporter.generate_cost_report(cost_data, analysis)
if args.output:
Path(args.output).write_text(report)
print(f"Report written to {args.output}")
else:
print(report)
# Send notifications
if args.slack_webhook:
reporter.send_slack(report, args.slack_webhook)
if args.email:
reporter.send_email(report, args.email)
return 0
def cmd_fix(args):
"""Auto-generate fixes for identified issues."""
scanner = AWSScanner(regions=args.regions)
findings = scanner.scan()
ai = AIEngine()
fixes = ai.generate_fixes(findings)
if args.create_pr:
# TODO: Create GitHub PR with fixes
pass
elif args.apply:
# TODO: Apply fixes directly (dangerous!)
pass
else:
# Just output the fixes
for fix in fixes:
print(f"# {fix.description}")
print(fix.terraform_code)
print()
return 0
def main():
parser = argparse.ArgumentParser(
description='Argus - AI-powered FinOps agent for AWS'
)
subparsers = parser.add_subparsers(dest='command', required=True)
# scan command
scan_parser = subparsers.add_parser('scan', help='Scan for optimizations')
scan_parser.add_argument('--regions', nargs='+', default=['us-east-1'])
scan_parser.add_argument('--exclude-tags', nargs='+', default=[])
scan_parser.add_argument('--output', '-o', help='Output file')
scan_parser.add_argument('--output-format', choices=['markdown', 'json', 'github'], default='markdown')
scan_parser.add_argument('--create-pr', action='store_true', help='Create PR with fixes')
scan_parser.set_defaults(func=cmd_scan)
# evaluate command
eval_parser = subparsers.add_parser('evaluate', help='Evaluate Terraform plan')
eval_parser.add_argument('--plan-file', required=True, help='Terraform plan file')
eval_parser.add_argument('--output', '-o', help='Output file')
eval_parser.add_argument('--output-format', choices=['markdown', 'json', 'github'], default='markdown')
eval_parser.add_argument('--fail-on-increase', action='store_true')
eval_parser.add_argument('--fail-threshold', type=float, default=100, help='Monthly $ threshold')
eval_parser.set_defaults(func=cmd_evaluate)
# report command
report_parser = subparsers.add_parser('report', help='Generate cost report')
report_parser.add_argument('--period', choices=['daily', 'weekly', 'monthly'], default='weekly')
report_parser.add_argument('--regions', nargs='+', default=['us-east-1'])
report_parser.add_argument('--output', '-o', help='Output file')
report_parser.add_argument('--output-format', choices=['markdown', 'json', 'html'], default='markdown')
report_parser.add_argument('--slack-webhook', help='Slack webhook URL')
report_parser.add_argument('--email', help='Email address for report')
report_parser.set_defaults(func=cmd_report)
# fix command
fix_parser = subparsers.add_parser('fix', help='Generate/apply fixes')
fix_parser.add_argument('--regions', nargs='+', default=['us-east-1'])
fix_parser.add_argument('--create-pr', action='store_true', help='Create GitHub PR')
fix_parser.add_argument('--apply', action='store_true', help='Apply fixes directly (dangerous!)')
fix_parser.set_defaults(func=cmd_fix)
args = parser.parse_args()
sys.exit(args.func(args))
if __name__ == '__main__':
main()

View File

View File

View File

321
src/argus/scanner/aws.py Normal file
View File

@@ -0,0 +1,321 @@
"""
AWS Scanner - Find cost optimization opportunities
"""
import boto3
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
@dataclass
class Finding:
"""A cost optimization finding."""
resource_type: str
resource_id: str
resource_arn: str
region: str
issue: str
severity: str # critical, high, medium, low
current_cost: float
potential_savings: float
recommendation: str
tags: dict = field(default_factory=dict)
@dataclass
class ScanResults:
"""Results from an AWS scan."""
findings: list[Finding] = field(default_factory=list)
total_monthly_spend: float = 0.0
total_potential_savings: float = 0.0
scan_time: datetime = field(default_factory=datetime.now)
regions_scanned: list[str] = field(default_factory=list)
@property
def critical(self) -> list[Finding]:
return [f for f in self.findings if f.severity == 'critical']
@property
def high(self) -> list[Finding]:
return [f for f in self.findings if f.severity == 'high']
class AWSScanner:
"""Scan AWS account for cost optimization opportunities."""
def __init__(self, regions: list[str] = None, exclude_tags: list[str] = None):
self.regions = regions or ['us-east-1']
self.exclude_tags = exclude_tags or []
self.findings = []
def scan(self) -> ScanResults:
"""Run full scan across all configured regions."""
results = ScanResults(regions_scanned=self.regions)
for region in self.regions:
self._scan_region(region, results)
results.total_potential_savings = sum(f.potential_savings for f in results.findings)
return results
def _scan_region(self, region: str, results: ScanResults):
"""Scan a single region."""
self._scan_ec2(region, results)
self._scan_ebs(region, results)
self._scan_rds(region, results)
self._scan_elb(region, results)
self._scan_eip(region, results)
def _scan_ec2(self, region: str, results: ScanResults):
"""Find underutilized or idle EC2 instances."""
ec2 = boto3.client('ec2', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
instances = ec2.describe_instances(
Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
)
for reservation in instances.get('Reservations', []):
for instance in reservation.get('Instances', []):
instance_id = instance['InstanceId']
instance_type = instance['InstanceType']
# Get CPU utilization
cpu_stats = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
StartTime=datetime.now() - timedelta(days=14),
EndTime=datetime.now(),
Period=86400,
Statistics=['Average']
)
avg_cpu = 0
if cpu_stats['Datapoints']:
avg_cpu = sum(d['Average'] for d in cpu_stats['Datapoints']) / len(cpu_stats['Datapoints'])
# Flag if CPU < 10% over 14 days
if avg_cpu < 10:
# Estimate cost (simplified)
hourly_cost = self._get_ec2_hourly_cost(instance_type, region)
monthly_cost = hourly_cost * 730
results.findings.append(Finding(
resource_type='EC2',
resource_id=instance_id,
resource_arn=f'arn:aws:ec2:{region}:{self._get_account_id()}:instance/{instance_id}',
region=region,
issue=f'Instance idle - {avg_cpu:.1f}% avg CPU over 14 days',
severity='high' if monthly_cost > 100 else 'medium',
current_cost=monthly_cost,
potential_savings=monthly_cost * 0.5, # Assume 50% savings from rightsizing
recommendation=f'Consider stopping, rightsizing, or terminating. Current type: {instance_type}',
tags=self._get_tags(instance.get('Tags', []))
))
def _scan_ebs(self, region: str, results: ScanResults):
"""Find unattached EBS volumes."""
ec2 = boto3.client('ec2', region_name=region)
volumes = ec2.describe_volumes(
Filters=[{'Name': 'status', 'Values': ['available']}]
)
for volume in volumes.get('Volumes', []):
volume_id = volume['VolumeId']
size_gb = volume['Size']
volume_type = volume['VolumeType']
# Estimate cost
monthly_cost = self._get_ebs_monthly_cost(volume_type, size_gb)
results.findings.append(Finding(
resource_type='EBS',
resource_id=volume_id,
resource_arn=f'arn:aws:ec2:{region}:{self._get_account_id()}:volume/{volume_id}',
region=region,
issue=f'Unattached volume ({size_gb}GB {volume_type})',
severity='high' if monthly_cost > 50 else 'medium',
current_cost=monthly_cost,
potential_savings=monthly_cost,
recommendation='Delete if not needed, or snapshot and delete',
tags=self._get_tags(volume.get('Tags', []))
))
def _scan_rds(self, region: str, results: ScanResults):
"""Find underutilized RDS instances."""
rds = boto3.client('rds', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
instances = rds.describe_db_instances()
for db in instances.get('DBInstances', []):
db_id = db['DBInstanceIdentifier']
db_class = db['DBInstanceClass']
# Get CPU utilization
cpu_stats = cloudwatch.get_metric_statistics(
Namespace='AWS/RDS',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': db_id}],
StartTime=datetime.now() - timedelta(days=14),
EndTime=datetime.now(),
Period=86400,
Statistics=['Average']
)
avg_cpu = 0
if cpu_stats['Datapoints']:
avg_cpu = sum(d['Average'] for d in cpu_stats['Datapoints']) / len(cpu_stats['Datapoints'])
if avg_cpu < 20:
monthly_cost = self._get_rds_monthly_cost(db_class, region)
results.findings.append(Finding(
resource_type='RDS',
resource_id=db_id,
resource_arn=db['DBInstanceArn'],
region=region,
issue=f'Underutilized - {avg_cpu:.1f}% avg CPU over 14 days',
severity='high' if monthly_cost > 200 else 'medium',
current_cost=monthly_cost,
potential_savings=monthly_cost * 0.4,
recommendation=f'Consider downsizing from {db_class}',
tags={}
))
def _scan_elb(self, region: str, results: ScanResults):
"""Find idle load balancers."""
elbv2 = boto3.client('elbv2', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
lbs = elbv2.describe_load_balancers()
for lb in lbs.get('LoadBalancers', []):
lb_arn = lb['LoadBalancerArn']
lb_name = lb['LoadBalancerName']
# Check request count
request_stats = cloudwatch.get_metric_statistics(
Namespace='AWS/ApplicationELB',
MetricName='RequestCount',
Dimensions=[{'Name': 'LoadBalancer', 'Value': lb_arn.split('/')[-3] + '/' + '/'.join(lb_arn.split('/')[-2:])}],
StartTime=datetime.now() - timedelta(days=7),
EndTime=datetime.now(),
Period=86400,
Statistics=['Sum']
)
total_requests = sum(d['Sum'] for d in request_stats.get('Datapoints', []))
if total_requests < 100: # Less than 100 requests in a week
results.findings.append(Finding(
resource_type='ALB',
resource_id=lb_name,
resource_arn=lb_arn,
region=region,
issue=f'Idle load balancer - {total_requests} requests in 7 days',
severity='medium',
current_cost=18.0, # ~$18/month for ALB
potential_savings=18.0,
recommendation='Delete if not needed',
tags={}
))
def _scan_eip(self, region: str, results: ScanResults):
"""Find unattached Elastic IPs."""
ec2 = boto3.client('ec2', region_name=region)
eips = ec2.describe_addresses()
for eip in eips.get('Addresses', []):
if 'AssociationId' not in eip:
results.findings.append(Finding(
resource_type='EIP',
resource_id=eip.get('AllocationId', eip.get('PublicIp')),
resource_arn=f'arn:aws:ec2:{region}:{self._get_account_id()}:elastic-ip/{eip.get("AllocationId")}',
region=region,
issue='Unattached Elastic IP',
severity='low',
current_cost=3.65, # $0.005/hour = ~$3.65/month
potential_savings=3.65,
recommendation='Release if not needed',
tags=self._get_tags(eip.get('Tags', []))
))
def get_cost_data(self, period: str = 'weekly') -> dict:
"""Get cost and usage data from Cost Explorer."""
ce = boto3.client('ce', region_name='us-east-1')
# Calculate date range
end_date = datetime.now().date()
if period == 'daily':
start_date = end_date - timedelta(days=1)
elif period == 'weekly':
start_date = end_date - timedelta(days=7)
else: # monthly
start_date = end_date - timedelta(days=30)
response = ce.get_cost_and_usage(
TimePeriod={
'Start': start_date.isoformat(),
'End': end_date.isoformat()
},
Granularity='DAILY',
Metrics=['UnblendedCost'],
GroupBy=[
{'Type': 'DIMENSION', 'Key': 'SERVICE'}
]
)
return response
def _get_account_id(self) -> str:
"""Get current AWS account ID."""
sts = boto3.client('sts')
return sts.get_caller_identity()['Account']
def _get_tags(self, tags: list) -> dict:
"""Convert AWS tags to dict."""
return {t['Key']: t['Value'] for t in tags}
def _get_ec2_hourly_cost(self, instance_type: str, region: str) -> float:
"""Get estimated hourly cost for EC2 instance type."""
# Simplified pricing - in production, use AWS Pricing API
pricing = {
't3.micro': 0.0104,
't3.small': 0.0208,
't3.medium': 0.0416,
't3.large': 0.0832,
'm5.large': 0.096,
'm5.xlarge': 0.192,
'r5.large': 0.126,
'r5.xlarge': 0.252,
}
return pricing.get(instance_type, 0.10)
def _get_ebs_monthly_cost(self, volume_type: str, size_gb: int) -> float:
"""Get estimated monthly cost for EBS volume."""
pricing = {
'gp3': 0.08,
'gp2': 0.10,
'io1': 0.125,
'io2': 0.125,
'st1': 0.045,
'sc1': 0.015,
}
return pricing.get(volume_type, 0.10) * size_gb
def _get_rds_monthly_cost(self, db_class: str, region: str) -> float:
"""Get estimated monthly cost for RDS instance."""
# Simplified pricing
pricing = {
'db.t3.micro': 12.41,
'db.t3.small': 24.82,
'db.t3.medium': 49.64,
'db.r5.large': 175.20,
'db.r5.xlarge': 350.40,
}
return pricing.get(db_class, 100.0)