-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathdigital_metadata.py
More file actions
1235 lines (1048 loc) · 47.7 KB
/
digital_metadata.py
File metadata and controls
1235 lines (1048 loc) · 47.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# ----------------------------------------------------------------------------
# Copyright (c) 2017 Massachusetts Institute of Technology (MIT)
# All rights reserved.
#
# Distributed under the terms of the BSD 3-clause license.
#
# The full license is in the LICENSE file, distributed with this software.
# ----------------------------------------------------------------------------
"""Read and write data/metadata in Digital Metadata HDF5 format.
It uses h5py to read and write to HDF5 files.
Reading/writing functionality is available from two classes:
DigitalMetadataReader and DigitalMetadataWriter.
"""
from __future__ import absolute_import, division, print_function
import collections
import copy
import datetime
import fractions
import glob
import itertools
import os
import re
import time
import traceback
import warnings
from collections import defaultdict
# third party imports
import h5py
import numpy as np
import packaging.version
import six
from six.moves import urllib, zip
# local imports
from . import list_drf, util
from ._version import __version__, __version_tuple__
try:
import pandas
except ImportError:
pass
__all__ = ("DigitalMetadataReader", "DigitalMetadataWriter")
# disable file locking in HDF5 >= 1.10 (not present in earlier versions)
# through only way possible: setting an environment variable
# this allows reading and writing metadata using the same file, which should be
# safe since we don't allow multiple or partial writes to the same sample index
# and is something we've allowed in practice with HDF5 1.8 and earlier
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
def _recursive_items(d, prefix="", visited=None):
"""Generate (key, value) pairs for a dict, recursing into sub-dicts.
Sub-dictionary (key, value) pairs will have '[parent_key]/' prepended
to their key name.
Parameters
----------
d : dict
The dictionary to iterate over.
prefix : string
The starting prefix to be added to keys to produce the returned name.
visited : set | None
Set of already visited dictionary ids, to flag infinite recursion.
If None, the empty set is used.
Yields
------
(key, val) : tuple
Key name (with prefix and parent dictionary key added) and value pairs
for an entry in the dictionary or a sub-dictionary.
"""
if visited is None:
visited = set()
visited.add(id(d))
for k, v in six.iteritems(d):
name = prefix + k
if isinstance(v, dict):
if id(v) not in visited:
for subk, subv in _recursive_items(v, name + "/", visited):
yield subk, subv
else:
errstr = "Infinite loop in data - dict <%s> passed in twice."
raise ValueError(errstr % str(v)[0:500])
else:
yield name, v
class DigitalMetadataWriter(object):
"""Write data in Digital Metadata HDF5 format."""
_min_version = packaging.version.parse("2.5")
_max_major_version = packaging.version.parse(__version__).major
# increment to package version when format changes are made
_writer_version = packaging.version.parse("2.5")
def __init__(
self,
metadata_dir,
subdir_cadence_secs,
file_cadence_secs,
sample_rate_numerator,
sample_rate_denominator,
file_name,
):
"""Initialize writer to channel directory with given parameters.
Parameters
----------
metadata_dir : string
The directory where this channel is to be written. It must already
exist and be writable.
subdir_cadence_secs : int
The number of seconds of metadata to store in each subdirectory.
The timestamp of any subdirectory will be an integer multiple of
this value.
file_cadence_secs : int
The number of seconds of metadata to store in each file. Note that
an integer number of files must exactly span a subdirectory,
implying::
(subdir_cadence_secs % file_cadence_secs) == 0
This class enforces the rule that file name timestamps will be in
the list::
range(subdirectory_timestamp,
subdirectory_timestamp + subdir_cadence_secs,
file_cadence_secs)
sample_rate_numerator : int
Numerator of sample rate in Hz.
sample_rate_denominator : int
Denominator of sample rate in Hz.
file_name : string
Prefix for metadata file names. Files in each subdirectory will be
named: "`file_name`@<timestamp>.h5".
"""
# verify all input arguments
if not os.access(metadata_dir, os.W_OK):
errstr = "metadata_dir %s does not exist or is not writable"
raise IOError(errstr % metadata_dir)
self._metadata_dir = metadata_dir
if subdir_cadence_secs != int(subdir_cadence_secs) or subdir_cadence_secs < 1:
errstr = "subdir_cadence_secs must be positive integer, not %s"
raise ValueError(errstr % str(subdir_cadence_secs))
self._subdir_cadence_secs = int(subdir_cadence_secs)
if file_cadence_secs != int(file_cadence_secs) or file_cadence_secs < 1:
errstr = "file_cadence_secs must be positive integer, not %s"
raise ValueError(errstr % str(file_cadence_secs))
self._file_cadence_secs = int(file_cadence_secs)
if (self._subdir_cadence_secs % self._file_cadence_secs) != 0:
raise ValueError("(subdir_cadence_secs % file_cadence_secs) != 0")
if not isinstance(file_name, six.string_types):
errstr = "file_name must be a string, not type %s"
raise ValueError(errstr % str(type(file_name)))
self._file_name = file_name
if (
sample_rate_numerator != int(sample_rate_numerator)
or sample_rate_numerator < 1
):
errstr = "sample_rate_numerator must be positive integer, not %s"
raise ValueError(errstr % str(sample_rate_numerator))
self._sample_rate_numerator = int(sample_rate_numerator)
if (
sample_rate_denominator != int(sample_rate_denominator)
or sample_rate_denominator < 1
):
errstr = "sample_rate_denominator must be positive integer, not %s"
raise ValueError(errstr % str(sample_rate_denominator))
self._sample_rate_denominator = int(sample_rate_denominator)
self._sample_rate = util.get_samplerate_frac(
sample_rate_numerator, sample_rate_denominator
)
# have to go to uint64 before longdouble to ensure correct conversion
# from int
self._samples_per_second = np.longdouble(
np.uint64(self._sample_rate_numerator)
) / np.longdouble(np.uint64(self._sample_rate_denominator))
if os.access(
os.path.join(self._metadata_dir, "dmd_properties.h5"), os.R_OK
) or os.access(os.path.join(self._metadata_dir, "metadata.h5"), os.R_OK):
self._parse_properties()
else:
self._digital_metadata_version = self._writer_version.base_version
self._fields = None # No data written yet
self._write_properties()
def get_sample_rate(self):
"""Return the sample rate in Hz as a fractions.Fraction."""
return self._sample_rate
def get_samples_per_second(self):
"""Return the sample rate in Hz as a np.longdouble."""
return self._samples_per_second
def write(self, samples, data):
"""Write new metadata to the Digital Metadata channel.
Parameters
----------
samples : list | 1-D array | int | float
A single sample index or an list of sample indices, given in
the number of samples since the epoch (t_since_epoch*sample_rate),
for the data to be written.
data : list of dicts | dict
If a list of dicts, each dictionary provides the metadata to be
written for each corresponding sample (`data` must have the same
length as `samples`). The dictionary keys give the field names,
while the values must be HDF5-compatible (numpy) objects or sub-
dictionaries meeting the same requirement.
If a dict, the keys give the field names and each value must be
one of the following:
- a 1-D numpy array or list/tuple of numpy objects with length
equal to ``len(samples)`` giving the metadata corresponding
to each sample index in `samples`
- a single value or numpy array of shape different than
``(len(samples),)``, giving the metadata for *all* of the
samples indices in `samples`
- another dictionary with keys that are valid Group name
strings and leaf values that are one of the above
The fields should always be the same each time the write method is
called to ensure that the fields are consistently present when
reading.
"""
try:
samples = np.atleast_1d(np.asarray(samples, dtype=np.uint64))
except (TypeError, ValueError):
raise ValueError("Values in `samples` must be convertible to uint64")
N = len(samples)
if N == 0:
raise ValueError("`samples` must not be empty")
if isinstance(data, dict):
if self._fields is None:
self._set_fields(list(data.keys()))
keyval_iterators = []
for key, val in _recursive_items(data):
if not isinstance(val, six.string_types):
try:
if len(val) == N:
it = zip(itertools.repeat(key, N), val)
keyval_iterators.append(it)
continue
except TypeError:
pass
# val is a string, doesn't have a length, or len(val) != N
it = itertools.repeat((key, val), N)
keyval_iterators.append(it)
keyvals = zip(*keyval_iterators)
elif len(data) == N:
if self._fields is None:
self._set_fields(list(data[0].keys()))
keyvals = (_recursive_items(d) for d in data)
else:
errstr = (
"`data` must be a dict or list of dicts with length equal to"
" the length of `samples`."
)
raise ValueError(errstr)
return self._write(samples, keyvals)
def _write(self, samples, keyvals):
"""Write new metadata to the Digital Metadata channel.
This function does no input checking, see `write` for that.
Parameters
----------
samples : 1-D numpy array of type uint64 sorted in ascending order
An array of sample indices, given in the number of samples since
the epoch (time_since_epoch*sample_rate).
keyvals : iterable of iterables same length as `samples`
Each element of this iterable corresponds to a sample in `samples`
and should be another iterable that produces (key, value) pairs to
write for that sample.
"""
grp_iter = self._sample_group_generator(samples)
for grp, keyval in zip(grp_iter, keyvals):
for key, val in keyval:
if val is not None:
grp.create_dataset(key, data=val)
else:
# treat None as the empty string so there will always
# be a dataset written when it is passed to write
grp.create_dataset(key, data="")
def _sample_group_generator(self, samples):
"""Yield HDF5 group for each sample in `samples`.
Parameters
----------
samples : 1-D numpy array of type uint64 sorted in ascending order
An array of sample indices, given in the number of samples since
the epoch (time_since_epoch*sample_rate).
Yields
------
grp : h5py.Group
HDF5 group for the sample. The group is located in the appropriate
Digital Metadata file and takes its name from the sample index.
"""
samples_per_file = self._file_cadence_secs * self._sample_rate
for file_idx, sample_group in itertools.groupby(
samples, lambda s: np.uint64(s / samples_per_file)
):
file_ts = file_idx * self._file_cadence_secs
file_basename = "%s@%i.h5" % (self._file_name, file_ts)
start_sub_ts = (
file_ts // self._subdir_cadence_secs
) * self._subdir_cadence_secs
sub_dt = datetime.datetime.fromtimestamp(
start_sub_ts, tz=datetime.timezone.utc
)
subdir = os.path.join(
self._metadata_dir, sub_dt.strftime("%Y-%m-%dT%H-%M-%S")
)
if not os.path.exists(subdir):
os.makedirs(subdir)
this_file = os.path.join(subdir, file_basename)
# use HDF5 "core" driver to hold file completely in memory until closed
# at which point it is flushed to disk (combine little writes)
with h5py.File(this_file, "a", driver="core", backing_store=True) as f:
for sample in sample_group:
try:
grp = f.create_group(str(sample))
except ValueError:
errstr = "Sample %i already in data: no overwriting allowed"
raise IOError(errstr % sample)
yield grp
def _set_fields(self, field_names):
"""Set the field names used in this metadata channel.
This method sets both the `_fields` attribute and writes the field
names to the channels top-level 'dmd_properties.h5' file.
Parameters
----------
field_names : list
List of field names used in this metadata channel.
"""
# build recarray and self._fields
recarray = np.recarray((len(field_names),), dtype=[("column", "|S128")])
self._fields = field_names
self._fields.sort() # for reproducability, use alphabetic order
for i, key in enumerate(self._fields):
recarray[i] = (key,)
# write recarray to metadata
properties_file_path = os.path.join(self._metadata_dir, "dmd_properties.h5")
with h5py.File(properties_file_path, "a") as f:
f.create_dataset("fields", data=recarray)
def _parse_properties(self):
"""Check writer properties against existing ones for the channel.
When a metadata channel already exists on disk, call this method when
creating a new DigitalMetadataWriter to check its parameters against
the existing ones. The `fields` attribute and metadata version of the
current writer will be set according to parameters found on disk.
Raises
------
ValueError
If the DigitalMetadataWriter parameters do not match those on disk.
IOError
If the Digital Metadata version of the existing metadata on disk
is not compatible with this software version.
"""
# reader will raise IOError if existing properties can't be read
# because of version incompatibilities
org_obj = DigitalMetadataReader(self._metadata_dir, accept_empty=True)
# but we also have to check if the metadata is the current major
# version so we know we can continue writing to it
self._digital_metadata_version = org_obj._digital_metadata_version
self._check_compatible_version()
attr_list = (
"_subdir_cadence_secs",
"_file_cadence_secs",
"_sample_rate_numerator",
"_sample_rate_denominator",
"_file_name",
)
for attr in attr_list:
if getattr(self, attr) != getattr(org_obj, attr):
errstr = "Mismatched %s: %s versus %s"
raise ValueError(
errstr % (attr, getattr(self, attr), getattr(org_obj, attr))
)
self._fields = org_obj._fields
def _check_compatible_version(self):
version = packaging.version.parse(self._digital_metadata_version)
if (
(version >= self._min_version)
and (version.major <= self._max_major_version)
) or version.is_devrelease:
pass
else:
errstr = (
"This existing Digital Metadata files are version %s, which is"
" not in the range required (%s to %s)."
)
raise IOError(
errstr
% (
version.base_version,
self._min_version.base_version,
self._max_major_version,
)
)
def _write_properties(self):
"""Write Digital Metadata properties to dmd_properties.h5 file."""
properties_file_path = os.path.join(self._metadata_dir, "dmd_properties.h5")
with h5py.File(properties_file_path, "w") as f:
f.attrs["subdir_cadence_secs"] = self._subdir_cadence_secs
f.attrs["file_cadence_secs"] = self._file_cadence_secs
f.attrs["sample_rate_numerator"] = self._sample_rate_numerator
f.attrs["sample_rate_denominator"] = self._sample_rate_denominator
# use np.string_ to store as fixed-length ascii strings
f.attrs["file_name"] = np.bytes_(self._file_name)
f.attrs["digital_metadata_version"] = np.bytes_(
self._digital_metadata_version
)
def __str__(self):
"""String summary of the DigitalMetadataWriter's parameters."""
ret_str = ""
attr_list = (
"_subdir_cadence_secs",
"_file_cadence_secs",
"_sample_rate",
"_file_name",
)
for attr in attr_list:
ret_str += "%s: %s\n" % (attr, str(getattr(self, attr)))
if self._fields is None:
ret_str += "_fields: None\n"
else:
ret_str += "_fields:\n"
for key in self._fields:
ret_str += "\t%s\n" % (key)
return ret_str
class DigitalMetadataReader(object):
"""Read data in Digital Metadata HDF5 format."""
_min_version = packaging.version.parse("2.0")
_max_version = packaging.version.parse(
packaging.version.parse(__version__).base_version
)
def __init__(self, metadata_dir, accept_empty=True):
"""Initialize reader to metadata channel directory.
Channel parameters are read from the attributes of the top-level file
'dmd_properties.h5' in the `metadata_dir`.
Parameters
----------
metadata_dir : string
Path to metadata channel directory, which contains a
'dmd_properties.h5' file and timestamped subdirectories containing
data.
accept_empty : bool, optional
If True, do not raise an IOError if the 'dmd_properties.h5' file is
empty. If False, raise an IOError in that case and delete the
empty 'dmd_properties.h5' file.
Raises
------
IOError
If 'dmd_properties.h5' file is not found in `metadata_dir` or if
`accept_empty` is False and the 'dmd_properties.h5' file is empty.
"""
self._metadata_dir = metadata_dir
if self._metadata_dir.find("http://") != -1:
self._local = False
# put properties file in /tmp/dmd_properties_%i.h5 % (pid)
url = os.path.join(self._metadata_dir, "dmd_properties.h5")
try:
f = urllib.request.urlopen(url)
except (urllib.error.URLError, urllib.error.HTTPError):
url = os.path.join(self._metadata_dir, "metadata.h5")
f = urllib.request.urlopen(url)
tmp_file = os.path.join("/tmp", "dmd_properties_%i.h5" % (os.getpid()))
fo = open(tmp_file, "w")
fo.write(f.read())
f.close()
fo.close()
else:
self._local = True
# list and match first properties file
tmp_file = next(
(
f
for f in sorted(
glob.glob(os.path.join(metadata_dir, list_drf.GLOB_DMDPROPFILE))
)
if re.match(list_drf.RE_DMDPROP, f)
),
None,
)
if tmp_file is None:
raise IOError("dmd_properties.h5 not found")
with h5py.File(tmp_file, "r") as f:
try:
subdir_cadence = f.attrs["subdir_cadence_secs"].item()
file_cadence = f.attrs["file_cadence_secs"].item()
except KeyError:
# maybe an older version with subdirectory_cadence_seconds
# and file_cadence_seconds
subdir_cadence = f.attrs["subdirectory_cadence_seconds"].item()
file_cadence = f.attrs["file_cadence_seconds"].item()
self._subdir_cadence_secs = subdir_cadence
self._file_cadence_secs = file_cadence
try:
try:
spsn = f.attrs["sample_rate_numerator"].item()
spsd = f.attrs["sample_rate_denominator"].item()
except KeyError:
# maybe an older version with samples_per_second_*
spsn = f.attrs["samples_per_second_numerator"].item()
spsd = f.attrs["samples_per_second_denominator"].item()
except KeyError:
# must have an older version with samples_per_second attribute
sps = f.attrs["samples_per_second"].item()
spsfrac = fractions.Fraction(sps).limit_denominator()
self._samples_per_second = np.longdouble(sps)
self._sample_rate_numerator = int(spsfrac.numerator)
self._sample_rate_denominator = int(spsfrac.denominator)
else:
self._sample_rate_numerator = spsn
self._sample_rate_denominator = spsd
# have to go to uint64 before longdouble to ensure correct
# conversion from int
self._samples_per_second = np.longdouble(
np.uint64(self._sample_rate_numerator)
) / np.longdouble(np.uint64(self._sample_rate_denominator))
self._sample_rate = util.get_samplerate_frac(
self._sample_rate_numerator, self._sample_rate_denominator
)
fname = f.attrs["file_name"]
if isinstance(fname, bytes):
# for convenience and forward-compatibility with h5py>=2.9
fname = fname.decode("ascii")
self._file_name = fname
try:
version = f.attrs["digital_metadata_version"]
except KeyError:
# version is before 2.3 when attribute was added
version = "2.0"
else:
if isinstance(version, bytes):
# for convenience and forward-compatibility with h5py>=2.9
version = version.decode("ascii")
self._digital_metadata_version = version
self._check_compatible_version()
try:
fields_dataset = f["fields"]
except KeyError:
if not accept_empty:
os.remove(tmp_file)
errstr = (
"No metadata yet written to %s, removing empty"
' "dmd_properties.h5"'
)
raise IOError(errstr % self._metadata_dir)
else:
self._fields = None
return
self._fields = []
for i in range(len(fields_dataset)):
field = fields_dataset[i]["column"]
if isinstance(field, bytes):
# for convenience and forward-compatibility with h5py>=2.9
field = field.decode("ascii")
self._fields.append(field)
if not self._local:
os.remove(tmp_file)
def get_bounds(self):
"""Get indices of first- and last-known sample as a tuple.
Returns
-------
first_sample_index : int
Index of the first sample, given in the number of samples since the
epoch (time_since_epoch*sample_rate).
last_sample_index : int
Index of the last sample, given in the number of samples since the
epoch (time_since_epoch*sample_rate).
Raises
------
IOError
If no data or first and last sample could not be determined.
"""
# loop through files in order to get first sample
first_sample = None
for path in list_drf.ilsdrf(
self._metadata_dir,
recursive=False,
reverse=False,
include_dmd=True,
include_drf=False,
include_dmd_properties=False,
):
try:
with h5py.File(path, "r") as f:
groups = list(f.keys())
groups.sort()
first_sample = int(groups[0])
except IOError:
# can't open file (e.g. doesn't exist anymore)
continue
except IndexError:
errstr = (
"Corrupt or empty file %s found and ignored."
" Deleting it will speed up get_bounds()."
)
print(errstr % path)
continue
else:
break
if first_sample is None:
raise IOError("All attempts to read first sample failed")
# loop through files in reverse order to get last sample
last_sample = None
for path in list_drf.ilsdrf(
self._metadata_dir,
recursive=False,
reverse=True,
include_dmd=True,
include_drf=False,
include_dmd_properties=False,
):
try:
with h5py.File(path, "r") as f:
groups = list(f.keys())
groups.sort()
last_sample = int(groups[-1])
except IOError:
# can't open file (e.g. doesn't exist anymore)
continue
except IndexError:
errstr = (
"Corrupt or empty file %s found and ignored."
" Deleting it will speed up get_bounds()."
)
print(errstr % path)
continue
else:
break
if last_sample is None:
raise IOError("All attempts to read last sample failed")
return (first_sample, last_sample)
def get_fields(self):
"""Return list of the field names in this metadata."""
# _fields is an internal data structure, so make a copy for the user
return copy.deepcopy(self._fields)
def get_sample_rate(self):
"""Return the sample rate in Hz as a fractions.Fraction."""
return self._sample_rate
def get_sample_rate_numerator(self):
"""Return the numerator of the sample rate in Hz."""
return self._sample_rate_numerator
def get_sample_rate_denominator(self):
"""Return the denominator of the sample rate in Hz."""
return self._sample_rate_denominator
def get_samples_per_second(self):
"""Return the sample rate in Hz as a np.longdouble."""
return self._samples_per_second
def get_subdir_cadence_secs(self):
"""Return the number of seconds of data stored in each subdirectory."""
return self._subdir_cadence_secs
def get_file_cadence_secs(self):
"""Return the number of seconds of data stored in each file."""
return self._file_cadence_secs
def get_file_name_prefix(self):
"""Return the metadata file name prefix."""
return self._file_name
def read(self, start_sample=None, end_sample=None, columns=None, method=None):
"""Read metadata between start and end samples.
Parameters
----------
start_sample : None | int
Sample index for start of read, given in the number of samples
since the epoch (time_since_epoch*sample_rate). If None,
`get_bounds` is called and the last sample is used.
end_sample : None | int
Sample index for end of read (inclusive), given in the number of
samples since the epoch (time_since_epoch*sample_rate). If None,
use `end_sample` equal to `start_sample`.
columns : None | string | list of strings
A string or list of strings giving the field/column name of
metadata to return. If None, all available columns will be read.
Using a string results in a different returned object than a one-
element list containing the string, see below.
method : None | 'pad'/'ffill'
If None, return only samples within the given range. If 'pad' or
'ffill', the first sample no later than `start_sample` (if any)
will also be included so that values are forward filled into the
desired range.
Returns
-------
OrderedDict
The dictionary's keys are the sample index for each sample of
metadata found between `start_sample` and `end_sample` (inclusive).
Each value is a metadata sample, given as either the column value
(if `columns` is a string) or a dictionary with column names as
keys and numpy objects as leaf values (if `columns` is None or
a list).
See Also
--------
read_dataframe : Read metadata into a DataFrame.
read_flatdict : Read metadata into a flat dictionary, keyed by field.
"""
if start_sample is None:
_, start_sample = self.get_bounds()
if end_sample is None:
end_sample = start_sample
elif start_sample > end_sample:
errstr = "Start sample %i more than end sample %i"
raise ValueError(errstr % (start_sample, end_sample))
ret_dict = collections.OrderedDict()
if method in ("pad", "ffill"):
# simple forward fill until something better is needed:
# get start bound of data and search for metadata within
# [start_bound, start_sample] until last sample is found
ffill_dict = collections.OrderedDict()
start_bound, end_bound = self.get_bounds()
file_list = self._get_file_list(start_bound, start_sample)
# go through files in reverse to break at last found sample
for this_file in reversed(file_list):
self._add_metadata(
ffill_dict,
this_file,
columns,
start_bound,
start_sample,
is_edge=False,
)
if ffill_dict:
# get last entry of ffill_dict which will be latest found
# sample in the file
key = next(reversed(ffill_dict))
ret_dict[key] = ffill_dict[key]
break
# increment start sample so we don't re-add any data at that sample
start_sample += 1
file_list = self._get_file_list(start_sample, end_sample)
for this_file in file_list:
if this_file in (file_list[0], file_list[-1]):
is_edge = True
else:
is_edge = False
self._add_metadata(
ret_dict, this_file, columns, start_sample, end_sample, is_edge
)
return ret_dict
def read_dataframe(
self, start_sample=None, end_sample=None, columns=None, method=None
):
"""Read metadata between start and end samples into a pandas DataFrame.
Parameters
----------
start_sample : int
Sample index for start of read, given in the number of samples
since the epoch (time_since_epoch*sample_rate). If None,
`get_bounds` is called and the last sample is used.
end_sample : None | int
Sample index for end of read (inclusive), given in the number of
samples since the epoch (time_since_epoch*sample_rate). If None,
use `end_sample` equal to `start_sample`.
columns : None | string | list of strings
A string or list of strings giving the field/column name of
metadata to return. If None, all available columns will be read.
method : None | 'pad'/'ffill'
If None, return only samples within the given range. If 'pad' or
'ffill', the first sample no later than `start_sample` (if any)
will also be included so that values are forward filled into the
desired range.
Returns
-------
DataFrame
Pandas DataFrame with rows corresponding to the sample index and
columns corresponding to the metadata key.
See Also
--------
read : Read metadata into an OrderedDict, keyed by sample index.
read_flatdict : Read metadata into a flat dictionary, keyed by field.
"""
if isinstance(columns, six.string_types):
# preserve column name in returned dictionary so it appears in DF
columns = [columns]
res = self.read(
start_sample=start_sample,
end_sample=end_sample,
columns=columns,
method=method,
)
data = list(dict(_recursive_items(d)) for d in res.values())
index = list(res.keys())
return pandas.DataFrame(data, index=index)
def read_flatdict(
self,
start_sample=None,
end_sample=None,
columns=None,
method=None,
squeeze=True,
):
"""Read metadata between start and end samples into a flat dictionary.
Parameters
----------
start_sample : int
Sample index for start of read, given in the number of samples
since the epoch (time_since_epoch*sample_rate). If None,
`get_bounds` is called and the last sample is used.
end_sample : None | int
Sample index for end of read (inclusive), given in the number of
samples since the epoch (time_since_epoch*sample_rate). If None,
use `end_sample` equal to `start_sample`.
columns : None | string | list of strings
A string or list of strings giving the field/column name of
metadata to return. If None, all available columns will be read.
method : None | 'pad'/'ffill'
If None, return only samples within the given range. If 'pad' or
'ffill', the first sample no later than `start_sample` (if any)
will also be included so that values are forward filled into the
desired range.
squeeze : bool
If True and end_sample is None (returning a single sample), return
the column values for the sample directly instead of as arrays with
a first dimension of one. Additionally, if only a single column
name is given and the result contains no subfields, return the
value of that column instead of a dictionary.
Returns
-------
dict or object
Dictionary with keys corresponding to the fields/columns of the
requested metadata. The values are arrays with length equal to
the number of samples. The dictionary also has an 'index' entry
containing an array of the sample indices. If `squeeze` is True
and a dictionary with a single sample and non-index column would be
returned, the non-index value is returned instead.
See Also
--------
read : Read metadata into an OrderedDict, keyed by sample index.
read_dataframe : Read metadata into a DataFrame.
"""
if isinstance(columns, six.string_types):
# preserve column name in returned dictionary so it appears in DF
columns = [columns]
res = self.read(
start_sample=start_sample,
end_sample=end_sample,
columns=columns,
method=method,
)
dict_of_lists = defaultdict(lambda: [np.nan] * len(res))
dict_of_lists["index"] = list(res.keys())
for k, sample_dict in enumerate(res.values()):
for key, val in _recursive_items(sample_dict):
dict_of_lists[key][k] = val
if squeeze and (end_sample is None):
flatdict = {k: v[0] for k, v in dict_of_lists.items()}
if len(flatdict) == 2:
del flatdict["index"]
return flatdict.popitem()[1]
else:
return flatdict
else:
return {k: np.array(v) for k, v in dict_of_lists.items()}
def read_latest(self, columns=None):
"""Read the most recent metadata sample.
This method calls `get_bounds` to find the last sample index and `read`
to read the latest metadata at or before the last sample.
Parameters
----------
columns : None | string | list of strings
A string or list of strings giving the field/column name of
metadata to return. If None, all available columns will be read.
Using a string results in a different returned object than a one-
element list containing the string, see below.
Returns
-------