Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@
USE_POSTGRES_FOR_ANALYTICS = env.bool("USE_POSTGRES_FOR_ANALYTICS", default=False)
USE_CACHE_FOR_USAGE_DATA = env.bool("USE_CACHE_FOR_USAGE_DATA", default=True)

# Base URL of the Control Plane that licensed Instances report usage snapshots to.
CONTROL_PLANE_URL = env.str("CONTROL_PLANE_URL", default=None)

API_USAGE_CACHE_SECONDS = env.int("API_USAGE_CACHE_SECONDS", default=0)

if not API_USAGE_CACHE_SECONDS:
Expand Down
106 changes: 89 additions & 17 deletions api/app_analytics/analytics_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from common.core.utils import is_saas, using_database_replica
from dateutil.relativedelta import relativedelta
from django.conf import settings
from django.db.models import Q, Sum
from django.db.models import Q, QuerySet, Sum
from django.utils import timezone
from rest_framework.exceptions import NotFound

Expand Down Expand Up @@ -78,19 +78,12 @@ def get_usage_data(
return []


def get_usage_data_from_local_db(
def _get_api_usage_bucket_qs(
organisation: Organisation,
environment_id: int | None = None,
project_id: int | None = None,
date_start: datetime | None = None,
date_stop: datetime | None = None,
labels_filter: Labels | None = None,
) -> list[UsageData]:
if date_start is None:
date_start = timezone.now() - timedelta(days=30)
if date_stop is None:
date_stop = timezone.now()

) -> QuerySet[APIUsageBucket]:
qs = APIUsageBucket.objects.filter(
environment_id__in=_get_environment_ids_for_org(organisation),
bucket_size=constants.ANALYTICS_READ_BUCKET_SIZE,
Expand All @@ -110,17 +103,96 @@ def get_usage_data_from_local_db(
if labels_filter:
qs = qs.filter(labels__contains=labels_filter)

qs = (
qs.filter( # type: ignore[assignment]
created_at__date__lte=date_stop,
created_at__date__gt=date_start,
)
.order_by("created_at__date")
return qs


def _aggregate_buckets(qs: QuerySet[APIUsageBucket]) -> list[UsageData]:
annotated = (
qs.order_by("created_at__date")
.values("created_at__date", "resource", "labels")
.annotate(count=Sum("total_count"))
)
return map_annotated_api_usage_buckets_to_usage_data(annotated) # type: ignore[arg-type]


def get_usage_data_from_local_db(
organisation: Organisation,
environment_id: int | None = None,
project_id: int | None = None,
date_start: datetime | None = None,
date_stop: datetime | None = None,
labels_filter: Labels | None = None,
) -> list[UsageData]:
if date_start is None:
date_start = timezone.now() - timedelta(days=30)
if date_stop is None:
date_stop = timezone.now()

qs = _get_api_usage_bucket_qs(
organisation,
environment_id=environment_id,
project_id=project_id,
labels_filter=labels_filter,
).filter(
created_at__date__lte=date_stop,
created_at__date__gt=date_start,
)
return _aggregate_buckets(qs)


def get_usage_data_from_local_db_for_window(
organisation: Organisation,
date_start: datetime,
date_stop: datetime,
project_id: int | None = None,
) -> list[UsageData]:
# Filters on the full timestamp (not ``created_at__date``) so that sub-day
# windows such as a single hour are honoured. The daily ``get_usage_data``
# path keeps its date-truncated filter to leave the billing endpoint
# unchanged.
qs = _get_api_usage_bucket_qs(
organisation,
project_id=project_id,
).filter(
created_at__gte=date_start,
created_at__lt=date_stop,
)
return _aggregate_buckets(qs)


return map_annotated_api_usage_buckets_to_usage_data(qs)
def get_usage_data_for_window(
organisation: Organisation,
date_start: datetime,
date_stop: datetime,
project_id: int | None = None,
) -> list[UsageData]:
"""
Return per-resource API-call counts for an explicit datetime window.

Mirrors ``get_usage_data`` but at full datetime
resolution, so callers can request a window finer than a day.
"""
if settings.USE_POSTGRES_FOR_ANALYTICS:
return get_usage_data_from_local_db_for_window(
organisation,
date_start=date_start,
date_stop=date_stop,
project_id=project_id,
)

if settings.INFLUXDB_TOKEN:
return get_usage_data_from_influxdb(
organisation_id=organisation.id,
project_id=project_id,
date_start=date_start,
date_stop=date_stop,
)

logger.warning(
"no-analytics-database-configured",
details=constants.NO_ANALYTICS_DATABASE_CONFIGURED_WARNING,
)
return []


def get_top_organisations_from_local_db(
Expand Down
6 changes: 6 additions & 0 deletions api/organisations/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
Subscription,
)
from organisations.subscriptions.constants import FREE_PLAN_ID
from organisations.usage_reporting.services import push_usage_snapshots
from users.models import FFAdminUser

from .constants import (
Expand Down Expand Up @@ -87,6 +88,11 @@ def update_organisation_subscription_information_cache_recurring() -> None:
update_organisation_subscription_information_cache()


@register_recurring_task(run_every=timedelta(hours=1))
def push_usage_to_control_plane() -> None:
push_usage_snapshots()


@register_task_handler()
def update_organisation_subscription_information_api_usage_cache() -> None:
subscription_info_cache.update_caches(SubscriptionCacheEntity.API_USAGE)
Expand Down
Empty file.
27 changes: 27 additions & 0 deletions api/organisations/usage_reporting/dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from dataclasses import dataclass
from datetime import datetime


@dataclass(frozen=True)
class ProjectUsage:
project_id: int
api_call_count: int


@dataclass(frozen=True)
class ApiCallBreakdown:
flags: int
identities: int
traits: int
environment_documents: int


@dataclass(frozen=True)
class UsageSnapshot:
timestamp: datetime
seat_count: int
api_call_total: int
api_call_breakdown: ApiCallBreakdown
project_count: int
instance_version: str
project_usage: list[ProjectUsage]
95 changes: 95 additions & 0 deletions api/organisations/usage_reporting/mappers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from datetime import datetime, timedelta

from common.core.utils import get_version
from django.db.models import QuerySet
from django.utils import timezone

from app_analytics.analytics_db_service import get_usage_data_for_window
from app_analytics.dataclasses import UsageData
from organisations.models import Organisation
from organisations.usage_reporting.dataclasses import (
ApiCallBreakdown,
ProjectUsage,
UsageSnapshot,
)
from projects.models import Project

# The Control Plane rejects payloads with more than this many project_usage rows.
MAX_PROJECT_USAGE_ROWS = 5_000


def _complete_hour_window() -> tuple[datetime, datetime]:
hour_end = timezone.now().replace(minute=0, second=0, microsecond=0)
hour_start = hour_end - timedelta(hours=1)
return hour_start, hour_end


def _total_api_calls(usage_data: list[UsageData]) -> int:
return sum(
data.flags + data.identities + data.traits + data.environment_document
for data in usage_data
)


def _aggregate(usage_data: list[UsageData]) -> tuple[int, ApiCallBreakdown]:
breakdown = ApiCallBreakdown(
flags=sum(data.flags for data in usage_data),
identities=sum(data.identities for data in usage_data),
traits=sum(data.traits for data in usage_data),
environment_documents=sum(data.environment_document for data in usage_data),
)
api_call_total = (
breakdown.flags
+ breakdown.identities
+ breakdown.traits
+ breakdown.environment_documents
)
return api_call_total, breakdown


def _project_usage(
*,
organisation: Organisation,
projects: QuerySet[Project],
hour_start: datetime,
hour_end: datetime,
) -> list[ProjectUsage]:
rows = [
ProjectUsage(
project_id=project.id,
api_call_count=_total_api_calls(
get_usage_data_for_window(
organisation,
hour_start,
hour_end,
project_id=project.id,
)
),
)
for project in projects
]
# Highest usage first
rows.sort(key=lambda row: row.api_call_count, reverse=True)
return rows[:MAX_PROJECT_USAGE_ROWS]


def map_organisation_to_usage_snapshot(organisation: Organisation) -> UsageSnapshot:
hour_start, hour_end = _complete_hour_window()
api_call_total, api_call_breakdown = _aggregate(
get_usage_data_for_window(organisation, hour_start, hour_end)
)
projects = organisation.projects.all()
return UsageSnapshot(
timestamp=hour_start,
seat_count=organisation.num_seats,
api_call_total=api_call_total,
api_call_breakdown=api_call_breakdown,
project_count=projects.count(),
instance_version=get_version(),
project_usage=_project_usage(
organisation=organisation,
projects=projects,
hour_start=hour_start,
hour_end=hour_end,
),
)
79 changes: 79 additions & 0 deletions api/organisations/usage_reporting/services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import base64
import dataclasses
import json

import requests
import structlog
from django.conf import settings
from django.core.serializers.json import DjangoJSONEncoder

from organisations.models import Organisation
from organisations.usage_reporting.dataclasses import UsageSnapshot
from organisations.usage_reporting.mappers import (
map_organisation_to_usage_snapshot,
)

logger = structlog.get_logger("usage_reporting")

USAGE_ENDPOINT_PATH = "/v1/usage"
REQUEST_TIMEOUT_SECONDS = 30


def _build_auth_token(signature: str) -> str:
return (
base64.urlsafe_b64encode(signature.encode("utf-8")).decode("ascii").rstrip("=")
)


def get_licensed_organisations() -> list[Organisation]:
if not settings.LICENSING_INSTALLED:
return []
return list( # pragma: no cover - requires the optional licensing package
Organisation.objects.filter(licence__isnull=False).select_related("licence")
)


def push_snapshot(
*,
base_url: str,
snapshot: UsageSnapshot,
signature: str,
) -> None:
url = f"{base_url.rstrip('/')}{USAGE_ENDPOINT_PATH}"
headers = {
"Authorization": f"Bearer {_build_auth_token(signature)}",
"Content-Type": "application/json",
}
body = json.dumps(dataclasses.asdict(snapshot), cls=DjangoJSONEncoder)

response = requests.post(
url,
data=body,
headers=headers,
timeout=REQUEST_TIMEOUT_SECONDS,
)

if response.ok:
logger.info("snapshot.pushed", status_code=response.status_code)
else:
logger.warning("snapshot.push_failed", status_code=response.status_code)


def push_usage_snapshots() -> None:
if not (base_url := settings.CONTROL_PLANE_URL):
logger.debug("run.skipped", reason="control_plane_url_unset")
return
if not (organisations := get_licensed_organisations()):
logger.debug("run.skipped", reason="no_licensed_organisations")
return

for organisation in organisations:
with structlog.contextvars.bound_contextvars(organisation__id=organisation.id):
try:
push_snapshot(
base_url=base_url,
snapshot=map_organisation_to_usage_snapshot(organisation),
signature=organisation.licence.signature,
)
except Exception:
logger.exception("snapshot.errored")
Loading
Loading