From a584aeab17fedbe9355c7092fb47fcdb4bea550d Mon Sep 17 00:00:00 2001 From: Steve Jennen Date: Mon, 15 Jun 2026 10:47:23 -0500 Subject: [PATCH] Add Omniscan3D support and fix message definition lookup --- brping/__init__.py | 3 +- brping/pingmessage.py | 48 +-- ci/deploy-whitelist | 1 + examples/omniscan3dExample.py | 224 ++++++++++++++ generate/generate-python.py | 9 +- generate/templates/device.py.in | 33 ++- generate/templates/omniscan3d.py.in | 419 +++++++++++++++++++++++++++ generate/templates/omniscan450.py.in | 2 +- generate/templates/s500.py.in | 2 +- generate/templates/surveyor240.py.in | 14 +- 10 files changed, 713 insertions(+), 42 deletions(-) create mode 100644 examples/omniscan3dExample.py create mode 100644 generate/templates/omniscan3d.py.in diff --git a/brping/__init__.py b/brping/__init__.py index 88804040..d0fa4f37 100644 --- a/brping/__init__.py +++ b/brping/__init__.py @@ -6,4 +6,5 @@ from brping.ping360 import Ping360 from brping.surveyor240 import Surveyor240 from brping.s500 import S500 -from brping.omniscan450 import Omniscan450 \ No newline at end of file +from brping.omniscan450 import Omniscan450 +from brping.omniscan3d import Omniscan3D \ No newline at end of file diff --git a/brping/pingmessage.py b/brping/pingmessage.py index ce701c0d..6fbee970 100644 --- a/brping/pingmessage.py +++ b/brping/pingmessage.py @@ -14,7 +14,9 @@ definitions.SURVEYOR240_ATOF_POINT_DATA, definitions.SURVEYOR240_YZ_POINT_DATA, definitions.S500_PROFILE6_T, - definitions.OMNISCAN450_OS_MONO_PROFILE + definitions.OMNISCAN450_OS_MONO_PROFILE, + definitions.OMNISCAN3D_JSON_WRAPPER, + definitions.OMNISCAN3D_OS3D_POINT_SET ] @@ -75,7 +77,12 @@ class PingMessage(object): # start_mm = m.start_mm # length_mm = m.length_mm # @endcode - def __init__(self, msg_id=0, msg_data=None): + def __init__(self, msg_id=0, msg_data=None, payload_dict=None): + if payload_dict is None: + payload_dict = definitions.payload_dict_all + + self.payload_dict = payload_dict + ## The message id self.message_id = msg_id @@ -106,10 +113,10 @@ def __init__(self, msg_id=0, msg_data=None): try: ## The name of this message - self.name = payload_dict[self.message_id]["name"] + self.name = self.payload_dict[self.message_id]["name"] ## The field names of this message - self.payload_field_names = payload_dict[self.message_id]["field_names"] + self.payload_field_names = self.payload_dict[self.message_id]["field_names"] # initialize payload field members for attr in self.payload_field_names: @@ -141,7 +148,7 @@ def pack_msg_data(self): msg_format = PingMessage.endianess + PingMessage.header_format + self.get_payload_format() # Prepare complete list of field names (header + payload) - attrs = PingMessage.header_field_names + payload_dict[self.message_id]["field_names"] + attrs = PingMessage.header_field_names + self.payload_dict[self.message_id]["field_names"] # Prepare iterable ordered list of values to pack values = [] @@ -173,13 +180,13 @@ def unpack_msg_data(self, msg_data): ## The name of this message try: - self.name = payload_dict[self.message_id]["name"] + self.name = self.payload_dict[self.message_id]["name"] except KeyError: print("Unknown message: ", self.message_id) return False ## The field names of this message - self.payload_field_names = payload_dict[self.message_id]["field_names"] + self.payload_field_names = self.payload_dict[self.message_id]["field_names"] if self.payload_length > 0: ## The struct formatting string for the message payload @@ -225,22 +232,22 @@ def verify_checksum(self): def update_payload_length(self): if self.message_id in variable_msgs or self.message_id in asciiMsgs: # The last field self.payload_field_names[-1] is always the single dynamic-length field - self.payload_length = payload_dict[self.message_id]["payload_length"] + len(getattr(self, self.payload_field_names[-1])) + self.payload_length = self.payload_dict[self.message_id]["payload_length"] + len(getattr(self, self.payload_field_names[-1])) else: - self.payload_length = payload_dict[self.message_id]["payload_length"] + self.payload_length = self.payload_dict[self.message_id]["payload_length"] ## Get the python struct formatting string for the message payload # @return the payload struct format string def get_payload_format(self): # messages with variable length fields if self.message_id in variable_msgs or self.message_id in asciiMsgs: - var_length = self.payload_length - payload_dict[self.message_id]["payload_length"] # Subtract static length portion from payload length + var_length = self.payload_length - self.payload_dict[self.message_id]["payload_length"] # Subtract static length portion from payload length if var_length <= 0: - return payload_dict[self.message_id]["format"] # variable data portion is empty + return self.payload_dict[self.message_id]["format"] # variable data portion is empty - return payload_dict[self.message_id]["format"] + str(var_length) + "s" + return self.payload_dict[self.message_id]["format"] + str(var_length) + "s" else: # messages with a static (constant) length - return payload_dict[self.message_id]["format"] + return self.payload_dict[self.message_id]["format"] ## Dump object into string representation # @return string representation of the object @@ -258,17 +265,17 @@ def __repr__(self): if self.message_id in variable_msgs: # static fields are handled as usual - for attr in payload_dict[self.message_id]["field_names"][:-1]: + for attr in self.payload_dict[self.message_id]["field_names"][:-1]: payload_string += "\n - " + attr + ": " + str(getattr(self, attr)) # the variable length field is always the last field - attr = payload_dict[self.message_id]["field_names"][-1:][0] + attr = self.payload_dict[self.message_id]["field_names"][-1:][0] # format this field as a list of hex values (rather than a string if we did not perform this handling) payload_string += "\n - " + attr + ": " + str([hex(item) for item in getattr(self, attr)]) else: # handling of static length messages and text messages - for attr in payload_dict[self.message_id]["field_names"]: + for attr in self.payload_dict[self.message_id]["field_names"]: payload_string += "\n - " + attr + ": " + str(getattr(self, attr)) representation = ( @@ -293,6 +300,7 @@ class PingParser(object): "errors", "parsed", "rx_msg", + "payload_dict", ) NEW_MESSAGE = 0 # Just got a complete checksum-verified message @@ -309,7 +317,8 @@ class PingParser(object): WAIT_CHECKSUM_H = 11 # Waiting for the checksum high byte ERROR = 12 # Checksum didn't check out - def __init__(self): + def __init__(self, payload_dict=None): + self.payload_dict = payload_dict or definitions.payload_dict_all self.buf = bytearray() self.state = self.WAIT_START self.payload_length = 0 # remaining for the message currently being parsed @@ -377,7 +386,10 @@ def wait_checksum_h(self, msg_byte): self.message_id = 0 self.buf.append(msg_byte) - self.rx_msg = PingMessage(msg_data=self.buf) + self.rx_msg = PingMessage( + msg_data=self.buf, + payload_dict=self.payload_dict + ) if self.rx_msg.verify_checksum(): self.parsed += 1 diff --git a/ci/deploy-whitelist b/ci/deploy-whitelist index 67495669..a36e5215 100644 --- a/ci/deploy-whitelist +++ b/ci/deploy-whitelist @@ -6,6 +6,7 @@ brping/ping360.py brping/surveyor240.py brping/s500.py brping/omniscan450.py +brping/omniscan3d.py brping/pingmessage.py examples tools diff --git a/examples/omniscan3dExample.py b/examples/omniscan3dExample.py new file mode 100644 index 00000000..4a6422ed --- /dev/null +++ b/examples/omniscan3dExample.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python + +#omniscan3dExample.py +from brping import definitions +from brping import Omniscan3D +from brping import PingMessage +import time +import argparse + +from builtins import input + +import signal +import sys +import math +from datetime import datetime +from pathlib import Path + +##Parse Command line options +############################ + +parser = argparse.ArgumentParser(description="Ping python library example.") +parser.add_argument('--device', action="store", required=False, type=str, help="Ping device port. E.g: /dev/ttyUSB0") +parser.add_argument('--baudrate', action="store", type=int, default=115200, help="Ping device baudrate. E.g: 115200") +parser.add_argument('--tcp', action="store", required=False, type=str, help="Omniscan3D IP:Port. E.g: 192.168.2.86:62312") +parser.add_argument('--range', action="store", required=False, type=str, help="Set range. E.g: 5000 or 0:5000") +parser.add_argument('--log', action="store", nargs='?', const=True, type=str, help="Log filename and/or directory path. Will create new log if blank or directory is specified. Will replay if file is specified and exists.") +args = parser.parse_args() +if args.device is None and args.tcp is None and args.log is None: + parser.print_help() + exit(1) + +# Signal handler to stop pinging on the Omniscan3D +def signal_handler(sig, frame): + print("Stopping pinging on Omniscan3D...") + myOmniscan3D.control_os3d_set_ping_params(ping_enable = False) + # Close socket if open + if myOmniscan3D.iodev: + try: + myOmniscan3D.iodev.close() + except Exception as e: + print(f"Failed to close socket: {e}") + sys.exit(0) + +signal.signal(signal.SIGINT, signal_handler) + +# Check for log argument and make new Omniscan3D +# If no .svlog is specified, create one using default directory +# If directory specified, .svlog be created in specified directory +# If a .svlog is specified, existing log will be opened +new_log = False +log_path = "" +replay_path = None +default_dir = Path("logs/omniscan3d").resolve() +if args.log is not None: + if args.log is True: + # Logging to default directory + default_dir.mkdir(parents=True, exist_ok=True) + myOmniscan3D = Omniscan3D(logging=True, log_directory=default_dir) + new_log = True + elif isinstance(args.log, str): + log_path = Path(args.log).expanduser() + + if log_path.suffix == ".svlog" and log_path.parent == Path("."): + log_path = default_dir / log_path.name + + log_path = log_path.resolve() + + if log_path.suffix == ".svlog": + if log_path.exists() and log_path.is_file(): + # File exists, replaying + new_log = False + myOmniscan3D = Omniscan3D(logging=False) + replay_path = log_path + print(f"Replaying from: {replay_path}") + else: + raise FileNotFoundError(f"Log file not found: {log_path}") + + elif log_path.is_dir() or log_path.suffix == "": + # Path is directory, logging to that directory + myOmniscan3D = Omniscan3D(logging=True, log_directory=log_path) + new_log = True + + else: + raise ValueError(f"Invalid log argument: {args.log}") +else: + myOmniscan3D = Omniscan3D() + +if args.log is None or new_log: + if args.device is not None: + myOmniscan3D.connect_serial(args.device, args.baudrate) + elif args.tcp is not None: + (host, port) = args.tcp.split(':') + try: + myOmniscan3D.connect_tcp(host, int(port)) + except Exception as e: + print(f"Could not connect to Omniscan3D at {host}:{port}") + print(f"Reason: {e}") + sys.exit(1) + + if myOmniscan3D.initialize() is False: + print("Failed to initialize Omniscan3D!") + exit(1) + +print("------------------------------------") +print("Starting Omniscan3D..") +print("Press CTRL+C to exit") +print("------------------------------------") + +input("Press Enter to continue...") + +# Running omniscan3d.py from existing log file +if args.log is not None and not new_log: + with open(log_path, 'rb') as f: + while True: + data = Omniscan3D.read_packet(f) + + if data is None: + break # EOF or bad packet + + # print(f"ID: {data.message_id}\tName: {data.name}") + + if data.message_id == definitions.OMNISCAN3D_ATTITUDE_REPORT: + # Print pitch and roll data + vector = (data.up_vec_x, data.up_vec_y, data.up_vec_z) + pitch = math.asin(vector[0]) + roll = math.atan2(vector[1], vector[2]) + print(f"Pitch: {pitch}\tRoll: {roll}") + elif data.message_id == definitions.OMNISCAN3D_OS3D_POINT_SET: + # print out data + print(f"ping_number : {data.ping_number}") + print(f"sos_mps : {data.sos_mps}") + print(f"num_points : {data.num_points}") + print(f"utc_msec : {data.utc_msec}") + print(f"pwr_up_msec : {data.pwr_up_msec}") + print(f"version : {data.version}") + print(f"device_number : {data.device_number}") + print(f"pwr_threshold_high : {data.pwr_threshold_high}") + print(f"pwr_threshold_med : {data.pwr_threshold_med}") + print(f"pwr_threshold_low : {data.pwr_threshold_low}") + elif data.message_id == definitions.OMNISCAN3D_END_PING_INFO: + # print out data + print("got OS3D_END_PING_INFO packet") + +# Connected to physical Omniscan3D +else: + if args.range is not None: + parts = args.range.split(':') + + if len(parts) == 2: + myOmniscan3D.control_os3d_set_ping_params( + start_m=int(parts[0]), + end_m=int(parts[1]), + ping_enable=True, + enable_atof_data=True + ) + elif len(parts) == 1: + myOmniscan3D.control_os3d_set_ping_params( + start_m=0, + end_m=int(parts[0]), + ping_enable=True, + enable_atof_data=True + ) + else: + print("Invalid range input, using default range") + myOmniscan3D.control_os3d_set_ping_params( + ping_enable=True, + enable_atof_data=True + ) + else: + print("start_m=0, end_m-=7, msec_per_ping=100, sos_mps=1500, ping_enable = True, enable_atof_data = True") + myOmniscan3D.control_os3d_set_ping_params( + start_m=0, + end_m=7, + msec_per_ping=200, + diagnostic_injected_signal = 1, + sos_mps=1500, + ping_enable = True, + enable_atof_data = True + ) + + if new_log: + print("Logging...\nCTRL+C to stop logging") + else: + print("CTRL-C to end program...") + try: + while True: + # Set multiple packets to listen for + data = myOmniscan3D.wait_message([definitions.OMNISCAN3D_ATTITUDE_REPORT, + definitions.OMNISCAN3D_OS3D_POINT_SET, + definitions.OMNISCAN3D_END_PING_INFO]) + + if data: + ## To watch pitch and roll data in real time while recording, uncomment this block + if data.message_id == definitions.OMNISCAN3D_ATTITUDE_REPORT: + # Print pitch and roll data + vector = (data.up_vec_x, data.up_vec_y, data.up_vec_z) + pitch = math.asin(vector[0]) + roll = math.atan2(vector[1], vector[2]) + print(f"Pitch: {pitch}\tRoll: {roll}") + elif data.message_id == definitions.OMNISCAN3D_OS3D_POINT_SET: + # print out data + print(f"ping_number : {data.ping_number}") + print(f"sos_mps : {data.sos_mps}") + print(f"num_points : {data.num_points}") + print(f"utc_msec : {data.utc_msec}") + print(f"pwr_up_msec : {data.pwr_up_msec}") + print(f"version : {data.version}") + print(f"device_number : {data.device_number}") + print(f"pwr_threshold_high : {data.pwr_threshold_high}") + print(f"pwr_threshold_med : {data.pwr_threshold_med}") + print(f"pwr_threshold_low : {data.pwr_threshold_low}") + + except KeyboardInterrupt: + if new_log: + print("Stopping logging...") + + + # Stop pinging from Omniscan3D + myOmniscan3D.control_os3d_set_ping_params(ping_enable = False) + if myOmniscan3D.iodev: + try: + myOmniscan3D.iodev.close() + except Exception as e: + print(f"Failed to close socket: {e}") diff --git a/generate/generate-python.py b/generate/generate-python.py index a5eacdc2..133afd5f 100755 --- a/generate/generate-python.py +++ b/generate/generate-python.py @@ -31,7 +31,8 @@ "ping360", "surveyor240", "s500", - "omniscan450"] + "omniscan450", + "omniscan3d"] struct_token = {"u8": "B", "u16": "H", @@ -106,4 +107,10 @@ templateFile = "%s/omniscan450.py.in" % templatePath f = open("%s/omniscan450.py" % args.output_directory, "w") f.write(g.generate(definitionFile, templateFile, {"structToken": struct_token})) +f.close() + +definitionFile = "%s/omniscan3d.json" % definitionPath +templateFile = "%s/omniscan3d.py.in" % templatePath +f = open("%s/omniscan3d.py" % args.output_directory, "w") +f.write(g.generate(definitionFile, templateFile, {"structToken": struct_token})) f.close() \ No newline at end of file diff --git a/generate/templates/device.py.in b/generate/templates/device.py.in index 4126d4cb..5a143d59 100644 --- a/generate/templates/device.py.in +++ b/generate/templates/device.py.in @@ -21,9 +21,16 @@ class PingDevice(object): {% endfor%} _input_buffer = deque() - def __init__(self): + def __init__(self, payload_dict=None): + my_payload_dict = {} + if payload_dict is not None: + my_payload_dict = definitions.payload_dict_common.copy() + my_payload_dict.update(payload_dict) + else: + my_payload_dict = definitions.payload_dict_all + ## A helper class to take care of decoding the input stream - self.parser = pingmessage.PingParser() + self.parser = pingmessage.PingParser(payload_dict=my_payload_dict) ## device id of this Ping1D object, used for dst_device_id in outgoing messages self.my_id = 255 @@ -187,18 +194,18 @@ class PingDevice(object): # @return True if the PingMessage was handled successfully def handle_message(self, msg): # TODO is this message for us? - setattr(self, "_src_device_id", msg.src_device_id) - setattr(self, "_dst_device_id", msg.dst_device_id) + self._src_device_id = msg.src_device_id + self._dst_device_id = msg.dst_device_id - if msg.message_id in pingmessage.payload_dict: - try: - for attr in pingmessage.payload_dict[msg.message_id]["field_names"]: - setattr(self, "_" + attr, getattr(msg, attr)) - except AttributeError as e: - print("attribute error while handling msg %d (%s): %s" % (msg.message_id, msg.name, msg.msg_data)) - return False - else: - print("Unrecognized message: %d", msg) + if not hasattr(msg, "payload_field_names"): + print(f"Unrecognized message: {msg.message_id}") + return False + + try: + for attr in msg.payload_field_names: + setattr(self, "_" + attr, getattr(msg, attr)) + except AttributeError as e: + print("attribute error while handling msg %d (%s): %s" % (msg.message_id, msg.name, msg.msg_data)) return False return True diff --git a/generate/templates/omniscan3d.py.in b/generate/templates/omniscan3d.py.in new file mode 100644 index 00000000..f9590e9f --- /dev/null +++ b/generate/templates/omniscan3d.py.in @@ -0,0 +1,419 @@ +#!/usr/bin/env python3 + +# omniscan3d.py +# A device API for the Cerulean Sonar Omniscan3D scanning sonar + +# This is a source template used to generate omniscan3d.py. +# Edit this file, not the generated omniscan3d.py. + +from brping import definitions +from brping import PingDevice +from brping import pingmessage +import math +import time +import struct +import socket +from datetime import datetime, timezone +from pathlib import Path + +# Imports for svlog header +import json +import os +import sys +import platform + +MAX_LOG_SIZE_MB = 500 + +class Omniscan3D(PingDevice): + def __init__(self, logging = False, log_directory = None): + super().__init__(payload_dict=definitions.payload_dict_omniscan3d) + + self.logging = logging + self.log_directory = log_directory + self.bytes_written = None + self.current_log = None + + def initialize(self): + if (self.readDeviceInformation() is None): + return False + if self.logging: + self.new_log(self.log_directory) + return True + +{% for msg in messages["get"]|sort %} + ## + # @brief Get a {{msg|replace("get_", "")}} message from the device\n + # Message description:\n + # {{messages["get"][msg].description}} + # + # @return None if there is no reply from the device, otherwise a dictionary with the following keys:\n +{% for field in messages["get"][msg].payload %} + # {{field.name}}: {% if field.units %}Units: {{field.units}}; {% endif %}{{field.description}}\n +{% endfor%} + def get_{{msg}}(self): + if self.request(definitions.OMNISCAN3D_{{msg|upper}}) is None: + return None + data = ({ +{% for field in messages["get"][msg].payload %} + "{{field.name}}": self._{{field.name}}, # {% if field.units %}Units: {{field.units}}; {% endif %}{{field.description}} +{% endfor %} + }) + return data + +{% endfor %} +{% for msg in messages["set"]|sort %} + ## + # @brief Send a {{msg}} message to the device\n + # Message description:\n + # {{messages["set"][msg].description}}\n + # Send the message to write the device parameters, then read the values back from the device\n + # +{% for field in messages["set"][msg].payload %} + # @param {{field.name}} - {% if field.units %}Units: {{field.units}}; {% endif %}{{field.description}} +{% endfor %} + # + # @return If verify is False, True on successful communication with the device. If verify is True, True if the new device parameters are verified to have been written correctly. False otherwise (failure to read values back or on verification failure) + def {{msg}}(self{% for field in messages["set"][msg].payload %}, {{field.name}}{% endfor %}, verify=True): + m = pingmessage.PingMessage(definitions.OMNISCAN3D_{{msg|upper}},payload_dict=definitions.payload_dict_omniscan3d) +{% for field in messages["set"][msg].payload %} + m.{{field.name}} = {{field.name}} +{% endfor %} + m.pack_msg_data() + self.write(m.msg_data) + if self.request(definitions.OMNISCAN3D_{{msg|replace("set_", "")|upper}}) is None: + return False + # Read back the data and check that changes have been applied + if (verify +{% if messages["set"][msg].payload %} + and ({% for field in messages["set"][msg].payload %}self._{{field.name}} != {{field.name}}{{ " or " if not loop.last }}{% endfor %})): +{% endif %} + return False + return True # success{% for field in messages["set"][msg].payload %} + m.{{field.name}} = {{field.name}} +{% endfor %} + m.pack_msg_data() + self.write(m.msg_data) + +{% endfor %} + +{% for msg in messages["control"]|sort %} + def control_{{msg}}(self{% for field in messages["control"][msg].payload %}, {{field.name}}{% if field.default is defined %}={{field.default}}{% endif %}{% endfor %}): + m = pingmessage.PingMessage(definitions.OMNISCAN3D_{{msg|upper}},payload_dict=definitions.payload_dict_omniscan3d) +{% for field in messages["control"][msg].payload %} + m.{{field.name}} = {{field.name}} +{% endfor %} + m.pack_msg_data() + self.write(m.msg_data) + +{% endfor %} + + def readDeviceInformation(self): + return self.request(definitions.COMMON_DEVICE_INFORMATION) + + # Calculate the milliseconds per ping from a ping rate + @staticmethod + def calc_msec_per_ping(ping_rate): + return math.floor(1000.0 / ping_rate) + + def get_utc_time(self): + clock_offset = 0 + round_trip_delay = 5000 + + local_now = datetime.now(timezone.utc) + corrected_time = local_now.timestamp() * 1000 + clock_offset / 2 + accuracy = round_trip_delay / 2 + + utc_msec_u64 = int(corrected_time) & 0xFFFFFFFFFFFFFFFF + accuracy_msec = int(accuracy) & 0xFFFFFFFF + + return utc_msec_u64, accuracy_msec + + # Reads a single packet from a file + @staticmethod + def read_packet(file): + sync = file.read(2) + if sync != b'BR': + return None + + payload_len_bytes = file.read(2) + if len(payload_len_bytes) < 2: + return None + payload_len = int.from_bytes(payload_len_bytes, 'little') + + msg_id = file.read(2) + if len(msg_id) < 2: + return None + + remaining = 2 + payload_len + 2 + rest = file.read(remaining) + if len(rest) < remaining: + return None + + msg_bytes = sync + payload_len_bytes + msg_id + rest + return pingmessage.PingMessage(msg_data=msg_bytes,payload_dict=definitions.payload_dict_omniscan3d) + + # Builds the packet containing metadata for the beginning of .svlog + def build_metadata_packet(self): + protocol = "tcp" # default fallback + if self.iodev: + if self.iodev.type == socket.SOCK_STREAM: + protocol = "tcp" + elif self.iodev.type == socket.SOCK_DGRAM: + protocol = "udp" + + if self.server_address: + url = f"{protocol}://{self.server_address[0]}:{self.server_address[1]}" + else: + url = f"{protocol}://unknown" + + content = { + "session_id": 1, + "session_uptime": 0.0, + "session_devices": [ + { + "url": url, + "product_id": "os3d45016" + } + ], + "session_platform": None, + "session_clients": [], + "session_plan_name": None, + + "is_recording": True, + "sonarlink_version": "", + "os_hostname": platform.node(), + "os_uptime": None, + "os_version": platform.version(), + "os_platform": platform.system().lower(), + "os_release": platform.release(), + + "process_path": sys.executable, + "process_version": f"v{platform.python_version()}", + "process_uptime": time.process_time(), + "process_arch": platform.machine(), + + "timestamp": datetime.now(timezone.utc).isoformat(), + "timestamp_timezone_offset": datetime.now().astimezone().utcoffset().total_seconds() // 60 + } + + json_bytes = json.dumps(content, indent=2).encode("utf-8") + + m = pingmessage.PingMessage(definitions.OMNISCAN3D_JSON_WRAPPER,payload_dict=definitions.payload_dict_omniscan3d) + m.payload = json_bytes + m.payload_length = len(json_bytes) + + msg_data = bytearray() + msg_data += b"BR" + msg_data += m.payload_length.to_bytes(2, "little") + msg_data += m.message_id.to_bytes(2, "little") + msg_data += m.dst_device_id.to_bytes(1, "little") + msg_data += m.src_device_id.to_bytes(1, "little") + msg_data += m.payload + + checksum = sum(msg_data) & 0xFFFF + msg_data += bytearray(struct.pack(pingmessage.PingMessage.endianess + pingmessage.PingMessage.checksum_format, checksum)) + + m.msg_data = msg_data + m.checksum = checksum + + return m + + # Enable logging + def start_logging(self, new_log = False, log_directory = None): + if self.logging: + return + + self.logging = True + + if self.current_log is None or new_log: + self.new_log(log_directory) + + def stop_logging(self): + self.logging = False + + # Creates a new log file + def new_log(self, log_directory=None): + dt = datetime.now() + save_name = dt.strftime("%Y-%m-%d-%H-%M") + + if log_directory is None: + project_root = Path.cwd().parent + self.log_directory = project_root / "logs/omniscan3d" + else: + self.log_directory = Path(log_directory) + + self.log_directory.mkdir(parents=True, exist_ok=True) + + log_path = self.log_directory / f"{save_name}.svlog" + + if log_path.exists(): + log_path.unlink() # delete existing file (program was restarted quickly) + + self.current_log = log_path + self.logging = True + self.bytes_written = 0 + + print(f"Logging to {self.current_log}") + + self.write_data(self.build_metadata_packet()) + + # Write data to .svlog file + def write_data(self, msg): + if not self.logging or not self.current_log: + return + + try: + if self.bytes_written > MAX_LOG_SIZE_MB * 1000000: + self.new_log(log_directory=self.log_directory) + + with open(self.current_log, 'ab') as f: + f.write(msg.msg_data) + self.bytes_written += len(msg.msg_data) + + except (OSError, IOError) as e: + print(f"[LOGGING ERROR] Failed to write to log file {self.current_log}: {e}") + self.stop_logging() + + except Exception as e: + print(f"[LOGGING ERROR] Unexpected error: {e}") + self.stop_logging() + + # Override wait_message to also handle point set requests from Omniscan3D and for creating atof_t data + def wait_message(self, message_ids, timeout=0.5): + tstart = time.time() + while time.time() < tstart + timeout: + msg = self.read() + if msg is not None: + if msg.message_id == definitions.OMNISCAN3D_OS3D_POINT_SET: + atof_byte_array = bytearray(msg.atof_point_data) + formatted_atof_array = struct.unpack('<' + 'I' * (4*int(msg.num_points)), atof_byte_array) + msg.atof_point_data = formatted_atof_array + + if msg.message_id in message_ids: + if self.logging: + self.write_data(msg) + return msg + time.sleep(0.005) + return None + + ## + # @brief Do the connection via an TCP link + # + # @param host: TCP server address (IPV4) or name + # @param port: port used to connect with server + # + def connect_tcp(self, host: str = None, port: int = 12345, timeout: float = 5.0): + if host is None: + host = '0.0.0.0' + + self.server_address = (host, port) + try: + print("Opening %s:%d" % self.server_address) + self.iodev = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.iodev.settimeout(timeout) + self.iodev.connect(self.server_address) + self.iodev.setblocking(0) + + except socket.timeout: + print("Unable to connect to device") + raise Exception("Connection timed out after {0} seconds".format(timeout)) + except Exception as exception: + raise Exception("Failed to open the given TCP port: {0}".format(exception)) + + ## + # @brief Read available data from the io device + def read_io(self): + if self.iodev == None: + raise Exception("IO device is null, please configure a connection before using the class.") + elif type(self.iodev).__name__ == 'Serial': + bytes = self.iodev.read(self.iodev.in_waiting) + self._input_buffer.extendleft(bytes) + else: # Socket + buffer_size = 4096 + while True: + try: # Check if we are reading before closing a connection + bytes = self.iodev.recv(buffer_size) + + if not bytes: + # if recv() returns empty, connection is closed (TCP) + if self.iodev.type == socket.SOCK_STREAM: + raise ConnectionError("TCP connection closed by peer.") + + self._input_buffer.extendleft(bytes) + + if len(bytes) < buffer_size: + break + + except BlockingIOError as exception: + pass # Ignore exceptions related to read before connection, a result of UDP nature + + except ConnectionResetError as e: + raise ConnectionError("Socket connection was reset: %s" % str(e)) + +# Class to represent the OmniScan3D atof_t struct +class atof_t: + def __init__(self, angle=0.0, tof=0.0, pwr=0.0, pt_type=0, reserved=(0, 0, 0)): + self.angle = angle + self.tof = tof + self.pwr = pwr + self.pt_type = pt_type + self.reserved = reserved + + def __repr__(self): + return ( + f"angle: {self.angle}, " + f"tof: {self.tof}, " + f"pwr: {self.pwr}, " + f"pt_type: {self.pt_type}" + ) + +# Creates atof_point_t[] from the dynamic byte payload +@staticmethod +def create_atof_list(msg): + raw_bytes = msg.atof_point_data + + point_size = struct.calcsize('