Grianan of Aileach, Co. Donegal — K. Mitch Hodge / Unsplash
Every anomaly detected by the pipeline in Post 3 carries a timestamp: the date the resource’s spend deviated from its baseline. Every deployment carries a timestamp too: when a new version of a service was pushed to production. If a deployment landed at 14:23 and an anomaly on that service’s resources began at 14:31, the temporal proximity is a strong signal — not proof, but a candidate worth investigating.
This post builds a pipeline that crosses that boundary, connecting the observability layer (what changed in cost) to the engineering layer (what changed in code). But it starts with an honest acknowledgement that most engineers skip: deployment correlation is one investigation path, not the only one. Before the pipeline looks for a deployment, it needs to ask whether a deployment is even the plausible cause.
Observability tells you that something changed. Code history tells you what changed. The gap between them is where cost incidents go uninvestigated — but only when code actually caused them.
The statistical control chart in Post 3 flags any resource whose spend deviates meaningfully from its rolling baseline. That definition is deliberately broad. In practice, anomalies arrive from a much wider range of causes than deployment errors:
| Cause | Signal | Deployment involved? |
|---|---|---|
| Bug introduced in a release | Sharp spike shortly after deploy | ✓ |
| Infrastructure change (console, Terraform) | Spike with no code deployment | ✗ |
| Reserved Instance or Savings Plan expiry | Overnight jump to on-demand rates | ✗ |
| Traffic growth | Gradual rise correlated with user volume | ✗ |
| Scheduled batch job | Recurring spike on known schedule | ✗ |
| Data volume crossing a tier threshold | Step change in storage or egress cost | ✗ |
| Third-party API pricing change | Appears as external service line item increase | ✗ |
| Security incident | Unexpected egress, unusual regions or services | ✗ |
| Latent code change | Bug introduced weeks ago, manifesting now | Possibly |
Attempting deployment correlation on the majority of these produces noise at best and a misleading false positive at worst — a deployment that happened to occur near an unrelated anomaly gets blamed, a spurious PR is raised, and an engineer wastes time investigating the wrong commit.
The first step is therefore a classifier, not a correlator.
Before searching for deployments, the pipeline classifies each anomaly by its most likely cause category. This routing decision determines which investigation path to take.
from dataclasses import dataclass
from enum import Enum
from datetime import timedelta
from .db import fetch_anomaly, fetch_recent_deployments, fetch_traffic_metrics
class InvestigationPath(str, Enum):
DEPLOYMENT_CORRELATION = "deployment_correlation" # recent deploy in window
INFRASTRUCTURE_DRIFT = "infrastructure_drift" # no deploy; check console / IaC
TRAFFIC_GROWTH = "traffic_growth" # spend tracking request volume
SCHEDULED_JOB = "scheduled_job" # recurring pattern on same weekday/date
RI_EXPIRY = "ri_expiry" # sudden jump, no prior gradual change
UNCLASSIFIED = "unclassified" # route to manual triage
@dataclass
class RoutingDecision:
path: InvestigationPath
confidence: str # 'high' | 'medium' | 'low'
rationale: str
deployment_id: str | None = None
def route_anomaly(anomaly_id: str, window_hours: int = 6) -> RoutingDecision:
anomaly = fetch_anomaly(anomaly_id)
# 1. Is the anomaly a gradual drift or a sharp spike?
is_sharp_spike = anomaly.sigma > 3.0 and anomaly.days_elevated <= 2
# 2. Is there a recent deployment for this team?
deployments = fetch_recent_deployments(
department = anomaly.department,
team = anomaly.team,
before_ts = anomaly.detected_at,
window_hours = window_hours,
environment = 'production',
)
if deployments and is_sharp_spike:
best = min(deployments, key=lambda d: d.minutes_before_anomaly)
return RoutingDecision(
path = InvestigationPath.DEPLOYMENT_CORRELATION,
confidence = 'high' if best.minutes_before_anomaly <= 60 else 'medium',
rationale = f"Sharp spike {anomaly.sigma:.1f}σ; deployment {best.commit_sha[:8]} landed {best.minutes_before_anomaly}m before onset.",
deployment_id = best.id,
)
# 3. Is spend tracking traffic volume?
traffic = fetch_traffic_metrics(anomaly.team, anomaly.anomaly_date)
if traffic and abs(traffic.pct_change - anomaly.cost_pct_change) < 15:
return RoutingDecision(
path = InvestigationPath.TRAFFIC_GROWTH,
confidence = 'medium',
rationale = f"Cost change ({anomaly.cost_pct_change:+.0f}%) closely tracks request volume change ({traffic.pct_change:+.0f}%). Likely legitimate traffic growth.",
)
# 4. Does the anomaly recur on the same day of week or month?
if anomaly.recurrence_pattern in ('weekly', 'monthly'):
return RoutingDecision(
path = InvestigationPath.SCHEDULED_JOB,
confidence = 'high',
rationale = f"Anomaly pattern matches {anomaly.recurrence_pattern} recurrence. Likely a known scheduled job — review if expected.",
)
# 5. Was there a sudden jump from a stable baseline (RI/SP expiry signature)?
if anomaly.prior_days_stable >= 30 and anomaly.cost_pct_change > 40 and not is_sharp_spike:
return RoutingDecision(
path = InvestigationPath.RI_EXPIRY,
confidence = 'medium',
rationale = "Stable baseline for 30+ days followed by a sustained step-up. Check for Reserved Instance or Savings Plan expiry.",
)
return RoutingDecision(
path = InvestigationPath.UNCLASSIFIED,
confidence = 'low',
rationale = "No clear causal pattern identified. Route to manual triage.",
) The traffic growth check above requires request volume data from your application monitoring layer (Datadog, CloudWatch metrics, Prometheus). If this isn’t available, skip that branch — a false negative (routing to UNCLASSIFIED instead of TRAFFIC_GROWTH) is safe. A false positive on deployment correlation is not.
Before writing any code, three pieces of infrastructure must be in place. Each is a meaningful trust boundary that should be established deliberately, not assumed.
| Requirement | Access needed | Notes |
|---|---|---|
| Deployment event stream | Read | GitHub Actions webhooks, ArgoCD event notifications, CircleCI webhooks, or equivalent. Stores deployment timestamps, service names, commit SHAs, and environment targets. |
| Tag-to-repository map | Read | A table mapping department/team pairs to the GitHub/GitLab repository or repositories that own those resources. The connective tissue that makes deployment correlation possible. |
| Repository code access | Read (code) + Write (PRs) | A GitHub App or GitLab Application with read access to code, diff retrieval, and write access to create draft pull requests. Never commit access. Scoped to specific repositories only — not org-wide. |
Most organisations do not have an explicit, maintained mapping from cost attribution tags (department/team) to code repositories. Building this map is a prerequisite for this pipeline and is itself a valuable artefact — it makes the ownership model explicit and auditable. Start by asking each team to register their repositories in a central table. This is a social and organisational task, not a technical one.
Deployment events from CI/CD systems are ingested via webhooks or polling and stored in a local table. This local store is the join target for anomaly correlation — pulling live from CI/CD APIs during correlation would be too slow and fragile.
CREATE TABLE deployment_events (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
deployed_at timestamptz NOT NULL,
service_name varchar NOT NULL,
environment varchar NOT NULL, -- 'production', 'staging' etc
commit_sha varchar NOT NULL,
commit_message text,
branch varchar,
repository varchar NOT NULL, -- 'org/repo-name'
ci_system varchar, -- 'github_actions', 'argocd', 'circleci'
triggered_by varchar, -- author or service account
pipeline_url varchar,
ingested_at timestamptz DEFAULT now()
);
CREATE INDEX ON deployment_events (service_name, deployed_at);
CREATE INDEX ON deployment_events (repository, deployed_at);
CREATE INDEX ON deployment_events (deployed_at); -- for temporal join with anomalies
-- Tag-to-repository mapping table
CREATE TABLE team_repositories (
department varchar NOT NULL,
team varchar NOT NULL,
repository varchar NOT NULL, -- 'org/repo-name'
primary_repo boolean DEFAULT true,
registered_by varchar,
registered_at timestamptz DEFAULT now(),
PRIMARY KEY (department, team, repository)
); When the router returns DEPLOYMENT_CORRELATION, the pipeline performs a temporal join: find deployments in the affected team’s repositories that occurred within a configurable window before the anomaly onset. A shorter window (2–4 hours) produces fewer, higher-confidence candidates. A longer window (24 hours) casts wider but introduces more noise.
The function returns a typed CorrelationOutcome rather than a bare list, so downstream code is forced to handle both the found and not-found cases explicitly.
from dataclasses import dataclass, field
from datetime import timedelta
from enum import Enum
from .db import fetch_anomaly, fetch_team_repos, fetch_deployments_in_window
class CorrelationPath(str, Enum):
DEPLOYMENT_FOUND = "deployment_found"
NO_DEPLOYMENT = "no_deployment" # searched window, found nothing
NO_REPO_MAP = "no_repository_map" # team not registered in team_repositories
@dataclass
class CorrelatedDeployment:
anomaly_id: str
deployment_id: str
repository: str
commit_sha: str
commit_message: str
deployed_at: object # datetime
minutes_before_anomaly: int
confidence: str # 'high' | 'medium' | 'low'
@dataclass
class CorrelationOutcome:
path: CorrelationPath
deployments: list[CorrelatedDeployment] = field(default_factory=list)
message: str = ""
def correlate_anomaly_with_deployments(
anomaly_id: str,
window_hours: int = 6,
high_conf_mins: int = 60,
medium_conf_mins: int = 240,
) -> CorrelationOutcome:
anomaly = fetch_anomaly(anomaly_id)
anomaly_ts = anomaly.detected_at
window_start = anomaly_ts - timedelta(hours=window_hours)
repos = fetch_team_repos(anomaly.department, anomaly.team)
if not repos:
return CorrelationOutcome(
path = CorrelationPath.NO_REPO_MAP,
message = f"No repositories registered for {anomaly.department}/{anomaly.team}. "
f"Add entries to team_repositories to enable deployment correlation.",
)
deployments = fetch_deployments_in_window(
repositories = [r.repository for r in repos],
from_ts = window_start,
to_ts = anomaly_ts,
environment = 'production',
)
if not deployments:
return CorrelationOutcome(
path = CorrelationPath.NO_DEPLOYMENT,
message = f"No production deployments in the {window_hours}h window. "
f"Repositories searched: {[r.repository for r in repos]}.",
)
results = []
for d in deployments:
delta_mins = int((anomaly_ts - d.deployed_at).total_seconds() / 60)
confidence = (
'high' if delta_mins <= high_conf_mins else
'medium' if delta_mins <= medium_conf_mins else
'low'
)
results.append(CorrelatedDeployment(
anomaly_id = anomaly_id,
deployment_id = d.id,
repository = d.repository,
commit_sha = d.commit_sha,
commit_message = d.commit_message,
deployed_at = d.deployed_at,
minutes_before_anomaly = delta_mins,
confidence = confidence,
))
return CorrelationOutcome(
path = CorrelationPath.DEPLOYMENT_FOUND,
deployments = sorted(results, key=lambda x: x.minutes_before_anomaly),
) A NO_DEPLOYMENT outcome is not a failure — it is a signal. It tells you the cause is almost certainly not a recent code change, and a different investigation checklist applies. Routing this to the same alert template as a deployment-correlated anomaly is actively unhelpful; the on-call engineer needs to know what to look at, not just that something spiked.
from .investigation_router import InvestigationPath
INVESTIGATION_CHECKLISTS = {
InvestigationPath.INFRASTRUCTURE_DRIFT: [
"Check cloud console for manual instance type, storage class, or config changes in the past 48h.",
"Review recent Terraform plan/apply runs in the IaC audit log.",
"Look for auto-scaling events that may have provisioned unexpected capacity.",
],
InvestigationPath.RI_EXPIRY: [
"Check Reserved Instance and Savings Plan coverage report for expirations in the past 7 days.",
"Review Compute Optimizer or Azure Advisor for coverage gap recommendations.",
"If an RI expired, decide whether to renew or convert to a Savings Plan.",
],
InvestigationPath.TRAFFIC_GROWTH: [
"Confirm request volume increase in APM or load balancer metrics.",
"Determine whether growth is expected (campaign, launch) or anomalous.",
"If expected, update the team's budget ceiling to reflect the new baseline.",
],
InvestigationPath.SCHEDULED_JOB: [
"Verify the job ran as expected and completed successfully.",
"Check whether job duration or data volume has grown since the baseline was set.",
"If cost is expected at this level, suppress future alerts for this pattern.",
],
InvestigationPath.UNCLASSIFIED: [
"Review all resource-level cost changes for this team in the anomaly window.",
"Check for security events: unusual regions, services, or egress volumes.",
"Check for third-party service invoices or marketplace charges.",
"Escalate to the team's tech lead for manual investigation if no cause found within 24h.",
],
}
def build_unattributed_alert(anomaly, routing) -> dict:
checklist = INVESTIGATION_CHECKLISTS.get(
routing.path, INVESTIGATION_CHECKLISTS[InvestigationPath.UNCLASSIFIED]
)
return {
"type": "unattributed_anomaly",
"anomaly_id": anomaly.id,
"team": f"{anomaly.department}/{anomaly.team}",
"resource": anomaly.resource_id,
"excess_cost": anomaly.excess_cost,
"sigma": anomaly.sigma,
"likely_cause": routing.path.value,
"rationale": routing.rationale,
"checklist": checklist,
"dashboard_url": f"/cost/anomalies/{anomaly.id}",
} The most difficult scenario is a code change deployed days or weeks before the anomaly manifests — a memory leak, a connection pool exhausting slowly, a query whose cost scales with data volume that only becomes visible once the table grows large enough. These arrive as NO_DEPLOYMENT results because the default window won’t reach far enough back. If the structured checklist rules out all other causes and the anomaly persists, widen the correlation window to 7 days and re-run before escalating to manual investigation.
Once the suspect deployment is identified, the next step is retrieving its diff and reasoning about which changes in that diff could plausibly cause the observed cost pattern. This is where LLM-assisted analysis becomes useful — a diff can be hundreds of lines across dozens of files, and the causal change may be a single line in an unexpected location.
Some cost anomalies have code causes that are reliably findable in a diff. Others do not. Knowing the difference prevents the analysis pipeline from producing low-confidence noise.
Tractable patterns: A loop instantiating a cloud SDK client on every iteration rather than once at startup — directly visible as a client constructor call inside a for/while loop. A missing database index causing full table scans — visible as a new query without a corresponding migration adding an index. A model selection change in an AI inference call — a changed string constant in a model_id parameter. A removed cache layer — deletion of a caching wrapper or the addition of a cache=False flag. A resource limit removed from an auto-scaler configuration — a deletion or modification in Kubernetes or Terraform HCL.
Intractable patterns: Emergent cost from the interaction of two independently safe changes. Infrastructure drift that preceded the deployment. Cost increases caused by traffic growth that happened to coincide. These should produce a “no clear code cause found” result rather than a speculative fix proposal.
from dataclasses import dataclass
from .github_client import fetch_commit_diff
import anthropic
client = anthropic.Anthropic()
@dataclass
class DiffAnalysisResult:
anomaly_id: str
commit_sha: str
repository: str
cause_found: bool
cause_description: str
offending_file: str | None
offending_lines: str | None
fix_description: str | None
fix_patch: str | None
confidence: str # 'high' | 'medium' | 'low' | 'none'
reasoning: str
ANALYSIS_PROMPT = """You are a cost-aware code reviewer. A cloud cost anomaly has been detected on resources owned by the {team} team.
ANOMALY DETAILS:
- Resource: {resource_id} ({service_name})
- Anomaly date: {anomaly_date}
- Actual cost: ${actual_cost:.2f} vs expected ${expected_cost:.2f} ({sigma:.1f}σ above baseline)
- Excess cost: ${excess_cost:.2f}
SUSPECT DEPLOYMENT:
- Repository: {repository}
- Commit: {commit_sha}
- Message: {commit_message}
- Deployed: {minutes_before} minutes before anomaly onset
CODE DIFF:
{diff}
Analyse this diff for changes that could plausibly explain the cost anomaly described above.
Respond in JSON with these fields:
- cause_found: boolean
- cause_description: string (1-2 sentences explaining the likely cause, or "No clear code cause identified")
- offending_file: string or null (file path if a specific file is responsible)
- offending_lines: string or null (the specific lines in unified diff format)
- fix_description: string or null (what change would fix this)
- fix_patch: string or null (a unified diff patch that implements the fix, ONLY if high confidence)
- confidence: "high" | "medium" | "low" | "none"
- reasoning: string (your analytical reasoning, 2-4 sentences)
IMPORTANT: Only set fix_patch if you have HIGH confidence and the fix is a targeted, bounded change.
Do not propose architecture changes as patches. If uncertain, set confidence to "low" or "none"."""
def analyse_diff(
anomaly,
deployment: object,
) -> DiffAnalysisResult:
diff = fetch_commit_diff(
repository = deployment.repository,
commit_sha = deployment.commit_sha,
max_chars = 40_000, # stay within context window comfortably
)
prompt = ANALYSIS_PROMPT.format(
team = anomaly.team,
resource_id = anomaly.resource_id,
service_name = anomaly.service_name,
anomaly_date = anomaly.anomaly_date,
actual_cost = anomaly.actual_cost,
expected_cost = anomaly.baseline_mean,
sigma = anomaly.sigma,
excess_cost = anomaly.excess_cost,
repository = deployment.repository,
commit_sha = deployment.commit_sha[:8],
commit_message = deployment.commit_message,
minutes_before = deployment.minutes_before_anomaly,
diff = diff,
)
response = client.messages.create(
model = 'claude-opus-4-20250514',
max_tokens = 2000,
messages = [{'role': 'user', 'content': prompt}],
)
import json
result = json.loads(response.content[0].text)
return DiffAnalysisResult(
anomaly_id = anomaly.id,
commit_sha = deployment.commit_sha,
repository = deployment.repository,
cause_found = result['cause_found'],
cause_description = result['cause_description'],
offending_file = result.get('offending_file'),
offending_lines = result.get('offending_lines'),
fix_description = result.get('fix_description'),
fix_patch = result.get('fix_patch'),
confidence = result['confidence'],
reasoning = result['reasoning'],
) When the analysis produces a high-confidence result with a specific fix patch, the pipeline raises a draft pull request in the affected repository. The PR is never raised automatically — it goes through a human review gate where the on-call engineer confirms that the proposed change is safe to open before the API call is made.
Here is what such a PR looks like in practice:
Title: fix(cost): move SDK client initialisation outside request loop in data-processor
Anomaly context: Resource i-0a4b2c8d (Amazon EC2) showed a 6.7σ cost spike on 2026-05-01, excess $238 above expected baseline. Deployment a3f8c1d landed 31 minutes before anomaly onset.
Root cause analysis: In src/processor/handler.py, commit a3f8c1d moved a boto3.client('s3') initialisation call inside the per-request processing loop. Each request now creates a new SDK client, generating an additional authentication round-trip and connection setup per call. At this service’s request volume (~8,000 req/hr), this produced approximately 200× the expected API call volume against dependent S3 buckets, driving compute and network cost up sharply.
Proposed fix:
--- a/src/processor/handler.py
+++ b/src/processor/handler.py
@@ -8,3 +8,5 @@
from config import settings
+
+# Initialise once at module load — not per request
+s3_client = boto3.client('s3', region_name=settings.AWS_REGION)
def process_record(record: dict) -> None:
- s3 = boto3.client('s3', region_name=settings.AWS_REGION)
- s3.put_object(Bucket=settings.BUCKET, Key=record['key'], Body=record['data'])
+ s3_client.put_object(Bucket=settings.BUCKET, Key=record['key'], Body=record['data'])
from .github_client import get_github_app_client
from .audit import log_pr_action
def create_draft_pr(
anomaly,
deployment,
analysis,
approved_by: str, # human reviewer who approved the PR creation
) -> str: # returns PR URL
# Require explicit human approval before any API call
if not approved_by:
raise ValueError("PR creation requires explicit human approval. Set approved_by.")
if not analysis.fix_patch:
raise ValueError("No fix patch available — cannot create PR.")
gh = get_github_app_client(repository=deployment.repository)
repo = gh.get_repo(deployment.repository)
# Create branch from main for the fix
base = repo.get_branch('main')
branch = f"finops/cost-fix-{anomaly.id[:8]}"
repo.create_git_ref(f"refs/heads/{branch}", base.commit.sha)
# Apply patch to the offending file
apply_patch_to_branch(repo, branch, analysis.offending_file, analysis.fix_patch)
pr_body = _build_pr_body(anomaly, deployment, analysis, approved_by)
pr = repo.create_pull(
title = f"fix(cost): {analysis.cause_description[:80]}",
body = pr_body,
head = branch,
base = 'main',
draft = True, # Always draft — human must promote to ready
)
# Full audit log — who approved, what was created, when
log_pr_action(
anomaly_id = anomaly.id,
deployment_id= deployment.deployment_id,
pr_url = pr.html_url,
approved_by = approved_by,
confidence = analysis.confidence,
)
return pr.html_url The fix proposal pipeline reaches into production codebases and proposes changes. That makes it a powerful tool and a meaningful risk surface. These guardrails are not optional — they are what makes the capability safe to operate.
create_draft_pr function requires an explicit approved_by parameter — the identity of the human reviewer who reviewed the proposal and authorised PR creation. This cannot be bypassed programmatically.team_repositories table. New repositories require explicit opt-in. Org-wide access is never granted..env, *.pem, *secret*, *credential*, Terraform state files, or any file matching the repository’s .gitignore secret exclusion patterns.The analysis is LLM-assisted and operates on a bounded view of the codebase — one commit’s diff, without full repository context. It will produce incorrect root cause analyses. It will occasionally propose fixes that are technically valid but wrong for the specific codebase — perhaps the “fix” pattern conflicts with a project convention, or the optimisation was intentionally removed for a reason not visible in the diff. Human review before merging is not a formality — it is the essential safety gate the entire system depends on.
Across this series, we’ve built a complete FinOps platform from first principles. The daily rollup built in Post 1 is still the query surface used here in Post 5 — every subsequent part extended the platform without replacing what came before. Here is the full capability set:
Post 1 — Foundation: Ingest billing data from any cloud via FOCUS 1.3, extract attribution tags, produce daily cost rollups by team and department. Enforce tagging policy, operate the nightly pipeline reliably, and maintain the platform over time.
Post 2 — Forecasting: Forecast end-of-period spend per budget, alert teams before overages occur, deliver enriched alerts with top cost drivers.
Post 3 — Attribution and Anomaly Detection: Filter singleton tag groupings, infer ownership of untagged resources via temporal cost correlation, manage attribution overrides. Detect resource-level cost anomalies via statistical control chart, roll up to enriched team alerts the same day, with untagged anomalies immediately routed through the correlation engine.
Post 4 — Optimisation: Ingest utilisation metrics from CloudWatch, Azure Monitor, and GCP, join with billing, classify resources into cost/util quadrants. Generate rightsizing recommendations with quantified savings, detect structurally misattributed resources via clustering. Model cost efficiency for AI inference (per token), network transfer (per GB), storage (multi-dimensional) via generalised unit cost registry.
Post 5 — Closing the Loop: Correlate anomalies with deployments, analyse suspect diffs for code-level causes, propose targeted fixes as draft PRs — with human review gates and a full audit trail.
The platform began as an answer to “how much did each team spend?” and ended as a system that can answer “which pull request caused that cost spike, why, and what should the fix look like?”
That is the distance from FinOps as reporting to FinOps as engineering discipline — where cost is a first-class observable, treated with the same rigour as latency, error rate, and throughput. Where teams don’t wait for the monthly bill review to learn they overspent. Where the discovery of a cost anomaly triggers the same investigation workflow as a production incident, because in this framing, it is one.
The technical components — FOCUS ingestion, the rollup, the forecast model, the statistical control chart, the clustering algorithms, the unit cost registry, the deployment correlation engine — are each individually achievable in a few weeks of focused engineering. What takes longer is the organisational infrastructure: tagging policies that stick, budget owners who engage with alerts, teams who treat rightsizing recommendations as legitimate engineering work rather than finance nagging.
The platform gives you the levers. Building the culture to pull them is the other half of the project — and arguably the more important one.
If you’re working on a FinOps platform and want to discuss the architecture, the tooling choices, or the organisational model that makes it stick — get in touch. We build these systems and help engineering teams get the most out of them.