Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions obp-api/src/main/resources/props/sample.props.template
Original file line number Diff line number Diff line change
Expand Up @@ -1607,10 +1607,17 @@ enable_metrics_scheduler = true
retain_archive_metrics_days = 1095
# Defines the number of days we keep rows in the table "Metric" former "MappedMetric"
retain_metrics_days = 367
# Defines the number of rows we can process at once
retain_metrics_move_limit = 50000
# Defines the interval of the scheduler
retain_metrics_scheduler_interval_in_seconds = 3600
# Defines the number of rows we can process at once.
# Tuned for smaller, more frequent runs: 10000 rows every 599s (below) = ~60k/hr
# ≈ 16.7 rows/sec max drain rate (keeps up with a sustained ~16 metric-generating
# calls/sec). Keep (move_limit x runs-per-hour) >= your peak metric ingest rate, or
# the archive backlog will grow. Smaller batches mean shorter runs and fewer stale
# scheduler locks from a JVM dying mid-run.
retain_metrics_move_limit = 10000
# Defines the interval of the scheduler (seconds). 599 (a prime, ~10 min) rather
# than 600 so successive runs drift across the wall clock instead of phase-locking
# to every 10th minute and to other periodic schedulers.
retain_metrics_scheduler_interval_in_seconds = 599


# Defines endpoints we want to store responses at Metric table
Expand Down
6 changes: 5 additions & 1 deletion obp-api/src/main/scala/bootstrap/liftweb/Boot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,12 @@ class Boot extends MdcLoggable {

APIUtil.getPropsAsBoolValue("enable_metrics_scheduler", true) match {
case true =>
// Default 599s (a prime, ~10 min) rather than a round 600: the odd interval
// makes successive runs drift across the wall clock instead of phase-locking
// to the top of every 10th minute (and to other periodic schedulers), so
// archive load is spread out rather than coinciding with other spikes.
val interval =
APIUtil.getPropsAsIntValue("retain_metrics_scheduler_interval_in_seconds", 3600)
APIUtil.getPropsAsIntValue("retain_metrics_scheduler_interval_in_seconds", 599)
MetricsArchiveScheduler.start(intervalInSeconds = interval)
case false => // Do not start it
}
Expand Down
6 changes: 6 additions & 0 deletions obp-api/src/main/scala/code/api/util/ApiRole.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,12 @@ object ApiRole extends MdcLoggable{
case class CanCreateMetricsArchiveRun(requiresBankId: Boolean = false) extends ApiRole
lazy val canCreateMetricsArchiveRun = CanCreateMetricsArchiveRun()

case class CanGetSchedulerJobLocks(requiresBankId: Boolean = false) extends ApiRole
lazy val canGetSchedulerJobLocks = CanGetSchedulerJobLocks()

case class CanDeleteSchedulerJobLock(requiresBankId: Boolean = false) extends ApiRole
lazy val canDeleteSchedulerJobLock = CanDeleteSchedulerJobLock()

case class CanGetConnectorHealth(requiresBankId: Boolean = false) extends ApiRole
lazy val canGetConnectorHealth = CanGetConnectorHealth()

Expand Down
53 changes: 53 additions & 0 deletions obp-api/src/main/scala/code/api/util/Glossary.scala
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,59 @@ object Glossary extends MdcLoggable {
|"""
)

glossaryItems += GlossaryItem(
title = "Adapter",
description =
s"""
|## Adapter
|
|In OBP, an Adapter is an out of process component that sits between OBP and a bank's systems of record (Core Banking System, Payment System, or database) and translates between them.
|
|An Adapter is paired with an Indirect [Connector](/glossary#Connector): the Connector inside OBP turns OBP function calls into messages and sends them over a transport (for example RabbitMQ, Akka, or a stored procedure call); the Adapter receives those messages, talks to the CBS, and returns a response in the agreed Outbound / Inbound message format.
|
|i.e. OBP API -> Connector -> Adapter -> CBS
|
|Key properties:
|
|* It runs outside OBP, in its own process, typically on the bank's side.
|* It can be written in any language, as long as it respects the message format published in the Message Docs for the relevant Connector. This is the main advantage over a Direct Connector, which must be written in a JVM language.
|* It usually contains bank specific integration code: the data access, field mappings, identifier translation, and quirks of that one bank's CBS. As a result, each bank typically has its own Adapter build.
|* The Adapter is responsible for emitting OBP shaped values where required (for example a UUID shaped ACCOUNT_ID mapped to the underlying core banking account number).
|
|For worked examples of writing an Adapter, see [Adapter.Akka.Intro](/glossary#Adapter.Akka.Intro) and [Adapter.Stored_Procedure.Intro](/glossary#Adapter.Stored_Procedure.Intro).
|""".stripMargin
)

glossaryItems += GlossaryItem(
title = "OBP Bank Node",
description =
s"""
|## OBP Bank Node
|
|An OBP Bank Node is a standardised software component designed to run at many banks inside their own network that connect to their Core Banking System (CBS) to an OBP API instance operated by a platform operator (for example TESOBE), without the bank having to run any OBP infrastructure itself.
|
|It is deployed as a single self contained service (typically a Docker container). All of its network connections are outbound from the bank's network (or controlled cloud) and no inbound ports are exposed to the public internet. To the bank's CBS it presents one small local interface (for example few REST endpoints); everything else (talking to the OBP API, and any systems it integrates) happens behind that interface.
|
|### How it relates to a Connector and an Adapter
|
|The OBP Bank Node is neither an OBP [Connector](/glossary#Connector) nor a traditional South Side Adapter, although it sits in similar territory. Two properties make the difference:
|
|* Direction of control. A South Side Adapter is called by an OBP Connector over a message bus: OBP is the caller and the Adapter responds to request messages with CBS data. The OBP Bank Node does the opposite on its north side: it acts as a client of the OBP API, calling OBP's REST interface itself. It both initiates calls to OBP and exposes a local interface to the bank's CBS, rather than only responding.
|
|* Code versus configuration. A South Side Adapter usually carries a significant amount of bank specific code: the translation logic for one bank's CBS (its data access, field mappings, and integration quirks) is written into the Adapter, so each bank effectively gets its own Adapter build. The OBP Bank Node carries no bank specific code; its per bank behaviour is entirely configuration. Any bank specific code in an integration lives on the bank's own side of the local interface (for example the CBS code that receives the Node's notifications), never inside the Node.
|
|In short: an OBP Connector is JVM code inside OBP that talks to systems of record; an Adapter is an out of process component that an Indirect Connector calls and that contains bank specific integration code; the OBP Bank Node is a bank side gateway that is configured rather than coded per bank and that acts as a client of the OBP API.
|
|### Open or closed source
|
|Because it integrates through the OBP API's published interfaces, vendors can build and run their own implementations.
|
|### Use cases
| The Node approach is suitable when implementing an OBP platform business that involves many banks in a common use case - or where the Platform utilises other interfaces e.g. to blockchains.
|
|""".stripMargin
)

glossaryItems += GlossaryItem(
title = "Connector.User.Authentication",
description =
Expand Down
93 changes: 92 additions & 1 deletion obp-api/src/main/scala/code/api/v7_0_0/Http4s700.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import code.api.Constant._
import code.api.ResourceDocs1_4_0.SwaggerDefinitionsJSON._
import code.api.util.APIUtil.{EmptyBody, _}
import code.api.util.{APIUtil, ApiRole, CallContext, CustomJsonFormats, Glossary, NewStyle}
import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateEntitlementAtOneBank, canCreateMetricsArchiveRun, canCreateOrganisation, canCreateRoutingScheme, canCreateTestEmail, canDeleteEntitlementAtAnyBank, canDeleteOrganisation, canDeleteRoutingScheme, canGetAccountAccessTrace, canGetAnyOrganisation, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetConnectorHealth, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMetricsDiagnostics, canGetMigrations, canUpdateBankSupportedRoutingScheme, canUpdateOrganisation, canUpdateRoutingScheme, canUpdateSystemView}
import code.api.util.ApiRole.{canCreateEntitlementAtAnyBank, canCreateEntitlementAtOneBank, canCreateMetricsArchiveRun, canCreateOrganisation, canCreateRoutingScheme, canCreateTestEmail, canDeleteEntitlementAtAnyBank, canDeleteOrganisation, canDeleteRoutingScheme, canDeleteSchedulerJobLock, canGetAccountAccessTrace, canGetAnyOrganisation, canGetAnyUser, canGetCacheConfig, canGetCacheInfo, canGetCacheNamespaces, canGetConnectorHealth, canGetCustomersAtOneBank, canGetDatabasePoolInfo, canGetMetricsDiagnostics, canGetMigrations, canGetSchedulerJobLocks, canUpdateBankSupportedRoutingScheme, canUpdateOrganisation, canUpdateRoutingScheme, canUpdateSystemView}
import code.api.util.CommonsEmailWrapper
import code.model.dataAccess.AuthUser
import code.api.util.ApiTag._
Expand Down Expand Up @@ -3353,6 +3353,10 @@ object Http4s700 {
|* `message` — human-readable summary.
|* `run` — the recorded run (run id, counts, duration, success, remark);
| absent when skipped.
|* `in_progress` — present only when skipped: the lock that blocked the run
| (`job_id`, `api_instance_id`, `started_at`, `age_seconds`). A large
| `age_seconds` (much older than a normal run) indicates a stale lock left
| by a dead JVM — clear the matching `jobscheduler` row to unblock.
|
|Note: the run executes synchronously, so a large backlog may take a while.
|
Expand All @@ -3365,6 +3369,93 @@ object Http4s700 {
http4sPartialFunction = Some(triggerMetricsArchiveRun)
)

// Route: GET /obp/v7.0.0/management/system/scheduler/job-locks
//
// List the `jobscheduler` lock rows (newest first, capped at 100). This table
// holds a row only while a scheduled job holds its lock — the row is deleted
// when the job finishes — so in healthy operation this is empty. Any row here
// is a currently-running job or a stale lock left by a dead JVM; `age_seconds`
// tells them apart, and the row can be cleared with the DELETE route below.
val getSchedulerJobs: HttpRoutes[IO] = HttpRoutes.of[IO] {
case req @ GET -> `prefixPath` / "management" / "system" / "scheduler" / "job-locks" =>
EndpointHelpers.withUser(req) { (_, _) =>
Future {
JSONFactory700.createSchedulerJobsJsonV700(code.scheduler.JobScheduler.mostRecent(100))
}
}
}

resourceDocs += ResourceDoc(
implementedInApiVersion,
nameOf(getSchedulerJobs),
"GET",
"/management/system/scheduler/job-locks",
"Get Scheduler Job Locks",
s"""List the scheduler lock rows from the `jobscheduler` table (most recent first, up to 100).
|
|**This is a lock table, not a job-history log.** A row exists only while a
|scheduled job (e.g. `MetricsArchiveScheduler`) holds its lock; it is deleted
|when the job finishes. So in healthy operation this list is **empty**.
|
|A row that is present is therefore one of:
|* a job genuinely running right now (small `age_seconds`), or
|* a **stale lock** left by a JVM that died mid-run (large `age_seconds`) —
| this blocks new runs of that job (e.g. "Trigger a Metrics Archive Run"
| returns `skipped_already_in_progress`). Clear it with
| `DELETE /management/system/scheduler/job-locks/JOB_ID`.
|
|Each row reports `job_id`, `name`, `api_instance_id`, `started_at` and
|`age_seconds` (seconds since the lock was taken).
|
|${userAuthenticationMessage(true)}""".stripMargin,
EmptyBody,
JSONFactory700.schedulerJobsJsonV700Example,
List($AuthenticatedUserIsRequired, UserHasMissingRoles, UnknownError),
apiTagSystem :: apiTagApi :: Nil,
Some(List(canGetSchedulerJobLocks)),
http4sPartialFunction = Some(getSchedulerJobs)
)

// Route: DELETE /obp/v7.0.0/management/system/scheduler/job-locks/JOB_ID
//
// Clear a scheduler lock row by its job id. Use this to release a stale lock
// left by a dead JVM so the job (e.g. metrics archiving) can run again.
// Idempotent — returns 204 even if the row is already gone.
val deleteSchedulerJob: HttpRoutes[IO] = HttpRoutes.of[IO] {
case req @ DELETE -> `prefixPath` / "management" / "system" / "scheduler" / "job-locks" / jobId =>
EndpointHelpers.withUserDelete(req) { (_, _) =>
Future { code.scheduler.JobScheduler.deleteByJobId(jobId); () }
}
}

resourceDocs += ResourceDoc(
implementedInApiVersion,
nameOf(deleteSchedulerJob),
"DELETE",
"/management/system/scheduler/job-locks/JOB_ID",
"Delete Scheduler Job Lock",
s"""Clear a scheduler lock row from the `jobscheduler` table by its `JOB_ID`.
|
|Use this to release a **stale lock** left by a JVM that died mid-run, which
|would otherwise keep a scheduled job (e.g. `MetricsArchiveScheduler`) from
|starting — see "Get Scheduler Job Locks" to find the `job_id` and judge staleness
|from its `age_seconds`.
|
|**Caution:** if the job is genuinely still running on some node, deleting its
|lock lets a second run start concurrently. Only clear locks you have confirmed
|are stale (much older than a normal run).
|
|Idempotent — returns 204 even if no row with that `JOB_ID` exists.
|
|${userAuthenticationMessage(true)}""".stripMargin,
EmptyBody,
EmptyBody,
List($AuthenticatedUserIsRequired, UserHasMissingRoles, UnknownError),
apiTagSystem :: apiTagApi :: Nil,
Some(List(canDeleteSchedulerJobLock)),
http4sPartialFunction = Some(deleteSchedulerJob)
)

// Enabled only in Lift test mode (Props.testMode == true, i.e. -Drun.mode=test).
// Props.testMode is set from the JVM system property before any props file loads,
// so it is reliably available at object-initialization time unlike file-based props.
Expand Down
70 changes: 65 additions & 5 deletions obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1138,13 +1138,26 @@
remark = r.Remark.get
)

// The in-progress archive job whose lock blocked a new run. Surfaced so an
// operator can tell a genuinely-running job from a stale lock left by a dead
// JVM: an `age_seconds` of seconds is a real run; minutes/hours/days is almost
// certainly abandoned and the `jobscheduler` lock row can be cleared by hand.
case class InProgressArchiveJobJsonV700(
job_id: String,

Check warning on line 1146 in obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter to match the regular expression "^[_a-zA-Z][a-zA-Z0-9]*$".

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ6ykfObaWMTRv-FViv2&open=AZ6ykfObaWMTRv-FViv2&pullRequest=2835
api_instance_id: String,

Check warning on line 1147 in obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter to match the regular expression "^[_a-zA-Z][a-zA-Z0-9]*$".

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ6ykfObaWMTRv-FViv3&open=AZ6ykfObaWMTRv-FViv3&pullRequest=2835
started_at: Date,

Check warning on line 1148 in obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter to match the regular expression "^[_a-zA-Z][a-zA-Z0-9]*$".

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ6ykfObaWMTRv-FViv4&open=AZ6ykfObaWMTRv-FViv4&pullRequest=2835
age_seconds: Long

Check warning on line 1149 in obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter to match the regular expression "^[_a-zA-Z][a-zA-Z0-9]*$".

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ6ykfObaWMTRv-FViv5&open=AZ6ykfObaWMTRv-FViv5&pullRequest=2835
)

// Result of manually triggering an archive run. `status` is one of
// "completed" (a run executed — inspect `run.success`) or
// "skipped_already_in_progress" (a run was already running, so none was started).
// "skipped_already_in_progress" (a run was already running, so none was started;
// `in_progress` then describes the lock that blocked it).
case class TriggerMetricsArchiveRunResponseJsonV700(
status: String,
message: String,
run: Option[MetricsArchiveRunJsonV700]
run: Option[MetricsArchiveRunJsonV700],
in_progress: Option[InProgressArchiveJobJsonV700] = None

Check warning on line 1160 in obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter to match the regular expression "^[_a-zA-Z][a-zA-Z0-9]*$".

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ6ykfObaWMTRv-FViv6&open=AZ6ykfObaWMTRv-FViv6&pullRequest=2835
)

def createTriggerMetricsArchiveRunResponseJsonV700(outcome: code.scheduler.RunOutcome): TriggerMetricsArchiveRunResponseJsonV700 =
Expand All @@ -1156,11 +1169,15 @@
else
s"Archive run completed with errors: ${r.Remark.get}"
TriggerMetricsArchiveRunResponseJsonV700("completed", msg, Some(metricsArchiveRunToJson(r)))
case code.scheduler.RunSkippedAlreadyInProgress =>
case code.scheduler.RunSkippedAlreadyInProgress(jobId, apiInstanceId, startedAt) =>
val ageSeconds = (System.currentTimeMillis - startedAt.getTime) / 1000L
TriggerMetricsArchiveRunResponseJsonV700(
"skipped_already_in_progress",
"An archive run is already in progress; no new run was started.",
None)
s"An archive run started at $startedAt on api_instance_id '$apiInstanceId' is already in progress " +
s"(job $jobId, running for $ageSeconds seconds); no new run was started. " +
s"If this is much older than a normal run, the lock is likely stale and can be cleared.",
None,
Some(InProgressArchiveJobJsonV700(jobId, apiInstanceId, startedAt, ageSeconds)))
}

lazy val triggerMetricsArchiveRunResponseJsonV700Example = TriggerMetricsArchiveRunResponseJsonV700(
Expand All @@ -1179,6 +1196,49 @@
))
)

// One row of the `jobscheduler` lock table. This table holds a row only while a
// job holds the scheduler lock (deleted when the job finishes), so a row here is
// a currently-running job or a stale lock left by a dead JVM — `age_seconds`
// tells them apart.
case class SchedulerJobJsonV700(
job_id: String,

Check warning on line 1204 in obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter to match the regular expression "^[_a-zA-Z][a-zA-Z0-9]*$".

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ6ykfObaWMTRv-FViv7&open=AZ6ykfObaWMTRv-FViv7&pullRequest=2835
name: String,
api_instance_id: String,

Check warning on line 1206 in obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter to match the regular expression "^[_a-zA-Z][a-zA-Z0-9]*$".

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ6ykfObaWMTRv-FViv8&open=AZ6ykfObaWMTRv-FViv8&pullRequest=2835
started_at: Date,

Check warning on line 1207 in obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter to match the regular expression "^[_a-zA-Z][a-zA-Z0-9]*$".

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ6ykfObaWMTRv-FViv9&open=AZ6ykfObaWMTRv-FViv9&pullRequest=2835
age_seconds: Long

Check warning on line 1208 in obp-api/src/main/scala/code/api/v7_0_0/JSONFactory7.0.0.scala

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this parameter to match the regular expression "^[_a-zA-Z][a-zA-Z0-9]*$".

See more on https://sonarcloud.io/project/issues?id=OpenBankProject_OBP-API&issues=AZ6ykfObaWMTRv-FViv-&open=AZ6ykfObaWMTRv-FViv-&pullRequest=2835
)

case class SchedulerJobsJsonV700(
jobs: List[SchedulerJobJsonV700],
count: Int
)

def createSchedulerJobsJsonV700(rows: List[code.scheduler.JobScheduler]): SchedulerJobsJsonV700 = {
val now = System.currentTimeMillis
val jobs = rows.map { r =>
val startedAt = r.createdAt.get
SchedulerJobJsonV700(
job_id = r.JobId.get,
name = r.Name.get,
api_instance_id = r.ApiInstanceId.get,
started_at = startedAt,
age_seconds = (now - startedAt.getTime) / 1000L
)
}
SchedulerJobsJsonV700(jobs, jobs.size)
}

lazy val schedulerJobsJsonV700Example = SchedulerJobsJsonV700(
jobs = List(SchedulerJobJsonV700(
job_id = "9f3c2b1a-7d4e-4c8a-9b2f-1e6d5a0c4b7e",
name = "MetricsArchiveScheduler",
api_instance_id = "obp",
started_at = new Date(1717200000000L),
age_seconds = 42L
)),
count = 1
)

private val metricsOneDayInMillis: Long = 86400000L
private def metricsAgeInDays(d: Date, now: Date): Long =
(now.getTime - d.getTime) / metricsOneDayInMillis
Expand Down
18 changes: 18 additions & 0 deletions obp-api/src/main/scala/code/scheduler/JobScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ class JobScheduler extends JobSchedulerTrait with LongKeyedMapper[JobScheduler]

object JobScheduler extends JobScheduler with LongKeyedMetaMapper[JobScheduler] {
override def dbIndexes: List[BaseIndex[JobScheduler]] = UniqueIndex(JobId) :: super.dbIndexes

/**
* The most recent scheduler-lock rows, newest first, capped at `limit`.
*
* Note: `jobscheduler` is a lock table, not a job-history log — a row exists
* only while a job holds the lock and is deleted when the job finishes. In
* healthy operation this returns an empty list; any rows present are either
* currently running or stale locks left by a dead JVM.
*/
def mostRecent(limit: Int): List[JobScheduler] =
findAll(OrderBy(JobScheduler.createdAt, Descending), MaxRows(limit))

/** Delete the lock row with the given JobId; returns true if a row was removed. */
def deleteByJobId(jobId: String): Boolean =
find(By(JobScheduler.JobId, jobId)) match {
case net.liftweb.common.Full(job) => delete_!(job)
case _ => false
}
}


Expand Down
Loading
Loading