smashbox.tools.tools
Created on Tue Jul 15 16:40:32 2025
@author: maxime
1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3""" 4Created on Tue Jul 15 16:40:32 2025 5 6@author: maxime 7""" 8 9import os 10import glob 11import numpy as np 12import pandas as pd 13import numbers 14from tqdm import tqdm 15 16from rich import print 17from rich.tree import Tree 18import inspect 19 20from typing import get_args 21 22 23class Dict2Struct: 24 def __init__(self, **entries): 25 self.__dict__.update(entries) 26 27 28def with_reticulate(): 29 # Exemple : RETICULATE_PYTHON env variable peut être définie 30 return "R_SESSION_INITIALIZED" in os.environ or "RETICULATE_PYTHON" in os.environ 31 32 33def build_object_tree(obj, name="root"): 34 35 tree = Tree(f"[bold cyan]Object {name}[/] ({type(obj).__name__})") 36 37 for attr in dir(obj): 38 39 if attr.startswith("_") or attr.startswith("__") and attr.endswith("__"): 40 continue 41 try: 42 val = getattr(obj, attr) 43 44 except Exception: 45 continue 46 47 if inspect.ismethod(val) or inspect.isfunction(val): 48 tree.add(f"[green]Method:[/] {attr}()") 49 50 elif inspect.isclass(val): 51 tree.add(f"[blue]Class:[/] {attr}") 52 subtree = build_object_tree(val, name=attr) 53 54 elif isinstance(val, dict): 55 tree.add(f"[red]Attribut:[/] {attr} = {val.keys()}") 56 elif isinstance( 57 val, 58 ( 59 int, 60 float, 61 str, 62 bool, 63 type(None), 64 list, 65 set, 66 tuple, 67 ), 68 ): 69 tree.add(f"[red]Attribute:[/] {attr} = {repr(val)}") 70 elif isinstance(val, (pd.DataFrame, np.ndarray)): 71 tree.add(f"[red]Attribute:[/] {attr} = {type(val)}") 72 elif "<class 'smashbox" in str(type(val)): 73 subtree = build_object_tree(val, name=attr) 74 tree.add(subtree) 75 else: 76 tree.add(f"[red]Unknown attribute type:[/] {attr} = {type(val)}") 77 78 return tree 79 80 81def autocast_args(func): 82 """ 83 Decrorator function. Usgae @autocast_args previous the function definition 84 The goal of this decorator is to test input args type and auto cast them if possible. 85 If the type of the arg is not good and cannot be auto-casted, the function will throw 86 an execption. 87 :param func: Function on which to apply the decorator 88 :type func: Any function 89 """ 90 91 sig = inspect.signature(func) 92 annotations = func.__annotations__ 93 94 def wrapper(*args, **kwargs): 95 96 bound = sig.bind(*args, **kwargs) 97 bound.apply_defaults() 98 99 for name, value in bound.arguments.items(): 100 if name in annotations: 101 102 target_type = annotations[name] 103 104 args_ = get_args(target_type) 105 106 if target_type is None and len(args_) == 0: 107 args_ = (type(None),) 108 target_type = type(None) 109 110 if not type(value) in args_: 111 112 if len(args_) > 1 and type(None) in args_: 113 114 converted = False 115 for t in args_: 116 117 if t is not type(None): 118 119 if value is not None: 120 try: 121 print( 122 f"</> Warning: Arg '{name}' of type {type(value)} is being" 123 f" converted to {t}" 124 ) 125 bound.arguments[name] = t(value) 126 converted = True 127 except: 128 pass 129 130 if converted: 131 break 132 133 if not converted: 134 raise TypeError( 135 f"</> Error: Arg '{name}' must be a type of " 136 f" {args_}, got {value}" 137 f" ({type(value).__name__})" 138 ) 139 140 else: 141 if not isinstance(value, target_type): 142 try: 143 print( 144 f"</> Warning: Arg '{name}' of type {type(value)} is being" 145 f" converted to {target_type}" 146 ) 147 bound.arguments[name] = target_type(value) 148 except Exception: 149 raise TypeError( 150 f"</> Error: Arg '{name}' must be a type of " 151 f" {target_type.__name__}, got {value}" 152 f" ({type(value).__name__})" 153 ) 154 155 return func(*bound.args, **bound.kwargs) 156 157 return wrapper 158 159 160def infograffas_bbox_extractor(info_graffas): 161 """ 162 A function to extract the bbox from info_graffas object and adapt it to the Smash 163 convention 164 165 Parameter: 166 ---------- 167 info_graffas: Dictionary containing information about the Graffas domain. At least: a dictionary `domain` containing keys 'left,bottom,right,top' associated to each 168 bouning coordinates and a key 'resolution_sim' containing a real with the resolution of the domain in meter. 169 170 Return 171 ------ 172 173 bbox, a dictionary of the bbox adapted to Smash. 174 """ 175 176 bbox = { 177 "left": info_graffas["domain"]["left"] - info_graffas["resolution_sim"] / 2, 178 "bottom": info_graffas["domain"]["bottom"] - info_graffas["resolution_sim"] / 2, 179 "right": info_graffas["domain"]["right"] + info_graffas["resolution_sim"] / 2, 180 "top": info_graffas["domain"]["top"] + info_graffas["resolution_sim"] / 2, 181 } 182 183 return bbox 184 185 186def GraffasVector2SmashArray(vdata, coordinates, resolution): 187 188 coord_x = ((coordinates["X"] - min(coordinates["X"])) / resolution).astype(int) 189 coord_y = ((coordinates["Y"] - min(coordinates["Y"])) / resolution).astype(int) 190 191 graffas_prcp = np.zeros(shape=(max(coord_x) + 1, max(coord_y) + 1, vdata.shape[1])) 192 193 for t in tqdm(range(graffas_prcp.shape[2])): 194 for j in range(len(coord_x)): 195 c_x = coord_x[j] 196 c_y = coord_y[j] 197 graffas_prcp[c_x, c_y, t] = vdata[j, t] 198 199 return graffas_prcp 200 201 202def array_isin(arr1: np.ndarray = None, arr2: np.ndarray = None): 203 204 pos = [] 205 206 for i in range(len(arr2)): 207 208 if not np.any(np.isin(arr1, arr2[i]) == 1): 209 print(f"</> Outlet name `{arr2[i]}` does not exist in the mesh.") 210 211 if arr1 is not None and arr2 is not None: 212 pos = list(np.where(np.isin(arr1, arr2))[0]) 213 214 if len(pos) == 0: 215 raise ValueError("Invalid outlets name.") 216 217 return pos 218 219 220def check_asset_path(asset_dir: str = "", path: None | os.PathLike = None): 221 222 if path is None: 223 return path 224 225 if not os.path.exists(path): 226 227 mypath = os.path.join(asset_dir) 228 matched_file = sorted(glob.glob(f"{mypath}/*{path}*")) 229 230 if len(matched_file) == 0: 231 raise ValueError( 232 f"'{path}' is not a valid {asset_dir} filename." 233 f"Choice are: {os.listdir(mypath)}" 234 ) 235 else: 236 path = matched_file[0] 237 238 return path 239 240 241def print_tree(root_path, prefix=""): 242 243 files = sorted(os.listdir(root_path)) 244 245 for index, name in enumerate(files): 246 247 path = os.path.join(root_path, name) 248 connector = "└── " if index == len(files) - 1 else "├── " 249 250 print(prefix + connector + name) 251 252 if os.path.isdir(path): 253 extension = " " if index == len(files) - 1 else "│ " 254 print_tree(path, prefix + extension) 255 256 257@autocast_args 258def time_resample_prcp_array( 259 array: np.ndarray | None, 260 input_dt: int | float = 3600.0, 261 output_dt: float = 3600.0, 262 t_axis: int = 2, 263): 264 """ 265 Resample an array of prcp for a given time-step. 266 :param array: the matrix containing the discharge with shape (nbx, nby, nbts) 267 :type array: np.ndarray | None 268 :param quantile_duration: The duration of the quantile (hours), defaults to 1 269 :type quantile_duration: int | float, optional 270 :param model_time_step: the time-step of the Smash model (seconds), defaults to 3600 271 :type model_time_step: float, optional 272 :param t_axis: The array axis direction of the time-step, defaults to 2 273 :type t_axis: int, optional 274 :return: The resampled array 275 :rtype: np.ndarray 276 277 """ 278 279 # resample array to the new duration 280 if pd.Timedelta(seconds=input_dt) < pd.Timedelta(seconds=output_dt): 281 282 if not (output_dt % input_dt == 0): 283 raise ValueError("output_dt must be a factor of intput_dt.") 284 285 print( 286 f"</> Resampling array with time-step `{input_dt}s`" 287 f" to time-step `{output_dt}s`" 288 ) 289 chunk_size = int(output_dt / input_dt) 290 291 array_trans = np.moveaxis(array, t_axis, 0) # Axe à la position 0 292 293 new_shape = (array_trans.shape[0] // chunk_size, chunk_size) + array_trans.shape[ 294 1: 295 ] 296 297 array_trans_reshaped = array_trans[ 298 0 : chunk_size * (array_trans.shape[0] // chunk_size) 299 ].reshape(new_shape) 300 301 del array_trans 302 303 array_trans_reshaped_sum = np.sum(array_trans_reshaped, axis=1) 304 305 del array_trans_reshaped 306 307 array = np.moveaxis(array_trans_reshaped_sum, 0, t_axis) 308 309 del array_trans_reshaped_mean 310 311 if pd.Timedelta(seconds=input_dt) > pd.Timedelta(seconds=output_dt): 312 313 if not (input_dt % output_dt == 0): 314 raise ValueError("output_dt must be a factor of intput_dt.") 315 316 # array = np.zeros(shape=(2, 2, 6)) + np.random.randint(10, size=(2, 2, 6)) 317 318 array = np.moveaxis( 319 array, 320 t_axis, 321 len(array.shape) - 1, 322 ) 323 324 chunk_size = int(input_dt / output_dt) 325 # nb_chunk = int(array.shape[len(array.shape) - 1] / chunk_size) 326 327 array = np.repeat(array, chunk_size, axis=2) / chunk_size 328 329 # array = array.reshape(array.shape[0], array.shape[1], nb_chunk, chunk_size).sum( 330 # -1 331 # ) 332 333 array = np.moveaxis(array, len(array.shape) - 1, t_axis) 334 335 return array 336 337 338@autocast_args 339def read_hourly_qobs(qobs_directory, code): 340 start_time = pd.Timestamp("1900-01-01 00:00") 341 end_time = pd.Timestamp.today().round(freq="24h") 342 dt = 3600.0 343 344 npdt = int((end_time - start_time).total_seconds() / dt) + 1 345 346 qobs = np.zeros(shape=(len(code), npdt)) - 99.0 347 348 miss = [] 349 350 for i, c in enumerate(code): 351 f = glob.glob(f"{qobs_directory}/**/*{c}*.csv", recursive=True) 352 353 if f: 354 dat = pd.read_csv(f[0]) 355 try: 356 file_start_time = pd.Timestamp(dat.columns[0]) 357 except Exception: 358 raise ValueError( 359 f"Column header '{dat.columns[0]}' in the observed discharge file for catchment '{c}' " 360 f"is not a valid date" 361 ) from None 362 363 file_end_time = file_start_time + pd.Timedelta(seconds=dt * (len(dat) - 1)) 364 start_diff = int((start_time - file_start_time).total_seconds() / dt) + 1 365 end_diff = int((end_time - file_start_time).total_seconds() / dt) + 1 366 367 # % Check if observed discharge file contains data for corresponding simulation period 368 if start_diff > dat.index.max() or end_diff < 0: 369 print( 370 f"</> The provided observed discharge file for catchment '{c}' does not contain data for the " 371 f"selected simulation period ['{start_time}', '{end_time}']. The file covers the period " 372 f"['{file_start_time}', '{file_end_time}']", 373 ) 374 else: 375 ind_start_dat = max(0, start_diff) 376 ind_end_dat = min(dat.index.max(), end_diff) 377 ind_start_arr = max(0, -start_diff) 378 ind_end_arr = ind_start_arr + ind_end_dat - ind_start_dat 379 380 qobs[i, ind_start_arr:ind_end_arr] = dat.iloc[ 381 ind_start_dat:ind_end_dat, 0 382 ] 383 else: 384 miss.append(c) 385 386 if miss: 387 print(f"</> Missing {len(miss)} observed discharge file(s): {miss}") 388 389 return qobs 390 391 392@autocast_args 393def read_object_as_dict(instance, recursion_counter: int = 0): 394 """ 395 396 create a dictionary from a custom python object 397 398 Parameters 399 ---------- 400 401 instance : object 402 an custom python object 403 404 Return 405 ------ 406 407 key_data: dict 408 an dictionary containing all keys and atributes of the object 409 410 """ 411 key_data = {} 412 # key_list = list() 413 # return_list = False 414 recursion_counter = 0 415 for attr in dir(instance): 416 # print(attr) 417 if not attr.startswith("_") and not attr in ["from_handle", "copy"]: 418 try: 419 value = getattr(instance, attr) 420 421 if isinstance(value, (np.ndarray, list, tuple)): 422 423 if isinstance(value, list): 424 value = np.array(value).astype("U") 425 426 if value.dtype == "object" or value.dtype.char == "U": 427 value = value.astype("U") 428 429 key_data.update({attr: value}) 430 431 elif isinstance(value, dict): 432 key_data.update({attr: value}) 433 434 elif isinstance(value, numbers.Number): 435 key_data.update({attr: value}) 436 437 elif isinstance(value, str): 438 key_data.update({attr: value}) 439 440 elif type(value) == "method": 441 next(attr) 442 443 else: 444 445 depp_key_data = read_object_as_dict( 446 value, recursion_counter=recursion_counter 447 ) 448 449 recursion_counter = recursion_counter + 1 450 451 if len(depp_key_data) > 0: 452 key_data.update({attr: depp_key_data}) 453 454 if recursion_counter > 100: 455 print("recursion counter exceed the limit of 100... return") 456 return 457 except: 458 print(f"unknown type for attribute : {attr}") 459 pass 460 461 return key_data
34def build_object_tree(obj, name="root"): 35 36 tree = Tree(f"[bold cyan]Object {name}[/] ({type(obj).__name__})") 37 38 for attr in dir(obj): 39 40 if attr.startswith("_") or attr.startswith("__") and attr.endswith("__"): 41 continue 42 try: 43 val = getattr(obj, attr) 44 45 except Exception: 46 continue 47 48 if inspect.ismethod(val) or inspect.isfunction(val): 49 tree.add(f"[green]Method:[/] {attr}()") 50 51 elif inspect.isclass(val): 52 tree.add(f"[blue]Class:[/] {attr}") 53 subtree = build_object_tree(val, name=attr) 54 55 elif isinstance(val, dict): 56 tree.add(f"[red]Attribut:[/] {attr} = {val.keys()}") 57 elif isinstance( 58 val, 59 ( 60 int, 61 float, 62 str, 63 bool, 64 type(None), 65 list, 66 set, 67 tuple, 68 ), 69 ): 70 tree.add(f"[red]Attribute:[/] {attr} = {repr(val)}") 71 elif isinstance(val, (pd.DataFrame, np.ndarray)): 72 tree.add(f"[red]Attribute:[/] {attr} = {type(val)}") 73 elif "<class 'smashbox" in str(type(val)): 74 subtree = build_object_tree(val, name=attr) 75 tree.add(subtree) 76 else: 77 tree.add(f"[red]Unknown attribute type:[/] {attr} = {type(val)}") 78 79 return tree
82def autocast_args(func): 83 """ 84 Decrorator function. Usgae @autocast_args previous the function definition 85 The goal of this decorator is to test input args type and auto cast them if possible. 86 If the type of the arg is not good and cannot be auto-casted, the function will throw 87 an execption. 88 :param func: Function on which to apply the decorator 89 :type func: Any function 90 """ 91 92 sig = inspect.signature(func) 93 annotations = func.__annotations__ 94 95 def wrapper(*args, **kwargs): 96 97 bound = sig.bind(*args, **kwargs) 98 bound.apply_defaults() 99 100 for name, value in bound.arguments.items(): 101 if name in annotations: 102 103 target_type = annotations[name] 104 105 args_ = get_args(target_type) 106 107 if target_type is None and len(args_) == 0: 108 args_ = (type(None),) 109 target_type = type(None) 110 111 if not type(value) in args_: 112 113 if len(args_) > 1 and type(None) in args_: 114 115 converted = False 116 for t in args_: 117 118 if t is not type(None): 119 120 if value is not None: 121 try: 122 print( 123 f"</> Warning: Arg '{name}' of type {type(value)} is being" 124 f" converted to {t}" 125 ) 126 bound.arguments[name] = t(value) 127 converted = True 128 except: 129 pass 130 131 if converted: 132 break 133 134 if not converted: 135 raise TypeError( 136 f"</> Error: Arg '{name}' must be a type of " 137 f" {args_}, got {value}" 138 f" ({type(value).__name__})" 139 ) 140 141 else: 142 if not isinstance(value, target_type): 143 try: 144 print( 145 f"</> Warning: Arg '{name}' of type {type(value)} is being" 146 f" converted to {target_type}" 147 ) 148 bound.arguments[name] = target_type(value) 149 except Exception: 150 raise TypeError( 151 f"</> Error: Arg '{name}' must be a type of " 152 f" {target_type.__name__}, got {value}" 153 f" ({type(value).__name__})" 154 ) 155 156 return func(*bound.args, **bound.kwargs) 157 158 return wrapper
Decrorator function. Usgae @autocast_args previous the function definition The goal of this decorator is to test input args type and auto cast them if possible. If the type of the arg is not good and cannot be auto-casted, the function will throw an execption.
Parameters
- func: Function on which to apply the decorator
161def infograffas_bbox_extractor(info_graffas): 162 """ 163 A function to extract the bbox from info_graffas object and adapt it to the Smash 164 convention 165 166 Parameter: 167 ---------- 168 info_graffas: Dictionary containing information about the Graffas domain. At least: a dictionary `domain` containing keys 'left,bottom,right,top' associated to each 169 bouning coordinates and a key 'resolution_sim' containing a real with the resolution of the domain in meter. 170 171 Return 172 ------ 173 174 bbox, a dictionary of the bbox adapted to Smash. 175 """ 176 177 bbox = { 178 "left": info_graffas["domain"]["left"] - info_graffas["resolution_sim"] / 2, 179 "bottom": info_graffas["domain"]["bottom"] - info_graffas["resolution_sim"] / 2, 180 "right": info_graffas["domain"]["right"] + info_graffas["resolution_sim"] / 2, 181 "top": info_graffas["domain"]["top"] + info_graffas["resolution_sim"] / 2, 182 } 183 184 return bbox
A function to extract the bbox from info_graffas object and adapt it to the Smash convention
Parameter:
info_graffas: Dictionary containing information about the Graffas domain. At least: a dictionary domain containing keys 'left,bottom,right,top' associated to each
bouning coordinates and a key 'resolution_sim' containing a real with the resolution of the domain in meter.
Return
bbox, a dictionary of the bbox adapted to Smash.
187def GraffasVector2SmashArray(vdata, coordinates, resolution): 188 189 coord_x = ((coordinates["X"] - min(coordinates["X"])) / resolution).astype(int) 190 coord_y = ((coordinates["Y"] - min(coordinates["Y"])) / resolution).astype(int) 191 192 graffas_prcp = np.zeros(shape=(max(coord_x) + 1, max(coord_y) + 1, vdata.shape[1])) 193 194 for t in tqdm(range(graffas_prcp.shape[2])): 195 for j in range(len(coord_x)): 196 c_x = coord_x[j] 197 c_y = coord_y[j] 198 graffas_prcp[c_x, c_y, t] = vdata[j, t] 199 200 return graffas_prcp
203def array_isin(arr1: np.ndarray = None, arr2: np.ndarray = None): 204 205 pos = [] 206 207 for i in range(len(arr2)): 208 209 if not np.any(np.isin(arr1, arr2[i]) == 1): 210 print(f"</> Outlet name `{arr2[i]}` does not exist in the mesh.") 211 212 if arr1 is not None and arr2 is not None: 213 pos = list(np.where(np.isin(arr1, arr2))[0]) 214 215 if len(pos) == 0: 216 raise ValueError("Invalid outlets name.") 217 218 return pos
221def check_asset_path(asset_dir: str = "", path: None | os.PathLike = None): 222 223 if path is None: 224 return path 225 226 if not os.path.exists(path): 227 228 mypath = os.path.join(asset_dir) 229 matched_file = sorted(glob.glob(f"{mypath}/*{path}*")) 230 231 if len(matched_file) == 0: 232 raise ValueError( 233 f"'{path}' is not a valid {asset_dir} filename." 234 f"Choice are: {os.listdir(mypath)}" 235 ) 236 else: 237 path = matched_file[0] 238 239 return path
242def print_tree(root_path, prefix=""): 243 244 files = sorted(os.listdir(root_path)) 245 246 for index, name in enumerate(files): 247 248 path = os.path.join(root_path, name) 249 connector = "└── " if index == len(files) - 1 else "├── " 250 251 print(prefix + connector + name) 252 253 if os.path.isdir(path): 254 extension = " " if index == len(files) - 1 else "│ " 255 print_tree(path, prefix + extension)
95 def wrapper(*args, **kwargs): 96 97 bound = sig.bind(*args, **kwargs) 98 bound.apply_defaults() 99 100 for name, value in bound.arguments.items(): 101 if name in annotations: 102 103 target_type = annotations[name] 104 105 args_ = get_args(target_type) 106 107 if target_type is None and len(args_) == 0: 108 args_ = (type(None),) 109 target_type = type(None) 110 111 if not type(value) in args_: 112 113 if len(args_) > 1 and type(None) in args_: 114 115 converted = False 116 for t in args_: 117 118 if t is not type(None): 119 120 if value is not None: 121 try: 122 print( 123 f"</> Warning: Arg '{name}' of type {type(value)} is being" 124 f" converted to {t}" 125 ) 126 bound.arguments[name] = t(value) 127 converted = True 128 except: 129 pass 130 131 if converted: 132 break 133 134 if not converted: 135 raise TypeError( 136 f"</> Error: Arg '{name}' must be a type of " 137 f" {args_}, got {value}" 138 f" ({type(value).__name__})" 139 ) 140 141 else: 142 if not isinstance(value, target_type): 143 try: 144 print( 145 f"</> Warning: Arg '{name}' of type {type(value)} is being" 146 f" converted to {target_type}" 147 ) 148 bound.arguments[name] = target_type(value) 149 except Exception: 150 raise TypeError( 151 f"</> Error: Arg '{name}' must be a type of " 152 f" {target_type.__name__}, got {value}" 153 f" ({type(value).__name__})" 154 ) 155 156 return func(*bound.args, **bound.kwargs)
Resample an array of prcp for a given time-step.
Parameters
- array: the matrix containing the discharge with shape (nbx, nby, nbts)
- quantile_duration: The duration of the quantile (hours), defaults to 1
- model_time_step: the time-step of the Smash model (seconds), defaults to 3600
- t_axis: The array axis direction of the time-step, defaults to 2
Returns
The resampled array
95 def wrapper(*args, **kwargs): 96 97 bound = sig.bind(*args, **kwargs) 98 bound.apply_defaults() 99 100 for name, value in bound.arguments.items(): 101 if name in annotations: 102 103 target_type = annotations[name] 104 105 args_ = get_args(target_type) 106 107 if target_type is None and len(args_) == 0: 108 args_ = (type(None),) 109 target_type = type(None) 110 111 if not type(value) in args_: 112 113 if len(args_) > 1 and type(None) in args_: 114 115 converted = False 116 for t in args_: 117 118 if t is not type(None): 119 120 if value is not None: 121 try: 122 print( 123 f"</> Warning: Arg '{name}' of type {type(value)} is being" 124 f" converted to {t}" 125 ) 126 bound.arguments[name] = t(value) 127 converted = True 128 except: 129 pass 130 131 if converted: 132 break 133 134 if not converted: 135 raise TypeError( 136 f"</> Error: Arg '{name}' must be a type of " 137 f" {args_}, got {value}" 138 f" ({type(value).__name__})" 139 ) 140 141 else: 142 if not isinstance(value, target_type): 143 try: 144 print( 145 f"</> Warning: Arg '{name}' of type {type(value)} is being" 146 f" converted to {target_type}" 147 ) 148 bound.arguments[name] = target_type(value) 149 except Exception: 150 raise TypeError( 151 f"</> Error: Arg '{name}' must be a type of " 152 f" {target_type.__name__}, got {value}" 153 f" ({type(value).__name__})" 154 ) 155 156 return func(*bound.args, **bound.kwargs)
95 def wrapper(*args, **kwargs): 96 97 bound = sig.bind(*args, **kwargs) 98 bound.apply_defaults() 99 100 for name, value in bound.arguments.items(): 101 if name in annotations: 102 103 target_type = annotations[name] 104 105 args_ = get_args(target_type) 106 107 if target_type is None and len(args_) == 0: 108 args_ = (type(None),) 109 target_type = type(None) 110 111 if not type(value) in args_: 112 113 if len(args_) > 1 and type(None) in args_: 114 115 converted = False 116 for t in args_: 117 118 if t is not type(None): 119 120 if value is not None: 121 try: 122 print( 123 f"</> Warning: Arg '{name}' of type {type(value)} is being" 124 f" converted to {t}" 125 ) 126 bound.arguments[name] = t(value) 127 converted = True 128 except: 129 pass 130 131 if converted: 132 break 133 134 if not converted: 135 raise TypeError( 136 f"</> Error: Arg '{name}' must be a type of " 137 f" {args_}, got {value}" 138 f" ({type(value).__name__})" 139 ) 140 141 else: 142 if not isinstance(value, target_type): 143 try: 144 print( 145 f"</> Warning: Arg '{name}' of type {type(value)} is being" 146 f" converted to {target_type}" 147 ) 148 bound.arguments[name] = target_type(value) 149 except Exception: 150 raise TypeError( 151 f"</> Error: Arg '{name}' must be a type of " 152 f" {target_type.__name__}, got {value}" 153 f" ({type(value).__name__})" 154 ) 155 156 return func(*bound.args, **bound.kwargs)
create a dictionary from a custom python object
Parameters
instance : object an custom python object
Return
key_data: dict an dictionary containing all keys and atributes of the object