From b16f35527c46e11159eba0c2a6c1e6cd324d2994 Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Fri, 12 Jun 2026 15:35:12 -0400 Subject: [PATCH 1/8] fix: migrate_osfmetrics_fix_6to8 make sure all items get a recent monthly usage report with cumulative counts up to this point -- previous data migration incorrectly assumed monthly usage reports were either complete or completely missing - restore some previously-removed elastic6 config and dsl types in osf.metrics.es6_metrics - change django-elasticsearch-metrics dependency to a version with elastic6 support added back in - add migrate_osfmetrics_fix_6to8 management command - shows diagnostic counts from es6 and es8 (unless --no-counts) - starts a task for each item with any past usage to add a usage report for 2026-05 with accurate cumulative view/download counts - allow running via osf-admin web ui --- .docker-compose.env | 1 + admin/management/urls.py | 3 +- admin/management/views.py | 16 + admin/templates/management/commands.html | 16 + api/base/settings/defaults.py | 7 + api/metrics/views.py | 5 +- docker-compose.yml | 18 + .../commands/migrate_osfmetrics_fix_6to8.py | 495 ++++++++++++++++++ .../commands/monthly_reporters_go.py | 2 +- osf/metrics/__init__.py | 2 + osf/metrics/es6_metrics.py | 143 +++++ osf/metrics/reporters/public_item_usage.py | 52 +- osf/metrics/utils.py | 36 ++ poetry.lock | 46 +- pyproject.toml | 4 +- website/settings/defaults.py | 3 + 16 files changed, 805 insertions(+), 44 deletions(-) create mode 100644 osf/management/commands/migrate_osfmetrics_fix_6to8.py create mode 100644 osf/metrics/es6_metrics.py diff --git a/.docker-compose.env b/.docker-compose.env index a712f7ac23c..80eebc8707b 100644 --- a/.docker-compose.env +++ b/.docker-compose.env @@ -6,6 +6,7 @@ DOMAIN=http://localhost:5000/ INTERNAL_DOMAIN=http://192.168.168.167:5000/ API_DOMAIN=http://localhost:8000/ ELASTIC_URI=192.168.168.167:9200 +ELASTIC6_URI=192.168.168.167:9201 ELASTIC8_URI=http://192.168.168.167:9202 ELASTIC8_USERNAME=elastic OSF_DB_HOST=192.168.168.167 diff --git a/admin/management/urls.py b/admin/management/urls.py index 3d29a259483..851227efafa 100644 --- a/admin/management/urls.py +++ b/admin/management/urls.py @@ -1,4 +1,4 @@ -from django.urls import re_path +from django.urls import re_path, path from admin.management import views @@ -22,4 +22,5 @@ name='sync_notification_templates'), re_path(r'^remove_orcid_from_user_social', views.RemoveOrcidFromUserSocial.as_view(), name='remove_orcid_from_user_social'), + path('migrate_osfmetrics_fix_6to8', views.MigrateOsfmetricsFix6to8.as_view(), name='migrate_osfmetrics_fix_6to8'), ] diff --git a/admin/management/views.py b/admin/management/views.py index 4b6e6b4c080..038b29a56a8 100644 --- a/admin/management/views.py +++ b/admin/management/views.py @@ -1,3 +1,5 @@ +from io import StringIO + from dateutil.parser import isoparse from django.views.generic import TemplateView, View from django.contrib import messages @@ -203,3 +205,17 @@ def post(self, request): remove_orcid_from_user_social() messages.success(request, 'Orcid from user social have been successfully removed.') return redirect(reverse('management:commands')) + + +class MigrateOsfmetricsFix6to8(ManagementCommandPermissionView): + def post(self, request): + _command_kwargs = { + 'no_color': True, + 'no_counts': request.POST.get('no_counts'), + 'start': request.POST.get('start'), + } + _out_io = StringIO() + call_command('migrate_osfmetrics_fix_6to8', **_command_kwargs, stdout=_out_io) + for _line in _out_io.getvalue().split('\n'): + messages.info(request, _line) + return redirect(reverse('management:commands')) diff --git a/admin/templates/management/commands.html b/admin/templates/management/commands.html index fd9ceec9c1b..4293fe66194 100644 --- a/admin/templates/management/commands.html +++ b/admin/templates/management/commands.html @@ -190,6 +190,22 @@

Remove existing orcid info from user social

+
+

migrate osf-metrics (fix) 6to8

+

+ view progress of the osf-metrics migration (fixup) from elastic6 to elastic8 (or start it) +

+
+ {% csrf_token %} + + + +
+
{% endblock %} diff --git a/api/base/settings/defaults.py b/api/base/settings/defaults.py index 215a2552dfe..014862adbcf 100644 --- a/api/base/settings/defaults.py +++ b/api/base/settings/defaults.py @@ -321,6 +321,12 @@ # django-elasticsearch-metrics DJELME_BACKENDS = { + 'osfmetrics_es6': { + 'elasticsearch_metrics.imps.elastic6': { + 'hosts': osf_settings.ELASTIC6_URI, + 'retry_on_timeout': True, + }, + }, 'osfmetrics_es8': { 'elasticsearch_metrics.imps.elastic8': { # passthru kwargs to elasticsearch8 connection constructor @@ -338,6 +344,7 @@ }, } OSF_USAGEEVENT_EXPIRATION_DAYS = 90 +ELASTICSEARCH_METRICS_DATE_FORMAT = '%Y' WAFFLE_CACHE_NAME = 'waffle_cache' STORAGE_USAGE_CACHE_NAME = 'storage_usage' diff --git a/api/metrics/views.py b/api/metrics/views.py index 618f04d6d17..607e79cab3d 100644 --- a/api/metrics/views.py +++ b/api/metrics/views.py @@ -136,8 +136,9 @@ def _do_es_request(self, django_request, djelme_backend_name, method, path): {'Content-Type': _content_type, 'Accept': 'application/json'} if _content_type else None ) + _perform_fn = getattr(_client, 'perform_request', None) or _client.transport.perform_request try: - _response = _client.perform_request( + _response = _perform_fn( method, f'/{path}', params=django_request.GET.dict(), @@ -150,7 +151,7 @@ def _do_es_request(self, django_request, djelme_backend_name, method, path): content_type='text/plain; charset=utf-8', status=_api_error.status_code, ) - return JsonResponse(_response.body) + return JsonResponse(_response if isinstance(_response, dict) else _response.body) def _get_es_client(self, djelme_backend_name): try: diff --git a/docker-compose.yml b/docker-compose.yml index c62541d6596..02ac3a78ce6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,8 @@ volumes: external: false elasticsearch8_data_vol: external: false + elasticsearch6_data_vol: + external: false rabbitmq_vol: external: false preprints_dist_vol: @@ -83,6 +85,22 @@ services: retries: 30 stdin_open: true + # Temporary: Remove when done with es6 + elasticsearch6: + image: docker.elastic.co/elasticsearch/elasticsearch:6.3.1 + environment: + - ES_JAVA_OPTS=-Xms512m -Xmx512m # reduce memory usage + ports: + - 9201:9200 + volumes: + - elasticsearch6_data_vol:/usr/share/elasticsearch/data + healthcheck: + start_period: 15s + test: curl -s http://localhost:9200/_cluster/health | grep -vq '"status":"red"' + interval: 10s + retries: 30 + stdin_open: true + postgres: image: postgres:15.4 command: diff --git a/osf/management/commands/migrate_osfmetrics_fix_6to8.py b/osf/management/commands/migrate_osfmetrics_fix_6to8.py new file mode 100644 index 00000000000..6644f4e9200 --- /dev/null +++ b/osf/management/commands/migrate_osfmetrics_fix_6to8.py @@ -0,0 +1,495 @@ +import collections +import heapq +import itertools +import logging + +from django.core.management.base import BaseCommand +from django.db import OperationalError as DjangoOperationalError +from elasticsearch6.exceptions import ConnectionError as Elastic6ConnectionError +from elasticsearch6 import helpers as es6_helpers +from elasticsearch6_dsl.connections import connections as es6_connections +from elasticsearch8.exceptions import TransportError as Elastic8TransportError +from elasticsearch8.helpers import BulkIndexError as Elastic8BulkIndexError +from psycopg2 import OperationalError as PostgresOperationalError + +from framework.celery_tasks import app as celery_app +from framework.sentry import log_exception +from osf.metrics import es6_metrics +from osf.metrics.monthly_reports import MonthlyPublicItemUsageReport +from osf.metrics.utils import ( + YearMonth, + get_database_iri, + get_item_type, + iter_composite_bucket_keys, +) +from osf import models as osfdb +from osf.models.base import osfid_iri +from website import settings as website_settings + + +### +# constants + +_FIX_YEARMONTH = YearMonth(2026, 5) + +_MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control + +_COMPOSITE_CHUNK_SIZE = 500 + +_RETRY_ERRORS = ( + DjangoOperationalError, + Elastic6ConnectionError, + Elastic8TransportError, + PostgresOperationalError, +) +_TASK_KWARGS = dict( + autoretry_for=_RETRY_ERRORS, + retry_backoff=True, # exponential backoff, with jitter + max_retries=20, +) + + +### +# fix usage report migration + +@celery_app.task(**_TASK_KWARGS) +def schedule_fix_usage_reports(after_osfid: str | None = None): + _until_when = _FIX_YEARMONTH.month_end() + _last_osfid = None + try: + for _osfid in _merge_sorted_osfids( + _each_countedusage_osfid(_until_when, after_osfid), + _each_preprintview_osfid(_until_when, after_osfid), + _each_preprintdownload_osfid(_until_when, after_osfid), + ): + add_fixed_usage_report.delay(_osfid) + _last_osfid = _osfid + except _RETRY_ERRORS as _error: + if _last_osfid is None: + raise # didn't even get started + # schedule another task to continue scheduling + schedule_fix_usage_reports.delay( + after_osfid=( # avoid infinite loop from _merge_sorted_osfids removing "_v1" + f'{_last_osfid}_v1' + if '_' not in _last_osfid + else _last_osfid + ), + ) + # let this task succeed but log the error + log_exception(_error) + + +@celery_app.task(**_TASK_KWARGS) +def add_fixed_usage_report(osfid: str): + # from PublicItemUsageReport to MonthlyPublicItemUsageReportEs8 + _osfobj, _ = osfdb.Guid.load_referent(osfid) + if _osfobj: + _usage_report = _make_usage_report(_osfobj, _FIX_YEARMONTH) + _usage_report.save() + + +### +# various helper functions + +def _es6_connection(): + return es6_connections.get_connection('osfmetrics_es6') + + +def _es8_bulk_save(es8_recordtype, each_new_record): + try: + es8_recordtype.bulk(each_new_record, stats_only=True) + except Elastic8BulkIndexError as _bulk_error: + # so actual errors show in celery task result + raise Exception(_bulk_error.errors) from _bulk_error + + +def _es6_scan_range( + es6_recordtype, + *, + from_when: str = '', + until_when: str, + addl_filter=None, +): + _timestamp_range = {'lt': until_when} + if from_when: + _timestamp_range['gte'] = from_when + _filters = [ + {'range': {'timestamp': _timestamp_range}}, + ] + if addl_filter: + _filters.append(addl_filter) + _query_body = {'query': {'bool': {'filter': _filters}}} + return es6_helpers.scan( + _es6_connection(), + index=es6_recordtype._template_pattern, + query=_query_body, + ) + + +def _semverish_from_yearmonth(given_yearmonth): + _ym = YearMonth.from_any(given_yearmonth) + return f'{_ym.year}.{_ym.month}' + + +def _make_usage_report(osf_obj, yearmonth: YearMonth): + # add a "last month" report with cumulative counts up to that point + _is_preprint = isinstance(osf_obj, osfdb.Preprint) + # total counts + _c_views, _c_view_sess, _c_downloads, _c_download_sess = _get_cumulative_usage( + osfid=osf_obj._id, + until_when=yearmonth.month_end().isoformat(), + is_preprint=_is_preprint, + ) + # counts for last month only + _views, _view_sess, _downloads, _download_sess = _get_cumulative_usage( + osfid=osf_obj._id, + until_when=yearmonth.month_end().isoformat(), + from_when=yearmonth.month_start().isoformat(), + is_preprint=_is_preprint, + ) + return MonthlyPublicItemUsageReport( + cycle_coverage=_semverish_from_yearmonth(yearmonth), + item_iri=osfid_iri(osf_obj._id), + item_osfids=[osf_obj._id], + item_types=[get_item_type(osf_obj)], + provider_ids=[_get_provider_id(osf_obj)], + database_iris=[get_database_iri(osf_obj)], + platform_iris=[website_settings.DOMAIN], + view_count=_views, + view_session_count=_view_sess, + cumulative_view_count=_c_views, + cumulative_view_session_count=_c_view_sess, + download_count=_downloads, + download_session_count=_download_sess, + cumulative_download_count=_c_downloads, + cumulative_download_session_count=_c_download_sess, + ) + + +def _get_provider_id(osfid_referent): + _provider = getattr(osfid_referent, 'provider', None) + if _provider is None: + return 'osf' # quacks like Node, Comment, WikiPage + elif isinstance(_provider, str): + return _provider # quacks like BaseFileNode + else: + return _provider._id # quacks like Registration, Preprint, Collection + + +def _get_cumulative_usage(osfid: str, *, until_when, from_when='', is_preprint: bool): + if is_preprint: + _views = _cumulative_preprint_count( + es6_metrics.PreprintView, + osfid, + until_when=until_when, + from_when=from_when, + ) + _downloads = _cumulative_preprint_count( + es6_metrics.PreprintDownload, + osfid, + until_when=until_when, + from_when=from_when, + ) + _view_sess, _download_sess = _views, _downloads # no session info on preprints (yet) + else: + _views, _view_sess = _cumulative_countedusage_views( + osfid, + until_when=until_when, + from_when=from_when, + ) + _downloads, _download_sess = _cumulative_countedusage_downloads( + osfid, + until_when=until_when, + from_when=from_when, + ) + return (_views, _view_sess, _downloads, _download_sess) + + +def _cumulative_countedusage_views( + osfid: str, *, until_when: str, from_when: str = '' +) -> tuple[int, int]: + # copied/adapted from osf.metrics.reporters.public_item_usage + _search = ( + es6_metrics.CountedAuthUsage.search() + .filter('term', item_public=True) + .filter('range', timestamp={'lt': until_when}) + .filter('term', action_labels='view') + .filter( + 'bool', + should=[ + {'term': {'item_guid': osfid}}, + {'term': {'surrounding_guids': osfid}}, + ], + minimum_should_match=1, + ) + .extra(size=0) # only aggregations, no hits + ) + if from_when: + _search = _search.filter('range', timestamp={'gte': from_when}) + _search.aggs.metric( + 'agg_session_count', + 'cardinality', + field='session_id', + precision_threshold=_MAX_CARDINALITY_PRECISION, + ) + _response = _search.execute() + _view_count = _response.hits.total + _view_session_count = ( + _response.aggregations.agg_session_count.value + if 'agg_session_count' in _response.aggregations + else 0 + ) + return (_view_count, _view_session_count) + + +def _cumulative_countedusage_downloads(osfid, *, until_when, from_when) -> tuple[int, int]: + '''aggregate downloads on each osfid (not including components/files)''' + # copied/adapted from osf.metrics.reporters.public_item_usage + _search = ( + es6_metrics.CountedAuthUsage.search() + .filter('term', item_public=True) + .filter('range', timestamp={'lt': until_when}) + .filter('term', action_labels='download') + .filter('term', item_guid=osfid) + ) + if from_when: + _search = _search.filter('range', timestamp={'gte': from_when}) + _search.aggs.metric( + 'agg_session_count', + 'cardinality', + field='session_id', + precision_threshold=_MAX_CARDINALITY_PRECISION, + ) + _response = _search.execute() + _download_count = _response.hits.total + _download_session_count = ( + _response.aggregations.agg_session_count.value + if 'agg_session_count' in _response.aggregations + else 0 + ) + return (_download_count, _download_session_count) + + +def _cumulative_preprint_count( + preprint_metric_cls, osfid: str, *, until_when: str, from_when: str = '' +) -> int: + '''aggregate counts on given preprint''' + # copied/adapted from osf.metrics.preprint_metrics + _search = ( + preprint_metric_cls.search() + .filter('terms', preprint_id=_synonymous_osfids(osfid)) + .filter('range', timestamp={'lt': until_when}) + .extra(size=0) # no hits; only aggs + ) + if from_when: + _search = _search.filter('range', timestamp={'gte': from_when}) + _search.aggs.metric('agg_count', 'sum', field='count') + _response = _search.execute() + return ( + int(_response.aggregations.agg_count.value) + if hasattr(_response.aggregations, 'agg_count') + else 0 + ) + + +def _synonymous_osfids(osfid: str) -> list[str]: + _synonyms = [osfid] + if osfid.endswith('_v1'): + # include pre-versioned-guid counts for v1 + _synonyms.append(osfid.removesuffix('_v1')) + elif '_' not in osfid: + # include v1 (if it exists) with unversioned guid + _synonyms.append(f'{osfid}_v1') + return _synonyms + + +def _each_countedusage_osfid(until_when, after_osfid=None) -> collections.abc.Iterator[str]: + _search = ( + es6_metrics.CountedAuthUsage.search() + .filter('term', item_public=True) + .filter('terms', action_labels=['view', 'download']) + .filter('range', timestamp={'lt': until_when}) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'item_guid'}}}], + size=_COMPOSITE_CHUNK_SIZE, + ) + return iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) + + +def _each_preprintview_osfid(until_when, after_osfid=None) -> collections.abc.Iterator[str]: + _search = ( + es6_metrics.PreprintView.search() + .filter('range', timestamp={'lt': until_when}) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'preprint_id'}}}], + size=_COMPOSITE_CHUNK_SIZE, + ) + return iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) + + +def _each_preprintdownload_osfid(until_when, after_osfid=None) -> collections.abc.Iterator[str]: + _search = ( + es6_metrics.PreprintDownload.search() + .filter('range', timestamp={'lt': until_when}) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'preprint_id'}}}], + size=_COMPOSITE_CHUNK_SIZE, + ) + return iter_composite_bucket_keys(_search, 'agg_osfid', 'osfid', after=after_osfid) + + +def _merge_sorted_osfids(*osfid_iterables): + def _osfids_group_key(osfid: str): + return ( # v1 same as unversioned + osfid.removesuffix('_v1') + if osfid.endswith('_v1') + else osfid + ) + for _k, _g in itertools.groupby( + heapq.merge(*osfid_iterables), + key=_osfids_group_key, + ): + yield _k + + +def _es8_usage_report_count(yearmonth: YearMonth) -> int: + _search = ( + MonthlyPublicItemUsageReport.search() + .filter('term', cycle_coverage=_semverish_from_yearmonth(yearmonth)) + .extra(track_total_hits=True) + ) + _response = _search.execute() + _total_count = _response.hits.total.value + return _total_count + + +def _es8_usage_report_osfid_count() -> int: + _search = ( + MonthlyPublicItemUsageReport.search() + .extra(size=0) # only aggs, no hits + ) + _search.aggs.metric( + 'agg_osfid_count', + 'cardinality', + field='item_osfids', + precision_threshold=_MAX_CARDINALITY_PRECISION, + ) + _response = _search.execute() + return ( + _response.aggregations.agg_osfid_count.value + if 'agg_osfid_count' in _response.aggregations + else 0 + ) + + +def _es6_preprint_osfid_count(preprint_metric_cls) -> int: + _search = ( + preprint_metric_cls.search() + .filter('range', timestamp={'lt': _FIX_YEARMONTH.month_end()}) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.metric( + 'agg_osfid_count', + 'cardinality', + field='preprint_id', + precision_threshold=_MAX_CARDINALITY_PRECISION, + ) + _response = _search.execute() + return ( + _response.aggregations.agg_osfid_count.value + if 'agg_osfid_count' in _response.aggregations + else 0 + ) + + +def _es6_cu_osfid_count() -> int: + _search = ( + es6_metrics.CountedAuthUsage.search() + .filter('range', timestamp={'lt': _FIX_YEARMONTH.month_end()}) + .extra(size=0) # only aggregations, no hits + ) + _search.aggs.metric( + 'agg_osfid_count', + 'cardinality', + field='item_guid', + precision_threshold=_MAX_CARDINALITY_PRECISION, + ) + _response = _search.execute() + return ( + _response.aggregations.agg_osfid_count.value + if 'agg_osfid_count' in _response.aggregations + else 0 + ) + + +### +# the command itself + +class Command(BaseCommand): + def add_arguments(self, parser): + parser.add_argument( + '--no-counts', + action='store_true', + ) + parser.add_argument( + '--start', + action='store_true', + ) + + def handle(self, *, start, no_counts, **kwargs): + self._quiet_chatty_loggers() + if not no_counts: + # display counts of reports and distinct items + _prior_ym = _FIX_YEARMONTH.prior() + self.stdout.write( + f'total osfids with preprint views thru {_FIX_YEARMONTH} in es6' + f': {_es6_preprint_osfid_count(es6_metrics.PreprintView)}' + ) + self.stdout.write( + f'total osfids with preprint downloads thru {_FIX_YEARMONTH} in es6' + f': {_es6_preprint_osfid_count(es6_metrics.PreprintDownload)}' + ) + self.stdout.write( + f'total osfids with with counted usage thru {_FIX_YEARMONTH} in es6' + f': {_es6_cu_osfid_count()}' + ) + self.stdout.write( + f'total osfids with usage reports in es8' + f': {_es8_usage_report_osfid_count()}\t<== compare this' + ) + self.stdout.write( + f'total usage reports for {_prior_ym} in es8' + f': {_es8_usage_report_count(_prior_ym)}' + ) + self.stdout.write( + f'total usage reports for {_FIX_YEARMONTH} in es8' + f': {_es8_usage_report_count(_FIX_YEARMONTH)}\t<== to this' + ) + # (if --start) schedule task per item (by composite agg on es6 usage reports and events) + # each item-task iter thru reports oldest to newest, adding cumulative counts + if start: + self.stdout.write( + f'starting per-item tasks to add a corrected usage report for {_FIX_YEARMONTH}' + ) + schedule_fix_usage_reports.delay() + + def _quiet_chatty_loggers(self): + _chatty_loggers = [ + 'elasticsearch', + 'elastic_transport', + 'elasticsearch_metrics', + ] + for logger_name in _chatty_loggers: + logging.getLogger(logger_name).setLevel(logging.ERROR) diff --git a/osf/management/commands/monthly_reporters_go.py b/osf/management/commands/monthly_reporters_go.py index 9f6d57bc5db..3f54d0208d1 100644 --- a/osf/management/commands/monthly_reporters_go.py +++ b/osf/management/commands/monthly_reporters_go.py @@ -71,7 +71,7 @@ def schedule_monthly_reporter( @celery_app.task( name='management.commands.monthly_reporter_do', autoretry_for=_CONTINUE_AFTER_ERRORS, - max_retries=5, + max_retries=15, retry_backoff=True, ) def monthly_reporter_do(reporter_key: str, yearmonth: str, report_kwargs: dict): diff --git a/osf/metrics/__init__.py b/osf/metrics/__init__.py index 7d124c501b7..08c3f938950 100644 --- a/osf/metrics/__init__.py +++ b/osf/metrics/__init__.py @@ -2,6 +2,7 @@ events, daily_reports, monthly_reports, + es6_metrics, ) @@ -9,4 +10,5 @@ 'events', 'daily_reports', 'monthly_reports', + 'es6_metrics', ) diff --git a/osf/metrics/es6_metrics.py b/osf/metrics/es6_metrics.py new file mode 100644 index 00000000000..4b29d80a01a --- /dev/null +++ b/osf/metrics/es6_metrics.py @@ -0,0 +1,143 @@ +import enum +import logging + +from elasticsearch6.exceptions import NotFoundError +from elasticsearch6_dsl import InnerDoc, analyzer, tokenizer +import elasticsearch_metrics.imps.elastic6 as metrics + + +logger = logging.getLogger(__name__) + +### +# preprint views and downloads + +class BasePreprintMetric(metrics.Metric): + count = metrics.Integer(doc_values=True, index=True, required=True) + provider_id = metrics.Keyword(index=True, doc_values=True, required=True) + user_id = metrics.Keyword(index=True, doc_values=True, required=False) + preprint_id = metrics.Keyword(index=True, doc_values=True, required=True) + version = metrics.Keyword(index=True, doc_values=True) + path = metrics.Text(index=True) + + # TODO: locale + + class Index: + settings = { + 'number_of_shards': 1, + 'number_of_replicas': 1, + 'refresh_interval': '1s', + } + + class Meta: + abstract = True + source = metrics.MetaField(enabled=True) + + @classmethod + def get_count_for_preprint(cls, preprint, after=None, before=None, index=None) -> int: + if preprint.version == 1: + search = cls.search(index=index).filter('terms', preprint_id=[preprint.get_guid()._id, preprint._id]) + else: + search = cls.search(index=index).filter('term', preprint_id=preprint._id) + timestamp = {} + if after: + timestamp['gte'] = after + if before: + timestamp['lt'] = before + if timestamp: + search = search.filter('range', timestamp=timestamp) + search.aggs.metric('sum_count', 'sum', field='count') + # Optimization: set size to 0 so that hits aren't returned (we only care about the aggregation) + search = search.extra(size=0) + try: + response = search.execute() + except NotFoundError: + # _get_relevant_indices returned 1 or more indices + # that doesn't exist. Fall back to unoptimized query + search = search.index().index(cls._default_index()) + response = search.execute() + # No indexed data + if not hasattr(response.aggregations, 'sum_count'): + return 0 + return int(response.aggregations.sum_count.value) + + def save(self, *args, **kwargs): + raise RuntimeError('no saving to es6') + + +class PreprintView(BasePreprintMetric): + pass + + +class PreprintDownload(BasePreprintMetric): + pass + + +### +# counted-usage + +route_prefix_analyzer = analyzer( + 'route_prefix_analyzer', + tokenizer=tokenizer('route_prefix_tokenizer', 'path_hierarchy', delimiter='.'), +) + +class PageviewInfo(InnerDoc): + """PageviewInfo + + for CountedAuthUsage generated by viewing a web page + """ + # fields that should be provided + referer_url = metrics.Keyword() + page_url = metrics.Keyword() + page_title = metrics.Keyword() + route_name = metrics.Keyword( + fields={ + 'by_prefix': metrics.Text(analyzer=route_prefix_analyzer), + }, + ) + + # fields autofilled from the above (see `_autofill_fields`) + page_path = metrics.Keyword() + referer_domain = metrics.Keyword() + hour_of_day = metrics.Integer() + + +class CountedAuthUsage(metrics.Metric): + """CountedAuthUsage + + Something was used! Let's quickly take note of that and + move on, then come back later to query/analyze/investigate. + + Aim to support a COUNTER-style reporting api + (see https://cop5.projectcounter.org/en/5.0.2/) + """ + + # where noted, fields correspond to defined terms from COUNTER + # https://cop5.projectcounter.org/en/5.0.2/appendices/a-glossary-of-terms.html + platform_iri = metrics.Keyword() # counter:Platform + provider_id = metrics.Keyword() # counter:Database(?) + session_id = metrics.Keyword() # counter:Session + item_guid = metrics.Keyword() # counter:Item + item_type = metrics.Keyword() # counter:Data-Type + surrounding_guids = metrics.Keyword(multi=True) # counter:Title + item_public = metrics.Boolean() # counter:Access-Type(?) + user_is_authenticated = metrics.Boolean() + + action_labels = metrics.Keyword(multi=True) + class ActionLabel(enum.Enum): + SEARCH = 'search' # counter:Search + VIEW = 'view' # counter:Investigation + DOWNLOAD = 'download' # counter:Request + WEB = 'web' # counter:Regular (aka "pageview") + API = 'api' # counter:TDM (aka "non-web api usage") + # TODO: count api usage, distinguish between web and non-web api requests + + # pageviews get additional info to support the "node analytics" view + # (see `api.metrics.views.NodeAnalyticsQuery`) + pageview_info = metrics.Object(PageviewInfo) + + class Meta: + dynamic = metrics.MetaField('strict') + source = metrics.MetaField(enabled=True) + + def save(self, *args, **kwargs): + raise RuntimeError('no saving to es6') diff --git a/osf/metrics/reporters/public_item_usage.py b/osf/metrics/reporters/public_item_usage.py index 1dfa5deba56..bd79070e67b 100644 --- a/osf/metrics/reporters/public_item_usage.py +++ b/osf/metrics/reporters/public_item_usage.py @@ -7,7 +7,11 @@ from osf.metadata.osf_gathering import OsfmapPartition from osf.metrics.monthly_reports import MonthlyPublicItemUsageReport from osf.metrics.events import OsfCountedUsageEvent -from osf.metrics.utils import YearMonth, cycle_coverage_yearmonth +from osf.metrics.utils import ( + YearMonth, + cycle_coverage_yearmonth, + iter_composite_bucket_keys, +) from ._base import MonthlyReporter @@ -15,6 +19,8 @@ _MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control +_NOT_BEFORE = YearMonth(2025, 6) + class _SkipItem(Exception): pass @@ -25,6 +31,14 @@ class PublicItemUsageReporter(MonthlyReporter): includes projects, project components, registrations, registration components, and preprints ''' + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if self.yearmonth < _NOT_BEFORE: + raise RuntimeError( + f'{self.__class__.__name__} cannot see before {_NOT_BEFORE} ' + '(when we started letting old event data expire)' + ) + def iter_report_kwargs(self, continue_after: dict | None = None): _after_item_iri = continue_after['item_iri'] if continue_after else None for _item_iri in self._each_item_iri(_after_item_iri): @@ -58,7 +72,7 @@ def _each_item_iri(self, after_item_iri: str | None) -> typing.Iterator[str]: sources=[{'item_iri': {'terms': {'field': 'item_iri'}}}], size=_CHUNK_SIZE, ) - return _iter_composite_bucket_keys(_search, 'agg_item_iri', 'item_iri', after=after_item_iri) + return iter_composite_bucket_keys(_search, 'agg_item_iri', 'item_iri', after=after_item_iri) def _build_report(self, item_iri) -> MonthlyPublicItemUsageReport: # get usage metrics from OsfCountedUsageEvent: @@ -160,37 +174,3 @@ def _prior_usage_report(self, item_iri): def _bucket_keys(buckets): return [_bucket['key'] for _bucket in buckets] - - -def _iter_composite_bucket_keys( - search: esdsl.Search, - composite_agg_name: str, - composite_source_name: str, - after: str | None = None, -) -> typing.Iterator[str]: - '''iterate thru *all* buckets of a composite aggregation, requesting new pages as needed - - assumes the given search has a composite aggregation of the given name - with a single value source of the given name - - updates the search in-place for subsequent pages - ''' - if after is not None: - search.aggs[composite_agg_name].after = {composite_source_name: after} - while True: - _page_response = search.execute(ignore_cache=True) # reused search object has the previous page cached - try: - _agg_result = _page_response.aggregations[composite_agg_name] - except KeyError: - return # no data; all done - for _bucket in _agg_result.buckets: - _key = _bucket.key.to_dict() - assert set(_key.keys()) == {composite_source_name}, f'expected only one key ("{composite_source_name}") in {_bucket.key}' - yield _key[composite_source_name] - # update the search for the next page - try: - _next_after = _agg_result.after_key - except AttributeError: - return # all done - else: - search.aggs[composite_agg_name].after = _next_after diff --git a/osf/metrics/utils.py b/osf/metrics/utils.py index 95ed8efdb2d..1ce923de86e 100644 --- a/osf/metrics/utils.py +++ b/osf/metrics/utils.py @@ -1,4 +1,5 @@ from __future__ import annotations +import collections.abc as cabc import calendar import dataclasses import re @@ -6,6 +7,7 @@ from hashlib import sha256 from typing import ClassVar +from elasticsearch8 import dsl as es8dsl from elasticsearch_metrics.util.timeparts import serialize_timeparts from osf.metadata.osfmap_utils import ( @@ -172,3 +174,37 @@ def month_start(self) -> datetime.datetime: def month_end(self) -> datetime.datetime: """get a datetime (in UTC timezone) when this YearMonth ends (the start of next month)""" return self.next().month_start() + + +def iter_composite_bucket_keys( + search: es8dsl.Search, + composite_agg_name: str, + composite_source_name: str, + after: str | None = None, +) -> cabc.Iterator[str]: + '''iterate thru *all* buckets of a composite aggregation, requesting new pages as needed + + assumes the given search has a composite aggregation of the given name + with a single value source of the given name + + updates the search in-place for subsequent pages + ''' + if after is not None: + search.aggs[composite_agg_name].after = {composite_source_name: after} + while True: + _page_response = search.execute(ignore_cache=True) # reused search object has the previous page cached + try: + _agg_result = _page_response.aggregations[composite_agg_name] + except KeyError: + return # no data; all done + for _bucket in _agg_result.buckets: + _key = _bucket.key.to_dict() + assert set(_key.keys()) == {composite_source_name}, f'expected only one key ("{composite_source_name}") in {_bucket.key}' + yield _key[composite_source_name] + # update the search for the next page + try: + _next_after = _agg_result.after_key + except AttributeError: + return # all done + else: + search.aggs[composite_agg_name].after = _next_after diff --git a/poetry.lock b/poetry.lock index cdedd8fe11e..67311478796 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1095,13 +1095,14 @@ develop = false [package.extras] anydjango = ["django"] +elastic6 = ["elasticsearch6-dsl (>=6.3.0,<7.0.0)"] elastic8 = ["elasticsearch8 (>=8.0.0,<9.0.0)"] [package.source] type = "git" url = "https://github.com/CenterForOpenScience/django-elasticsearch-metrics.git" -reference = "22cea2531783d3f06da3f4407624aae0bbb50c02" -resolved_reference = "22cea2531783d3f06da3f4407624aae0bbb50c02" +reference = "b5026b0878db738c7c62205819b72cdfbe1c2da6" +resolved_reference = "b5026b0878db738c7c62205819b72cdfbe1c2da6" [[package]] name = "django-extensions" @@ -1389,6 +1390,45 @@ files = [ [package.dependencies] urllib3 = ">=1.8,<2.0" +[[package]] +name = "elasticsearch6" +version = "6.8.2" +description = "Python client for Elasticsearch" +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*, <4" +groups = ["main"] +files = [ + {file = "elasticsearch6-6.8.2-py2.py3-none-any.whl", hash = "sha256:4edf2d61f854f642185d5af915b23c57e70d9f2b54f558b62ae55fa720583f5e"}, + {file = "elasticsearch6-6.8.2.tar.gz", hash = "sha256:7c215910b6bc18928d24d6c1d0b09b0684c824af609906d5e007a9a268109678"}, +] + +[package.dependencies] +urllib3 = ">=1.21.1" + +[package.extras] +develop = ["coverage", "mock", "nose", "nosexcover", "numpy", "pandas", "pyyaml", "requests (>=2.0.0,<3.0.0)", "sphinx (<1.7)", "sphinx-rtd-theme"] +requests = ["requests (>=2.4.0,<3.0.0)"] + +[[package]] +name = "elasticsearch6-dsl" +version = "6.4.0" +description = "Python client for Elasticsearch" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "elasticsearch6-dsl-6.4.0.tar.gz", hash = "sha256:4bbc60919b73484d028eca31f749f0eea80d8b0bfe0a9a33b54eb0afca1d9b5f"}, + {file = "elasticsearch6_dsl-6.4.0-py2.py3-none-any.whl", hash = "sha256:a5767ef65c50f7c8af7ba6c176bd8df2c1fb501c644bc196cbd675f15c0f2be1"}, +] + +[package.dependencies] +elasticsearch6 = ">=6.0.0,<7.0.0" +python-dateutil = "*" +six = "*" + +[package.extras] +develop = ["coverage (<5.0.0)", "mock", "pytest (>=3.0.0)", "pytest-cov", "pytz", "sphinx", "sphinx-rtd-theme"] + [[package]] name = "elasticsearch8" version = "8.19.3" @@ -4687,4 +4727,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "287ebe4055cbb4338c81ad9f5658650e41042447548c151f49b6ef20f9cf57db" +content-hash = "9b2f29b9b4958b47e4c428f4225d04bbd12f239c02f9afc186989a49547b83ed" diff --git a/pyproject.toml b/pyproject.toml index 83f60beaf05..69cfdb8c0fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,8 @@ Markupsafe = "2.1.5" blinker = "1.7.0" furl = "2.1.3" elasticsearch2 = "2.5.1" +elasticsearch6= "6.8.2" +elasticsearch6-dsl = "6.4.0" elasticsearch8 = "8.19.3" elastic-transport = "8.17.1" google-api-python-client = "2.123.0" @@ -94,7 +96,7 @@ datacite = "1.1.3" rdflib = "7.0.0" colorlog = "6.8.2" # Metrics -django-elasticsearch-metrics = {git ="https://github.com/CenterForOpenScience/django-elasticsearch-metrics.git", rev = "22cea2531783d3f06da3f4407624aae0bbb50c02"} +django-elasticsearch-metrics = {git ="https://github.com/CenterForOpenScience/django-elasticsearch-metrics.git", rev = "b5026b0878db738c7c62205819b72cdfbe1c2da6"} # Impact Metrics CSV Export djangorestframework-csv = "3.0.2" gevent = "24.2.1" diff --git a/website/settings/defaults.py b/website/settings/defaults.py index da607c2afb3..dfc78bb07ab 100644 --- a/website/settings/defaults.py +++ b/website/settings/defaults.py @@ -109,6 +109,7 @@ def parent_dir(path): SEARCH_ENGINE = 'elastic' # Can be 'elastic', or None ELASTIC_URI = '127.0.0.1:9200' +ELASTIC6_URI = os.environ.get('ELASTIC6_URI', '127.0.0.1:9201') ELASTIC8_URI = os.environ.get('ELASTIC8_URI') ELASTIC8_CERT_PATH = os.environ.get('ELASTIC8_CERT_PATH') ELASTIC8_ASSERT_HOSTNAME = os.environ.get('ELASTIC8_ASSERT_HOSTNAME') @@ -485,6 +486,7 @@ class CeleryConfig: } background_migration_modules = { + 'osf.management.commands.migrate_osfmetrics_fix_6to8', } try: @@ -601,6 +603,7 @@ class CeleryConfig: 'scripts.remove_after_use.merge_notification_subscription_provider_ct', 'scripts.disable_removed_beat_tasks', 'osf.management.commands.delete_withdrawn_or_failed_registration_files', + 'osf.management.commands.migrate_osfmetrics_fix_6to8', ) # Modules that need metrics and release requirements From 1ddf0bb6ce9cf319c742287afe208c0a842eb453 Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Mon, 15 Jun 2026 08:03:46 -0400 Subject: [PATCH 2/8] add es6 back to github actions --- .github/workflows/test-build.yml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/.github/workflows/test-build.yml b/.github/workflows/test-build.yml index ead75f5e21e..7315f7a4fc0 100644 --- a/.github/workflows/test-build.yml +++ b/.github/workflows/test-build.yml @@ -48,6 +48,15 @@ jobs: --health-interval 10s --health-timeout 30s --health-retries 5 + elasticsearch6: &ES6_SERVICE + image: elasticsearch:6.8.23 + ports: + - 9201:9200 + options: >- + --health-cmd "curl -sf http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=30s" + --health-interval 10s + --health-timeout 30s + --health-retries 5 postgres: &POSTGRES_SERVICE image: postgres env: @@ -67,6 +76,7 @@ jobs: run: poetry run python3 -m invoke test-ci-addons --junit env: ELASTIC8_URI: http://localhost:9202 + ELASTIC6_URI: http://localhost:9201 - name: Upload report if: (success() || failure()) # run this step even if previous step failed uses: ./.github/actions/gen-report @@ -94,6 +104,7 @@ jobs: checks: write services: elasticsearch8: *ES8_SERVICE + elasticsearch6: *ES6_SERVICE postgres: *POSTGRES_SERVICE steps: - uses: actions/checkout@v6 @@ -104,6 +115,7 @@ jobs: run: poetry run python3 -m invoke test-ci-api1-and-js --junit env: ELASTIC8_URI: http://localhost:9202 + ELASTIC6_URI: http://localhost:9201 - name: Upload report if: (success() || failure()) # run this step even if previous step failed uses: ./.github/actions/gen-report @@ -115,6 +127,7 @@ jobs: checks: write services: elasticsearch8: *ES8_SERVICE + elasticsearch6: *ES6_SERVICE postgres: *POSTGRES_SERVICE steps: - uses: actions/checkout@v6 @@ -123,6 +136,7 @@ jobs: run: poetry run python3 -m invoke test-ci-api2 --junit env: ELASTIC8_URI: http://localhost:9202 + ELASTIC6_URI: http://localhost:9201 - name: Upload report if: (success() || failure()) # run this step even if previous step failed uses: ./.github/actions/gen-report @@ -141,6 +155,7 @@ jobs: run: poetry run python3 -m invoke test-ci-api3-and-osf --junit env: ELASTIC8_URI: http://localhost:9202 + ELASTIC6_URI: http://localhost:9201 - name: Upload report if: (success() || failure()) # run this step even if previous step failed uses: ./.github/actions/gen-report From ba22adfefbb4fe5f00d2ee2064c2f4947c0396b9 Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Mon, 15 Jun 2026 13:05:40 -0400 Subject: [PATCH 3/8] fix: orderable YearMonth --- osf/metrics/utils.py | 5 +++++ osf_tests/metrics/test_yearmonth.txt | 24 ++++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/osf/metrics/utils.py b/osf/metrics/utils.py index 1ce923de86e..5fe7f4043cc 100644 --- a/osf/metrics/utils.py +++ b/osf/metrics/utils.py @@ -2,6 +2,7 @@ import collections.abc as cabc import calendar import dataclasses +import functools import re import datetime from hashlib import sha256 @@ -107,6 +108,7 @@ def get_immediate_wrapper(osfid_referent): ) +@functools.total_ordering @dataclasses.dataclass(frozen=True) class YearMonth: """YearMonth: represents a specific month in a specific year""" @@ -151,6 +153,9 @@ def __str__(self): """convert to string of "YYYY-MM" format""" return f'{self.year}-{self.month:0>2}' + def __le__(self, other): + return (self.year <= other.year) and (self.month <= other.month) + def next(self) -> YearMonth: """get a new YearMonth for the month after this one""" return ( diff --git a/osf_tests/metrics/test_yearmonth.txt b/osf_tests/metrics/test_yearmonth.txt index e078b709b6a..72753e54035 100644 --- a/osf_tests/metrics/test_yearmonth.txt +++ b/osf_tests/metrics/test_yearmonth.txt @@ -71,3 +71,27 @@ datetime.datetime(3333, 3, 1, 0, 0, tzinfo=datetime.timezone.utc) datetime.datetime(3333, 4, 1, 0, 0, tzinfo=datetime.timezone.utc) >>> YearMonth(1999, 12).month_end().isoformat() '2000-01-01T00:00:00+00:00' + +comparisons: +>>> YearMonth(1000, 3) < YearMonth(1000, 5) +True +>>> YearMonth(1000, 3) > YearMonth(1000, 5) +False +>>> YearMonth(2000, 3) < YearMonth(1000, 5) +False +>>> YearMonth(2000, 3) > YearMonth(1000, 5) +True +>>> YearMonth(1000, 3) < YearMonth(1000, 3) +False +>>> YearMonth(1000, 3) > YearMonth(1000, 3) +False +>>> YearMonth(1000, 3) < YearMonth(1000, 5) +True +>>> YearMonth(1000, 3) <= YearMonth(1000, 5) +True +>>> YearMonth(1000, 3) >= YearMonth(1000, 5) +False +>>> YearMonth(1000, 3) <= YearMonth(1000, 3) +True +>>> YearMonth(1000, 3) >= YearMonth(1000, 3) +True From d9042b0a3169a357cfad9d49624b7cfe70a89e5b Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Mon, 15 Jun 2026 15:14:21 -0400 Subject: [PATCH 4/8] more correct counts --- osf/management/commands/migrate_osfmetrics_fix_6to8.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/osf/management/commands/migrate_osfmetrics_fix_6to8.py b/osf/management/commands/migrate_osfmetrics_fix_6to8.py index 6644f4e9200..54144e5d747 100644 --- a/osf/management/commands/migrate_osfmetrics_fix_6to8.py +++ b/osf/management/commands/migrate_osfmetrics_fix_6to8.py @@ -417,6 +417,8 @@ def _es6_preprint_osfid_count(preprint_metric_cls) -> int: def _es6_cu_osfid_count() -> int: _search = ( es6_metrics.CountedAuthUsage.search() + .filter('term', item_public=True) + .filter('terms', action_labels=['view', 'download']) .filter('range', timestamp={'lt': _FIX_YEARMONTH.month_end()}) .extra(size=0) # only aggregations, no hits ) From 9bb50dffeabff53d64d3998683bef3c1737d1fd1 Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Tue, 16 Jun 2026 07:40:47 -0400 Subject: [PATCH 5/8] loggable error on non-existant osfid --- osf/management/commands/migrate_osfmetrics_fix_6to8.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/osf/management/commands/migrate_osfmetrics_fix_6to8.py b/osf/management/commands/migrate_osfmetrics_fix_6to8.py index 54144e5d747..5d3c812b24d 100644 --- a/osf/management/commands/migrate_osfmetrics_fix_6to8.py +++ b/osf/management/commands/migrate_osfmetrics_fix_6to8.py @@ -86,6 +86,8 @@ def add_fixed_usage_report(osfid: str): if _osfobj: _usage_report = _make_usage_report(_osfobj, _FIX_YEARMONTH) _usage_report.save() + else: + raise RuntimeError('osfid does not exist! skipping...', osfid) ### From a019d4c0a9249021a9b62b8cb7eefd0a0d1ccf6c Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Tue, 16 Jun 2026 09:41:00 -0400 Subject: [PATCH 6/8] more accurate comparison count --- osf/management/commands/migrate_osfmetrics_fix_6to8.py | 1 + 1 file changed, 1 insertion(+) diff --git a/osf/management/commands/migrate_osfmetrics_fix_6to8.py b/osf/management/commands/migrate_osfmetrics_fix_6to8.py index 5d3c812b24d..6f6aaa2ef16 100644 --- a/osf/management/commands/migrate_osfmetrics_fix_6to8.py +++ b/osf/management/commands/migrate_osfmetrics_fix_6to8.py @@ -380,6 +380,7 @@ def _es8_usage_report_count(yearmonth: YearMonth) -> int: def _es8_usage_report_osfid_count() -> int: _search = ( MonthlyPublicItemUsageReport.search() + .filter('range', cycle_coverage={'lte': _semverish_from_yearmonth(_FIX_YEARMONTH)}) .extra(size=0) # only aggs, no hits ) _search.aggs.metric( From 4583f0d79792a8d3bc6df919c4ed6d7a3973a4ef Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Tue, 16 Jun 2026 09:48:28 -0400 Subject: [PATCH 7/8] move usage report epoch to setting --- api/base/settings/defaults.py | 1 + .../commands/migrate_osfmetrics_fix_6to8.py | 27 ++++++++++--------- osf/metrics/reporters/public_item_usage.py | 9 +++---- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/api/base/settings/defaults.py b/api/base/settings/defaults.py index 014862adbcf..ab4c3523f08 100644 --- a/api/base/settings/defaults.py +++ b/api/base/settings/defaults.py @@ -345,6 +345,7 @@ } OSF_USAGEEVENT_EXPIRATION_DAYS = 90 ELASTICSEARCH_METRICS_DATE_FORMAT = '%Y' +MONTHLY_USAGE_REPORT_EPOCH = '2026-05' # cannot create monthly usage reports before this point WAFFLE_CACHE_NAME = 'waffle_cache' STORAGE_USAGE_CACHE_NAME = 'storage_usage' diff --git a/osf/management/commands/migrate_osfmetrics_fix_6to8.py b/osf/management/commands/migrate_osfmetrics_fix_6to8.py index 6f6aaa2ef16..2c5a3b22dfd 100644 --- a/osf/management/commands/migrate_osfmetrics_fix_6to8.py +++ b/osf/management/commands/migrate_osfmetrics_fix_6to8.py @@ -3,6 +3,7 @@ import itertools import logging +from django.conf import settings as api_settings from django.core.management.base import BaseCommand from django.db import OperationalError as DjangoOperationalError from elasticsearch6.exceptions import ConnectionError as Elastic6ConnectionError @@ -30,7 +31,7 @@ ### # constants -_FIX_YEARMONTH = YearMonth(2026, 5) +_EPOCH_YEARMONTH = YearMonth.from_str(api_settings.MONTHLY_USAGE_REPORT_EPOCH) _MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control @@ -54,7 +55,7 @@ @celery_app.task(**_TASK_KWARGS) def schedule_fix_usage_reports(after_osfid: str | None = None): - _until_when = _FIX_YEARMONTH.month_end() + _until_when = _EPOCH_YEARMONTH.month_end() _last_osfid = None try: for _osfid in _merge_sorted_osfids( @@ -84,7 +85,7 @@ def add_fixed_usage_report(osfid: str): # from PublicItemUsageReport to MonthlyPublicItemUsageReportEs8 _osfobj, _ = osfdb.Guid.load_referent(osfid) if _osfobj: - _usage_report = _make_usage_report(_osfobj, _FIX_YEARMONTH) + _usage_report = _make_usage_report(_osfobj, _EPOCH_YEARMONTH) _usage_report.save() else: raise RuntimeError('osfid does not exist! skipping...', osfid) @@ -380,7 +381,7 @@ def _es8_usage_report_count(yearmonth: YearMonth) -> int: def _es8_usage_report_osfid_count() -> int: _search = ( MonthlyPublicItemUsageReport.search() - .filter('range', cycle_coverage={'lte': _semverish_from_yearmonth(_FIX_YEARMONTH)}) + .filter('range', cycle_coverage={'lte': _semverish_from_yearmonth(_EPOCH_YEARMONTH)}) .extra(size=0) # only aggs, no hits ) _search.aggs.metric( @@ -400,7 +401,7 @@ def _es8_usage_report_osfid_count() -> int: def _es6_preprint_osfid_count(preprint_metric_cls) -> int: _search = ( preprint_metric_cls.search() - .filter('range', timestamp={'lt': _FIX_YEARMONTH.month_end()}) + .filter('range', timestamp={'lt': _EPOCH_YEARMONTH.month_end()}) .extra(size=0) # only aggregations, no hits ) _search.aggs.metric( @@ -422,7 +423,7 @@ def _es6_cu_osfid_count() -> int: es6_metrics.CountedAuthUsage.search() .filter('term', item_public=True) .filter('terms', action_labels=['view', 'download']) - .filter('range', timestamp={'lt': _FIX_YEARMONTH.month_end()}) + .filter('range', timestamp={'lt': _EPOCH_YEARMONTH.month_end()}) .extra(size=0) # only aggregations, no hits ) _search.aggs.metric( @@ -457,17 +458,17 @@ def handle(self, *, start, no_counts, **kwargs): self._quiet_chatty_loggers() if not no_counts: # display counts of reports and distinct items - _prior_ym = _FIX_YEARMONTH.prior() + _prior_ym = _EPOCH_YEARMONTH.prior() self.stdout.write( - f'total osfids with preprint views thru {_FIX_YEARMONTH} in es6' + f'total osfids with preprint views thru {_EPOCH_YEARMONTH} in es6' f': {_es6_preprint_osfid_count(es6_metrics.PreprintView)}' ) self.stdout.write( - f'total osfids with preprint downloads thru {_FIX_YEARMONTH} in es6' + f'total osfids with preprint downloads thru {_EPOCH_YEARMONTH} in es6' f': {_es6_preprint_osfid_count(es6_metrics.PreprintDownload)}' ) self.stdout.write( - f'total osfids with with counted usage thru {_FIX_YEARMONTH} in es6' + f'total osfids with with counted usage thru {_EPOCH_YEARMONTH} in es6' f': {_es6_cu_osfid_count()}' ) self.stdout.write( @@ -479,14 +480,14 @@ def handle(self, *, start, no_counts, **kwargs): f': {_es8_usage_report_count(_prior_ym)}' ) self.stdout.write( - f'total usage reports for {_FIX_YEARMONTH} in es8' - f': {_es8_usage_report_count(_FIX_YEARMONTH)}\t<== to this' + f'total usage reports for {_EPOCH_YEARMONTH} in es8' + f': {_es8_usage_report_count(_EPOCH_YEARMONTH)}\t<== to this' ) # (if --start) schedule task per item (by composite agg on es6 usage reports and events) # each item-task iter thru reports oldest to newest, adding cumulative counts if start: self.stdout.write( - f'starting per-item tasks to add a corrected usage report for {_FIX_YEARMONTH}' + f'starting per-item tasks to add a corrected usage report for {_EPOCH_YEARMONTH}' ) schedule_fix_usage_reports.delay() diff --git a/osf/metrics/reporters/public_item_usage.py b/osf/metrics/reporters/public_item_usage.py index bd79070e67b..e7bccb21bf9 100644 --- a/osf/metrics/reporters/public_item_usage.py +++ b/osf/metrics/reporters/public_item_usage.py @@ -2,6 +2,7 @@ import datetime import typing +from django.conf import settings from elasticsearch8 import dsl as esdsl from osf.metadata.osf_gathering import OsfmapPartition @@ -19,8 +20,6 @@ _MAX_CARDINALITY_PRECISION = 40000 # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-cardinality-aggregation.html#_precision_control -_NOT_BEFORE = YearMonth(2025, 6) - class _SkipItem(Exception): pass @@ -33,10 +32,10 @@ class PublicItemUsageReporter(MonthlyReporter): ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - if self.yearmonth < _NOT_BEFORE: + if self.yearmonth <= YearMonth.from_str(settings.MONTHLY_USAGE_REPORT_EPOCH): raise RuntimeError( - f'{self.__class__.__name__} cannot see before {_NOT_BEFORE} ' - '(when we started letting old event data expire)' + f'{self.__class__.__name__} cannot see before {settings.MONTHLY_USAGE_REPORT_EPOCH}' + ' (when we started letting old event data expire)' ) def iter_report_kwargs(self, continue_after: dict | None = None): From ae30973e1ff8d175f7823aaf12ec79290307f294 Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Tue, 16 Jun 2026 11:10:30 -0400 Subject: [PATCH 8/8] finding anomalies... --- admin/management/views.py | 1 + admin/templates/management/commands.html | 1 + .../commands/migrate_osfmetrics_fix_6to8.py | 36 ++++++++++++++++++- 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/admin/management/views.py b/admin/management/views.py index 038b29a56a8..a0d0da7d437 100644 --- a/admin/management/views.py +++ b/admin/management/views.py @@ -212,6 +212,7 @@ def post(self, request): _command_kwargs = { 'no_color': True, 'no_counts': request.POST.get('no_counts'), + 'find_anomaly': request.POST.get('find_anomaly'), 'start': request.POST.get('start'), } _out_io = StringIO() diff --git a/admin/templates/management/commands.html b/admin/templates/management/commands.html index 4293fe66194..b037be6e3ed 100644 --- a/admin/templates/management/commands.html +++ b/admin/templates/management/commands.html @@ -200,6 +200,7 @@

migrate osf-metrics (fix) 6to8

style="display: flex; flex-direction: column;"> {% csrf_token %} +