diff --git a/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala b/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala index 30b508c139..3abe66ef28 100644 --- a/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala +++ b/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala @@ -103,7 +103,7 @@ import code.metadata.tags.MappedTag import code.metadata.transactionimages.MappedTransactionImage import code.metadata.wheretags.MappedWhereTag import code.methodrouting.MethodRouting -import code.metrics.{ConnectorTrace, MappedConnectorMetric, MappedMetric, MetricArchive} +import code.metrics.{ConnectorTrace, MappedConnectorMetric, MappedMetric, MetricArchive, MetricsArchiveRun} import code.migration.MigrationScriptLog import code.model._ import code.model.dataAccess._ @@ -994,6 +994,7 @@ object ToSchemify extends MdcLoggable { TransactionRequestAttribute, MappedMetric, MetricArchive, + MetricsArchiveRun, MapperAccountHolders, MappedEntitlement, MappedConnectorMetric, diff --git a/obp-api/src/main/scala/code/api/util/ApiRole.scala b/obp-api/src/main/scala/code/api/util/ApiRole.scala index d09f02849a..7a61c8aab7 100644 --- a/obp-api/src/main/scala/code/api/util/ApiRole.scala +++ b/obp-api/src/main/scala/code/api/util/ApiRole.scala @@ -448,6 +448,12 @@ object ApiRole extends MdcLoggable{ case class CanGetDatabasePoolInfo(requiresBankId: Boolean = false) extends ApiRole lazy val canGetDatabasePoolInfo = CanGetDatabasePoolInfo() + case class CanGetMetricsDiagnostics(requiresBankId: Boolean = false) extends ApiRole + lazy val canGetMetricsDiagnostics = CanGetMetricsDiagnostics() + + case class CanCreateMetricsArchiveRun(requiresBankId: Boolean = false) extends ApiRole + lazy val canCreateMetricsArchiveRun = CanCreateMetricsArchiveRun() + case class CanGetConnectorHealth(requiresBankId: Boolean = false) extends ApiRole lazy val canGetConnectorHealth = CanGetConnectorHealth() diff --git a/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala b/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala index 32e85eb914..7532313015 100644 --- a/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala +++ b/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala @@ -7,7 +7,7 @@ import code.api.Constant._ import code.api.ResourceDocs1_4_0.SwaggerDefinitionsJSON._ import code.api.util.APIUtil.{EmptyBody, _} import code.api.util.{APIUtil, ApiRole, CallContext, CustomJsonFormats, Glossary, NewStyle} -import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateEntitlementAtOneBank, canCreateOrganisation, canCreateRoutingScheme, canCreateTestEmail, canDeleteEntitlementAtAnyBank, canDeleteOrganisation, canDeleteRoutingScheme, canGetAccountAccessTrace, canGetAnyOrganisation, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetConnectorHealth, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMigrations, canUpdateBankSupportedRoutingScheme, canUpdateOrganisation, canUpdateRoutingScheme, canUpdateSystemView} +import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateEntitlementAtOneBank, canCreateMetricsArchiveRun, canCreateOrganisation, canCreateRoutingScheme, canCreateTestEmail, canDeleteEntitlementAtAnyBank, canDeleteOrganisation, canDeleteRoutingScheme, canGetAccountAccessTrace, canGetAnyOrganisation, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetConnectorHealth, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMetricsDiagnostics, canGetMigrations, canUpdateBankSupportedRoutingScheme, canUpdateOrganisation, canUpdateRoutingScheme, canUpdateSystemView} import code.api.util.CommonsEmailWrapper import code.model.dataAccess.AuthUser import code.api.util.ApiTag._ @@ -3240,6 +3240,126 @@ object Http4s700 { http4sPartialFunction = Some(factoryResetSystemView) ) + // Route: GET /obp/v7.0.0/management/system/diagnostics/metrics + // + // Operator diagnostic for the metrics-archiving pipeline. Reports the + // archiving props plus row counts and the oldest/newest record in both the + // `metric` and `metricarchive` tables, then runs integrity checks that + // surface whether MetricsArchiveScheduler is keeping each table inside its + // configured retention window. Intended for use from the API Manager. + val getMetricsDiagnostics: HttpRoutes[IO] = HttpRoutes.of[IO] { + case req @ GET -> `prefixPath` / "management" / "system" / "diagnostics" / "metrics" => + EndpointHelpers.withUser(req) { (_, _) => + Future { + JSONFactory700.createMetricsAndArchiveMetricsDiagnosticsJsonV700() + } + } + } + + resourceDocs += ResourceDoc( + implementedInApiVersion, + nameOf(getMetricsDiagnostics), + "GET", + "/management/system/diagnostics/metrics", + "Get Metrics and Archive Metrics Diagnostics", + s"""Diagnostic for the metrics-archiving pipeline (`MetricsArchiveScheduler`). + | + |Returns: + | + |* `config` — the relevant props and their *effective* values after the + | scheduler's floors are applied (`retain_metrics_days` floored to 60, + | `retain_archive_metrics_days` floored to 365): `write_metrics`, + | `enable_metrics_scheduler`, `retain_metrics_scheduler_interval_in_seconds`, + | `retain_metrics_days`, `retain_archive_metrics_days`, + | `retain_metrics_move_limit`. + |* `metric` — row count and oldest/newest record (date + age in days) of + | the live `metric` table. + |* `metric_archive` — the same for the `metricarchive` table. + |* `last_run` / `last_successful_run` — the most recent (and most recent + | successful) scheduler run from the `metricsarchiverun` audit log, including + | rows moved, rows deleted, duration and success. Absent if no run has been + | recorded yet. + |* `checks` — a list of integrity checks, each with a `status` of `OK`, + | `WARNING`, or `ERROR`: + | * `write_metrics_enabled` + | * `metrics_scheduler_enabled` + | * `metric_oldest_within_retention` — flags if the oldest live metric + | is older than the retention window (move job not keeping up / stopped). + | * `archive_oldest_within_retention` — flags if the oldest archived + | metric is older than the archive retention (cleanup not keeping up / stopped). + | * `archive_recently_updated` — flags if a backlog exists but the newest + | archived record is stale (move job stopped). + | * `archive_job_last_run` — reports the outcome of the most recent run + | from the run log (errors if the last run failed; warns if none recorded). + |* `everything_as_expected` — `true` only when every check is `OK`. + | + |${userAuthenticationMessage(true)}""".stripMargin, + EmptyBody, + JSONFactory700.metricsAndArchiveMetricsDiagnosticsJsonV700Example, + List($AuthenticatedUserIsRequired, UserHasMissingRoles, UnknownError), + apiTagMetric :: apiTagSystem :: apiTagApi :: Nil, + Some(List(canGetMetricsDiagnostics)), + http4sPartialFunction = Some(getMetricsDiagnostics) + ) + + // Route: POST /obp/v7.0.0/management/system/diagnostics/metrics/run + // + // Manually trigger one metrics-archive run. This calls the exact same + // `MetricsArchiveScheduler.runOnce()` the timer uses, so it honours the same + // concurrency lock (won't start if a run is already in progress), the same + // retention props/floors, and records the run in the `metricsarchiverun` log. + // The run executes synchronously and may take a while for large backlogs + // (it moves up to `retain_metrics_move_limit` rows). + val triggerMetricsArchiveRun: HttpRoutes[IO] = HttpRoutes.of[IO] { + case req @ POST -> `prefixPath` / "management" / "system" / "diagnostics" / "metrics" / "run" => + EndpointHelpers.withUser(req) { (user, _) => + Future { + val outcome = code.scheduler.MetricsArchiveScheduler.runOnce() + logger.info(s"AUDIT triggerMetricsArchiveRun: user_id=${user.userId} outcome=${outcome.getClass.getSimpleName}") + JSONFactory700.createTriggerMetricsArchiveRunResponseJsonV700(outcome) + } + } + } + + resourceDocs += ResourceDoc( + implementedInApiVersion, + nameOf(triggerMetricsArchiveRun), + "POST", + "/management/system/diagnostics/metrics/run", + "Trigger a Metrics Archive Run", + s"""Manually run the metrics-archiving job once, on demand. + | + |This invokes the **same** code path as the scheduled + |`MetricsArchiveScheduler` run, so it respects all the same checks: + | + |* **Concurrency lock** — if an archive run is already in progress (the + | `JobScheduler` lock is held, on this or another node), no new run is + | started and the response `status` is `skipped_already_in_progress`. + |* **Retention rules** — moves `metric` rows older than the effective + | `retain_metrics_days` (floored to 60) to `metricarchive`, up to + | `retain_metrics_move_limit` rows; then deletes `metricarchive` rows + | older than the effective `retain_archive_metrics_days` (floored to 365). + |* **Run log** — the outcome is written to the `metricsarchiverun` audit + | log, exactly as a scheduled run would be. + | + |Response fields: + |* `status` — `completed` (a run executed — inspect `run.success`) or + | `skipped_already_in_progress`. + |* `message` — human-readable summary. + |* `run` — the recorded run (run id, counts, duration, success, remark); + | absent when skipped. + | + |Note: the run executes synchronously, so a large backlog may take a while. + | + |${userAuthenticationMessage(true)}""".stripMargin, + EmptyBody, + JSONFactory700.triggerMetricsArchiveRunResponseJsonV700Example, + List($AuthenticatedUserIsRequired, UserHasMissingRoles, UnknownError), + apiTagMetric :: apiTagSystem :: apiTagApi :: Nil, + Some(List(canCreateMetricsArchiveRun)), + http4sPartialFunction = Some(triggerMetricsArchiveRun) + ) + // Enabled only in Lift test mode (Props.testMode == true, i.e. -Drun.mode=test). // Props.testMode is set from the JVM system property before any props file loads, // so it is reliably available at object-initialization time unlike file-based props. diff --git a/obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala b/obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala index e0e0529ad3..dddbbf0286 100644 --- a/obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala +++ b/obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala @@ -7,11 +7,14 @@ import code.api.util.ErrorMessages.MandatoryPropertyIsNotSet import code.api.v4_0_0.{EnergySource400, HostedAt400, HostedBy400, PostSimpleCounterpartyJson400} import code.bankconnectors.Connector import code.customer.CustomerX +import code.metrics.{MappedMetric, MetricArchive, MetricsArchiveRun} import code.util.Helper.MdcLoggable import code.views.Views import com.openbankproject.commons.model.{AccountId, AccountRoutingJsonV121, AmountOfMoneyJsonV121, BankId, BankIdAccountId, CoreAccount, TransactionRequest, TransactionRequestCommonBodyJSON, User} import com.openbankproject.commons.util.ApiVersion +import java.util.Date import net.liftweb.common.Full +import net.liftweb.mapper.{Ascending, By, By_<=, Descending, MaxRows, OrderBy} import scala.concurrent.{ExecutionContext, Future} @@ -1064,4 +1067,348 @@ object JSONFactory700 extends MdcLoggable with code.api.util.CustomJsonFormats { lazy val validationEmailResponseJsonV700Example = ValidationEmailResponseJsonV700( message = "If an unvalidated account exists for this username and email, a validation email has been sent." ) + + // ── Metrics & Archive Metrics diagnostics ────────────────────────────────── + // + // Reports the metrics-archiving configuration plus row counts and the + // oldest/newest record in both the `metric` and `metricarchive` tables, and + // runs a set of integrity checks that flag whether MetricsArchiveScheduler is + // actually keeping the tables within their configured retention windows. + + case class MetricsTableStatsJsonV700( + table_name: String, + count: Long, + oldest_record_date: Option[Date], + newest_record_date: Option[Date], + oldest_record_age_days: Option[Long], + newest_record_age_days: Option[Long] + ) + + case class MetricsArchiveConfigJsonV700( + write_metrics: Boolean, + enable_metrics_scheduler: Boolean, + retain_metrics_scheduler_interval_in_seconds: Int, + retain_metrics_days: Long, + retain_metrics_days_effective: Long, + retain_archive_metrics_days: Long, + retain_archive_metrics_days_effective: Long, + retain_metrics_move_limit: Int + ) + + // status is one of "OK", "WARNING", "ERROR". + case class MetricsIntegrityCheckJsonV700( + name: String, + status: String, + message: String + ) + + // One row of the metricsarchiverun audit log (a completed scheduler run). + case class MetricsArchiveRunJsonV700( + run_id: String, + api_instance_id: String, + started_at: Date, + ended_at: Date, + duration_ms: Long, + rows_moved_to_archive: Int, + rows_deleted_from_archive: Int, + success: Boolean, + remark: String + ) + + case class MetricsAndArchiveMetricsDiagnosticsJsonV700( + config: MetricsArchiveConfigJsonV700, + metric: MetricsTableStatsJsonV700, + metric_archive: MetricsTableStatsJsonV700, + last_run: Option[MetricsArchiveRunJsonV700], + last_successful_run: Option[MetricsArchiveRunJsonV700], + checks: List[MetricsIntegrityCheckJsonV700], + everything_as_expected: Boolean + ) + + private def metricsArchiveRunToJson(r: MetricsArchiveRun): MetricsArchiveRunJsonV700 = + MetricsArchiveRunJsonV700( + run_id = r.RunId.get, + api_instance_id = r.ApiInstanceId.get, + started_at = r.StartedAt.get, + ended_at = r.EndedAt.get, + duration_ms = r.DurationMs.get, + rows_moved_to_archive = r.RowsMovedToArchive.get, + rows_deleted_from_archive = r.RowsDeletedFromArchive.get, + success = r.Success.get, + remark = r.Remark.get + ) + + // Result of manually triggering an archive run. `status` is one of + // "completed" (a run executed — inspect `run.success`) or + // "skipped_already_in_progress" (a run was already running, so none was started). + case class TriggerMetricsArchiveRunResponseJsonV700( + status: String, + message: String, + run: Option[MetricsArchiveRunJsonV700] + ) + + def createTriggerMetricsArchiveRunResponseJsonV700(outcome: code.scheduler.RunOutcome): TriggerMetricsArchiveRunResponseJsonV700 = + outcome match { + case code.scheduler.RunCompleted(r) => + val msg = + if (r.Success.get) + s"Archive run completed: moved ${r.RowsMovedToArchive.get} rows to the archive, deleted ${r.RowsDeletedFromArchive.get} outdated archive rows." + else + s"Archive run completed with errors: ${r.Remark.get}" + TriggerMetricsArchiveRunResponseJsonV700("completed", msg, Some(metricsArchiveRunToJson(r))) + case code.scheduler.RunSkippedAlreadyInProgress => + TriggerMetricsArchiveRunResponseJsonV700( + "skipped_already_in_progress", + "An archive run is already in progress; no new run was started.", + None) + } + + lazy val triggerMetricsArchiveRunResponseJsonV700Example = TriggerMetricsArchiveRunResponseJsonV700( + status = "completed", + message = "Archive run completed: moved 4000 rows to the archive, deleted 1500 outdated archive rows.", + run = Some(MetricsArchiveRunJsonV700( + run_id = "9f3c2b1a-7d4e-4c8a-9b2f-1e6d5a0c4b7e", + api_instance_id = "obp", + started_at = new Date(1717200000000L), + ended_at = new Date(1717200012000L), + duration_ms = 12000L, + rows_moved_to_archive = 4000, + rows_deleted_from_archive = 1500, + success = true, + remark = "" + )) + ) + + private val metricsOneDayInMillis: Long = 86400000L + private def metricsAgeInDays(d: Date, now: Date): Long = + (now.getTime - d.getTime) / metricsOneDayInMillis + + /** + * Inspect the `metric` and `metricarchive` tables together with the archiving + * props and report whether the MetricsArchiveScheduler is behaving as + * configured. The effective retention values mirror the floors applied in + * `code.scheduler.MetricsArchiveScheduler` (retain_metrics_days floored to 60, + * retain_archive_metrics_days floored to 365). + * + * Note: this issues blocking Mapper queries (count + a single-row ORDER BY on + * the indexed `date` column) — call it from a Future. + */ + def createMetricsAndArchiveMetricsDiagnosticsJsonV700(): MetricsAndArchiveMetricsDiagnosticsJsonV700 = { + val now = new Date() + + val writeMetrics = APIUtil.getPropsAsBoolValue("write_metrics", false) + val schedulerEnabled = APIUtil.getPropsAsBoolValue("enable_metrics_scheduler", true) + val schedulerIntervalSeconds = APIUtil.getPropsAsIntValue("retain_metrics_scheduler_interval_in_seconds", 3600) + val retainMetricsDays = APIUtil.getPropsAsLongValue("retain_metrics_days", 367) + val retainMetricsDaysEffective = if (retainMetricsDays > 59) retainMetricsDays else 60L + val retainArchiveMetricsDays = APIUtil.getPropsAsLongValue("retain_archive_metrics_days", 365L * 3) + val retainArchiveMetricsDaysEffective = if (retainArchiveMetricsDays > 364) retainArchiveMetricsDays else 365L + val moveLimit = APIUtil.getPropsAsIntValue("retain_metrics_move_limit", 50000) + + val config = MetricsArchiveConfigJsonV700( + write_metrics = writeMetrics, + enable_metrics_scheduler = schedulerEnabled, + retain_metrics_scheduler_interval_in_seconds = schedulerIntervalSeconds, + retain_metrics_days = retainMetricsDays, + retain_metrics_days_effective = retainMetricsDaysEffective, + retain_archive_metrics_days = retainArchiveMetricsDays, + retain_archive_metrics_days_effective = retainArchiveMetricsDaysEffective, + retain_metrics_move_limit = moveLimit + ) + + def statsFor(tableName: String, count: Long, oldest: Option[Date], newest: Option[Date]) = + MetricsTableStatsJsonV700( + table_name = tableName, + count = count, + oldest_record_date = oldest, + newest_record_date = newest, + oldest_record_age_days = oldest.map(metricsAgeInDays(_, now)), + newest_record_age_days = newest.map(metricsAgeInDays(_, now)) + ) + + val metricOldest = MappedMetric.findAll(OrderBy(MappedMetric.date, Ascending), MaxRows(1)).headOption.map(_.getDate()) + val metricNewest = MappedMetric.findAll(OrderBy(MappedMetric.date, Descending), MaxRows(1)).headOption.map(_.getDate()) + val metricStats = statsFor("metric", MappedMetric.count, metricOldest, metricNewest) + + val archiveOldest = MetricArchive.findAll(OrderBy(MetricArchive.date, Ascending), MaxRows(1)).headOption.map(_.getDate()) + val archiveNewest = MetricArchive.findAll(OrderBy(MetricArchive.date, Descending), MaxRows(1)).headOption.map(_.getDate()) + val archiveStats = statsFor("metricarchive", MetricArchive.count, archiveOldest, archiveNewest) + + val graceDays = 7L + val checks = scala.collection.mutable.ListBuffer[MetricsIntegrityCheckJsonV700]() + + checks += (if (writeMetrics) + MetricsIntegrityCheckJsonV700("write_metrics_enabled", "OK", + "write_metrics=true: API calls are being recorded into the metric table.") + else + MetricsIntegrityCheckJsonV700("write_metrics_enabled", "WARNING", + "write_metrics=false: no new API metrics are being written, so the metric table count will not grow.")) + + checks += (if (schedulerEnabled) + MetricsIntegrityCheckJsonV700("metrics_scheduler_enabled", "OK", + "enable_metrics_scheduler=true: the archive/cleanup scheduler is active.") + else + MetricsIntegrityCheckJsonV700("metrics_scheduler_enabled", "ERROR", + "enable_metrics_scheduler=false: old metrics are never moved to the archive nor deleted; the metric table will grow without bound.")) + + metricOldest match { + case Some(d) => + val age = metricsAgeInDays(d, now) + if (age <= retainMetricsDaysEffective + graceDays) + checks += MetricsIntegrityCheckJsonV700("metric_oldest_within_retention", "OK", + s"Oldest metric is $age days old, within the effective retention of $retainMetricsDaysEffective days (+${graceDays}d grace).") + else + checks += MetricsIntegrityCheckJsonV700("metric_oldest_within_retention", "ERROR", + s"Oldest metric is $age days old but the effective retention is $retainMetricsDaysEffective days. Records older than this should have been moved to the archive — the archive move job is not keeping up or has stopped.") + case None => + checks += MetricsIntegrityCheckJsonV700("metric_oldest_within_retention", "OK", "The metric table is empty.") + } + + // Old metric rows with an empty correlation id can't be archived (the archive + // requires a UUID) and are excluded from the move job, so they sit in the metric + // table indefinitely. Surface their count so this is visible rather than looking + // like a stalled job in the oldest-within-retention check above. + val unarchivableOldMetricCount = MappedMetric.count( + By_<=(MappedMetric.date, new Date(now.getTime - retainMetricsDaysEffective * metricsOneDayInMillis)), + By(MappedMetric.correlationId, "") + ) + if (unarchivableOldMetricCount == 0) + checks += MetricsIntegrityCheckJsonV700("metric_unarchivable_rows", "OK", + "No metric rows older than the retention window are blocked from archiving.") + else + checks += MetricsIntegrityCheckJsonV700("metric_unarchivable_rows", "WARNING", + s"$unarchivableOldMetricCount metric row(s) older than the retention window have an empty correlation id and cannot be archived (the archive requires a UUID). They remain in the metric table and are excluded from the move job — typically legacy rows that predate correlation ids.") + + archiveOldest match { + case Some(d) => + val age = metricsAgeInDays(d, now) + if (age <= retainArchiveMetricsDaysEffective + graceDays) + checks += MetricsIntegrityCheckJsonV700("archive_oldest_within_retention", "OK", + s"Oldest archived metric is $age days old, within the effective archive retention of $retainArchiveMetricsDaysEffective days (+${graceDays}d grace).") + else + checks += MetricsIntegrityCheckJsonV700("archive_oldest_within_retention", "ERROR", + s"Oldest archived metric is $age days old but the effective archive retention is $retainArchiveMetricsDaysEffective days. Records older than this should have been deleted — the archive cleanup job is not keeping up or has stopped.") + case None => + checks += MetricsIntegrityCheckJsonV700("archive_oldest_within_retention", "OK", "The metricarchive table is empty.") + } + + // If a backlog of metrics older than the retention window exists, the move + // job must be running, so the newest archived record should itself be + // roughly retain_metrics_days old. A much older newest-archive value means + // the move job stopped. + (metricOldest, archiveNewest) match { + case (Some(mo), Some(an)) if metricsAgeInDays(mo, now) > retainMetricsDaysEffective + graceDays => + val newestArchiveAge = metricsAgeInDays(an, now) + if (newestArchiveAge <= retainMetricsDaysEffective + graceDays) + checks += MetricsIntegrityCheckJsonV700("archive_recently_updated", "OK", + s"Newest archived metric is $newestArchiveAge days old, consistent with an active move job.") + else + checks += MetricsIntegrityCheckJsonV700("archive_recently_updated", "ERROR", + s"There are metric rows older than the retention window, yet the newest archived record is $newestArchiveAge days old. The move job appears to have stopped roughly ${newestArchiveAge - retainMetricsDaysEffective} days ago.") + case _ => + checks += MetricsIntegrityCheckJsonV700("archive_recently_updated", "OK", + "No backlog of metrics older than the retention window — nothing to move right now.") + } + + // Run-log derived check: the durable record of scheduler runs (metricsarchiverun). + val lastRun = MetricsArchiveRun.lastRun + val lastSuccessfulRun = MetricsArchiveRun.lastSuccessfulRun + lastRun match { + case Some(r) if r.Success.get => + val ageDays = metricsAgeInDays(r.StartedAt.get, now) + checks += MetricsIntegrityCheckJsonV700("archive_job_last_run", "OK", + s"Last archive run succeeded $ageDays days ago (moved ${r.RowsMovedToArchive.get} rows, deleted ${r.RowsDeletedFromArchive.get} outdated archive rows).") + case Some(r) => + val ageDays = metricsAgeInDays(r.StartedAt.get, now) + val lastOkNote = lastSuccessfulRun + .map(s => s" Last successful run was ${metricsAgeInDays(s.StartedAt.get, now)} days ago.") + .getOrElse(" No successful run has ever been recorded.") + checks += MetricsIntegrityCheckJsonV700("archive_job_last_run", "ERROR", + s"The most recent archive run ($ageDays days ago) failed: ${r.Remark.get}.$lastOkNote") + case None if schedulerEnabled => + checks += MetricsIntegrityCheckJsonV700("archive_job_last_run", "WARNING", + "No archive run has been recorded yet. The scheduler is enabled but may not have completed its first run since this table was introduced.") + case None => + checks += MetricsIntegrityCheckJsonV700("archive_job_last_run", "OK", + "No archive run recorded — the scheduler is disabled, so this is expected.") + } + + MetricsAndArchiveMetricsDiagnosticsJsonV700( + config = config, + metric = metricStats, + metric_archive = archiveStats, + last_run = lastRun.map(metricsArchiveRunToJson), + last_successful_run = lastSuccessfulRun.map(metricsArchiveRunToJson), + checks = checks.toList, + everything_as_expected = checks.forall(_.status == "OK") + ) + } + + lazy val metricsAndArchiveMetricsDiagnosticsJsonV700Example = MetricsAndArchiveMetricsDiagnosticsJsonV700( + config = MetricsArchiveConfigJsonV700( + write_metrics = true, + enable_metrics_scheduler = true, + retain_metrics_scheduler_interval_in_seconds = 3600, + retain_metrics_days = 90, + retain_metrics_days_effective = 90, + retain_archive_metrics_days = 730, + retain_archive_metrics_days_effective = 730, + retain_metrics_move_limit = 4000 + ), + metric = MetricsTableStatsJsonV700( + table_name = "metric", + count = 1240000L, + oldest_record_date = Some(new Date(1709251200000L)), + newest_record_date = Some(new Date(1717200000000L)), + oldest_record_age_days = Some(85L), + newest_record_age_days = Some(0L) + ), + metric_archive = MetricsTableStatsJsonV700( + table_name = "metricarchive", + count = 9800000L, + oldest_record_date = Some(new Date(1654041600000L)), + newest_record_date = Some(new Date(1701907200000L)), + oldest_record_age_days = Some(700L), + newest_record_age_days = Some(92L) + ), + last_run = Some(MetricsArchiveRunJsonV700( + run_id = "9f3c2b1a-7d4e-4c8a-9b2f-1e6d5a0c4b7e", + api_instance_id = "obp", + started_at = new Date(1717200000000L), + ended_at = new Date(1717200012000L), + duration_ms = 12000L, + rows_moved_to_archive = 4000, + rows_deleted_from_archive = 1500, + success = true, + remark = "" + )), + last_successful_run = Some(MetricsArchiveRunJsonV700( + run_id = "9f3c2b1a-7d4e-4c8a-9b2f-1e6d5a0c4b7e", + api_instance_id = "obp", + started_at = new Date(1717200000000L), + ended_at = new Date(1717200012000L), + duration_ms = 12000L, + rows_moved_to_archive = 4000, + rows_deleted_from_archive = 1500, + success = true, + remark = "" + )), + checks = List( + MetricsIntegrityCheckJsonV700("write_metrics_enabled", "OK", + "write_metrics=true: API calls are being recorded into the metric table."), + MetricsIntegrityCheckJsonV700("metrics_scheduler_enabled", "OK", + "enable_metrics_scheduler=true: the archive/cleanup scheduler is active."), + MetricsIntegrityCheckJsonV700("metric_oldest_within_retention", "OK", + "Oldest metric is 85 days old, within the effective retention of 90 days (+7d grace)."), + MetricsIntegrityCheckJsonV700("metric_unarchivable_rows", "OK", + "No metric rows older than the retention window are blocked from archiving."), + MetricsIntegrityCheckJsonV700("archive_oldest_within_retention", "OK", + "Oldest archived metric is 700 days old, within the effective archive retention of 730 days (+7d grace)."), + MetricsIntegrityCheckJsonV700("archive_recently_updated", "OK", + "Newest archived metric is 92 days old, consistent with an active move job."), + MetricsIntegrityCheckJsonV700("archive_job_last_run", "OK", + "Last archive run succeeded 0 days ago (moved 4000 rows, deleted 1500 outdated archive rows).") + ), + everything_as_expected = true + ) } diff --git a/obp-api/src/main/scala/code/metrics/APIMetrics.scala b/obp-api/src/main/scala/code/metrics/APIMetrics.scala index 3376aa161d..f252af2892 100644 --- a/obp-api/src/main/scala/code/metrics/APIMetrics.scala +++ b/obp-api/src/main/scala/code/metrics/APIMetrics.scala @@ -77,7 +77,7 @@ trait APIMetrics { targetIp: String, apiInstanceId: String, consentReferenceId: String - ): Unit + ): Boolean // //TODO: ordering of list? should this be by date? currently not enforced // def getAllGroupedByUrl() : Map[String, List[APIMetric]] diff --git a/obp-api/src/main/scala/code/metrics/ElasticsearchMetrics.scala b/obp-api/src/main/scala/code/metrics/ElasticsearchMetrics.scala index daf0d95163..db8e71c184 100644 --- a/obp-api/src/main/scala/code/metrics/ElasticsearchMetrics.scala +++ b/obp-api/src/main/scala/code/metrics/ElasticsearchMetrics.scala @@ -25,7 +25,7 @@ object ElasticsearchMetrics extends APIMetrics { sourceIp: String, targetIp: String, apiInstanceId: String, - consentReferenceId: String): Unit = ??? + consentReferenceId: String): Boolean = ??? // override def getAllGroupedByUserId(): Map[String, List[APIMetric]] = { // //TODO: replace the following with valid ES query diff --git a/obp-api/src/main/scala/code/metrics/MappedMetrics.scala b/obp-api/src/main/scala/code/metrics/MappedMetrics.scala index f52986de21..9fe6bac89c 100644 --- a/obp-api/src/main/scala/code/metrics/MappedMetrics.scala +++ b/obp-api/src/main/scala/code/metrics/MappedMetrics.scala @@ -114,6 +114,12 @@ object MappedMetrics extends APIMetrics with MdcLoggable{ override def saveMetric(userId: String, url: String, date: Date, duration: Long, userName: String, appName: String, developerEmail: String, consumerId: String, implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String, responseBody: String, sourceIp: String, targetIp: String, apiInstanceId: String, consentReferenceId: String): Unit = { + // A correlation id is expected on every metric. Rows without one cannot be moved + // to the archive later (its correlationId column requires a UUID), so flag it at + // write time where the source of the missing id can actually be traced. + if (correlationId == null || correlationId.trim.isEmpty) { + logger.warn(s"saveMetric: writing a Metric row with an empty correlation id (url=$url, verb=$verb, consumerId=$consumerId, implementedInVersion=$implementedInVersion). This row will not be archivable.") + } MetricBatchWriter.enqueue( MetricBatchWriter.MetricRow( userId = userId, @@ -143,8 +149,13 @@ object MappedMetrics extends APIMetrics with MdcLoggable{ implementedByPartialFunction: String, implementedInVersion: String, verb: String, httpCode: Option[Int], correlationId: String, responseBody: String, sourceIp: String, targetIp: String, - apiInstanceId: String, consentReferenceId: String): Unit = { - val metric = MetricArchive.find(By(MetricArchive.id, primaryKey)).getOrElse(MetricArchive.create) + apiInstanceId: String, consentReferenceId: String): Boolean = { + // Fix: dedup by the source metric's primary key stored in `metricId`, NOT by the + // archive's own auto-increment `id`. The two are unrelated id-spaces; matching on + // `id` overwrites an unrelated archived row once the archive's id sequence grows + // into the live metric id range. + val metric = MetricArchive.find(By(MetricArchive.metricId, primaryKey)).getOrElse(MetricArchive.create) + metric .metricId(primaryKey) .userId(userId) @@ -169,7 +180,14 @@ object MappedMetrics extends APIMetrics with MdcLoggable{ case Some(code) => metric.httpCode(code) case None => } - metric.save + // Fix: Lift's .save returns false (it does NOT throw) on a failed insert. + // Returning that result lets the caller skip the source-row delete and mark + // the run as failed instead of silently stalling. + val saved = metric.save + if (!saved) { + logger.error(s"saveMetricsArchive: failed to persist MetricArchive row for metricId=$primaryKey (url=$url, date=$date)") + } + saved } private def trueOrFalse(condition: Boolean): String = if (condition) s"1=1" else s"0=1" diff --git a/obp-api/src/main/scala/code/metrics/MetricsArchiveRun.scala b/obp-api/src/main/scala/code/metrics/MetricsArchiveRun.scala new file mode 100644 index 0000000000..9dcde6eeea --- /dev/null +++ b/obp-api/src/main/scala/code/metrics/MetricsArchiveRun.scala @@ -0,0 +1,93 @@ +package code.metrics + +import java.util.Date + +import code.util.MappedUUID +import net.liftweb.mapper._ + +/** + * Append-only audit log of `MetricsArchiveScheduler` runs. + * + * One row is written at the end of every scheduler tick that actually did work + * (i.e. was not skipped because a previous run was still in progress). It records + * how many rows were moved `metric` -> `metricarchive`, how many outdated archive + * rows were deleted, the wall-clock duration, and whether the run succeeded. + * + * This is durable history. Contrast with the `jobscheduler` table, which holds + * only a transient lock row that exists during a run and is deleted on completion. + * + * The log is self-capped: only the most recent [[MetricsArchiveRun.maxRowsToKeep]] + * rows are retained. Each write prunes anything older, so the table stays small. + * + * Naming: this is a DB entity, so the class must not start with `Mapped` and the + * column objects must not start with `m` + uppercase (see `MappedClassNameTest`). + */ +class MetricsArchiveRun extends LongKeyedMapper[MetricsArchiveRun] with IdPK { + + def getSingleton = MetricsArchiveRun + + object RunId extends MappedUUID(this) + object ApiInstanceId extends MappedString(this, 100) + object StartedAt extends MappedDateTime(this) + object EndedAt extends MappedDateTime(this) + object DurationMs extends MappedLong(this) + object RowsMovedToArchive extends MappedInt(this) + object RowsDeletedFromArchive extends MappedInt(this) + object Success extends MappedBoolean(this) + object Remark extends MappedText(this) +} + +object MetricsArchiveRun extends MetricsArchiveRun with LongKeyedMetaMapper[MetricsArchiveRun] { + + override def dbIndexes: List[BaseIndex[MetricsArchiveRun]] = + UniqueIndex(RunId) :: Index(StartedAt) :: super.dbIndexes + + /** Keep only the most recent N runs; older rows are pruned on every write. */ + val maxRowsToKeep: Int = 1000 + + /** + * Persist one completed run, then prune the log back to the most recent + * [[maxRowsToKeep]] rows. The scheduler's own retention applies to `metric` / + * `metricarchive`; this cap keeps the run log itself bounded. + */ + def recordRun(runId: String, + apiInstanceId: String, + startedAt: Date, + endedAt: Date, + rowsMovedToArchive: Int, + rowsDeletedFromArchive: Int, + success: Boolean, + remark: Option[String]): MetricsArchiveRun = { + val saved = MetricsArchiveRun.create + .RunId(runId) + .ApiInstanceId(apiInstanceId) + .StartedAt(startedAt) + .EndedAt(endedAt) + .DurationMs(endedAt.getTime - startedAt.getTime) + .RowsMovedToArchive(rowsMovedToArchive) + .RowsDeletedFromArchive(rowsDeletedFromArchive) + .Success(success) + .Remark(remark.getOrElse("")) + .saveMe() + pruneToMostRecent(maxRowsToKeep) + saved + } + + /** + * Delete all but the most recent `keep` rows (by primary key, which is + * monotonic). No-op when the table holds `keep` or fewer rows. + */ + def pruneToMostRecent(keep: Int): Unit = + MetricsArchiveRun + .findAll(OrderBy(id, Descending), MaxRows(keep)) + .lastOption + .foreach(oldestToKeep => MetricsArchiveRun.bulkDelete_!!(By_<(id, oldestToKeep.id.get))) + + /** Most recent run by start time, if any. */ + def lastRun: Option[MetricsArchiveRun] = + MetricsArchiveRun.findAll(OrderBy(StartedAt, Descending), MaxRows(1)).headOption + + /** Most recent successful run by start time, if any. */ + def lastSuccessfulRun: Option[MetricsArchiveRun] = + MetricsArchiveRun.findAll(By(Success, true), OrderBy(StartedAt, Descending), MaxRows(1)).headOption +} diff --git a/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala b/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala index f25c3847fb..754fe61f90 100644 --- a/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala +++ b/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala @@ -5,15 +5,35 @@ import java.util.{Calendar, Date} import code.actorsystem.ObpActorSystem import code.api.Constant import code.api.util.APIUtil.generateUUID -import code.api.util.{APIUtil, OBPLimit, OBPToDate} -import code.metrics.{APIMetric, APIMetrics, MappedMetric, MetricArchive} +import code.api.util.APIUtil +import code.metrics.{APIMetric, APIMetrics, MappedMetric, MetricArchive, MetricsArchiveRun} import code.util.Helper.MdcLoggable import net.liftweb.common.Full -import net.liftweb.mapper.{By, By_<=, By_>=} +import net.liftweb.mapper.{Ascending, By, By_<=, By_>=, MaxRows, NotBy, OrderBy} import scala.concurrent.duration._ +/** + * Result of moving outdated rows from `Metric` to `MetricArchive`. + * @param moved rows copied to the archive and deleted from `Metric` + * @param failed rows whose archive copy genuinely failed (a real problem) + * + * Rows that have no correlation id are not counted here at all — they are + * excluded by the candidate query (see `conditionalDeleteMetricsRow`) so the job + * never repeatedly scans them. Their current count is reported by the + * `/diagnostics/metrics` endpoint instead. + */ +case class ArchiveMoveResult(moved: Int, failed: Int) + +/** Outcome of a single [[MetricsArchiveScheduler.runOnce]] invocation. */ +sealed trait RunOutcome +/** A run executed and was recorded (inspect `run.Success` for whether it errored). */ +case class RunCompleted(run: MetricsArchiveRun) extends RunOutcome +/** No run started because one was already in progress (JobScheduler lock present). */ +case object RunSkippedAlreadyInProgress extends RunOutcome + + object MetricsArchiveScheduler extends MdcLoggable { private lazy val actorSystem = ObpActorSystem.localActorSystem @@ -43,30 +63,69 @@ object MetricsArchiveScheduler extends MdcLoggable { initialDelay = Duration(intervalInSeconds, TimeUnit.SECONDS), interval = Duration(intervalInSeconds, TimeUnit.SECONDS), runnable = new Runnable { - def run(): Unit = { - JobScheduler.find(By(JobScheduler.Name, jobName)) match { - case Full(job) => // There is an ongoing/hanging job - logger.info(s"Cannot start MetricsArchiveScheduler.start.run due to ongoing job. Job ID: ${job.JobId}") - case _ => // Start a new job - val uniqueId = generateUUID() - val job = JobScheduler.create - .JobId(uniqueId) - .Name(jobName) - .ApiInstanceId(apiInstanceId) - .saveMe() - logger.info(s"Starting Job ID: $uniqueId") - conditionalDeleteMetricsRow() - deleteOutdatedRowsFromMetricsArchive() - JobScheduler.delete_!(job) // Allow future jobs - logger.info(s"End of Job ID: $uniqueId") - } - } + def run(): Unit = { runOnce(); () } } ) logger.info("Bye from MetricsArchiveScheduler.start") } - def deleteOutdatedRowsFromMetricsArchive() = { + /** + * Perform a single metrics-archive run: move outdated `Metric` rows to + * `MetricArchive`, delete outdated `MetricArchive` rows, and record the outcome + * in the `metricsarchiverun` log. Honours the same concurrency guard as the + * scheduler — if a `JobScheduler` lock for this job is already present (a run is + * in progress on this or another node) it returns [[RunSkippedAlreadyInProgress]] + * without doing anything. + * + * This is the single source of truth for "do an archive run", called both by + * the timer (every tick) and by the manual trigger endpoint, so a manual run + * respects exactly the same checks and retention rules as a scheduled one. + */ + def runOnce(): RunOutcome = { + JobScheduler.find(By(JobScheduler.Name, jobName)) match { + case Full(job) => // There is an ongoing/hanging job + logger.info(s"MetricsArchiveScheduler.runOnce skipped due to ongoing job. Job ID: ${job.JobId}") + RunSkippedAlreadyInProgress + case _ => // Start a new job + val uniqueId = generateUUID() + val job = JobScheduler.create + .JobId(uniqueId) + .Name(jobName) + .ApiInstanceId(apiInstanceId) + .saveMe() + logger.info(s"Starting Job ID: $uniqueId") + val startedAt = new Date() + var rowsMoved = 0 + var rowsDeleted = 0 + try { + val moveResult = conditionalDeleteMetricsRow() + rowsMoved = moveResult.moved + rowsDeleted = deleteOutdatedRowsFromMetricsArchive() + // A genuine copy failure marks the run NOT successful, with a remark, so the + // run log and the diagnostics endpoint surface it instead of silently + // leaving rows behind. + val runSucceeded = moveResult.failed == 0 + val remark = + if (moveResult.failed > 0) Some(s"${moveResult.failed} metric row(s) failed to copy to the archive and were left in place.") + else None + val run = MetricsArchiveRun.recordRun(uniqueId, apiInstanceId, startedAt, new Date(), + rowsMoved, rowsDeleted, success = runSucceeded, remark) + RunCompleted(run) + } catch { + case e: Exception => + logger.error(s"MetricsArchiveScheduler Job ID: $uniqueId failed", e) + val run = MetricsArchiveRun.recordRun(uniqueId, apiInstanceId, startedAt, new Date(), + rowsMoved, rowsDeleted, success = false, Some(Option(e.getMessage).getOrElse(e.toString))) + RunCompleted(run) + } finally { + JobScheduler.delete_!(job) // Allow future jobs + logger.info(s"End of Job ID: $uniqueId (rows moved to archive: $rowsMoved, outdated archive rows deleted: $rowsDeleted)") + } + } + } + + // Returns the number of outdated rows deleted from the "MetricArchive" table. + def deleteOutdatedRowsFromMetricsArchive(): Int = { logger.info("Hello from MetricsArchiveScheduler.deleteOutdatedRowsFromMetricsArchive") val currentTime = new Date() val defaultValue : Int = 365 * 3 @@ -75,12 +134,17 @@ object MetricsArchiveScheduler extends MdcLoggable { case _ => 365 } val someYearsAgo: Date = new Date(currentTime.getTime - (oneDayInMillis * days)) - // Delete the outdated rows from the table "MetricsArchive" + // Count before deleting so the run log records how many rows were removed. + val outdatedCount = MetricArchive.count(By_<=(MetricArchive.date, someYearsAgo)).toInt + // Delete the outdated rows from the table "MetricArchive" MetricArchive.bulkDelete_!!(By_<=(MetricArchive.date, someYearsAgo)) - logger.info("Bye from MetricsArchiveScheduler.deleteOutdatedRowsFromMetricsArchive") + logger.info(s"Bye from MetricsArchiveScheduler.deleteOutdatedRowsFromMetricsArchive (deleted $outdatedCount rows)") + outdatedCount } - def conditionalDeleteMetricsRow() = { + // Move "Metric" rows older than retain_metrics_days into "MetricArchive", oldest + // first, deleting each source row only after its archive copy is verified. + def conditionalDeleteMetricsRow(): ArchiveMoveResult = { logger.info("Hello from MetricsArchiveScheduler.conditionalDeleteMetricsRow") val currentTime = new Date() val days = APIUtil.getPropsAsLongValue("retain_metrics_days", 367) match { @@ -89,32 +153,45 @@ object MetricsArchiveScheduler extends MdcLoggable { } val someDaysAgo: Date = new Date(currentTime.getTime - (oneDayInMillis * days)) val limit = APIUtil.getPropsAsIntValue("retain_metrics_move_limit", 50000) - // Get the data from the table "Metric" older than specified by retain_metrics_days - logger.info("MetricsArchiveScheduler.conditionalDeleteMetricsRow says before candidateMetricRowsToMove val") - val candidateMetricRowsToMove = APIMetrics.apiMetrics.vend.getAllMetrics(List(OBPToDate(someDaysAgo), OBPLimit(limit))) - logger.info("MetricsArchiveScheduler.conditionalDeleteMetricsRow says after candidateMetricRowsToMove val") - logger.info(s"Number of rows: ${candidateMetricRowsToMove.length}") - candidateMetricRowsToMove map { i => - // and copy it to the table "MetricArchive" - copyRowToMetricsArchive(i) - } - logger.info("MetricsArchiveScheduler.conditionalDeleteMetricsRow says after coping all rows") - logger.info("MetricsArchiveScheduler.conditionalDeleteMetricsRow says before maybeDeletedRows val") - val maybeDeletedRows: List[(Boolean, Long)] = candidateMetricRowsToMove map { i => - // and delete it after successful coping - MetricArchive.find(By(MetricArchive.metricId, i.getMetricId())) match { - case Full(_) => (MappedMetric.bulkDelete_!!(By(MappedMetric.id, i.getMetricId())), i.getMetricId()) - case _ => (false, i.getMetricId()) + // Query the live Metric table directly — oldest first, up to `limit` rows. + // We deliberately bypass APIMetrics.getAllMetrics here: that path is wrapped in + // a Redis cache (stable TTL for past-dated queries), and a "which rows should I + // move and delete" query must never be served from a stale snapshot. + // + // We also exclude rows with an empty correlation id: the archive's correlationId + // column requires a UUID, so those rows can't be archived. Filtering them in the + // query (rather than skipping them in the loop) means the job never repeatedly + // re-scans the same un-archivable rows — which, being the oldest, would otherwise + // permanently occupy the oldest-first candidate window. Their count is surfaced + // by the /diagnostics/metrics endpoint instead. + val candidateMetricRowsToMove: List[MappedMetric] = MappedMetric.findAll( + By_<=(MappedMetric.date, someDaysAgo), + NotBy(MappedMetric.correlationId, ""), + OrderBy(MappedMetric.date, Ascending), + MaxRows(limit) + ) + logger.info(s"MetricsArchiveScheduler.conditionalDeleteMetricsRow: ${candidateMetricRowsToMove.length} candidate rows to move") + + var moved = 0 + var failed = 0 + candidateMetricRowsToMove.foreach { i => + // Copy first, then delete the source row only if the archive copy both saved + // and is verifiably readable back by metricId. + val copied = copyRowToMetricsArchive(i) + if (copied && MetricArchive.find(By(MetricArchive.metricId, i.getMetricId())).isDefined) { + MappedMetric.bulkDelete_!!(By(MappedMetric.id, i.getMetricId())) + moved += 1 + } else { + failed += 1 + logger.warn(s"Row with primary key ${i.getMetricId()} of the table Metric was not successfully copied to the archive; leaving it in place.") } } - logger.info("MetricsArchiveScheduler.conditionalDeleteMetricsRow says after maybeDeletedRows val") - maybeDeletedRows.filter(_._1 == false).map { i => - logger.warn(s"Row with primary key ${i._2} of the table Metric is not successfully copied.") - } - logger.info("Bye from MetricsArchiveScheduler.conditionalDeleteMetricsRow") + logger.info(s"Bye from MetricsArchiveScheduler.conditionalDeleteMetricsRow (moved $moved, failed $failed)") + ArchiveMoveResult(moved, failed) } - private def copyRowToMetricsArchive(i: APIMetric): Unit = { + // Returns true when the archive copy was persisted, false otherwise. + private def copyRowToMetricsArchive(i: APIMetric): Boolean = { APIMetrics.apiMetrics.vend.saveMetricsArchive( i.getMetricId(), i.getUserId(), diff --git a/obp-api/src/test/scala/code/api/v7_0_0/Http4s700RoutesTest.scala b/obp-api/src/test/scala/code/api/v7_0_0/Http4s700RoutesTest.scala index b9047631eb..8756b26b99 100644 --- a/obp-api/src/test/scala/code/api/v7_0_0/Http4s700RoutesTest.scala +++ b/obp-api/src/test/scala/code/api/v7_0_0/Http4s700RoutesTest.scala @@ -6,7 +6,7 @@ import code.api.util.http4s.Http4sStandardHeaders import code.api.Constant.SYSTEM_OWNER_VIEW_ID import code.api.ResponseHeader import code.api.util.APIUtil -import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateOrganisation, canCreateRoutingScheme, canDeleteEntitlementAtAnyBank, canDeleteOrganisation, canDeleteRoutingScheme, canUpdateSystemView, canGetAccountAccessTrace, canGetAnyOrganisation, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetCardsForBank, canGetConnectorHealth, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMigrations, canReadResourceDoc, canUpdateBankSupportedRoutingScheme, canUpdateOrganisation, canUpdateRoutingScheme} +import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateOrganisation, canCreateRoutingScheme, canDeleteEntitlementAtAnyBank, canDeleteOrganisation, canDeleteRoutingScheme, canUpdateSystemView, canGetAccountAccessTrace, canGetAnyOrganisation, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetCardsForBank, canGetConnectorHealth, canCreateMetricsArchiveRun, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMetricsDiagnostics, canGetMigrations, canReadResourceDoc, canUpdateBankSupportedRoutingScheme, canUpdateOrganisation, canUpdateRoutingScheme} import code.api.util.ErrorMessages.{AuthenticatedUserIsRequired, BankNotFound, EntitlementAlreadyExists, InvalidOrganisationIdFormat, InvalidRoutingSchemeName, MobileWalletDestinationNotFound, MobileWalletInvalidMsisdn, OrganisationAlreadyExists, OrganisationNotFound, PayeeLookupAddressMismatch, PayeeLookupIdentifierTypeNotRegistered, PayeeNotFound, RoutingSchemeAlreadyExists, RoutingSchemeExampleAddressMismatch, RoutingSchemeNotFound, SystemViewNotFound, UserHasMissingRoles, UserNotFoundByUserId} import code.api.Constant.SYSTEM_AUDITOR_VIEW_ID import code.views.MapperViews @@ -1823,4 +1823,196 @@ class Http4s700RoutesTest extends ServerSetupWithTestData { } } + // ─── getMetricsDiagnostics ──────────────────────────────────────────────────── + + feature("Http4s700 getMetricsDiagnostics endpoint") { + + val diagnosticsPath = "/obp/v7.0.0/management/system/diagnostics/metrics" + + scenario("Reject unauthenticated access to the metrics diagnostics", Http4s700RoutesTag) { + Given("GET the diagnostics path with no auth headers") + val (statusCode, json, _) = makeHttpRequest(diagnosticsPath) + + Then("Response is 401 with AuthenticatedUserIsRequired message") + statusCode shouldBe 401 + json match { + case JObject(fields) => + toFieldMap(fields).get("message") match { + case Some(JString(msg)) => msg should include(AuthenticatedUserIsRequired) + case _ => fail("Expected message field") + } + case _ => fail("Expected JSON object") + } + } + + scenario("Return 403 when authenticated but missing canGetMetricsDiagnostics role", Http4s700RoutesTag) { + Given("GET the diagnostics path with DirectLogin header but no role") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (statusCode, json, _) = makeHttpRequest(diagnosticsPath, headers) + + Then("Response is 403 with UserHasMissingRoles and the role name") + statusCode shouldBe 403 + json match { + case JObject(fields) => + toFieldMap(fields).get("message") match { + case Some(JString(msg)) => + msg should include(UserHasMissingRoles) + msg should include(canGetMetricsDiagnostics.toString) + case _ => fail("Expected message field") + } + case _ => fail("Expected JSON object") + } + } + + scenario("Return 200 with diagnostics shape when authenticated with canGetMetricsDiagnostics role", Http4s700RoutesTag) { + Given("canGetMetricsDiagnostics role granted to resourceUser1") + addEntitlement("", resourceUser1.userId, canGetMetricsDiagnostics.toString) + + When("GET the diagnostics path with DirectLogin header") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (statusCode, json, _) = makeHttpRequest(diagnosticsPath, headers) + + Then("Response is 200 with config, metric, metric_archive, checks and everything_as_expected") + statusCode shouldBe 200 + json match { + case JObject(fields) => + val m = toFieldMap(fields) + m.keys should contain("config") + m.keys should contain("metric") + m.keys should contain("metric_archive") + m.keys should contain("everything_as_expected") + + And("config exposes the archiving props and their effective values") + m.get("config") match { + case Some(JObject(cfgFields)) => + val cfg = toFieldMap(cfgFields) + cfg.keys should contain("write_metrics") + cfg.keys should contain("enable_metrics_scheduler") + cfg.keys should contain("retain_metrics_scheduler_interval_in_seconds") + cfg.keys should contain("retain_metrics_days") + cfg.keys should contain("retain_metrics_days_effective") + cfg.keys should contain("retain_archive_metrics_days") + cfg.keys should contain("retain_archive_metrics_days_effective") + cfg.keys should contain("retain_metrics_move_limit") + case _ => fail("Expected config object") + } + + And("the metric table stats name the live metric table") + m.get("metric") match { + case Some(JObject(metricFields)) => + toFieldMap(metricFields).get("table_name") match { + case Some(JString(name)) => name shouldBe "metric" + case _ => fail("Expected table_name field") + } + case _ => fail("Expected metric object") + } + + And("the archive table stats name the metricarchive table") + m.get("metric_archive") match { + case Some(JObject(archiveFields)) => + toFieldMap(archiveFields).get("table_name") match { + case Some(JString(name)) => name shouldBe "metricarchive" + case _ => fail("Expected table_name field") + } + case _ => fail("Expected metric_archive object") + } + + And("checks is a non-empty array, each entry carrying name/status/message") + m.get("checks") match { + case Some(JArray(items)) => + items should not be empty + items.foreach { + case JObject(checkFields) => + val c = toFieldMap(checkFields) + c.keys should contain("name") + c.keys should contain("status") + c.keys should contain("message") + case _ => fail("Expected each check to be a JSON object") + } + case _ => fail("Expected checks array") + } + case _ => fail("Expected JSON object for getMetricsDiagnostics") + } + } + } + + // ─── triggerMetricsArchiveRun ───────────────────────────────────────────────── + + feature("Http4s700 triggerMetricsArchiveRun endpoint") { + + val triggerPath = "/obp/v7.0.0/management/system/diagnostics/metrics/run" + + scenario("Reject unauthenticated trigger of a metrics archive run", Http4s700RoutesTag) { + Given("POST the trigger path with no auth headers") + val (statusCode, json, _) = makeHttpRequestWithMethod("POST", triggerPath) + + Then("Response is 401 with AuthenticatedUserIsRequired message") + statusCode shouldBe 401 + json match { + case JObject(fields) => + toFieldMap(fields).get("message") match { + case Some(JString(msg)) => msg should include(AuthenticatedUserIsRequired) + case _ => fail("Expected message field") + } + case _ => fail("Expected JSON object") + } + } + + scenario("Return 403 when authenticated but missing canCreateMetricsArchiveRun role", Http4s700RoutesTag) { + Given("POST the trigger path with DirectLogin header but no role") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (statusCode, json, _) = makeHttpRequestWithMethod("POST", triggerPath, headers) + + Then("Response is 403 with UserHasMissingRoles and the role name") + statusCode shouldBe 403 + json match { + case JObject(fields) => + toFieldMap(fields).get("message") match { + case Some(JString(msg)) => + msg should include(UserHasMissingRoles) + msg should include(canCreateMetricsArchiveRun.toString) + case _ => fail("Expected message field") + } + case _ => fail("Expected JSON object") + } + } + + scenario("Return 200 and run the archive when authenticated with canCreateMetricsArchiveRun role", Http4s700RoutesTag) { + Given("canCreateMetricsArchiveRun role granted to resourceUser1") + addEntitlement("", resourceUser1.userId, canCreateMetricsArchiveRun.toString) + + When("POST the trigger path with DirectLogin header") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (statusCode, json, _) = makeHttpRequestWithMethod("POST", triggerPath, headers) + + Then("Response is 200 with status=completed and a recorded run") + statusCode shouldBe 200 + json match { + case JObject(fields) => + val m = toFieldMap(fields) + m.get("status") match { + case Some(JString(s)) => s shouldBe "completed" + case _ => fail("Expected status field") + } + m.keys should contain("message") + m.get("run") match { + case Some(JObject(runFields)) => + val r = toFieldMap(runFields) + r.keys should contain("run_id") + r.keys should contain("rows_moved_to_archive") + r.keys should contain("rows_deleted_from_archive") + r.get("success") match { + case Some(JBool(ok)) => ok shouldBe true + case _ => fail("Expected success field") + } + case _ => fail("Expected run object on a completed run") + } + case _ => fail("Expected JSON object for triggerMetricsArchiveRun") + } + + And("the run was recorded in the metricsarchiverun log") + code.metrics.MetricsArchiveRun.lastRun.isDefined shouldBe true + } + } + }