smashbox.tools.tools

Created on Tue Jul 15 16:40:32 2025

@author: maxime

  1#!/usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3"""
  4Created on Tue Jul 15 16:40:32 2025
  5
  6@author: maxime
  7"""
  8
  9import os
 10import glob
 11import numpy as np
 12import pandas as pd
 13import numbers
 14from tqdm import tqdm
 15
 16from rich import print
 17from rich.tree import Tree
 18import inspect
 19
 20from typing import get_args
 21
 22
 23class Dict2Struct:
 24    def __init__(self, **entries):
 25        self.__dict__.update(entries)
 26
 27
 28def with_reticulate():
 29    # Exemple : RETICULATE_PYTHON env variable peut être définie
 30    return "R_SESSION_INITIALIZED" in os.environ or "RETICULATE_PYTHON" in os.environ
 31
 32
 33def build_object_tree(obj, name="root"):
 34
 35    tree = Tree(f"[bold cyan]Object {name}[/] ({type(obj).__name__})")
 36
 37    for attr in dir(obj):
 38
 39        if attr.startswith("_") or attr.startswith("__") and attr.endswith("__"):
 40            continue
 41        try:
 42            val = getattr(obj, attr)
 43
 44        except Exception:
 45            continue
 46
 47        if inspect.ismethod(val) or inspect.isfunction(val):
 48            tree.add(f"[green]Method:[/] {attr}()")
 49
 50        elif inspect.isclass(val):
 51            tree.add(f"[blue]Class:[/] {attr}")
 52            subtree = build_object_tree(val, name=attr)
 53
 54        elif isinstance(val, dict):
 55            tree.add(f"[red]Attribut:[/]  {attr} = {val.keys()}")
 56        elif isinstance(
 57            val,
 58            (
 59                int,
 60                float,
 61                str,
 62                bool,
 63                type(None),
 64                list,
 65                set,
 66                tuple,
 67            ),
 68        ):
 69            tree.add(f"[red]Attribute:[/] {attr} = {repr(val)}")
 70        elif isinstance(val, (pd.DataFrame, np.ndarray)):
 71            tree.add(f"[red]Attribute:[/] {attr} = {type(val)}")
 72        elif "<class 'smashbox" in str(type(val)):
 73            subtree = build_object_tree(val, name=attr)
 74            tree.add(subtree)
 75        else:
 76            tree.add(f"[red]Unknown attribute type:[/] {attr} = {type(val)}")
 77
 78    return tree
 79
 80
 81def autocast_args(func):
 82    """
 83    Decrorator function. Usgae @autocast_args previous the function definition
 84    The goal of this decorator is to test input args type and auto cast them if possible.
 85    If the type of the arg is not good and cannot be auto-casted, the function will throw
 86    an execption.
 87    :param func: Function on which to apply the decorator
 88    :type func: Any function
 89    """
 90
 91    sig = inspect.signature(func)
 92    annotations = func.__annotations__
 93
 94    def wrapper(*args, **kwargs):
 95
 96        bound = sig.bind(*args, **kwargs)
 97        bound.apply_defaults()
 98
 99        for name, value in bound.arguments.items():
100            if name in annotations:
101
102                target_type = annotations[name]
103
104                args_ = get_args(target_type)
105
106                if target_type is None and len(args_) == 0:
107                    args_ = (type(None),)
108                    target_type = type(None)
109
110                if not type(value) in args_:
111
112                    if len(args_) > 1 and type(None) in args_:
113
114                        converted = False
115                        for t in args_:
116
117                            if t is not type(None):
118
119                                if value is not None:
120                                    try:
121                                        print(
122                                            f"</> Warning: Arg '{name}' of type {type(value)} is being"
123                                            f" converted to {t}"
124                                        )
125                                        bound.arguments[name] = t(value)
126                                        converted = True
127                                    except:
128                                        pass
129
130                                if converted:
131                                    break
132
133                        if not converted:
134                            raise TypeError(
135                                f"</> Error: Arg '{name}' must be a type of "
136                                f" {args_}, got {value}"
137                                f" ({type(value).__name__})"
138                            )
139
140                    else:
141                        if not isinstance(value, target_type):
142                            try:
143                                print(
144                                    f"</> Warning: Arg '{name}' of type {type(value)} is being"
145                                    f" converted to {target_type}"
146                                )
147                                bound.arguments[name] = target_type(value)
148                            except Exception:
149                                raise TypeError(
150                                    f"</> Error: Arg '{name}' must be a type of "
151                                    f" {target_type.__name__}, got {value}"
152                                    f" ({type(value).__name__})"
153                                )
154
155        return func(*bound.args, **bound.kwargs)
156
157    return wrapper
158
159
160def infograffas_bbox_extractor(info_graffas):
161    """
162    A function to extract the bbox from info_graffas object and adapt it to the Smash
163    convention
164
165    Parameter:
166    ----------
167    info_graffas: Dictionary containing information about the Graffas domain. At least: a dictionary `domain` containing keys 'left,bottom,right,top' associated to each
168     bouning coordinates and a key 'resolution_sim' containing a real with the resolution of the domain in meter.
169
170     Return
171     ------
172
173     bbox, a dictionary of the bbox adapted to Smash.
174    """
175
176    bbox = {
177        "left": info_graffas["domain"]["left"] - info_graffas["resolution_sim"] / 2,
178        "bottom": info_graffas["domain"]["bottom"] - info_graffas["resolution_sim"] / 2,
179        "right": info_graffas["domain"]["right"] + info_graffas["resolution_sim"] / 2,
180        "top": info_graffas["domain"]["top"] + info_graffas["resolution_sim"] / 2,
181    }
182
183    return bbox
184
185
186def GraffasVector2SmashArray(vdata, coordinates, resolution):
187
188    coord_x = ((coordinates["X"] - min(coordinates["X"])) / resolution).astype(int)
189    coord_y = ((coordinates["Y"] - min(coordinates["Y"])) / resolution).astype(int)
190
191    graffas_prcp = np.zeros(shape=(max(coord_x) + 1, max(coord_y) + 1, vdata.shape[1]))
192
193    for t in tqdm(range(graffas_prcp.shape[2])):
194        for j in range(len(coord_x)):
195            c_x = coord_x[j]
196            c_y = coord_y[j]
197            graffas_prcp[c_x, c_y, t] = vdata[j, t]
198
199    return graffas_prcp
200
201
202def array_isin(arr1: np.ndarray = None, arr2: np.ndarray = None):
203
204    pos = []
205
206    for i in range(len(arr2)):
207
208        if not np.any(np.isin(arr1, arr2[i]) == 1):
209            print(f"</> Outlet name `{arr2[i]}` does not exist in the mesh.")
210
211    if arr1 is not None and arr2 is not None:
212        pos = list(np.where(np.isin(arr1, arr2))[0])
213
214    if len(pos) == 0:
215        raise ValueError("Invalid outlets name.")
216
217    return pos
218
219
220def check_asset_path(asset_dir: str = "", path: None | os.PathLike = None):
221
222    if path is None:
223        return path
224
225    if not os.path.exists(path):
226
227        mypath = os.path.join(asset_dir)
228        matched_file = sorted(glob.glob(f"{mypath}/*{path}*"))
229
230        if len(matched_file) == 0:
231            raise ValueError(
232                f"'{path}' is not a valid {asset_dir} filename."
233                f"Choice are: {os.listdir(mypath)}"
234            )
235        else:
236            path = matched_file[0]
237
238    return path
239
240
241def print_tree(root_path, prefix=""):
242
243    files = sorted(os.listdir(root_path))
244
245    for index, name in enumerate(files):
246
247        path = os.path.join(root_path, name)
248        connector = "└── " if index == len(files) - 1 else "├── "
249
250        print(prefix + connector + name)
251
252        if os.path.isdir(path):
253            extension = "    " if index == len(files) - 1 else "│   "
254            print_tree(path, prefix + extension)
255
256
257@autocast_args
258def time_resample_prcp_array(
259    array: np.ndarray | None,
260    input_dt: int | float = 3600.0,
261    output_dt: float = 3600.0,
262    t_axis: int = 2,
263):
264    """
265    Resample an array of prcp for a given time-step.
266    :param array: the matrix containing the discharge with shape (nbx, nby, nbts)
267    :type array: np.ndarray | None
268    :param quantile_duration: The duration of the quantile (hours), defaults to 1
269    :type quantile_duration: int | float, optional
270    :param model_time_step: the time-step of the Smash model (seconds), defaults to 3600
271    :type model_time_step: float, optional
272    :param t_axis: The array axis direction of the time-step, defaults to 2
273    :type t_axis: int, optional
274    :return: The resampled array
275    :rtype: np.ndarray
276
277    """
278
279    # resample array to the new duration
280    if pd.Timedelta(seconds=input_dt) < pd.Timedelta(seconds=output_dt):
281
282        if not (output_dt % input_dt == 0):
283            raise ValueError("output_dt must be a factor of intput_dt.")
284
285        print(
286            f"</> Resampling array with time-step `{input_dt}s`"
287            f" to time-step `{output_dt}s`"
288        )
289        chunk_size = int(output_dt / input_dt)
290
291        array_trans = np.moveaxis(array, t_axis, 0)  # Axe à la position 0
292
293        new_shape = (array_trans.shape[0] // chunk_size, chunk_size) + array_trans.shape[
294            1:
295        ]
296
297        array_trans_reshaped = array_trans[
298            0 : chunk_size * (array_trans.shape[0] // chunk_size)
299        ].reshape(new_shape)
300
301        del array_trans
302
303        array_trans_reshaped_sum = np.sum(array_trans_reshaped, axis=1)
304
305        del array_trans_reshaped
306
307        array = np.moveaxis(array_trans_reshaped_sum, 0, t_axis)
308
309        del array_trans_reshaped_mean
310
311    if pd.Timedelta(seconds=input_dt) > pd.Timedelta(seconds=output_dt):
312
313        if not (input_dt % output_dt == 0):
314            raise ValueError("output_dt must be a factor of intput_dt.")
315
316        # array = np.zeros(shape=(2, 2, 6)) + np.random.randint(10, size=(2, 2, 6))
317
318        array = np.moveaxis(
319            array,
320            t_axis,
321            len(array.shape) - 1,
322        )
323
324        chunk_size = int(input_dt / output_dt)
325        # nb_chunk = int(array.shape[len(array.shape) - 1] / chunk_size)
326
327        array = np.repeat(array, chunk_size, axis=2) / chunk_size
328
329        # array = array.reshape(array.shape[0], array.shape[1], nb_chunk, chunk_size).sum(
330        #     -1
331        # )
332
333        array = np.moveaxis(array, len(array.shape) - 1, t_axis)
334
335    return array
336
337
338@autocast_args
339def read_hourly_qobs(qobs_directory, code):
340    start_time = pd.Timestamp("1900-01-01 00:00")
341    end_time = pd.Timestamp.today().round(freq="24h")
342    dt = 3600.0
343
344    npdt = int((end_time - start_time).total_seconds() / dt) + 1
345
346    qobs = np.zeros(shape=(len(code), npdt)) - 99.0
347
348    miss = []
349
350    for i, c in enumerate(code):
351        f = glob.glob(f"{qobs_directory}/**/*{c}*.csv", recursive=True)
352
353        if f:
354            dat = pd.read_csv(f[0])
355            try:
356                file_start_time = pd.Timestamp(dat.columns[0])
357            except Exception:
358                raise ValueError(
359                    f"Column header '{dat.columns[0]}' in the observed discharge file for catchment '{c}' "
360                    f"is not a valid date"
361                ) from None
362
363            file_end_time = file_start_time + pd.Timedelta(seconds=dt * (len(dat) - 1))
364            start_diff = int((start_time - file_start_time).total_seconds() / dt) + 1
365            end_diff = int((end_time - file_start_time).total_seconds() / dt) + 1
366
367            # % Check if observed discharge file contains data for corresponding simulation period
368            if start_diff > dat.index.max() or end_diff < 0:
369                print(
370                    f"</> The provided observed discharge file for catchment '{c}' does not contain data for the "
371                    f"selected simulation period ['{start_time}', '{end_time}']. The file covers the period "
372                    f"['{file_start_time}', '{file_end_time}']",
373                )
374            else:
375                ind_start_dat = max(0, start_diff)
376                ind_end_dat = min(dat.index.max(), end_diff)
377                ind_start_arr = max(0, -start_diff)
378                ind_end_arr = ind_start_arr + ind_end_dat - ind_start_dat
379
380                qobs[i, ind_start_arr:ind_end_arr] = dat.iloc[
381                    ind_start_dat:ind_end_dat, 0
382                ]
383        else:
384            miss.append(c)
385
386    if miss:
387        print(f"</> Missing {len(miss)} observed discharge file(s): {miss}")
388
389    return qobs
390
391
392@autocast_args
393def read_object_as_dict(instance, recursion_counter: int = 0):
394    """
395
396    create a dictionary from a custom python object
397
398    Parameters
399    ----------
400
401    instance : object
402        an custom python object
403
404    Return
405    ------
406
407    key_data: dict
408        an dictionary containing all keys and atributes of the object
409
410    """
411    key_data = {}
412    # key_list = list()
413    # return_list = False
414    recursion_counter = 0
415    for attr in dir(instance):
416        # print(attr)
417        if not attr.startswith("_") and not attr in ["from_handle", "copy"]:
418            try:
419                value = getattr(instance, attr)
420
421                if isinstance(value, (np.ndarray, list, tuple)):
422
423                    if isinstance(value, list):
424                        value = np.array(value).astype("U")
425
426                    if value.dtype == "object" or value.dtype.char == "U":
427                        value = value.astype("U")
428
429                    key_data.update({attr: value})
430
431                elif isinstance(value, dict):
432                    key_data.update({attr: value})
433
434                elif isinstance(value, numbers.Number):
435                    key_data.update({attr: value})
436
437                elif isinstance(value, str):
438                    key_data.update({attr: value})
439
440                elif type(value) == "method":
441                    next(attr)
442
443                else:
444
445                    depp_key_data = read_object_as_dict(
446                        value, recursion_counter=recursion_counter
447                    )
448
449                    recursion_counter = recursion_counter + 1
450
451                    if len(depp_key_data) > 0:
452                        key_data.update({attr: depp_key_data})
453
454                    if recursion_counter > 100:
455                        print("recursion counter exceed the limit of 100... return")
456                        return
457            except:
458                print(f"unknown type for attribute : {attr}")
459                pass
460
461    return key_data
class Dict2Struct:
24class Dict2Struct:
25    def __init__(self, **entries):
26        self.__dict__.update(entries)
Dict2Struct(**entries)
25    def __init__(self, **entries):
26        self.__dict__.update(entries)
def with_reticulate():
29def with_reticulate():
30    # Exemple : RETICULATE_PYTHON env variable peut être définie
31    return "R_SESSION_INITIALIZED" in os.environ or "RETICULATE_PYTHON" in os.environ
def build_object_tree(obj, name='root'):
34def build_object_tree(obj, name="root"):
35
36    tree = Tree(f"[bold cyan]Object {name}[/] ({type(obj).__name__})")
37
38    for attr in dir(obj):
39
40        if attr.startswith("_") or attr.startswith("__") and attr.endswith("__"):
41            continue
42        try:
43            val = getattr(obj, attr)
44
45        except Exception:
46            continue
47
48        if inspect.ismethod(val) or inspect.isfunction(val):
49            tree.add(f"[green]Method:[/] {attr}()")
50
51        elif inspect.isclass(val):
52            tree.add(f"[blue]Class:[/] {attr}")
53            subtree = build_object_tree(val, name=attr)
54
55        elif isinstance(val, dict):
56            tree.add(f"[red]Attribut:[/]  {attr} = {val.keys()}")
57        elif isinstance(
58            val,
59            (
60                int,
61                float,
62                str,
63                bool,
64                type(None),
65                list,
66                set,
67                tuple,
68            ),
69        ):
70            tree.add(f"[red]Attribute:[/] {attr} = {repr(val)}")
71        elif isinstance(val, (pd.DataFrame, np.ndarray)):
72            tree.add(f"[red]Attribute:[/] {attr} = {type(val)}")
73        elif "<class 'smashbox" in str(type(val)):
74            subtree = build_object_tree(val, name=attr)
75            tree.add(subtree)
76        else:
77            tree.add(f"[red]Unknown attribute type:[/] {attr} = {type(val)}")
78
79    return tree
def autocast_args(func):
 82def autocast_args(func):
 83    """
 84    Decrorator function. Usgae @autocast_args previous the function definition
 85    The goal of this decorator is to test input args type and auto cast them if possible.
 86    If the type of the arg is not good and cannot be auto-casted, the function will throw
 87    an execption.
 88    :param func: Function on which to apply the decorator
 89    :type func: Any function
 90    """
 91
 92    sig = inspect.signature(func)
 93    annotations = func.__annotations__
 94
 95    def wrapper(*args, **kwargs):
 96
 97        bound = sig.bind(*args, **kwargs)
 98        bound.apply_defaults()
 99
100        for name, value in bound.arguments.items():
101            if name in annotations:
102
103                target_type = annotations[name]
104
105                args_ = get_args(target_type)
106
107                if target_type is None and len(args_) == 0:
108                    args_ = (type(None),)
109                    target_type = type(None)
110
111                if not type(value) in args_:
112
113                    if len(args_) > 1 and type(None) in args_:
114
115                        converted = False
116                        for t in args_:
117
118                            if t is not type(None):
119
120                                if value is not None:
121                                    try:
122                                        print(
123                                            f"</> Warning: Arg '{name}' of type {type(value)} is being"
124                                            f" converted to {t}"
125                                        )
126                                        bound.arguments[name] = t(value)
127                                        converted = True
128                                    except:
129                                        pass
130
131                                if converted:
132                                    break
133
134                        if not converted:
135                            raise TypeError(
136                                f"</> Error: Arg '{name}' must be a type of "
137                                f" {args_}, got {value}"
138                                f" ({type(value).__name__})"
139                            )
140
141                    else:
142                        if not isinstance(value, target_type):
143                            try:
144                                print(
145                                    f"</> Warning: Arg '{name}' of type {type(value)} is being"
146                                    f" converted to {target_type}"
147                                )
148                                bound.arguments[name] = target_type(value)
149                            except Exception:
150                                raise TypeError(
151                                    f"</> Error: Arg '{name}' must be a type of "
152                                    f" {target_type.__name__}, got {value}"
153                                    f" ({type(value).__name__})"
154                                )
155
156        return func(*bound.args, **bound.kwargs)
157
158    return wrapper

Decrorator function. Usgae @autocast_args previous the function definition The goal of this decorator is to test input args type and auto cast them if possible. If the type of the arg is not good and cannot be auto-casted, the function will throw an execption.

Parameters
  • func: Function on which to apply the decorator
def infograffas_bbox_extractor(info_graffas):
161def infograffas_bbox_extractor(info_graffas):
162    """
163    A function to extract the bbox from info_graffas object and adapt it to the Smash
164    convention
165
166    Parameter:
167    ----------
168    info_graffas: Dictionary containing information about the Graffas domain. At least: a dictionary `domain` containing keys 'left,bottom,right,top' associated to each
169     bouning coordinates and a key 'resolution_sim' containing a real with the resolution of the domain in meter.
170
171     Return
172     ------
173
174     bbox, a dictionary of the bbox adapted to Smash.
175    """
176
177    bbox = {
178        "left": info_graffas["domain"]["left"] - info_graffas["resolution_sim"] / 2,
179        "bottom": info_graffas["domain"]["bottom"] - info_graffas["resolution_sim"] / 2,
180        "right": info_graffas["domain"]["right"] + info_graffas["resolution_sim"] / 2,
181        "top": info_graffas["domain"]["top"] + info_graffas["resolution_sim"] / 2,
182    }
183
184    return bbox

A function to extract the bbox from info_graffas object and adapt it to the Smash convention

Parameter:

info_graffas: Dictionary containing information about the Graffas domain. At least: a dictionary domain containing keys 'left,bottom,right,top' associated to each bouning coordinates and a key 'resolution_sim' containing a real with the resolution of the domain in meter.

Return


bbox, a dictionary of the bbox adapted to Smash.

def GraffasVector2SmashArray(vdata, coordinates, resolution):
187def GraffasVector2SmashArray(vdata, coordinates, resolution):
188
189    coord_x = ((coordinates["X"] - min(coordinates["X"])) / resolution).astype(int)
190    coord_y = ((coordinates["Y"] - min(coordinates["Y"])) / resolution).astype(int)
191
192    graffas_prcp = np.zeros(shape=(max(coord_x) + 1, max(coord_y) + 1, vdata.shape[1]))
193
194    for t in tqdm(range(graffas_prcp.shape[2])):
195        for j in range(len(coord_x)):
196            c_x = coord_x[j]
197            c_y = coord_y[j]
198            graffas_prcp[c_x, c_y, t] = vdata[j, t]
199
200    return graffas_prcp
def array_isin(arr1: numpy.ndarray = None, arr2: numpy.ndarray = None):
203def array_isin(arr1: np.ndarray = None, arr2: np.ndarray = None):
204
205    pos = []
206
207    for i in range(len(arr2)):
208
209        if not np.any(np.isin(arr1, arr2[i]) == 1):
210            print(f"</> Outlet name `{arr2[i]}` does not exist in the mesh.")
211
212    if arr1 is not None and arr2 is not None:
213        pos = list(np.where(np.isin(arr1, arr2))[0])
214
215    if len(pos) == 0:
216        raise ValueError("Invalid outlets name.")
217
218    return pos
def check_asset_path(asset_dir: str = '', path: None | os.PathLike = None):
221def check_asset_path(asset_dir: str = "", path: None | os.PathLike = None):
222
223    if path is None:
224        return path
225
226    if not os.path.exists(path):
227
228        mypath = os.path.join(asset_dir)
229        matched_file = sorted(glob.glob(f"{mypath}/*{path}*"))
230
231        if len(matched_file) == 0:
232            raise ValueError(
233                f"'{path}' is not a valid {asset_dir} filename."
234                f"Choice are: {os.listdir(mypath)}"
235            )
236        else:
237            path = matched_file[0]
238
239    return path
def time_resample_prcp_array(*args, **kwargs):
 95    def wrapper(*args, **kwargs):
 96
 97        bound = sig.bind(*args, **kwargs)
 98        bound.apply_defaults()
 99
100        for name, value in bound.arguments.items():
101            if name in annotations:
102
103                target_type = annotations[name]
104
105                args_ = get_args(target_type)
106
107                if target_type is None and len(args_) == 0:
108                    args_ = (type(None),)
109                    target_type = type(None)
110
111                if not type(value) in args_:
112
113                    if len(args_) > 1 and type(None) in args_:
114
115                        converted = False
116                        for t in args_:
117
118                            if t is not type(None):
119
120                                if value is not None:
121                                    try:
122                                        print(
123                                            f"</> Warning: Arg '{name}' of type {type(value)} is being"
124                                            f" converted to {t}"
125                                        )
126                                        bound.arguments[name] = t(value)
127                                        converted = True
128                                    except:
129                                        pass
130
131                                if converted:
132                                    break
133
134                        if not converted:
135                            raise TypeError(
136                                f"</> Error: Arg '{name}' must be a type of "
137                                f" {args_}, got {value}"
138                                f" ({type(value).__name__})"
139                            )
140
141                    else:
142                        if not isinstance(value, target_type):
143                            try:
144                                print(
145                                    f"</> Warning: Arg '{name}' of type {type(value)} is being"
146                                    f" converted to {target_type}"
147                                )
148                                bound.arguments[name] = target_type(value)
149                            except Exception:
150                                raise TypeError(
151                                    f"</> Error: Arg '{name}' must be a type of "
152                                    f" {target_type.__name__}, got {value}"
153                                    f" ({type(value).__name__})"
154                                )
155
156        return func(*bound.args, **bound.kwargs)

Resample an array of prcp for a given time-step.

Parameters
  • array: the matrix containing the discharge with shape (nbx, nby, nbts)
  • quantile_duration: The duration of the quantile (hours), defaults to 1
  • model_time_step: the time-step of the Smash model (seconds), defaults to 3600
  • t_axis: The array axis direction of the time-step, defaults to 2
Returns

The resampled array

def read_hourly_qobs(*args, **kwargs):
 95    def wrapper(*args, **kwargs):
 96
 97        bound = sig.bind(*args, **kwargs)
 98        bound.apply_defaults()
 99
100        for name, value in bound.arguments.items():
101            if name in annotations:
102
103                target_type = annotations[name]
104
105                args_ = get_args(target_type)
106
107                if target_type is None and len(args_) == 0:
108                    args_ = (type(None),)
109                    target_type = type(None)
110
111                if not type(value) in args_:
112
113                    if len(args_) > 1 and type(None) in args_:
114
115                        converted = False
116                        for t in args_:
117
118                            if t is not type(None):
119
120                                if value is not None:
121                                    try:
122                                        print(
123                                            f"</> Warning: Arg '{name}' of type {type(value)} is being"
124                                            f" converted to {t}"
125                                        )
126                                        bound.arguments[name] = t(value)
127                                        converted = True
128                                    except:
129                                        pass
130
131                                if converted:
132                                    break
133
134                        if not converted:
135                            raise TypeError(
136                                f"</> Error: Arg '{name}' must be a type of "
137                                f" {args_}, got {value}"
138                                f" ({type(value).__name__})"
139                            )
140
141                    else:
142                        if not isinstance(value, target_type):
143                            try:
144                                print(
145                                    f"</> Warning: Arg '{name}' of type {type(value)} is being"
146                                    f" converted to {target_type}"
147                                )
148                                bound.arguments[name] = target_type(value)
149                            except Exception:
150                                raise TypeError(
151                                    f"</> Error: Arg '{name}' must be a type of "
152                                    f" {target_type.__name__}, got {value}"
153                                    f" ({type(value).__name__})"
154                                )
155
156        return func(*bound.args, **bound.kwargs)
def read_object_as_dict(*args, **kwargs):
 95    def wrapper(*args, **kwargs):
 96
 97        bound = sig.bind(*args, **kwargs)
 98        bound.apply_defaults()
 99
100        for name, value in bound.arguments.items():
101            if name in annotations:
102
103                target_type = annotations[name]
104
105                args_ = get_args(target_type)
106
107                if target_type is None and len(args_) == 0:
108                    args_ = (type(None),)
109                    target_type = type(None)
110
111                if not type(value) in args_:
112
113                    if len(args_) > 1 and type(None) in args_:
114
115                        converted = False
116                        for t in args_:
117
118                            if t is not type(None):
119
120                                if value is not None:
121                                    try:
122                                        print(
123                                            f"</> Warning: Arg '{name}' of type {type(value)} is being"
124                                            f" converted to {t}"
125                                        )
126                                        bound.arguments[name] = t(value)
127                                        converted = True
128                                    except:
129                                        pass
130
131                                if converted:
132                                    break
133
134                        if not converted:
135                            raise TypeError(
136                                f"</> Error: Arg '{name}' must be a type of "
137                                f" {args_}, got {value}"
138                                f" ({type(value).__name__})"
139                            )
140
141                    else:
142                        if not isinstance(value, target_type):
143                            try:
144                                print(
145                                    f"</> Warning: Arg '{name}' of type {type(value)} is being"
146                                    f" converted to {target_type}"
147                                )
148                                bound.arguments[name] = target_type(value)
149                            except Exception:
150                                raise TypeError(
151                                    f"</> Error: Arg '{name}' must be a type of "
152                                    f" {target_type.__name__}, got {value}"
153                                    f" ({type(value).__name__})"
154                                )
155
156        return func(*bound.args, **bound.kwargs)

create a dictionary from a custom python object

Parameters

instance : object an custom python object

Return

key_data: dict an dictionary containing all keys and atributes of the object