From 71fe108e6001b331a1f6b4d2422448e2a4cf0516 Mon Sep 17 00:00:00 2001 From: simonredfern Date: Wed, 10 Jun 2026 08:23:14 +0200 Subject: [PATCH 1/3] Adding Adapter and OBP Bank Node glossary items --- .../main/scala/code/api/util/Glossary.scala | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/obp-api/src/main/scala/code/api/util/Glossary.scala b/obp-api/src/main/scala/code/api/util/Glossary.scala index 50c392b0cb..485286c95b 100644 --- a/obp-api/src/main/scala/code/api/util/Glossary.scala +++ b/obp-api/src/main/scala/code/api/util/Glossary.scala @@ -581,6 +581,59 @@ object Glossary extends MdcLoggable { |""" ) + glossaryItems += GlossaryItem( + title = "Adapter", + description = + s""" + |## Adapter + | + |In OBP, an Adapter is an out of process component that sits between OBP and a bank's systems of record (Core Banking System, Payment System, or database) and translates between them. + | + |An Adapter is paired with an Indirect [Connector](/glossary#Connector): the Connector inside OBP turns OBP function calls into messages and sends them over a transport (for example RabbitMQ, Akka, or a stored procedure call); the Adapter receives those messages, talks to the CBS, and returns a response in the agreed Outbound / Inbound message format. + | + |i.e. OBP API -> Connector -> Adapter -> CBS + | + |Key properties: + | + |* It runs outside OBP, in its own process, typically on the bank's side. + |* It can be written in any language, as long as it respects the message format published in the Message Docs for the relevant Connector. This is the main advantage over a Direct Connector, which must be written in a JVM language. + |* It usually contains bank specific integration code: the data access, field mappings, identifier translation, and quirks of that one bank's CBS. As a result, each bank typically has its own Adapter build. + |* The Adapter is responsible for emitting OBP shaped values where required (for example a UUID shaped ACCOUNT_ID mapped to the underlying core banking account number). + | + |For worked examples of writing an Adapter, see [Adapter.Akka.Intro](/glossary#Adapter.Akka.Intro) and [Adapter.Stored_Procedure.Intro](/glossary#Adapter.Stored_Procedure.Intro). + |""".stripMargin + ) + + glossaryItems += GlossaryItem( + title = "OBP Bank Node", + description = + s""" + |## OBP Bank Node + | + |An OBP Bank Node is a standardised software component designed to run at many banks inside their own network that connect to their Core Banking System (CBS) to an OBP API instance operated by a platform operator (for example TESOBE), without the bank having to run any OBP infrastructure itself. + | + |It is deployed as a single self contained service (typically a Docker container). All of its network connections are outbound from the bank's network (or controlled cloud) and no inbound ports are exposed to the public internet. To the bank's CBS it presents one small local interface (for example few REST endpoints); everything else (talking to the OBP API, and any systems it integrates) happens behind that interface. + | + |### How it relates to a Connector and an Adapter + | + |The OBP Bank Node is neither an OBP [Connector](/glossary#Connector) nor a traditional South Side Adapter, although it sits in similar territory. Two properties make the difference: + | + |* Direction of control. A South Side Adapter is called by an OBP Connector over a message bus: OBP is the caller and the Adapter responds to request messages with CBS data. The OBP Bank Node does the opposite on its north side: it acts as a client of the OBP API, calling OBP's REST interface itself. It both initiates calls to OBP and exposes a local interface to the bank's CBS, rather than only responding. + | + |* Code versus configuration. A South Side Adapter usually carries a significant amount of bank specific code: the translation logic for one bank's CBS (its data access, field mappings, and integration quirks) is written into the Adapter, so each bank effectively gets its own Adapter build. The OBP Bank Node carries no bank specific code; its per bank behaviour is entirely configuration. Any bank specific code in an integration lives on the bank's own side of the local interface (for example the CBS code that receives the Node's notifications), never inside the Node. + | + |In short: an OBP Connector is JVM code inside OBP that talks to systems of record; an Adapter is an out of process component that an Indirect Connector calls and that contains bank specific integration code; the OBP Bank Node is a bank side gateway that is configured rather than coded per bank and that acts as a client of the OBP API. + | + |### Open or closed source + | + |Because it integrates through the OBP API's published interfaces, vendors can build and run their own implementations. + | + |### Use cases + | The Node approach is suitable when implementing an OBP platform business that involves many banks in a common use case - or where the Platform utilises other interfaces e.g. to blockchains. + | + |""".stripMargin + ) + glossaryItems += GlossaryItem( title = "Connector.User.Authentication", description = From 3b4f9bca48557133b0fea5bc0d8dc724ad3de21e Mon Sep 17 00:00:00 2001 From: simonredfern Date: Wed, 10 Jun 2026 18:46:15 +0200 Subject: [PATCH 2/3] Tweaking defaults for metrics migration. Adding job-locks endpoints to list and delete --- .../resources/props/sample.props.template | 15 +- .../main/scala/bootstrap/liftweb/Boot.scala | 6 +- .../main/scala/code/api/util/ApiRole.scala | 6 + .../scala/code/api/v7_0_0/Http4s700.scala | 93 ++++++++- .../code/api/v7_0_0/JSONFactory7.0.0.scala | 70 ++++++- .../scala/code/scheduler/JobScheduler.scala | 18 ++ .../scheduler/MetricsArchiveScheduler.scala | 20 +- .../code/api/v7_0_0/Http4s700RoutesTest.scala | 182 +++++++++++++++++- .../MetricsArchiveSchedulerTest.scala | 8 +- 9 files changed, 398 insertions(+), 20 deletions(-) diff --git a/obp-api/src/main/resources/props/sample.props.template b/obp-api/src/main/resources/props/sample.props.template index af90c7ecc0..7872a7af26 100644 --- a/obp-api/src/main/resources/props/sample.props.template +++ b/obp-api/src/main/resources/props/sample.props.template @@ -1607,10 +1607,17 @@ enable_metrics_scheduler = true retain_archive_metrics_days = 1095 # Defines the number of days we keep rows in the table "Metric" former "MappedMetric" retain_metrics_days = 367 -# Defines the number of rows we can process at once -retain_metrics_move_limit = 50000 -# Defines the interval of the scheduler -retain_metrics_scheduler_interval_in_seconds = 3600 +# Defines the number of rows we can process at once. +# Tuned for smaller, more frequent runs: 10000 rows every 599s (below) = ~60k/hr +# ≈ 16.7 rows/sec max drain rate (keeps up with a sustained ~16 metric-generating +# calls/sec). Keep (move_limit x runs-per-hour) >= your peak metric ingest rate, or +# the archive backlog will grow. Smaller batches mean shorter runs and fewer stale +# scheduler locks from a JVM dying mid-run. +retain_metrics_move_limit = 10000 +# Defines the interval of the scheduler (seconds). 599 (a prime, ~10 min) rather +# than 600 so successive runs drift across the wall clock instead of phase-locking +# to every 10th minute and to other periodic schedulers. +retain_metrics_scheduler_interval_in_seconds = 599 # Defines endpoints we want to store responses at Metric table diff --git a/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala b/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala index 3abe66ef28..bc9440c78b 100644 --- a/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala +++ b/obp-api/src/main/scala/bootstrap/liftweb/Boot.scala @@ -547,8 +547,12 @@ class Boot extends MdcLoggable { APIUtil.getPropsAsBoolValue("enable_metrics_scheduler", true) match { case true => + // Default 599s (a prime, ~10 min) rather than a round 600: the odd interval + // makes successive runs drift across the wall clock instead of phase-locking + // to the top of every 10th minute (and to other periodic schedulers), so + // archive load is spread out rather than coinciding with other spikes. val interval = - APIUtil.getPropsAsIntValue("retain_metrics_scheduler_interval_in_seconds", 3600) + APIUtil.getPropsAsIntValue("retain_metrics_scheduler_interval_in_seconds", 599) MetricsArchiveScheduler.start(intervalInSeconds = interval) case false => // Do not start it } diff --git a/obp-api/src/main/scala/code/api/util/ApiRole.scala b/obp-api/src/main/scala/code/api/util/ApiRole.scala index 7a61c8aab7..3373664f18 100644 --- a/obp-api/src/main/scala/code/api/util/ApiRole.scala +++ b/obp-api/src/main/scala/code/api/util/ApiRole.scala @@ -454,6 +454,12 @@ object ApiRole extends MdcLoggable{ case class CanCreateMetricsArchiveRun(requiresBankId: Boolean = false) extends ApiRole lazy val canCreateMetricsArchiveRun = CanCreateMetricsArchiveRun() + case class CanGetSchedulerJobLocks(requiresBankId: Boolean = false) extends ApiRole + lazy val canGetSchedulerJobLocks = CanGetSchedulerJobLocks() + + case class CanDeleteSchedulerJobLock(requiresBankId: Boolean = false) extends ApiRole + lazy val canDeleteSchedulerJobLock = CanDeleteSchedulerJobLock() + case class CanGetConnectorHealth(requiresBankId: Boolean = false) extends ApiRole lazy val canGetConnectorHealth = CanGetConnectorHealth() diff --git a/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala b/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala index 2c122eaf76..47573f4f1c 100644 --- a/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala +++ b/obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala @@ -7,7 +7,7 @@ import code.api.Constant._ import code.api.ResourceDocs1_4_0.SwaggerDefinitionsJSON._ import code.api.util.APIUtil.{EmptyBody, _} import code.api.util.{APIUtil, ApiRole, CallContext, CustomJsonFormats, Glossary, NewStyle} -import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateEntitlementAtOneBank, canCreateMetricsArchiveRun, canCreateOrganisation, canCreateRoutingScheme, canCreateTestEmail, canDeleteEntitlementAtAnyBank, canDeleteOrganisation, canDeleteRoutingScheme, canGetAccountAccessTrace, canGetAnyOrganisation, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetConnectorHealth, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMetricsDiagnostics, canGetMigrations, canUpdateBankSupportedRoutingScheme, canUpdateOrganisation, canUpdateRoutingScheme, canUpdateSystemView} +import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateEntitlementAtOneBank, canCreateMetricsArchiveRun, canCreateOrganisation, canCreateRoutingScheme, canCreateTestEmail, canDeleteEntitlementAtAnyBank, canDeleteOrganisation, canDeleteRoutingScheme, canDeleteSchedulerJobLock, canGetAccountAccessTrace, canGetAnyOrganisation, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetConnectorHealth, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMetricsDiagnostics, canGetMigrations, canGetSchedulerJobLocks, canUpdateBankSupportedRoutingScheme, canUpdateOrganisation, canUpdateRoutingScheme, canUpdateSystemView} import code.api.util.CommonsEmailWrapper import code.model.dataAccess.AuthUser import code.api.util.ApiTag._ @@ -3353,6 +3353,10 @@ object Http4s700 { |* `message` — human-readable summary. |* `run` — the recorded run (run id, counts, duration, success, remark); | absent when skipped. + |* `in_progress` — present only when skipped: the lock that blocked the run + | (`job_id`, `api_instance_id`, `started_at`, `age_seconds`). A large + | `age_seconds` (much older than a normal run) indicates a stale lock left + | by a dead JVM — clear the matching `jobscheduler` row to unblock. | |Note: the run executes synchronously, so a large backlog may take a while. | @@ -3365,6 +3369,93 @@ object Http4s700 { http4sPartialFunction = Some(triggerMetricsArchiveRun) ) + // Route: GET /obp/v7.0.0/management/system/scheduler/job-locks + // + // List the `jobscheduler` lock rows (newest first, capped at 100). This table + // holds a row only while a scheduled job holds its lock — the row is deleted + // when the job finishes — so in healthy operation this is empty. Any row here + // is a currently-running job or a stale lock left by a dead JVM; `age_seconds` + // tells them apart, and the row can be cleared with the DELETE route below. + val getSchedulerJobs: HttpRoutes[IO] = HttpRoutes.of[IO] { + case req @ GET -> `prefixPath` / "management" / "system" / "scheduler" / "job-locks" => + EndpointHelpers.withUser(req) { (_, _) => + Future { + JSONFactory700.createSchedulerJobsJsonV700(code.scheduler.JobScheduler.mostRecent(100)) + } + } + } + + resourceDocs += ResourceDoc( + implementedInApiVersion, + nameOf(getSchedulerJobs), + "GET", + "/management/system/scheduler/job-locks", + "Get Scheduler Job Locks", + s"""List the scheduler lock rows from the `jobscheduler` table (most recent first, up to 100). + | + |**This is a lock table, not a job-history log.** A row exists only while a + |scheduled job (e.g. `MetricsArchiveScheduler`) holds its lock; it is deleted + |when the job finishes. So in healthy operation this list is **empty**. + | + |A row that is present is therefore one of: + |* a job genuinely running right now (small `age_seconds`), or + |* a **stale lock** left by a JVM that died mid-run (large `age_seconds`) — + | this blocks new runs of that job (e.g. "Trigger a Metrics Archive Run" + | returns `skipped_already_in_progress`). Clear it with + | `DELETE /management/system/scheduler/job-locks/JOB_ID`. + | + |Each row reports `job_id`, `name`, `api_instance_id`, `started_at` and + |`age_seconds` (seconds since the lock was taken). + | + |${userAuthenticationMessage(true)}""".stripMargin, + EmptyBody, + JSONFactory700.schedulerJobsJsonV700Example, + List($AuthenticatedUserIsRequired, UserHasMissingRoles, UnknownError), + apiTagSystem :: apiTagApi :: Nil, + Some(List(canGetSchedulerJobLocks)), + http4sPartialFunction = Some(getSchedulerJobs) + ) + + // Route: DELETE /obp/v7.0.0/management/system/scheduler/job-locks/JOB_ID + // + // Clear a scheduler lock row by its job id. Use this to release a stale lock + // left by a dead JVM so the job (e.g. metrics archiving) can run again. + // Idempotent — returns 204 even if the row is already gone. + val deleteSchedulerJob: HttpRoutes[IO] = HttpRoutes.of[IO] { + case req @ DELETE -> `prefixPath` / "management" / "system" / "scheduler" / "job-locks" / jobId => + EndpointHelpers.withUserDelete(req) { (_, _) => + Future { code.scheduler.JobScheduler.deleteByJobId(jobId); () } + } + } + + resourceDocs += ResourceDoc( + implementedInApiVersion, + nameOf(deleteSchedulerJob), + "DELETE", + "/management/system/scheduler/job-locks/JOB_ID", + "Delete Scheduler Job Lock", + s"""Clear a scheduler lock row from the `jobscheduler` table by its `JOB_ID`. + | + |Use this to release a **stale lock** left by a JVM that died mid-run, which + |would otherwise keep a scheduled job (e.g. `MetricsArchiveScheduler`) from + |starting — see "Get Scheduler Job Locks" to find the `job_id` and judge staleness + |from its `age_seconds`. + | + |**Caution:** if the job is genuinely still running on some node, deleting its + |lock lets a second run start concurrently. Only clear locks you have confirmed + |are stale (much older than a normal run). + | + |Idempotent — returns 204 even if no row with that `JOB_ID` exists. + | + |${userAuthenticationMessage(true)}""".stripMargin, + EmptyBody, + EmptyBody, + List($AuthenticatedUserIsRequired, UserHasMissingRoles, UnknownError), + apiTagSystem :: apiTagApi :: Nil, + Some(List(canDeleteSchedulerJobLock)), + http4sPartialFunction = Some(deleteSchedulerJob) + ) + // Enabled only in Lift test mode (Props.testMode == true, i.e. -Drun.mode=test). // Props.testMode is set from the JVM system property before any props file loads, // so it is reliably available at object-initialization time unlike file-based props. diff --git a/obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala b/obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala index e5a4e45219..87df2ce057 100644 --- a/obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala +++ b/obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala @@ -1138,13 +1138,26 @@ object JSONFactory700 extends MdcLoggable with code.api.util.CustomJsonFormats { remark = r.Remark.get ) + // The in-progress archive job whose lock blocked a new run. Surfaced so an + // operator can tell a genuinely-running job from a stale lock left by a dead + // JVM: an `age_seconds` of seconds is a real run; minutes/hours/days is almost + // certainly abandoned and the `jobscheduler` lock row can be cleared by hand. + case class InProgressArchiveJobJsonV700( + job_id: String, + api_instance_id: String, + started_at: Date, + age_seconds: Long + ) + // Result of manually triggering an archive run. `status` is one of // "completed" (a run executed — inspect `run.success`) or - // "skipped_already_in_progress" (a run was already running, so none was started). + // "skipped_already_in_progress" (a run was already running, so none was started; + // `in_progress` then describes the lock that blocked it). case class TriggerMetricsArchiveRunResponseJsonV700( status: String, message: String, - run: Option[MetricsArchiveRunJsonV700] + run: Option[MetricsArchiveRunJsonV700], + in_progress: Option[InProgressArchiveJobJsonV700] = None ) def createTriggerMetricsArchiveRunResponseJsonV700(outcome: code.scheduler.RunOutcome): TriggerMetricsArchiveRunResponseJsonV700 = @@ -1156,11 +1169,15 @@ object JSONFactory700 extends MdcLoggable with code.api.util.CustomJsonFormats { else s"Archive run completed with errors: ${r.Remark.get}" TriggerMetricsArchiveRunResponseJsonV700("completed", msg, Some(metricsArchiveRunToJson(r))) - case code.scheduler.RunSkippedAlreadyInProgress => + case code.scheduler.RunSkippedAlreadyInProgress(jobId, apiInstanceId, startedAt) => + val ageSeconds = (System.currentTimeMillis - startedAt.getTime) / 1000L TriggerMetricsArchiveRunResponseJsonV700( "skipped_already_in_progress", - "An archive run is already in progress; no new run was started.", - None) + s"An archive run started at $startedAt on api_instance_id '$apiInstanceId' is already in progress " + + s"(job $jobId, running for $ageSeconds seconds); no new run was started. " + + s"If this is much older than a normal run, the lock is likely stale and can be cleared.", + None, + Some(InProgressArchiveJobJsonV700(jobId, apiInstanceId, startedAt, ageSeconds))) } lazy val triggerMetricsArchiveRunResponseJsonV700Example = TriggerMetricsArchiveRunResponseJsonV700( @@ -1179,6 +1196,49 @@ object JSONFactory700 extends MdcLoggable with code.api.util.CustomJsonFormats { )) ) + // One row of the `jobscheduler` lock table. This table holds a row only while a + // job holds the scheduler lock (deleted when the job finishes), so a row here is + // a currently-running job or a stale lock left by a dead JVM — `age_seconds` + // tells them apart. + case class SchedulerJobJsonV700( + job_id: String, + name: String, + api_instance_id: String, + started_at: Date, + age_seconds: Long + ) + + case class SchedulerJobsJsonV700( + jobs: List[SchedulerJobJsonV700], + count: Int + ) + + def createSchedulerJobsJsonV700(rows: List[code.scheduler.JobScheduler]): SchedulerJobsJsonV700 = { + val now = System.currentTimeMillis + val jobs = rows.map { r => + val startedAt = r.createdAt.get + SchedulerJobJsonV700( + job_id = r.JobId.get, + name = r.Name.get, + api_instance_id = r.ApiInstanceId.get, + started_at = startedAt, + age_seconds = (now - startedAt.getTime) / 1000L + ) + } + SchedulerJobsJsonV700(jobs, jobs.size) + } + + lazy val schedulerJobsJsonV700Example = SchedulerJobsJsonV700( + jobs = List(SchedulerJobJsonV700( + job_id = "9f3c2b1a-7d4e-4c8a-9b2f-1e6d5a0c4b7e", + name = "MetricsArchiveScheduler", + api_instance_id = "obp", + started_at = new Date(1717200000000L), + age_seconds = 42L + )), + count = 1 + ) + private val metricsOneDayInMillis: Long = 86400000L private def metricsAgeInDays(d: Date, now: Date): Long = (now.getTime - d.getTime) / metricsOneDayInMillis diff --git a/obp-api/src/main/scala/code/scheduler/JobScheduler.scala b/obp-api/src/main/scala/code/scheduler/JobScheduler.scala index 38ceb1f9c5..9f1a68854c 100644 --- a/obp-api/src/main/scala/code/scheduler/JobScheduler.scala +++ b/obp-api/src/main/scala/code/scheduler/JobScheduler.scala @@ -20,6 +20,24 @@ class JobScheduler extends JobSchedulerTrait with LongKeyedMapper[JobScheduler] object JobScheduler extends JobScheduler with LongKeyedMetaMapper[JobScheduler] { override def dbIndexes: List[BaseIndex[JobScheduler]] = UniqueIndex(JobId) :: super.dbIndexes + + /** + * The most recent scheduler-lock rows, newest first, capped at `limit`. + * + * Note: `jobscheduler` is a lock table, not a job-history log — a row exists + * only while a job holds the lock and is deleted when the job finishes. In + * healthy operation this returns an empty list; any rows present are either + * currently running or stale locks left by a dead JVM. + */ + def mostRecent(limit: Int): List[JobScheduler] = + findAll(OrderBy(JobScheduler.createdAt, Descending), MaxRows(limit)) + + /** Delete the lock row with the given JobId; returns true if a row was removed. */ + def deleteByJobId(jobId: String): Boolean = + find(By(JobScheduler.JobId, jobId)) match { + case net.liftweb.common.Full(job) => delete_!(job) + case _ => false + } } diff --git a/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala b/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala index 754fe61f90..e647699f33 100644 --- a/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala +++ b/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala @@ -30,8 +30,13 @@ case class ArchiveMoveResult(moved: Int, failed: Int) sealed trait RunOutcome /** A run executed and was recorded (inspect `run.Success` for whether it errored). */ case class RunCompleted(run: MetricsArchiveRun) extends RunOutcome -/** No run started because one was already in progress (JobScheduler lock present). */ -case object RunSkippedAlreadyInProgress extends RunOutcome +/** + * No run started because one was already in progress (a `JobScheduler` lock is + * present). Carries the held lock's details so callers can tell a genuinely + * running job from a stale lock left by a dead JVM: a `startedAt` seconds ago is + * a real run; one minutes/hours/days old is almost certainly abandoned. + */ +case class RunSkippedAlreadyInProgress(jobId: String, apiInstanceId: String, startedAt: Date) extends RunOutcome object MetricsArchiveScheduler extends MdcLoggable { @@ -84,8 +89,8 @@ object MetricsArchiveScheduler extends MdcLoggable { def runOnce(): RunOutcome = { JobScheduler.find(By(JobScheduler.Name, jobName)) match { case Full(job) => // There is an ongoing/hanging job - logger.info(s"MetricsArchiveScheduler.runOnce skipped due to ongoing job. Job ID: ${job.JobId}") - RunSkippedAlreadyInProgress + logger.info(s"MetricsArchiveScheduler.runOnce skipped due to ongoing job. Job ID: ${job.JobId.get}, started at: ${job.createdAt.get}, api_instance_id: ${job.ApiInstanceId.get}") + RunSkippedAlreadyInProgress(job.JobId.get, job.ApiInstanceId.get, job.createdAt.get) case _ => // Start a new job val uniqueId = generateUUID() val job = JobScheduler.create @@ -152,7 +157,12 @@ object MetricsArchiveScheduler extends MdcLoggable { case _ => 60 } val someDaysAgo: Date = new Date(currentTime.getTime - (oneDayInMillis * days)) - val limit = APIUtil.getPropsAsIntValue("retain_metrics_move_limit", 50000) + // Default tuned for smaller, more frequent runs: 10k rows every ~10 min + // (see retain_metrics_scheduler_interval_in_seconds, default 599s) = ~60k rows/hr + // ≈ 16.7 rows/sec max drain rate, i.e. it keeps up with a sustained ~16 + // metric-generating calls/sec. Much shorter per-run passes than a single 50k + // batch — fewer mid-run failures (stale locks) and less memory/lock contention. + val limit = APIUtil.getPropsAsIntValue("retain_metrics_move_limit", 10000) // Query the live Metric table directly — oldest first, up to `limit` rows. // We deliberately bypass APIMetrics.getAllMetrics here: that path is wrapped in // a Redis cache (stable TTL for past-dated queries), and a "which rows should I diff --git a/obp-api/src/test/scala/code/api/v7_0_0/Http4s700RoutesTest.scala b/obp-api/src/test/scala/code/api/v7_0_0/Http4s700RoutesTest.scala index 8756b26b99..bafb710739 100644 --- a/obp-api/src/test/scala/code/api/v7_0_0/Http4s700RoutesTest.scala +++ b/obp-api/src/test/scala/code/api/v7_0_0/Http4s700RoutesTest.scala @@ -6,7 +6,9 @@ import code.api.util.http4s.Http4sStandardHeaders import code.api.Constant.SYSTEM_OWNER_VIEW_ID import code.api.ResponseHeader import code.api.util.APIUtil -import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateOrganisation, canCreateRoutingScheme, canDeleteEntitlementAtAnyBank, canDeleteOrganisation, canDeleteRoutingScheme, canUpdateSystemView, canGetAccountAccessTrace, canGetAnyOrganisation, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetCardsForBank, canGetConnectorHealth, canCreateMetricsArchiveRun, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMetricsDiagnostics, canGetMigrations, canReadResourceDoc, canUpdateBankSupportedRoutingScheme, canUpdateOrganisation, canUpdateRoutingScheme} +import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateOrganisation, canCreateRoutingScheme, canDeleteEntitlementAtAnyBank, canDeleteOrganisation, canDeleteRoutingScheme, canDeleteSchedulerJobLock, canUpdateSystemView, canGetAccountAccessTrace, canGetAnyOrganisation, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetCardsForBank, canGetConnectorHealth, canCreateMetricsArchiveRun, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMetricsDiagnostics, canGetMigrations, canGetSchedulerJobLocks, canReadResourceDoc, canUpdateBankSupportedRoutingScheme, canUpdateOrganisation, canUpdateRoutingScheme} +import code.scheduler.JobScheduler +import net.liftweb.mapper.By import code.api.util.ErrorMessages.{AuthenticatedUserIsRequired, BankNotFound, EntitlementAlreadyExists, InvalidOrganisationIdFormat, InvalidRoutingSchemeName, MobileWalletDestinationNotFound, MobileWalletInvalidMsisdn, OrganisationAlreadyExists, OrganisationNotFound, PayeeLookupAddressMismatch, PayeeLookupIdentifierTypeNotRegistered, PayeeNotFound, RoutingSchemeAlreadyExists, RoutingSchemeExampleAddressMismatch, RoutingSchemeNotFound, SystemViewNotFound, UserHasMissingRoles, UserNotFoundByUserId} import code.api.Constant.SYSTEM_AUDITOR_VIEW_ID import code.views.MapperViews @@ -26,7 +28,7 @@ import org.typelevel.ci.CIString import java.util.Date import code.setup.ServerSetupWithTestData import net.liftweb.json.JValue -import net.liftweb.json.JsonAST.{JArray, JBool, JField, JObject, JString} +import net.liftweb.json.JsonAST.{JArray, JBool, JField, JInt, JObject, JString} import net.liftweb.json.JsonParser.parse import org.scalatest.Tag @@ -342,6 +344,182 @@ class Http4s700RoutesTest extends ServerSetupWithTestData { } } + // ─── scheduler job-locks ────────────────────────────────────────────────────── + + /** Remove every jobscheduler lock row so a scenario starts from a clean table. */ + private def clearJobLocks(): Unit = + JobScheduler.findAll().foreach(JobScheduler.delete_!) + + /** Seed one jobscheduler lock row and return its job id. */ + private def seedJobLock(name: String = "MetricsArchiveScheduler", apiInstanceId: String = "test-node"): String = { + val jobId = APIUtil.generateUUID() + JobScheduler.create.JobId(jobId).Name(name).ApiInstanceId(apiInstanceId).saveMe() + jobId + } + + feature("Http4s700 getSchedulerJobLocks endpoint") { + + scenario("Reject unauthenticated GET to /management/system/scheduler/job-locks", Http4s700RoutesTag) { + Given("GET /obp/v7.0.0/management/system/scheduler/job-locks with no auth") + val (statusCode, json, _) = makeHttpRequest("/obp/v7.0.0/management/system/scheduler/job-locks") + + Then("Response is 401") + statusCode shouldBe 401 + json match { + case JObject(fields) => + toFieldMap(fields).get("message") match { + case Some(JString(msg)) => msg should include(AuthenticatedUserIsRequired) + case _ => fail("Expected message field") + } + case _ => fail("Expected JSON object") + } + } + + scenario("Return 403 when authenticated but missing canGetSchedulerJobLocks role", Http4s700RoutesTag) { + Given("DirectLogin without the required role") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (statusCode, json, _) = makeHttpRequest("/obp/v7.0.0/management/system/scheduler/job-locks", headers) + + Then("Response is 403 with UserHasMissingRoles message naming the role") + statusCode shouldBe 403 + json match { + case JObject(fields) => + toFieldMap(fields).get("message") match { + case Some(JString(msg)) => + msg should include(UserHasMissingRoles) + msg should include(canGetSchedulerJobLocks.toString) + case _ => fail("Expected message field") + } + case _ => fail("Expected JSON object") + } + } + + scenario("Return 200 with an empty list when no locks are held", Http4s700RoutesTag) { + Given("canGetSchedulerJobLocks granted and the lock table cleared") + addEntitlement("", resourceUser1.userId, canGetSchedulerJobLocks.toString) + clearJobLocks() + + When("GET /obp/v7.0.0/management/system/scheduler/job-locks with DirectLogin header") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (statusCode, json, _) = makeHttpRequest("/obp/v7.0.0/management/system/scheduler/job-locks", headers) + + Then("Response is 200 with jobs=[] and count=0") + statusCode shouldBe 200 + json match { + case JObject(fields) => + val map = toFieldMap(fields) + map.get("count") shouldBe Some(JInt(0)) + map.get("jobs") match { + case Some(JArray(items)) => items shouldBe empty + case _ => fail("Expected jobs array") + } + case _ => fail("Expected JSON object") + } + } + + scenario("Return 200 listing a held lock with its fields", Http4s700RoutesTag) { + Given("canGetSchedulerJobLocks granted, the table cleared, and one seeded lock") + addEntitlement("", resourceUser1.userId, canGetSchedulerJobLocks.toString) + clearJobLocks() + val seededJobId = seedJobLock(name = "MetricsArchiveScheduler", apiInstanceId = "test-node") + + When("GET /obp/v7.0.0/management/system/scheduler/job-locks with DirectLogin header") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (statusCode, json, _) = makeHttpRequest("/obp/v7.0.0/management/system/scheduler/job-locks", headers) + + Then("Response is 200 with count=1 and the seeded lock fully described") + statusCode shouldBe 200 + json match { + case JObject(fields) => + val map = toFieldMap(fields) + map.get("count") shouldBe Some(JInt(1)) + map.get("jobs") match { + case Some(JArray(List(JObject(jobFields)))) => + val jobMap = toFieldMap(jobFields) + jobMap.keys should contain allOf ("job_id", "name", "api_instance_id", "started_at", "age_seconds") + jobMap.get("job_id") shouldBe Some(JString(seededJobId)) + jobMap.get("name") shouldBe Some(JString("MetricsArchiveScheduler")) + jobMap.get("api_instance_id") shouldBe Some(JString("test-node")) + jobMap.get("age_seconds") match { + case Some(JInt(age)) => age.toLong should be >= 0L + case _ => fail("Expected numeric age_seconds") + } + case _ => fail("Expected a one-element jobs array of objects") + } + case _ => fail("Expected JSON object") + } + } + } + + feature("Http4s700 deleteSchedulerJobLock endpoint") { + + scenario("Reject unauthenticated DELETE to /management/system/scheduler/job-locks/JOB_ID", Http4s700RoutesTag) { + Given("DELETE /obp/v7.0.0/management/system/scheduler/job-locks/some-id with no auth") + val (statusCode, json, _) = makeHttpRequestWithMethod( + "DELETE", "/obp/v7.0.0/management/system/scheduler/job-locks/some-id") + + Then("Response is 401") + statusCode shouldBe 401 + json match { + case JObject(fields) => + toFieldMap(fields).get("message") match { + case Some(JString(msg)) => msg should include(AuthenticatedUserIsRequired) + case _ => fail("Expected message field") + } + case _ => fail("Expected JSON object") + } + } + + scenario("Return 403 when authenticated but missing canDeleteSchedulerJobLock role", Http4s700RoutesTag) { + Given("DELETE without the required role") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (statusCode, json, _) = makeHttpRequestWithMethod( + "DELETE", "/obp/v7.0.0/management/system/scheduler/job-locks/some-id", headers) + + Then("Response is 403 with UserHasMissingRoles message naming the role") + statusCode shouldBe 403 + json match { + case JObject(fields) => + toFieldMap(fields).get("message") match { + case Some(JString(msg)) => + msg should include(UserHasMissingRoles) + msg should include(canDeleteSchedulerJobLock.toString) + case _ => fail("Expected message field") + } + case _ => fail("Expected JSON object") + } + } + + scenario("Return 204 and clear the lock when authenticated with role and the lock exists", Http4s700RoutesTag) { + Given("canDeleteSchedulerJobLock granted and one seeded lock") + addEntitlement("", resourceUser1.userId, canDeleteSchedulerJobLock.toString) + val seededJobId = seedJobLock() + JobScheduler.find(By(JobScheduler.JobId, seededJobId)).isDefined shouldBe true + + When("DELETE /obp/v7.0.0/management/system/scheduler/job-locks/{jobId} with DirectLogin header") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (statusCode, _, _) = makeHttpRequestWithMethod( + "DELETE", s"/obp/v7.0.0/management/system/scheduler/job-locks/$seededJobId", headers) + + Then("Response is 204 and the lock row is gone") + statusCode shouldBe 204 + JobScheduler.find(By(JobScheduler.JobId, seededJobId)).isDefined shouldBe false + } + + scenario("Return 204 even when the job id does not exist (idempotent)", Http4s700RoutesTag) { + Given("canDeleteSchedulerJobLock role granted and a non-existent job id") + addEntitlement("", resourceUser1.userId, canDeleteSchedulerJobLock.toString) + + When("DELETE /obp/v7.0.0/management/system/scheduler/job-locks/non-existent with DirectLogin header") + val headers = Map("DirectLogin" -> s"token=${token1.value}") + val (statusCode, _, _) = makeHttpRequestWithMethod( + "DELETE", "/obp/v7.0.0/management/system/scheduler/job-locks/non-existent-job-id-xyz", headers) + + Then("Response is 204 — delete is idempotent") + statusCode shouldBe 204 + } + } + // ─── addEntitlement ─────────────────────────────────────────────────────────── feature("Http4s700 addEntitlement endpoint") { diff --git a/obp-api/src/test/scala/code/scheduler/MetricsArchiveSchedulerTest.scala b/obp-api/src/test/scala/code/scheduler/MetricsArchiveSchedulerTest.scala index cc53b9bcd2..15402e64d1 100644 --- a/obp-api/src/test/scala/code/scheduler/MetricsArchiveSchedulerTest.scala +++ b/obp-api/src/test/scala/code/scheduler/MetricsArchiveSchedulerTest.scala @@ -149,12 +149,16 @@ class MetricsArchiveSchedulerTest extends ServerSetup { scenario("runOnce is skipped (no work, no log row) when a job lock is already present") { seedMetric(daysAgo(800), validUuid()) // Simulate an in-progress run on this or another node. - JobScheduler.create.JobId(validUuid()).Name(jobName).ApiInstanceId("other-node").saveMe() + val lockJobId = validUuid() + JobScheduler.create.JobId(lockJobId).Name(jobName).ApiInstanceId("other-node").saveMe() val outcome = MetricsArchiveScheduler.runOnce() Then("the run is skipped and nothing changed") - outcome should equal(RunSkippedAlreadyInProgress) + outcome shouldBe a[RunSkippedAlreadyInProgress] + val skipped = outcome.asInstanceOf[RunSkippedAlreadyInProgress] + skipped.jobId should equal(lockJobId) + skipped.apiInstanceId should equal("other-node") MetricsArchiveRun.count should equal(0L) MappedMetric.count should equal(1L) } From 7549094c5d3daa0bcd26c5813e86eef68a036d4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Mili=C4=87?= Date: Tue, 9 Jun 2026 11:05:34 +0200 Subject: [PATCH 3/3] fix(scheduler): boot cleanup must match ApiInstanceId, not Name MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MetricsArchiveScheduler's startup cleanup used By(JobScheduler.Name, apiInstanceId), but lock rows are created with Name=jobName and ApiInstanceId=apiInstanceId — so it never matched and a redeploy could not clear an orphaned lock. Only the "older than 5 days" sweep would, leaving archiving stalled for up to 5 days (observed on a sandbox: a crash's lock survived the next-day deploy and blocked every tick for ~2 weeks). runOnce's try/finally already prevents an *exception* from orphaning the lock; this closes the residual gap for terminations that bypass finally (kill -9, OOM, container eviction, power loss). Keyed on this instance's own ApiInstanceId, so another node's genuinely-running job is left untouched. --- .../code/scheduler/MetricsArchiveScheduler.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala b/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala index e647699f33..df1b02d14a 100644 --- a/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala +++ b/obp-api/src/main/scala/code/scheduler/MetricsArchiveScheduler.scala @@ -53,8 +53,15 @@ object MetricsArchiveScheduler extends MdcLoggable { logger.info(s"--------- Clean up Jobs ---------") logger.info(s"Delete all Jobs created by api_instance_id=$apiInstanceId") - JobScheduler.findAll(By(JobScheduler.Name, apiInstanceId)).map { i => - println(s"Job name: ${i.name}, Date: ${i.createdAt}") + // On boot this instance cannot have a genuinely-running job, so clear any of its + // own leftover lock rows (e.g. orphaned by a kill -9 / OOM / container eviction + // that bypassed the finally in runOnce). Match on ApiInstanceId, NOT Name — lock + // rows store Name=jobName and ApiInstanceId=apiInstanceId, so the old + // `By(Name, apiInstanceId)` never matched and a redeploy could not self-heal + // (only the 5-day sweep below would, leaving archiving stalled up to 5 days). + // Keyed on this instance's own id, so another node's running job is untouched. + JobScheduler.findAll(By(JobScheduler.ApiInstanceId, apiInstanceId)).map { i => + logger.info(s"Deleting leftover Job name: ${i.name}, Date: ${i.createdAt}, api_instance_id: $apiInstanceId") i }.map(_.delete_!) logger.info(s"Delete all Jobs older than 5 days")