-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathintake_catalog.py
More file actions
133 lines (110 loc) · 5.08 KB
/
intake_catalog.py
File metadata and controls
133 lines (110 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# coding: utf-8
"""
STACKIT Intake API
This API provides endpoints for managing Intakes.
The version of the OpenAPI document: 1beta.3.5
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import json
import pprint
from typing import Any, ClassVar, Dict, List, Optional, Set
from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import Annotated, Self
from stackit.intake.models.catalog_auth import CatalogAuth
from stackit.intake.models.partitioning_type import PartitioningType
class IntakeCatalog(BaseModel):
"""
The Iceberg catalog configuration
""" # noqa: E501
auth: Optional[CatalogAuth] = None
namespace: Optional[Annotated[str, Field(strict=True, max_length=1024)]] = Field(
default="intake",
description="The namespace to which data shall be written. It will be automatically created, if it does not exist.",
)
partition_by: Optional[List[Annotated[str, Field(strict=True, max_length=1024)]]] = Field(
default=None,
description="List of Iceberg partitioning expressions to use when creating the target table. This setting can only be used when `partitioning` is set to `manual`. Partitioning configuration of an Intake can be changed after creation, but will only take effect once the table has been changed as well. In case of not updating the table to a new one, the old partitioning configuration will remain. See the [Apache Iceberg spec](https://iceberg.apache.org/spec/#partitioning) for more details. ",
alias="partitionBy",
)
partitioning: Optional[PartitioningType] = PartitioningType.NONE
table_name: Optional[Annotated[str, Field(min_length=1, strict=True, max_length=1024)]] = Field(
default=None,
description="The table name is a short name chosen by the user to identify the table in Iceberg.",
alias="tableName",
)
uri: Annotated[str, Field(strict=True, max_length=1024)] = Field(
description="The URI to the Iceberg catalog endpoint"
)
warehouse: Annotated[str, Field(strict=True, max_length=1024)] = Field(
description="The Iceberg warehouse to connect to, required when the catalog has no default warehouse configured."
)
__properties: ClassVar[List[str]] = [
"auth",
"namespace",
"partitionBy",
"partitioning",
"tableName",
"uri",
"warehouse",
]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of IntakeCatalog from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of auth
if self.auth:
_dict["auth"] = self.auth.to_dict()
# set to None if partition_by (nullable) is None
# and model_fields_set contains the field
if self.partition_by is None and "partition_by" in self.model_fields_set:
_dict["partitionBy"] = None
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of IntakeCatalog from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate(
{
"auth": CatalogAuth.from_dict(obj["auth"]) if obj.get("auth") is not None else None,
"namespace": obj.get("namespace") if obj.get("namespace") is not None else "intake",
"partitionBy": obj.get("partitionBy"),
"partitioning": (
obj.get("partitioning") if obj.get("partitioning") is not None else PartitioningType.NONE
),
"tableName": obj.get("tableName"),
"uri": obj.get("uri"),
"warehouse": obj.get("warehouse"),
}
)
return _obj