-
Notifications
You must be signed in to change notification settings - Fork 10
Sync from csv #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Sync from csv #18
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| # requires-python = ">=3.11" | ||
| # dependencies = [ | ||
| # "click>=8.1.7", | ||
| # "dateparser>=1.2.0", | ||
| # "garminconnect>=0.3.0", | ||
| # "httpx[http2,cli,brotli]>=0.28.1", | ||
| # "inquirer>=3.4.0", | ||
|
|
@@ -30,16 +31,20 @@ | |
| import os | ||
| import pathlib | ||
| import platform | ||
| import re | ||
| import tempfile | ||
| import unicodedata | ||
| from contextlib import contextmanager | ||
| from datetime import datetime, timedelta | ||
| from functools import cache, wraps | ||
|
|
||
| import click | ||
| import dateparser | ||
| import garminconnect as GC | ||
| import inquirer | ||
| import keyring | ||
| from dateutil import parser as dateutil_parser | ||
| from dateutil import tz | ||
| from httpx import HTTPStatusError | ||
|
|
||
| import omronconnect as OC | ||
|
|
@@ -1350,6 +1355,182 @@ def garmin_get_weighins(gc: GC.Garmin, startdate: str, enddate: str) -> T.Dict[s | |
| return gcWeighins | ||
|
|
||
|
|
||
| ######################################################################################################################## | ||
| def _normalize_csv_header(value: str) -> str: | ||
| normalized = unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii") | ||
| normalized = normalized.lower() | ||
| normalized = re.sub(r"\([^)]*\)", "", normalized) | ||
| normalized = re.sub(r"[^a-z0-9]+", " ", normalized) | ||
| return normalized.strip() | ||
|
|
||
|
|
||
| def _find_csv_column(fieldnames: T.Sequence[str], terms: T.List[str], required: bool = True) -> T.Optional[str]: | ||
| normalized_fields = {name: _normalize_csv_header(name) for name in fieldnames} | ||
|
|
||
| for original, normalized in normalized_fields.items(): | ||
| if all(term in normalized for term in terms): | ||
| return original | ||
|
|
||
| if required: | ||
| raise ValueError( | ||
| f"Unable to find expected CSV column containing terms {terms}. Found columns: {', '.join(fieldnames)}" | ||
| ) | ||
|
|
||
| return None | ||
|
|
||
|
|
||
| def _parse_omron_connect_date(date_str: str): | ||
| parsed = dateparser.parse( | ||
| date_str, | ||
| settings={ | ||
| "DATE_ORDER": "DMY", | ||
| "STRICT_PARSING": True, | ||
| }, | ||
| ) | ||
| if parsed is None: | ||
| raise ValueError(f"Unsupported date format: '{date_str}'") | ||
|
|
||
| return parsed.date() | ||
|
|
||
|
|
||
| def _parse_omron_connect_timezone(timezone_str: T.Optional[str], fallback_tz: T.Any) -> T.Any: | ||
| if timezone_str is None or not timezone_str.strip(): | ||
| return fallback_tz | ||
|
|
||
| parsed_tz = tz.gettz(timezone_str.strip()) | ||
| if parsed_tz is None: | ||
| raise ValueError(f"Unsupported time zone: '{timezone_str}'") | ||
|
|
||
| return parsed_tz | ||
|
|
||
|
|
||
| def _parse_omron_connect_datetime(date_str: str, time_str: str, measurement_tz: T.Any) -> datetime: | ||
| date_value = _parse_omron_connect_date(date_str) | ||
| time_value = None | ||
|
|
||
| for time_format in ("%H:%M", "%H:%M:%S"): | ||
| try: | ||
| time_value = datetime.strptime(time_str.strip(), time_format).time() | ||
| break | ||
|
|
||
| except ValueError: | ||
| continue | ||
|
|
||
| if time_value is None: | ||
| try: | ||
| time_value = dateutil_parser.parse(time_str).time() | ||
|
|
||
| except (ValueError, TypeError, dateutil_parser.ParserError) as e: | ||
| raise ValueError(f"Unsupported time format: '{time_str}'") from e | ||
|
|
||
| return datetime.combine(date_value, time_value).replace(tzinfo=measurement_tz) | ||
|
|
||
|
|
||
| def _parse_omron_connect_measurement_datetime(datetime_str: str, measurement_tz: T.Any) -> datetime: | ||
| parsed = dateparser.parse( | ||
| datetime_str, | ||
| settings={ | ||
| "DATE_ORDER": "DMY", | ||
| "STRICT_PARSING": True, | ||
| }, | ||
| ) | ||
| if parsed is None: | ||
| raise ValueError(f"Unsupported datetime format: '{datetime_str}'") | ||
|
|
||
| if parsed.tzinfo is not None: | ||
| return parsed.astimezone(measurement_tz) | ||
|
|
||
| return parsed.replace(tzinfo=measurement_tz) | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here better use https://dateutil.readthedocs.io/en/stable/index.html ? from dateutil import parser as dateutil_parser def _parse_omron_connect_datetime(date_str: str, timezone_str: str) -> datetime: |
||
|
|
||
| def load_omron_connect_bp_csv(csv_path: str) -> T.List[OC.BPMeasurement]: | ||
| """Load blood pressure measurements exported from OMRON Connect CSV.""" | ||
|
|
||
| fallback_tz = datetime.now().astimezone().tzinfo | ||
| if fallback_tz is None: | ||
| raise ValueError("Unable to determine local timezone for CSV import") | ||
|
|
||
| with open(csv_path, "r", encoding="utf-8-sig", newline="") as f: | ||
| sample = f.read(4096) | ||
| f.seek(0) | ||
|
|
||
| try: | ||
| dialect = csv.Sniffer().sniff(sample, delimiters=",;\t") | ||
|
|
||
| except csv.Error: | ||
| dialect = csv.excel | ||
|
|
||
| reader = csv.DictReader(f, dialect=dialect) | ||
| if not reader.fieldnames: | ||
| raise ValueError("CSV file has no header row") | ||
|
|
||
| datetime_col = _find_csv_column(reader.fieldnames, ["measurement", "date"], required=False) | ||
|
|
||
| date_col = _find_csv_column(reader.fieldnames, ["date"], required=False) | ||
| if date_col is None: | ||
| date_col = _find_csv_column(reader.fieldnames, ["datum"]) | ||
|
|
||
| time_col = None | ||
| if datetime_col is None: | ||
| time_col = _find_csv_column(reader.fieldnames, ["time"], required=False) | ||
| if time_col is None: | ||
| time_col = _find_csv_column(reader.fieldnames, ["tid"]) | ||
|
|
||
| timezone_col = _find_csv_column(reader.fieldnames, ["time", "zone"], required=False) | ||
|
|
||
| systolic_col = _find_csv_column(reader.fieldnames, ["systol"], required=False) | ||
| if systolic_col is None: | ||
| systolic_col = _find_csv_column(reader.fieldnames, ["sys"]) | ||
|
|
||
| diastolic_col = _find_csv_column(reader.fieldnames, ["diastol"], required=False) | ||
| if diastolic_col is None: | ||
| diastolic_col = _find_csv_column(reader.fieldnames, ["dia"]) | ||
|
|
||
| pulse_col = _find_csv_column(reader.fieldnames, ["puls"], required=False) | ||
| if pulse_col is None: | ||
| pulse_col = _find_csv_column(reader.fieldnames, ["pulse"]) | ||
|
|
||
| notes_col = _find_csv_column(reader.fieldnames, ["anm"], required=False) | ||
| if notes_col is None: | ||
| notes_col = _find_csv_column(reader.fieldnames, ["note"], required=False) | ||
|
|
||
| measurements: T.List[OC.BPMeasurement] = [] | ||
| for row_num, row in enumerate(reader, start=2): | ||
| if not any((value or "").strip() for value in row.values()): | ||
| continue | ||
|
|
||
| try: | ||
| measurement_tz = _parse_omron_connect_timezone(row.get(timezone_col), fallback_tz) | ||
| if datetime_col is not None: | ||
| measurement_dt = _parse_omron_connect_measurement_datetime(row[datetime_col], measurement_tz) | ||
|
|
||
| else: | ||
| measurement_dt = _parse_omron_connect_datetime(row[date_col], row[time_col], measurement_tz) | ||
|
|
||
| systolic = int(str(row[systolic_col]).strip()) | ||
| diastolic = int(str(row[diastolic_col]).strip()) | ||
| pulse = int(str(row[pulse_col]).strip()) | ||
| notes = str(row.get(notes_col, "")).strip() if notes_col else "" | ||
| if notes in ("-", "\u2013"): | ||
| notes = "" | ||
|
|
||
| except (TypeError, ValueError, KeyError) as e: | ||
| raise ValueError(f"Failed to parse CSV row {row_num}: {e}") from e | ||
|
|
||
| measurements.append( | ||
| OC.BPMeasurement( | ||
| systolic=systolic, | ||
| diastolic=diastolic, | ||
| pulse=pulse, | ||
| measurementDate=int(measurement_dt.timestamp() * 1000), | ||
| timeZone=measurement_tz, | ||
| notes=notes, | ||
| ) | ||
| ) | ||
|
|
||
| return sorted(measurements, key=lambda m: m.measurementDate) | ||
|
|
||
|
|
||
| ######################################################################################################################## | ||
| class DateRangeException(Exception): | ||
| pass | ||
|
|
@@ -1912,6 +2093,12 @@ def remove_device(ctx: click.Context, devname: str): | |
| show_default=True, | ||
| help="Do not write to Garmin Connect.", | ||
| ) | ||
| @click.option( | ||
| "--csv-file", | ||
| type=click.Path(exists=True, dir_okay=False, readable=True), | ||
| default=None, | ||
| help="Sync blood pressure data from an OMRON Connect exported CSV file instead of the OMRON API.", | ||
| ) | ||
| @click.pass_context | ||
| @requires_config | ||
| def sync_device( | ||
|
|
@@ -1923,6 +2110,7 @@ def sync_device( | |
| to_date: T.Optional[str], | ||
| overwrite: bool, | ||
| no_write: bool, | ||
| csv_file: T.Optional[str], | ||
| ): | ||
| """Sync DEVNAMES... to Garmin Connect. | ||
|
|
||
|
|
@@ -1960,6 +2148,9 @@ def sync_device( | |
| \b | ||
| # End date minus 7 days | ||
| omramin sync --to 20240131 --days 7 | ||
| \b | ||
| # Sync blood pressure data (incl. pulse) from OMRON Connect CSV export | ||
| omramin sync --csv-file "omron-export.csv" | ||
| """ | ||
|
|
||
| config_path = ctx.obj["config_path"] | ||
|
|
@@ -1968,6 +2159,71 @@ def sync_device( | |
| opts.overwrite = overwrite | ||
| opts.write_to_garmin = not no_write | ||
|
|
||
| if csv_file and device_category and device_category.upper() != OC.DeviceCategory.BPM.name: | ||
| L.error("CSV sync only supports blood pressure data. Use --category BPM or omit --category.") | ||
| return | ||
|
|
||
| if csv_file and devnames: | ||
| L.warning("DEVNAMES are ignored when --csv-file is used") | ||
|
|
||
| if csv_file: | ||
| try: | ||
| measurements = load_omron_connect_bp_csv(csv_file) | ||
|
|
||
| except ValueError as e: | ||
| L.error(f"CSV parsing error: {e}") | ||
| return | ||
|
|
||
| if not measurements: | ||
| L.info("No blood pressure measurements found in CSV file") | ||
| return | ||
|
|
||
| if days is not None or from_date is not None or to_date is not None: | ||
| try: | ||
| startLocal, endLocal = calculate_date_range_from_options( | ||
| from_date=from_date, | ||
| to_date=to_date, | ||
| days=days, | ||
| ) | ||
|
|
||
| except DateRangeException as e: | ||
| L.error(f"Invalid date range: {e}") | ||
| return | ||
|
|
||
| except ValueError as e: | ||
| L.error(f"Date parsing error: {e}") | ||
| return | ||
|
|
||
| min_ms = int(startLocal * 1000) | ||
| max_ms = int(endLocal * 1000) | ||
| measurements = [m for m in measurements if min_ms <= m.measurementDate <= max_ms] | ||
|
|
||
| if not measurements: | ||
| L.info("No CSV measurements matched the requested date range") | ||
| return | ||
|
|
||
| try: | ||
| gc = garmin_login(config_path) | ||
|
|
||
| except LoginError: | ||
| L.info("Failed to login to Garmin Connect.") | ||
| return | ||
|
|
||
| if not gc: | ||
| L.info("Failed to login to Garmin Connect.") | ||
| return | ||
|
|
||
| first_ts = measurements[0].measurementDate / 1000 | ||
| last_ts = measurements[-1].measurementDate / 1000 | ||
| startdateStr = datetime.fromtimestamp(first_ts).date().isoformat() | ||
| enddateStr = datetime.fromtimestamp(last_ts).date().isoformat() | ||
|
|
||
| L.info(f"Loaded {len(measurements)} blood pressure entries from CSV '{csv_file}'") | ||
| gcData = garmin_get_bp_measurements(gc, startdateStr, enddateStr) | ||
| sync_bp_measurements(gc, gcData, T.cast(T.List[OC.MeasurementTypes], measurements), opts) | ||
| L.info("CSV sync completed") | ||
| return | ||
|
|
||
| try: | ||
| config = U.json_load(config_path) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| garminconnect | ||
| click | ||
| dateparser | ||
| brotlicffi | ||
| httpx[http2,cli,brotli] | ||
| python-dateutil | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can that be done with https://dateparser.readthedocs.io/en/latest/ ?
this would remove the hardcoded strings map and remove rx parsing/normalization