-
Notifications
You must be signed in to change notification settings - Fork 93
Expand file tree
/
Copy pathobject_classes.py
More file actions
2420 lines (1886 loc) · 77.8 KB
/
object_classes.py
File metadata and controls
2420 lines (1886 loc) · 77.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
# Copyright (c) 2020 - 2025 Ricardo Bartels. All rights reserved.
#
# netbox-sync.py
#
# This work is licensed under the terms of the MIT license.
# For a copy, see file LICENSE.txt included in this
# repository or visit: <https://opensource.org/licenses/MIT>.
import json
from ipaddress import ip_network, IPv4Network, IPv6Network
# noinspection PyUnresolvedReferences
from packaging import version
from module.common.misc import grab
from module.common.logging import get_logger
from module.netbox.manufacturer_mapping import sanitize_manufacturer_name
log = get_logger()
class NetBoxInterfaceType:
"""
class to handle and determine NetBox interface types
Parameters
----------
data: int, str
int: set interface speed
str: try to parse the adapter model and determine speed and interface type
"""
fallback_type = "other"
# valid types which can be used within netbox-sync
valid_types = {
"virtual": "Virtual",
"other": "Other",
"100base-tx": 100,
"1000base-t": 1_000,
"1000base-x-sfp": 1_000,
"2.5gbase-t": 2_500,
"5gbase-t": 5_000,
"10gbase-t": 10_000,
"10gbase-x-sfpp": 10_000,
"10gbase-x-xfp": 10_000,
"25gbase-x-sfp28": 25_000,
"40gbase-x-qsfpp": 40_000,
"50gbase-x-sfp28": 50_000,
"50gbase-x-sfp56": 50_000,
"100gbase-x-cfp": 100_000,
"100gbase-x-cfp2": 100_000,
"100gbase-x-cfp4": 100_000,
"100gbase-x-cpak": 100_000,
"100gbase-x-qsfp28": 100_000,
"200gbase-x-qsfp56": 200_000,
"200gbase-x-cfp2": 200_000,
"400gbase-x-qsfpdd": 400_000,
"400gbase-x-osfp": 400_000
}
# assign common types for an interface speed value
common_types = {
100: "100base-tx",
1_000: "1000base-t",
2_500: "2.5gbase-t",
5_000: "5gbase-t",
10_000: "10gbase-x-sfpp",
25_000: "25gbase-x-sfp28",
40_000: "40gbase-x-qsfpp",
50_000: "50gbase-x-sfp28",
100_000: "100gbase-x-qsfp28",
200_000: "200gbase-x-qsfp56",
400_000: "400gbase-x-qsfpdd"
}
detected_speed = 0
detected_type = None
def __init__(self, data=None):
"""
Parameters
----------
data: int, str
int: set interface speed
str: try to parse the adapter model and determine speed and interface type
"""
try:
self.detected_speed = int(data)
except (TypeError, ValueError):
self.parse_data_from_adapter_name(data)
def get_netbox_type_list(self) -> list:
"""
get a list of valid interface types
Returns
-------
valid_interface_types: list
a list with valid types
"""
return list(self.valid_types.keys())
def get_common_type(self) -> str:
"""
return an interface type from the common type list
Returns
-------
common_type: str
NetBox interface type
"""
return self.common_types.get(self.detected_speed, self.fallback_type)
def parse_data_from_adapter_name(self, adapter_name: str = None) -> None:
"""
parses a provided adapter name and tries to determine speed and interface connector type
Parameters
----------
adapter_name: str
the adapter name/description
"""
if not isinstance(adapter_name, str):
return
detected_speed = 0
for nic_speed in ["400", "200", "100", "50", "40", "25", "10", "5", "2.5", "1"]:
if f"{nic_speed}gb" in adapter_name.lower():
detected_speed = nic_speed
break
elif f"{nic_speed}gbe" in adapter_name.lower():
detected_speed = nic_speed
break
if detected_speed == "2.5":
self.detected_speed = 2500
else:
self.detected_speed = int(detected_speed) * 1000
for nic_type in ["Base-T", "QSFP-DD", "QSFP28", "QSFP56", "SFP28", "QSFP+", "QSFP", "SFP+", "SFP", "XFP"]:
if nic_type.lower() in adapter_name.lower():
if nic_type == "QSFP-DD":
nic_type = nic_type.replace("-", "")
elif "+" in nic_type:
nic_type = nic_type.replace("+", "p")
self.detected_type = nic_type.lower()
break
def get_speed_human(self) -> str:
"""
return a human representation of the detected interface speed
Returns
-------
human_speed: str
human-readable string of interface speed
"""
if self.detected_speed == 0:
return self.fallback_type
if self.detected_speed < 1000:
return f"{self.detected_speed}MbE"
else:
if self.detected_speed == 2500:
speed_to_return = "2.5"
else:
speed_to_return = int(self.detected_speed / 1000)
return f"{speed_to_return}GbE"
def get_this_netbox_type(self) -> str:
"""
returns a NetBox interface type based on the detected parameters
Returns
-------
interface_type: str
NetBox interface type
"""
if self.detected_speed == 0:
return self.fallback_type
if self.detected_type is None:
return self.get_common_type()
# get possible speed types:
possible_speed_types = list()
for nic_type, nic_speed in self.valid_types.items():
if nic_speed == self.detected_speed:
possible_speed_types.append(nic_type)
# only one possible nic type
if len(possible_speed_types) == 1:
return self.get_common_type()
detected_nic_type = None
for possible_speed_type in possible_speed_types:
if self.detected_type in possible_speed_type:
return possible_speed_type
if detected_nic_type is None:
return self.get_common_type()
class NetBoxMappings:
"""
Adds singleton to map between NetBox object class and "object_type" attribute of that class
this is used for objects scopes and IP address and MAC address objects
"""
mapping = dict()
def __new__(cls):
it = cls.__dict__.get("__it__")
if it is not None:
return it
cls.__it__ = it = object.__new__(cls)
it.init()
return it
def init(self):
for classDefinition in NetBoxObject.__subclasses__():
if classDefinition.object_type is not None:
self.mapping[classDefinition] = classDefinition.object_type
self.mapping[classDefinition.object_type] = classDefinition
def get(self, needle):
if isinstance(needle, NetBoxObject):
return self.mapping.get(type(needle))
else:
return self.mapping.get(needle)
def scopes_object_types(self, scopes_list) -> list:
result = list()
if not isinstance(scopes_list, list):
raise ValueError("value for 'scopes_list' must be a list")
for scope in scopes_list:
result.append(self.get(scope))
return result
class NetBoxObject:
"""
Base class for all NetBox object types. Implements all methods used on a NetBox object.
subclasses need to have the following attributes:
name: string
name of the object type (i.e. "virtual machine")
api_path: string
NetBox api path of object type (i.e: "virtualization/virtual-machines")
object_type: string
NetBox object type (i.e: "virtualization.virtualmachine") to handle scopes for this NetBox object
primary_key: string
name of the data model key which represents the primary key of this object besides id (i.e: "name")
data_model: string
dict of permitted data keys and possible values (see description below)
prune: bool
defines if this object type will be pruned by netbox-sync
optional attributes
secondary_key: string
name of the data model key which represents the secondary key of this object besides id
enforce_secondary_key: bool
if secondary key of an object shall be added to name when get_display_name() method is called
min_netbox_version: string
defines since which NetBox version this object is available
read_only: bool
defines if this is a read only object class and can't be changed within netbox-sync
The data_model attribute needs to be a dict describing the data model in NetBox.
Key must be string.
Value can be following types:
int (instance):
value of this attribute must be a string and will be truncated if string exceeds max length of "int"
int (class):
value must be an integer
str (class):
can be a string with an undefined length
bool (class):
attribute must be True or False
NetBoxObject subclass:
value of this key is a reference to another NetBoxObject of exact defined type
list (instance):
value can be one of the predefined values in that list.
list of NetBoxObject subclasses:
value must be an instance of predefined netBoxObject classes in list
NBObjectList subclass:
value mast be the defined subclass of NBObjectList
"""
name = ""
api_path = ""
primary_key = ""
data_model = {}
object_type = None
min_netbox_version = "0.0"
read_only = False
# _mandatory_attrs must be set at subclasses
_mandatory_attrs = ("name", "api_path", "primary_key", "data_model", "object_type")
# just skip this object if a mandatory attribute is missing
skip_object_if_mandatory_attr_is_missing = False
# keep handle to inventory instance to append objects on demand
inventory = None
def __init__(self, data=None, read_from_netbox=False, inventory=None, source=None):
if not all(getattr(self, attr) for attr in self._mandatory_attrs) or hasattr(self, "prune") is False:
raise ValueError(
f"FATAL: not all mandatory attributes {self._mandatory_attrs} "
f"are set in {self.__class__.__name__}."
)
# set default values
self.data = dict()
self.inventory = inventory
self.is_new = True
self.nb_id = 0
self.updated_items = list()
self.unset_items = list()
self.source = source
self.deleted = False
self._original_data = dict()
# add empty lists for list items
for key, data_type in self.data_model.items():
if data_type in NBObjectList.__subclasses__():
self.data[key] = data_type()
# add data to this object
self.update(data=data, read_from_netbox=read_from_netbox, source=source)
def __repr__(self):
return "<%s instance '%s' at %s>" % (self.__class__.__name__, self.get_display_name(), id(self))
def to_dict(self):
"""
returns this object as a dictionary
Returns
-------
dict: dictionary of all relevant items of this object instance
"""
out = dict()
for key in dir(self):
value = getattr(self, key)
if "__" in key:
continue
if callable(value) is True:
continue
if key in ["inventory", "default_attributes", "data_model_relation", "mapping", "scopes"]:
continue
if key == "source":
value = getattr(value, "name", None)
if key == "data_model":
data_model = dict()
for data_key, data_value in value.items():
if isinstance(data_value, list):
new_data_value = list()
for possible_option in data_value:
if type(possible_option) == type:
new_data_value.append(str(possible_option))
else:
new_data_value.append(possible_option)
data_value = new_data_value
# if value is class name then print class name
if type(data_value) == type:
data_value = str(data_value)
data_model[data_key] = data_value
value = data_model
if key == "data":
data = dict()
for data_key, data_value in value.items():
# if value is class name then print class representation
if isinstance(data_value, (NetBoxObject, IPv4Network, IPv6Network)):
data_value = repr(data_value)
elif isinstance(data_value, NBObjectList):
data_value = [repr(x) for x in data_value]
data[data_key] = data_value
value = data
out[key] = value
return out
def __str__(self):
"""
formats this object as a dict in JSON format
Returns
-------
str: object dict as JSON
"""
return json.dumps(self.to_dict(), sort_keys=True, indent=4)
@staticmethod
def format_slug(text=None, max_len=50):
"""
Format string to comply to NetBox slug acceptable pattern and max length.
Parameters
----------
text: str
name to format into a NetBox slug
max_len: int
maximum possible length of slug
Returns
-------
str: input name formatted as slug und truncated if necessary
"""
if text is None or len(text) == 0:
raise AttributeError("Argument 'text' can't be None or empty!")
permitted_chars = (
"abcdefghijklmnopqrstuvwxyz" # alphabet
"0123456789" # numbers
"_-" # symbols
)
# Replace separators with dash
for sep in [" ", ",", "."]:
text = text.replace(sep, "-")
# Strip unacceptable characters
text = "".join([c for c in text.lower() if c in permitted_chars])
# Enforce max length
return text[0:max_len]
def get_uniq_slug(self, text=None, max_len=50)-> str:
"""
return an uniq slug. If the default slug is already used try to
append a number until a slug is found which has not been used.
Parameters
----------
text: str
name to format into a NetBox slug
max_len: int
maximum possible length of slug
Returns
-------
(str): return the slug
"""
slug = self.format_slug(text=text, max_len=max_len)
if self.inventory.slug_used(self.__class__, slug) is False:
return slug
for x in range(1,20):
new_slug = f"{slug}-{x}"
if self.inventory.slug_used(self.__class__, new_slug) is False and len(new_slug) <= max_len:
log.info(f"Slug '{slug}' for {self.name} '{text}' has been used. "
f"Assigning slug '{new_slug}'")
return new_slug
raise ValueError(f"Unable to find uniq slug for {self.name} '{text}'")
# noinspection PyAttributeOutsideInit
def update(self, data=None, read_from_netbox=False, source=None):
"""
parse data dictionary and validate input. Add data to object if valid.
Parameters
----------
data: dict
dictionary with data to add/update
read_from_netbox: bool
True if data was gathered from NetBox via request
source: source handler
object handler of source
Returns
-------
None
"""
if data is None:
return
if not isinstance(data, dict):
raise AttributeError("Argument 'data' needs to be a dict!")
if data.get("id") is not None:
self.nb_id = data.get("id")
# skip item as it's missing its primary key
if data.get(self.primary_key) is None and \
(read_from_netbox is True or self.data.get(self.primary_key) is None):
if self.skip_object_if_mandatory_attr_is_missing is True:
device_url = data.get("url") or self.data.get('url')
log.debug2(f"This '{self.name}' ({self.nb_id}) data structure does not contain "
f"the primary key '{self.primary_key}'. Skipping. Link: {device_url}")
else:
log.error(f"This '{self.name}' data structure does not contain "
f"the primary key '{self.primary_key}' got: {data}")
return None
if read_from_netbox is True:
self.is_new = False
self.data = data
self.updated_items = list()
self.unset_items = list()
return
if self.read_only is True:
raise ValueError(f"Adding {self.name} by this program is currently not implemented.")
self.set_source(source)
display_name = self.get_display_name(data)
if display_name is None:
display_name = self.get_display_name()
log.debug2(f"Parsing '{self.name}' data structure: {display_name}")
parsed_data = dict()
for key, value in data.items():
if key not in self.data_model.keys():
log.error(f"Found undefined data model key '{key}' for object '{self.__class__.__name__}'")
continue
# skip unset values
if value is None:
log.info(f"Found unset key '{key}' while parsing {display_name}. Skipping This key")
continue
# check data model to see how we have to parse the value
defined_value_type = self.data_model.get(key)
# value must be a string witch a certain max length
if isinstance(defined_value_type, int):
if not isinstance(value, str):
log.error(f"Invalid data type for '{self.__class__.__name__}.{key}' (must be str), got: "
f"{value} ({type(value)})")
continue
value = value[0:defined_value_type]
if key == "slug":
value = self.get_uniq_slug(text=value, max_len=defined_value_type)
if isinstance(defined_value_type, list):
if isinstance(value, NetBoxObject):
if type(value) not in defined_value_type:
log.error(f"Invalid data type for '{key}' (must be one of {defined_value_type}), "
f"got: '{type(value)}'")
continue
# check if value is in defined list
elif value not in defined_value_type:
log.error(f"Invalid data type for '{key}' (must be one of {defined_value_type}), got: '{value}'")
continue
# just check the type of the value
type_check_failed = False
for valid_type in [bool, str, int, list]:
if defined_value_type == valid_type and not isinstance(value, valid_type):
log.error(f"Invalid data type for '{key}' (must be {valid_type.__name__}), got: '{value}'")
type_check_failed = True
break
if type_check_failed is True:
continue
# tags need to be treated as list of dictionaries, tags are only added
if defined_value_type == NBTagList:
# noinspection PyTypeChecker
value = self.compile_tags(value)
# VLANs will overwrite the whole list of current VLANs
if defined_value_type == NBVLANList:
value = self.compile_vlans(value)
if defined_value_type == NBManufacturer:
value = self.sanitize_manufacturer_name(value)
if defined_value_type == NBCustomField:
if not isinstance(value, dict):
log.error(f"Invalid data type for '{key}' (must be 'dict'), got: '{value}'")
continue
for field_name in value.keys():
if self.inventory.get_by_data(NBCustomField, data={"name": field_name}) is None:
log.error(f"{NBCustomField.name} '{field_name}' not found in inventory. "
"Needs to be created first!")
type_check_failed = True
if type_check_failed is True:
continue
# allows an empty site for netbox objects where a site is not mandatory
# required for clusters and sub-objects without site reference
if (isinstance(self, (NBCluster, NBVM, NBVLAN)) and
key == "site" and
grab(value, "name") is None):
parsed_data[key] = None
continue
# this is meant to be reference to a different object
if defined_value_type in NetBoxObject.__subclasses__() and defined_value_type != NBCustomField:
if not isinstance(value, NetBoxObject):
# try to find object.
value = self.inventory.add_update_object(defined_value_type, data=value, source=source)
# add source if currently undefined (read from NetBox)
value.set_source(source)
# add to parsed data dict
parsed_data[key] = value
# add/update slug
# if data model contains a slug we need to handle it
if "slug" in self.data_model.keys() and \
parsed_data.get("slug") is None and \
parsed_data.get(self.primary_key) is not None and \
self.data.get("slug") in [None, ""]:
parsed_data["slug"] = self.get_uniq_slug(text=parsed_data.get(self.primary_key),
max_len=self.data_model.get("slug"))
# update all data items
data_updated = False
for key, new_value in parsed_data.items():
# nothing changed, continue with next key
current_value = self.data.get(key)
if current_value == new_value:
continue
# get current value str
if isinstance(current_value, (NetBoxObject, NBObjectList)):
current_value_str = str(current_value.get_display_name())
# if data model is a list then we need to read the NetBox data value
elif isinstance(self.data_model.get(key), list) and isinstance(current_value, dict):
current_value_str = str(current_value.get("value"))
else:
current_value_str = str(current_value).replace("\r", "")
# get new value str
if self.data_model.get(key) == NBCustomField:
if current_value is None:
current_value = dict()
# Fix for object/multi-object custom fields
# When patching, we only need the IDs, not the full object representation
new_value_copy = new_value.copy()
for field_name, field_value in new_value_copy.items():
# Check for custom field type
custom_field = self.inventory.get_by_data(NBCustomField, data={"name": field_name})
if custom_field is not None:
field_type = grab(custom_field, "data.type")
# Handle object type custom fields - need only ID
if field_type == "object" and isinstance(field_value, dict) and field_value.get('id') is not None:
new_value[field_name] = field_value.get('id')
# Handle multi-object type custom fields - need list of IDs
elif field_type == "multi-object" and isinstance(field_value, list):
ids = []
for item in field_value:
if isinstance(item, dict) and item.get('id') is not None:
ids.append(item.get('id'))
if ids:
new_value[field_name] = ids
new_value = {**current_value, **new_value}
new_value_str = str(new_value)
elif isinstance(new_value, (NetBoxObject, NBObjectList)):
new_value_str = str(new_value.get_display_name())
else:
new_value_str = str(new_value).replace("\r", "")
# support NetBox 2.11+ vcpus float value
# noinspection PyTypeChecker
if current_value is not None and \
self.data_model.get(key) in [int, float] and \
isinstance(new_value, (int, float)) and \
float(current_value) == float(new_value):
continue
if isinstance(current_value, NetBoxObject) and isinstance(new_value, NetBoxObject):
if current_value is new_value:
continue
# just check again if values might match now
elif current_value_str == new_value_str:
continue
# skip update if just the letter case changed for the primary key
if key == self.primary_key and current_value_str.lower() == new_value_str.lower():
continue
if self.is_new is False:
if self._original_data.get(key) == new_value_str and key in self.updated_items:
self.data[key] = new_value
self.updated_items.remove(key)
log.debug(f"{self.name.capitalize()} '{display_name}' attribute '{key}' was set back to "
f"original NetBox value '{current_value_str}'")
continue
# save original NetBox value for future use to detect updates which sets it back to the same value
# which is already saved in NetBox
elif self._original_data.get(key) is None:
self._original_data[key] = current_value_str
new_value_str = new_value_str.replace("\n", " ")
log.info(f"{self.name.capitalize()} '{display_name}' attribute '{key}' changed from "
f"'{current_value_str}' to '{new_value_str}'")
self.data[key] = new_value
self.updated_items.append(key)
data_updated = True
self.resolve_relations()
if data_updated is True and self.is_new is False:
log.debug("Updated %s object: %s" % (self.name, self.get_display_name()))
def set_source(self, source=None):
"""
updates the source attribute, Only update if undefined
"""
if source is not None and self.source is None:
self.source = source
def get_display_name(self, data=None, including_second_key=False):
"""
return a name as string of this object based on primary/secondary key
Parameters
----------
data: dict
optional data dictionary to format name from if object is not initialized
including_second_key: bool
if True adds second key if object has one
Returns
-------
str: name of object
"""
this_data_set = data
if data is None:
this_data_set = self.data
if this_data_set is None:
return None
my_name = this_data_set.get(self.primary_key)
secondary_key = getattr(self, "secondary_key", None)
enforce_secondary_key = getattr(self, "enforce_secondary_key", False)
include_secondary_key_if_present = getattr(self, "include_secondary_key_if_present", False)
if secondary_key is None:
return my_name
if my_name is not None and True in \
[enforce_secondary_key, including_second_key, include_secondary_key_if_present]:
secondary_key_value = this_data_set.get(secondary_key)
org_secondary_key_value = str(secondary_key_value)
read_from_netbox = False
if isinstance(secondary_key_value, NetBoxObject):
read_from_netbox = True if secondary_key_value.nb_id != 0 else False
secondary_key_value = secondary_key_value.get_display_name()
elif isinstance(secondary_key_value, dict):
read_from_netbox = True if secondary_key_value.get("id", 0) != 0 else False
secondary_key_value = self.get_display_name(data=secondary_key_value)
if secondary_key_value is None and read_from_netbox is False and include_secondary_key_if_present is False:
log.warning(f"Unable to determine second key '{secondary_key}' for {self.name} '{my_name}', "
f"got: {org_secondary_key_value}")
log.warning("This could cause serious errors and lead to wrongly assigned object relations!!!")
my_name = f"{my_name} ({secondary_key_value})"
return my_name
def resolve_relations(self):
"""
Resolve object relations for this object. Substitute a dict of data with an id with the instantiated
reference of this object
"""
for key, data_type in self.data_model.items():
if self.data.get(key) is None:
continue
# continue if data_type is not an NetBox object
# noinspection PyTypeChecker
if data_type not in NetBoxObject.__subclasses__() + NBObjectList.__subclasses__():
continue
# NBCustomField are special
if data_type == NBCustomField:
continue
data_value = self.data.get(key)
if data_type in NBObjectList.__subclasses__():
resolved_object_list = data_type()
assert isinstance(resolved_object_list, list)
for item in data_value:
if isinstance(item, data_type.member_type):
item_object = item
else:
item_object = self.inventory.get_by_data(data_type.member_type, data=item)
if item_object is not None:
resolved_object_list.append(item_object)
resolved_data = resolved_object_list
else:
if data_value is None:
continue
if isinstance(data_value, NetBoxObject):
resolved_data = data_value
else:
data_to_find = None
if isinstance(data_value, int):
data_to_find = {"id": data_value}
elif isinstance(data_value, dict):
data_to_find = data_value
resolved_data = self.inventory.get_by_data(data_type, data=data_to_find)
if resolved_data is not None:
self.data[key] = resolved_data
else:
log.error(f"Problems resolving relation '{key}' for object '{self.get_display_name()}' and "
f"value '{data_value}'")
def resolve_scoped_relations(self, id_attr, type_attr):
o_id = self.data.get(id_attr)
o_type = self.data.get(type_attr)
if hasattr(self, "mapping") is True:
mapping = getattr(self, "mapping")
else:
return
if isinstance(o_id, int) and o_type is not None and mapping.get(o_type) is not None:
self.data[id_attr] = self.inventory.get_by_id(mapping.get(o_type), nb_id=o_id)
elif o_id is not None and not isinstance(o_id, NetBoxObject):
o_id_name = grab(self, f"data.{id_attr}.name")
log.debug(f"{self.name} '{self.data.get('name')}' {type_attr} '{o_type}' for "
f"'{o_id_name}' is currently not supported")
self.data[type_attr] = ""
def get_dependencies(self):
"""
returns a list of NetBoxObject subclasses this object depends on
Returns
-------
list: of NetBoxObject subclasses
"""
r = [x for x in self.data_model.values() if x in NetBoxObject.__subclasses__()]
r.extend([x.member_type for x in self.data_model.values() if x in NBObjectList.__subclasses__()])
return r
def get_tags(self) -> list:
"""
returns a list of strings of tag names
Returns
-------
list: of strings of tag names
"""
tag_list = list()
if "tags" not in self.data_model.keys():
return tag_list
for tag in self.data.get("tags", list()):
if isinstance(tag, NetBoxObject):
tag_name = tag.get_display_name()
if tag_name not in tag_list:
tag_list.append(tag_name)
else:
log.error(f"This tag is not an NetBox object: {tag}")
log.error(f"Please report this here: https://github.com/bb-Ricardo/netbox-sync/issues/120")
return tag_list
@classmethod
def extract_tag_name(cls, this_tag):
if isinstance(this_tag, NBTag):
return this_tag.get_display_name()
elif isinstance(this_tag, str):
return this_tag
elif isinstance(this_tag, dict) and this_tag.get("name") is not None:
return this_tag.get("name")
def compile_tags(self, tags, remove=False):
"""
Parameters
----------
tags: (str, list, dict, NBTag)
tags to parse and add/remove to/from current list of object tags
remove: bool
True if tags shall be removed, otherwise they will be added
Returns
-------
NBTagList: with added/removed tags
"""
if tags is None or NBTagList not in self.data_model.values():
return
# list of parsed tag strings
sanitized_tag_strings = list()
"""
disable logging
log.debug2(f"Compiling TAG list")
"""
new_tag_list = NBTagList()
if isinstance(tags, list):
for tag in tags:
sanitized_tag_strings.append(self.extract_tag_name(tag))
else:
# noinspection PyTypeChecker
sanitized_tag_strings.append(self.extract_tag_name(tags))
# current list of tag strings
current_tag_strings = self.get_tags()
new_tags = list()
removed_tags = list()
for tag_name in sanitized_tag_strings:
if tag_name is None:
continue