diff --git a/pyotb/__init__.py b/pyotb/__init__.py index d4efd392e4d90d39ac2108f9202f917a21b63f66..c9fe2585fe43c310a9e19b094925a239f206a6dd 100644 --- a/pyotb/__init__.py +++ b/pyotb/__init__.py @@ -4,19 +4,12 @@ __version__ = "2.0.0" from .helpers import logger, set_logger_level from .apps import * -from .core import ( - App, - Input, - Output, - get_nbchannels, - get_pixel_type, - summarize -) +from .core import App, Input, Output, get_nbchannels, get_pixel_type, summarize from .functions import ( # pylint: disable=redefined-builtin all, any, - where, clip, + define_processing_area, run_tf_function, - define_processing_area + where, ) diff --git a/pyotb/apps.py b/pyotb/apps.py index bad5089cba57143f1cf36c3466478641cb8017c6..6f54b27a59fd97203319ce68f1e7a680b73c4b62 100644 --- a/pyotb/apps.py +++ b/pyotb/apps.py @@ -1,12 +1,14 @@ # -*- coding: utf-8 -*- """Search for OTB (set env if necessary), subclass core.App for each available application.""" from __future__ import annotations + import os -import sys import subprocess +import sys from pathlib import Path import otbApplication as otb # pylint: disable=import-error + from .core import App from .helpers import logger @@ -31,36 +33,48 @@ def get_available_applications(as_subprocess: bool = False) -> list[str]: if "PYTHONPATH" not in env: env["PYTHONPATH"] = "" env["PYTHONPATH"] += ":" + str(Path(otb.__file__).parent) - env["OTB_LOGGER_LEVEL"] = "CRITICAL" # in order to suppress warnings while listing applications + env[ + "OTB_LOGGER_LEVEL" + ] = "CRITICAL" # in order to suppress warnings while listing applications pycmd = "import otbApplication; print(otbApplication.Registry.GetAvailableApplications())" cmd_args = [sys.executable, "-c", pycmd] try: params = {"env": env, "stdout": subprocess.PIPE, "stderr": subprocess.PIPE} with subprocess.Popen(cmd_args, **params) as process: - logger.debug('Exec "%s \'%s\'"', ' '.join(cmd_args[:-1]), pycmd) + logger.debug("Exec \"%s '%s'\"", " ".join(cmd_args[:-1]), pycmd) stdout, stderr = process.communicate() stdout, stderr = stdout.decode(), stderr.decode() # ast.literal_eval is secure and will raise more handy Exceptions than eval from ast import literal_eval # pylint: disable=import-outside-toplevel + app_list = literal_eval(stdout.strip()) assert isinstance(app_list, (tuple, list)) except subprocess.SubprocessError: logger.debug("Failed to call subprocess") except (ValueError, SyntaxError, AssertionError): - logger.debug("Failed to decode output or convert to tuple:\nstdout=%s\nstderr=%s", stdout, stderr) + logger.debug( + "Failed to decode output or convert to tuple:\nstdout=%s\nstderr=%s", + stdout, + stderr, + ) if not app_list: - logger.debug("Failed to list applications in an independent process. Falling back to local python import") + logger.debug( + "Failed to list applications in an independent process. Falling back to local python import" + ) # Find applications using the normal way if not app_list: app_list = otb.Registry.GetAvailableApplications() if not app_list: - raise SystemExit("Unable to load applications. Set env variable OTB_APPLICATION_PATH and try again.") + raise SystemExit( + "Unable to load applications. Set env variable OTB_APPLICATION_PATH and try again." + ) logger.info("Successfully loaded %s OTB applications", len(app_list)) return app_list class OTBTFApp(App): """Helper for OTBTF.""" + @staticmethod def set_nb_sources(*args, n_sources: int = None): """Set the number of sources of TensorflowModelServe. Can be either user-defined or deduced from the args. @@ -72,13 +86,17 @@ class OTBTFApp(App): """ if n_sources: - os.environ['OTB_TF_NSOURCES'] = str(int(n_sources)) + os.environ["OTB_TF_NSOURCES"] = str(int(n_sources)) else: # Retrieving the number of `source#.il` parameters - params_dic = {k: v for arg in args if isinstance(arg, dict) for k, v in arg.items()} - n_sources = len([k for k in params_dic if 'source' in k and k.endswith('.il')]) + params_dic = { + k: v for arg in args if isinstance(arg, dict) for k, v in arg.items() + } + n_sources = len( + [k for k in params_dic if "source" in k and k.endswith(".il")] + ) if n_sources >= 1: - os.environ['OTB_TF_NSOURCES'] = str(n_sources) + os.environ["OTB_TF_NSOURCES"] = str(n_sources) def __init__(self, name: str, *args, n_sources: int = None, **kwargs): """Constructor for an OTBTFApp object. @@ -98,17 +116,22 @@ class OTBTFApp(App): AVAILABLE_APPLICATIONS = get_available_applications(as_subprocess=True) # This is to enable aliases of Apps, i.e. using apps like `pyotb.AppName(...)` instead of `pyotb.App("AppName", ...)` -_CODE_TEMPLATE = """ +_CODE_TEMPLATE = ( + """ class {name}(App): - """ """ + """ + """ def __init__(self, *args, **kwargs): super().__init__('{name}', *args, **kwargs) """ +) for _app in AVAILABLE_APPLICATIONS: # Customize the behavior for some OTBTF applications. `OTB_TF_NSOURCES` is now handled by pyotb if _app in ("PatchesExtraction", "TensorflowModelTrain", "TensorflowModelServe"): - exec(_CODE_TEMPLATE.format(name=_app).replace("(App)", "(OTBTFApp)")) # pylint: disable=exec-used + exec( # pylint: disable=exec-used + _CODE_TEMPLATE.format(name=_app).replace("(App)", "(OTBTFApp)") + ) # Default behavior for any OTB application else: exec(_CODE_TEMPLATE.format(name=_app)) # pylint: disable=exec-used diff --git a/pyotb/core.py b/pyotb/core.py index 903f71fb4303cbd6d91e48b1fc511f12c3877289..dfe2598067fd6ba93050ca7d3555674a25d6de3a 100644 --- a/pyotb/core.py +++ b/pyotb/core.py @@ -2,11 +2,11 @@ """This module is the core of pyotb.""" from __future__ import annotations +from abc import ABC, abstractmethod from ast import literal_eval from pathlib import Path from time import perf_counter from typing import Any -from abc import ABC, abstractmethod import numpy as np import otbApplication as otb # pylint: disable=import-error @@ -16,6 +16,7 @@ from .helpers import logger class OTBObject(ABC): """Abstraction of an image object.""" + @property @abstractmethod def name(self) -> str: @@ -85,7 +86,9 @@ class OTBObject(ABC): """Return a dict output of ComputeImagesStatistics for the first image output.""" return App("ComputeImagesStatistics", self, quiet=True).data - def get_values_at_coords(self, row: int, col: int, bands: int = None) -> list[int | float] | int | float: + def get_values_at_coords( + self, row: int, col: int, bands: int = None + ) -> list[int | float] | int | float: """Get pixel value(s) at a given YX coordinates. Args: @@ -107,7 +110,9 @@ class OTBObject(ABC): elif isinstance(bands, slice): channels = self.channels_list_from_slice(bands) elif not isinstance(bands, list): - raise TypeError(f"{self.name}: type '{type(bands)}' cannot be interpreted as a valid slicing") + raise TypeError( + f"{self.name}: type '{type(bands)}' cannot be interpreted as a valid slicing" + ) if channels: app.app.Execute() app.set_parameters({"cl": [f"Channel{n + 1}" for n in channels]}) @@ -130,9 +135,13 @@ class OTBObject(ABC): return list(range(0, stop, step)) if start is None and stop is None: return list(range(0, nb_channels, step)) - raise ValueError(f"{self.name}: '{bands}' cannot be interpreted as valid slicing.") + raise ValueError( + f"{self.name}: '{bands}' cannot be interpreted as valid slicing." + ) - def export(self, key: str = None, preserve_dtype: bool = True) -> dict[str, dict[str, np.ndarray]]: + def export( + self, key: str = None, preserve_dtype: bool = True + ) -> dict[str, dict[str, np.ndarray]]: """Export a specific output image as numpy array and store it in object exports_dic. Args: @@ -149,10 +158,14 @@ class OTBObject(ABC): if key not in self.exports_dic: self.exports_dic[key] = self.app.ExportImage(key) if preserve_dtype: - self.exports_dic[key]["array"] = self.exports_dic[key]["array"].astype(self.dtype) + self.exports_dic[key]["array"] = self.exports_dic[key]["array"].astype( + self.dtype + ) return self.exports_dic[key] - def to_numpy(self, key: str = None, preserve_dtype: bool = True, copy: bool = False) -> np.ndarray: + def to_numpy( + self, key: str = None, preserve_dtype: bool = True, copy: bool = False + ) -> np.ndarray: """Export a pyotb object to numpy array. Args: @@ -370,12 +383,18 @@ class OTBObject(ABC): row, col = key[0], key[1] if isinstance(row, int) and isinstance(col, int): if row < 0 or col < 0: - raise ValueError(f"{self.name} cannot read pixel value at negative coordinates ({row}, {col})") + raise ValueError( + f"{self.name} cannot read pixel value at negative coordinates ({row}, {col})" + ) channels = key[2] if len(key) == 3 else None return self.get_values_at_coords(row, col, channels) # Slicing - if not isinstance(key, tuple) or (isinstance(key, tuple) and (len(key) < 2 or len(key) > 3)): - raise ValueError(f'"{key}" cannot be interpreted as valid slicing. Slicing should be 2D or 3D.') + if not isinstance(key, tuple) or ( + isinstance(key, tuple) and (len(key) < 2 or len(key) > 3) + ): + raise ValueError( + f'"{key}" cannot be interpreted as valid slicing. Slicing should be 2D or 3D.' + ) if isinstance(key, tuple) and len(key) == 2: # Adding a 3rd dimension key = key + (slice(None, None, None),) @@ -392,7 +411,7 @@ class App(OTBObject): INPUT_IMAGE_TYPES = [ # Images only otb.ParameterType_InputImage, - otb.ParameterType_InputImageList + otb.ParameterType_InputImageList, ] INPUT_PARAM_TYPES = INPUT_IMAGE_TYPES + [ # Vectors @@ -426,7 +445,15 @@ class App(OTBObject): otb.ParameterType_InputFilenameList, ] - def __init__(self, appname: str, *args, frozen: bool = False, quiet: bool = False, name: str = "", **kwargs): + def __init__( + self, + appname: str, + *args, + frozen: bool = False, + quiet: bool = False, + name: str = "", + **kwargs, + ): """Common constructor for OTB applications. Handles in-memory connection between apps. Args: @@ -445,7 +472,11 @@ class App(OTBObject): """ # Attributes and data structures used by properties - create = otb.Registry.CreateApplicationWithoutLogger if quiet else otb.Registry.CreateApplication + create = ( + otb.Registry.CreateApplicationWithoutLogger + if quiet + else otb.Registry.CreateApplication + ) self._app = create(appname) self._name = name or appname self._exports_dic = {} @@ -455,14 +486,25 @@ class App(OTBObject): self.quiet, self.frozen = quiet, frozen # Param keys and types self.parameters_keys = tuple(self.app.GetParametersKeys()) - self._all_param_types = {k: self.app.GetParameterType(k) for k in self.parameters_keys} - types = (otb.ParameterType_OutputImage, otb.ParameterType_OutputVectorData, otb.ParameterType_OutputFilename) - self._out_param_types = {k: v for k, v in self._all_param_types.items() if v in types} + self._all_param_types = { + k: self.app.GetParameterType(k) for k in self.parameters_keys + } + types = ( + otb.ParameterType_OutputImage, + otb.ParameterType_OutputVectorData, + otb.ParameterType_OutputFilename, + ) + self._out_param_types = { + k: v for k, v in self._all_param_types.items() if v in types + } # Init, execute and write (auto flush only when output param was provided) if args or kwargs: self.set_parameters(*args, **kwargs) # Create Output image objects - for key in filter(lambda k: self._out_param_types[k] == otb.ParameterType_OutputImage, self._out_param_types): + for key in filter( + lambda k: self._out_param_types[k] == otb.ParameterType_OutputImage, + self._out_param_types, + ): self.outputs[key] = Output(self, key, self._settings.get(key)) if not self.frozen: self.execute() @@ -492,9 +534,7 @@ class App(OTBObject): def __is_one_of_types(self, key: str, param_types: list[int]) -> bool: """Helper to factor is_input and is_output.""" if key not in self._all_param_types: - raise KeyError( - f"key {key} not found in the application parameters types" - ) + raise KeyError(f"key {key} not found in the application parameters types") return self._all_param_types[key] in param_types def is_input(self, key: str) -> bool: @@ -507,9 +547,7 @@ class App(OTBObject): True if the parameter is an input, else False """ - return self.__is_one_of_types( - key=key, param_types=self.INPUT_PARAM_TYPES - ) + return self.__is_one_of_types(key=key, param_types=self.INPUT_PARAM_TYPES) def is_output(self, key: str) -> bool: """Returns True if the key is an output. @@ -521,9 +559,7 @@ class App(OTBObject): True if the parameter is an output, else False """ - return self.__is_one_of_types( - key=key, param_types=self.OUTPUT_PARAM_TYPES - ) + return self.__is_one_of_types(key=key, param_types=self.OUTPUT_PARAM_TYPES) def is_key_list(self, key: str) -> bool: """Check if a parameter key is an input parameter list.""" @@ -541,7 +577,9 @@ class App(OTBObject): for key, value in sorted(self._all_param_types.items()): if value == param_type: return key - raise TypeError(f"{self.name}: could not find any parameter key matching the provided types") + raise TypeError( + f"{self.name}: could not find any parameter key matching the provided types" + ) @property def input_key(self) -> str: @@ -598,7 +636,11 @@ class App(OTBObject): # When the parameter expects a list, if needed, change the value to list if self.is_key_list(key) and not isinstance(obj, (list, tuple)): obj = [obj] - logger.info('%s: argument for parameter "%s" was converted to list', self.name, key) + logger.info( + '%s: argument for parameter "%s" was converted to list', + self.name, + key, + ) try: if self.is_input(key): if self.is_key_images_list(key): @@ -630,19 +672,28 @@ class App(OTBObject): if not dtype: param = self._settings.get(self.input_image_key) if not param: - logger.warning("%s: could not propagate pixel type from inputs to output", self.name) + logger.warning( + "%s: could not propagate pixel type from inputs to output", + self.name, + ) return if isinstance(param, (list, tuple)): param = param[0] # first image in "il" try: dtype = get_pixel_type(param) except (TypeError, RuntimeError): - logger.warning('%s: unable to identify pixel type of key "%s"', self.name, param) + logger.warning( + '%s: unable to identify pixel type of key "%s"', self.name, param + ) return if target_key: keys = [target_key] else: - keys = [k for k, v in self._out_param_types.items() if v == otb.ParameterType_OutputImage] + keys = [ + k + for k, v in self._out_param_types.items() + if v == otb.ParameterType_OutputImage + ] for key in keys: self.app.SetParameterOutputImagePixelType(key, dtype) @@ -653,7 +704,9 @@ class App(OTBObject): try: self.app.Execute() except (RuntimeError, FileNotFoundError) as e: - raise RuntimeError(f"{self.name}: error during during app execution ({e}") from e + raise RuntimeError( + f"{self.name}: error during during app execution ({e}" + ) from e self.frozen = False self._time_end = perf_counter() logger.debug("%s: execution ended", self.name) @@ -665,13 +718,22 @@ class App(OTBObject): logger.debug("%s: flushing data to disk", self.name) self.app.WriteOutput() except RuntimeError: - logger.debug("%s: failed with WriteOutput, executing once again with ExecuteAndWriteOutput", self.name) + logger.debug( + "%s: failed with WriteOutput, executing once again with ExecuteAndWriteOutput", + self.name, + ) self._time_start = perf_counter() self.app.ExecuteAndWriteOutput() self._time_end = perf_counter() - def write(self, path: str | Path | dict[str, str] = None, ext_fname: str = "", - pixel_type: dict[str, str] | str = None, preserve_dtype: bool = False, **kwargs) -> bool: + def write( + self, + path: str | Path | dict[str, str] = None, + ext_fname: str = "", + pixel_type: dict[str, str] | str = None, + preserve_dtype: bool = False, + **kwargs, + ) -> bool: """Set output pixel type and write the output raster files. Args: @@ -697,24 +759,35 @@ class App(OTBObject): if isinstance(path, dict): kwargs.update(path) elif isinstance(path, str) and kwargs: - logger.warning('%s: keyword arguments specified, ignoring argument "%s"', self.name, path) + logger.warning( + '%s: keyword arguments specified, ignoring argument "%s"', + self.name, + path, + ) elif isinstance(path, (str, Path)) and self.output_key: kwargs.update({self.output_key: str(path)}) elif path is not None: raise TypeError(f"{self.name}: unsupported filepath type ({type(path)})") if not (kwargs or any(k in self._settings for k in self._out_param_types)): - raise KeyError(f"{self.name}: at least one filepath is required, if not provided during App init") + raise KeyError( + f"{self.name}: at least one filepath is required, if not provided during App init" + ) parameters = kwargs.copy() # Append filename extension to filenames if ext_fname: - logger.debug("%s: using extended filename for outputs: %s", self.name, ext_fname) + logger.debug( + "%s: using extended filename for outputs: %s", self.name, ext_fname + ) if not ext_fname.startswith("?"): ext_fname = "?&" + ext_fname elif not ext_fname.startswith("?&"): ext_fname = "?&" + ext_fname[1:] for key, value in kwargs.items(): - if self._out_param_types[key] == otb.ParameterType_OutputImage and "?" not in value: + if ( + self._out_param_types[key] == otb.ParameterType_OutputImage + and "?" not in value + ): parameters[key] = value + ext_fname # Manage output pixel types data_types = {} @@ -722,12 +795,16 @@ class App(OTBObject): if isinstance(pixel_type, str): dtype = parse_pixel_type(pixel_type) type_name = self.app.ConvertPixelTypeToNumpy(dtype) - logger.debug('%s: output(s) will be written with type "%s"', self.name, type_name) + logger.debug( + '%s: output(s) will be written with type "%s"', self.name, type_name + ) for key in parameters: if self._out_param_types[key] == otb.ParameterType_OutputImage: data_types[key] = dtype elif isinstance(pixel_type, dict): - data_types = {key: parse_pixel_type(dtype) for key, dtype in pixel_type.items()} + data_types = { + key: parse_pixel_type(dtype) for key, dtype in pixel_type.items() + } elif preserve_dtype: self.propagate_dtype() # all outputs will have the same type as the main input raster @@ -751,7 +828,11 @@ class App(OTBObject): dest = files if filepath.exists() else missing dest.append(str(filepath.absolute())) for filename in missing: - logger.error("%s: execution seems to have failed, %s does not exist", self.name, filename) + logger.error( + "%s: execution seems to have failed, %s does not exist", + self.name, + filename, + ) return bool(files) and not missing # Private functions @@ -769,11 +850,17 @@ class App(OTBObject): for arg in args: if isinstance(arg, dict): kwargs.update(arg) - elif isinstance(arg, (str, OTBObject)) or isinstance(arg, list) and self.is_key_list(self.input_key): + elif ( + isinstance(arg, (str, OTBObject)) + or isinstance(arg, list) + and self.is_key_list(self.input_key) + ): kwargs.update({self.input_key: arg}) return kwargs - def __set_param(self, key: str, obj: list | tuple | OTBObject | otb.Application | list[Any]): + def __set_param( + self, key: str, obj: list | tuple | OTBObject | otb.Application | list[Any] + ): """Set one parameter, decide which otb.Application method to use depending on target object.""" if obj is None or (isinstance(obj, (list, tuple)) and not obj): self.app.ClearValue(key) @@ -781,9 +868,13 @@ class App(OTBObject): # Single-parameter cases if isinstance(obj, OTBObject): self.app.ConnectImage(key, obj.app, obj.output_image_key) - elif isinstance(obj, otb.Application): # this is for backward comp with plain OTB + elif isinstance( + obj, otb.Application + ): # this is for backward comp with plain OTB self.app.ConnectImage(key, obj, get_out_images_param_keys(obj)[0]) - elif key == "ram": # SetParameterValue in OTB<7.4 doesn't work for ram parameter cf gitlab OTB issue 2200 + elif ( + key == "ram" + ): # SetParameterValue in OTB<7.4 doesn't work for ram parameter cf gitlab OTB issue 2200 self.app.SetParameterInt("ram", int(obj)) elif not isinstance(obj, list): # any other parameters (str, int...) self.app.SetParameterValue(key, obj) @@ -793,7 +884,9 @@ class App(OTBObject): for inp in obj: if isinstance(inp, OTBObject): self.app.ConnectImage(key, inp.app, inp.output_image_key) - elif isinstance(inp, otb.Application): # this is for backward comp with plain OTB + elif isinstance( + inp, otb.Application + ): # this is for backward comp with plain OTB self.app.ConnectImage(key, obj, get_out_images_param_keys(inp)[0]) else: # here `input` should be an image filepath # Append `input` to the list, do not overwrite any previously set element of the image list @@ -809,9 +902,13 @@ class App(OTBObject): continue value = self.app.GetParameterValue(key) # TODO: here we *should* use self.app.IsParameterEnabled, but it seems broken - if isinstance(value, otb.ApplicationProxy) and self.app.HasAutomaticValue(key): + if isinstance(value, otb.ApplicationProxy) and self.app.HasAutomaticValue( + key + ): try: - value = str(value) # some default str values like "mode" or "interpolator" + value = str( + value + ) # some default str values like "mode" or "interpolator" self._auto_parameters[key] = value continue except RuntimeError: @@ -841,13 +938,21 @@ class App(OTBObject): if key in self.parameters: return self.parameters[key] raise KeyError(f"{self.name}: unknown or undefined parameter '{key}'") - raise TypeError(f"{self.name}: cannot access object item or slice using {type(key)} object") + raise TypeError( + f"{self.name}: cannot access object item or slice using {type(key)} object" + ) class Slicer(App): """Slicer objects i.e. when we call something like raster[:, :, 2] from Python.""" - def __init__(self, obj: OTBObject, rows: slice, cols: slice, channels: slice | list[int] | int): + def __init__( + self, + obj: OTBObject, + rows: slice, + cols: slice, + channels: slice | list[int] | int, + ): """Create a slicer object, that can be used directly for writing or inside a BandMath. It contains : @@ -861,7 +966,14 @@ class Slicer(App): channels: channels, can be slicing, list or int """ - super().__init__("ExtractROI", obj, mode="extent", quiet=True, frozen=True, name=f"Slicer from {obj.name}") + super().__init__( + "ExtractROI", + obj, + mode="extent", + quiet=True, + frozen=True, + name=f"Slicer from {obj.name}", + ) self.rows, self.cols = rows, cols parameters = {} @@ -879,7 +991,9 @@ class Slicer(App): elif isinstance(channels, tuple): channels = list(channels) elif not isinstance(channels, list): - raise ValueError(f"Invalid type for channels, should be int, slice or list of bands. : {channels}") + raise ValueError( + f"Invalid type for channels, should be int, slice or list of bands. : {channels}" + ) # Change the potential negative index values to reverse index channels = [c if c >= 0 else nb_channels + c for c in channels] parameters.update({"cl": [f"Channel{i + 1}" for i in channels]}) @@ -891,17 +1005,23 @@ class Slicer(App): parameters.update({"mode.extent.uly": rows.start}) spatial_slicing = True if rows.stop is not None and rows.stop != -1: - parameters.update({"mode.extent.lry": rows.stop - 1}) # subtract 1 to respect python convention + parameters.update( + {"mode.extent.lry": rows.stop - 1} + ) # subtract 1 to respect python convention spatial_slicing = True if cols.start is not None: parameters.update({"mode.extent.ulx": cols.start}) spatial_slicing = True if cols.stop is not None and cols.stop != -1: - parameters.update({"mode.extent.lrx": cols.stop - 1}) # subtract 1 to respect python convention + parameters.update( + {"mode.extent.lrx": cols.stop - 1} + ) # subtract 1 to respect python convention spatial_slicing = True # These are some attributes when the user simply wants to extract *one* band to be used in an Operation if not spatial_slicing and isinstance(channels, list) and len(channels) == 1: - self.one_band_sliced = channels[0] + 1 # OTB convention: channels start at 1 + self.one_band_sliced = ( + channels[0] + 1 + ) # OTB convention: channels start at 1 self.input = obj # Execute app @@ -956,7 +1076,9 @@ class Operation(App): # NB: the keys of the dictionary are strings-only, instead of 'complex' objects, to enable easy serialization self.im_dic = {} self.im_count = 1 - map_repr_to_input = {} # to be able to retrieve the real python object from its string representation + map_repr_to_input = ( + {} + ) # to be able to retrieve the real python object from its string representation for inp in self.inputs: if not isinstance(inp, (int, float)): if str(inp) not in self.im_dic: @@ -964,13 +1086,23 @@ class Operation(App): map_repr_to_input[repr(inp)] = inp self.im_count += 1 # Getting unique image inputs, in the order im1, im2, im3 ... - self.unique_inputs = [map_repr_to_input[id_str] for id_str in sorted(self.im_dic, key=self.im_dic.get)] + self.unique_inputs = [ + map_repr_to_input[id_str] + for id_str in sorted(self.im_dic, key=self.im_dic.get) + ] self.exp_bands, self.exp = self.get_real_exp(self.fake_exp_bands) appname = "BandMath" if len(self.exp_bands) == 1 else "BandMathX" name = f'Operation exp="{self.exp}"' - super().__init__(appname, il=self.unique_inputs, exp=self.exp, quiet=True, name=name) + super().__init__( + appname, il=self.unique_inputs, exp=self.exp, quiet=True, name=name + ) - def build_fake_expressions(self, operator: str, inputs: list[OTBObject | str | int | float], nb_bands: int = None): + def build_fake_expressions( + self, + operator: str, + inputs: list[OTBObject | str | int | float], + nb_bands: int = None, + ): """Create a list of 'fake' expressions, one for each band. E.g for the operation input1 + input2, we create a fake expression that is like "str(input1) + str(input2)" @@ -989,12 +1121,21 @@ class Operation(App): pass # For any other operations, the output number of bands is the same as inputs else: - if any(isinstance(inp, Slicer) and hasattr(inp, "one_band_sliced") for inp in inputs): + if any( + isinstance(inp, Slicer) and hasattr(inp, "one_band_sliced") + for inp in inputs + ): nb_bands = 1 else: - nb_bands_list = [get_nbchannels(inp) for inp in inputs if not isinstance(inp, (float, int))] + nb_bands_list = [ + get_nbchannels(inp) + for inp in inputs + if not isinstance(inp, (float, int)) + ] # check that all inputs have the same nb of bands - if len(nb_bands_list) > 1 and not all(x == nb_bands_list[0] for x in nb_bands_list): + if len(nb_bands_list) > 1 and not all( + x == nb_bands_list[0] for x in nb_bands_list + ): raise ValueError("All images do not have the same number of bands") nb_bands = nb_bands_list[0] @@ -1008,10 +1149,14 @@ class Operation(App): if len(inputs) == 3 and k == 0: # When cond is monoband whereas the result is multiband, we expand the cond to multiband cond_band = 1 if nb_bands != inp.shape[2] else band - fake_exp, corresponding_inputs, nb_channels = self.make_fake_exp(inp, cond_band, keep_logical=True) + fake_exp, corresponding_inputs, nb_channels = self.make_fake_exp( + inp, cond_band, keep_logical=True + ) else: # Any other input - fake_exp, corresponding_inputs, nb_channels = self.make_fake_exp(inp, band, keep_logical=False) + fake_exp, corresponding_inputs, nb_channels = self.make_fake_exp( + inp, band, keep_logical=False + ) expressions.append(fake_exp) # Reference the inputs and nb of channels (only on first pass in the loop to avoid duplicates) if i == 0 and corresponding_inputs and nb_channels: @@ -1025,7 +1170,9 @@ class Operation(App): # We create here the "fake" expression. For example, for a BandMathX expression such as '2 * im1 + im2', # the false expression stores the expression 2 * str(input1) + str(input2) fake_exp = f"({expressions[0]} {operator} {expressions[1]})" - elif len(inputs) == 3 and operator == "?": # this is only for ternary expression + elif ( + len(inputs) == 3 and operator == "?" + ): # this is only for ternary expression fake_exp = f"({expressions[0]} ? {expressions[1]} : {expressions[2]})" self.fake_exp_bands.append(fake_exp) @@ -1052,7 +1199,9 @@ class Operation(App): return exp_bands, ";".join(exp_bands) @staticmethod - def make_fake_exp(x: OTBObject | str, band: int, keep_logical: bool = False) -> tuple[str, list[OTBObject], int]: + def make_fake_exp( + x: OTBObject | str, band: int, keep_logical: bool = False + ) -> tuple[str, list[OTBObject], int]: """This an internal function, only to be used by `build_fake_expressions`. Enable to create a fake expression just for one input and one band. @@ -1127,9 +1276,16 @@ class LogicalOperation(Operation): """ self.logical_fake_exp_bands = [] super().__init__(operator, *inputs, nb_bands=nb_bands, name="LogicalOperation") - self.logical_exp_bands, self.logical_exp = self.get_real_exp(self.logical_fake_exp_bands) + self.logical_exp_bands, self.logical_exp = self.get_real_exp( + self.logical_fake_exp_bands + ) - def build_fake_expressions(self, operator: str, inputs: list[OTBObject | str | int | float], nb_bands: int = None): + def build_fake_expressions( + self, + operator: str, + inputs: list[OTBObject | str | int | float], + nb_bands: int = None, + ): """Create a list of 'fake' expressions, one for each band. e.g for the operation input1 > input2, we create a fake expression that is like @@ -1142,19 +1298,30 @@ class LogicalOperation(Operation): """ # For any other operations, the output number of bands is the same as inputs - if any(isinstance(inp, Slicer) and hasattr(inp, "one_band_sliced") for inp in inputs): + if any( + isinstance(inp, Slicer) and hasattr(inp, "one_band_sliced") + for inp in inputs + ): nb_bands = 1 else: - nb_bands_list = [get_nbchannels(inp) for inp in inputs if not isinstance(inp, (float, int))] + nb_bands_list = [ + get_nbchannels(inp) + for inp in inputs + if not isinstance(inp, (float, int)) + ] # check that all inputs have the same nb of bands - if len(nb_bands_list) > 1 and not all(x == nb_bands_list[0] for x in nb_bands_list): + if len(nb_bands_list) > 1 and not all( + x == nb_bands_list[0] for x in nb_bands_list + ): raise ValueError("All images do not have the same number of bands") nb_bands = nb_bands_list[0] # Create a list of fake exp, each item of the list corresponding to one band for i, band in enumerate(range(1, nb_bands + 1)): expressions = [] for inp in inputs: - fake_exp, corresp_inputs, nb_channels = super().make_fake_exp(inp, band, keep_logical=True) + fake_exp, corresp_inputs, nb_channels = super().make_fake_exp( + inp, band, keep_logical=True + ) expressions.append(fake_exp) # Reference the inputs and nb of channels (only on first pass in the loop to avoid duplicates) if i == 0 and corresp_inputs and nb_channels: @@ -1195,9 +1362,16 @@ class Input(App): class Output(OTBObject): """Object that behave like a pointer to a specific application output file.""" + _filepath: str | Path = None - def __init__(self, pyotb_app: App, param_key: str = None, filepath: str = None, mkdir: bool = True): + def __init__( + self, + pyotb_app: App, + param_key: str = None, + filepath: str = None, + mkdir: bool = True, + ): """Constructor for an Output object. Args: @@ -1262,7 +1436,9 @@ class Output(OTBObject): def write(self, filepath: None | str | Path = None, **kwargs) -> bool: """Write output to disk, filepath is not required if it was provided to parent App during init.""" if filepath is None: - return self.parent_pyotb_app.write({self.output_image_key: self.filepath}, **kwargs) + return self.parent_pyotb_app.write( + {self.output_image_key: self.filepath}, **kwargs + ) return self.parent_pyotb_app.write({self.output_image_key: filepath}, **kwargs) def __str__(self) -> str: @@ -1293,7 +1469,7 @@ def add_vsi_prefix(filepath: str | Path) -> str: ".gz": "vsigzip", ".7z": "vsi7z", ".zip": "vsizip", - ".rar": "vsirar" + ".rar": "vsirar", } basename = filepath.split("?")[0] ext = Path(basename).suffix @@ -1319,8 +1495,12 @@ def get_nbchannels(inp: str | Path | OTBObject) -> int: try: info = App("ReadImageInfo", inp, quiet=True) return info["numberbands"] - except RuntimeError as info_err: # this happens when we pass a str that is not a filepath - raise TypeError(f"Could not get the number of channels file '{inp}' ({info_err})") from info_err + except ( + RuntimeError + ) as info_err: # this happens when we pass a str that is not a filepath + raise TypeError( + f"Could not get the number of channels file '{inp}' ({info_err})" + ) from info_err raise TypeError(f"Can't read number of channels of type '{type(inp)}' object {inp}") @@ -1341,8 +1521,12 @@ def get_pixel_type(inp: str | Path | OTBObject) -> str: try: info = App("ReadImageInfo", inp, quiet=True) datatype = info["datatype"] # which is such as short, float... - except RuntimeError as info_err: # this happens when we pass a str that is not a filepath - raise TypeError(f"Could not get the pixel type of `{inp}` ({info_err})") from info_err + except ( + RuntimeError + ) as info_err: # this happens when we pass a str that is not a filepath + raise TypeError( + f"Could not get the pixel type of `{inp}` ({info_err})" + ) from info_err if datatype: return parse_pixel_type(datatype) raise TypeError(f"Could not get the pixel type of {type(inp)} object {inp}") @@ -1376,13 +1560,21 @@ def parse_pixel_type(pixel_type: str | int) -> int: return getattr(otb, f"ImagePixelType_{pixel_type}") if pixel_type in datatype_to_pixeltype: return getattr(otb, f"ImagePixelType_{datatype_to_pixeltype[pixel_type]}") - raise KeyError(f"Unknown data type `{pixel_type}`. Available ones: {datatype_to_pixeltype}") - raise TypeError(f"Bad pixel type specification ({pixel_type} of type {type(pixel_type)})") + raise KeyError( + f"Unknown data type `{pixel_type}`. Available ones: {datatype_to_pixeltype}" + ) + raise TypeError( + f"Bad pixel type specification ({pixel_type} of type {type(pixel_type)})" + ) def get_out_images_param_keys(app: OTBObject) -> list[str]: """Return every output parameter keys of an OTB app.""" - return [key for key in app.GetParametersKeys() if app.GetParameterType(key) == otb.ParameterType_OutputImage] + return [ + key + for key in app.GetParametersKeys() + if app.GetParameterType(key) == otb.ParameterType_OutputImage + ] def summarize( @@ -1406,6 +1598,7 @@ def summarize( parameters of an app and its parents """ + def strip_path(param: str | Any): if not isinstance(param, str): return summarize(param) @@ -1420,8 +1613,17 @@ def summarize( parameters = {} for key, param in obj.parameters.items(): - if strip_input_paths and obj.is_input(key) or strip_output_paths and obj.is_output(key): - parameters[key] = [strip_path(p) for p in param] if isinstance(param, list) else strip_path(param) + if ( + strip_input_paths + and obj.is_input(key) + or strip_output_paths + and obj.is_output(key) + ): + parameters[key] = ( + [strip_path(p) for p in param] + if isinstance(param, list) + else strip_path(param) + ) else: parameters[key] = summarize(param) return {"name": obj.app.GetName(), "parameters": parameters} diff --git a/pyotb/functions.py b/pyotb/functions.py index 6cc78891f1a2c649d6ae6b574057866f4af60b24..96431cdc64460a30c0af5cf422b2dffe54f24618 100644 --- a/pyotb/functions.py +++ b/pyotb/functions.py @@ -1,19 +1,22 @@ # -*- coding: utf-8 -*- """This module provides a set of functions for pyotb.""" from __future__ import annotations + import inspect import os +import subprocess import sys -import uuid import textwrap -import subprocess +import uuid from collections import Counter -from .core import App, Operation, LogicalOperation, Input, get_nbchannels +from .core import App, Input, LogicalOperation, Operation, get_nbchannels from .helpers import logger -def where(cond: App | str, x: App | str | int | float, y: App | str | int | float) -> Operation: +def where( + cond: App | str, x: App | str | int | float, y: App | str | int | float +) -> Operation: """Functionally similar to numpy.where. Where cond is True (!=0), returns x. Else returns y. Args: @@ -36,18 +39,29 @@ def where(cond: App | str, x: App | str | int | float, y: App | str | int | floa y_nb_channels = get_nbchannels(y) if x_nb_channels and y_nb_channels: if x_nb_channels != y_nb_channels: - raise ValueError('X and Y images do not have the same number of bands. ' - f'X has {x_nb_channels} bands whereas Y has {y_nb_channels} bands') + raise ValueError( + "X and Y images do not have the same number of bands. " + f"X has {x_nb_channels} bands whereas Y has {y_nb_channels} bands" + ) x_or_y_nb_channels = x_nb_channels if x_nb_channels else y_nb_channels cond_nb_channels = get_nbchannels(cond) - if cond_nb_channels != 1 and x_or_y_nb_channels and cond_nb_channels != x_or_y_nb_channels: - raise ValueError('Condition and X&Y do not have the same number of bands. Condition has ' - f'{cond_nb_channels} bands whereas X&Y have {x_or_y_nb_channels} bands') + if ( + cond_nb_channels != 1 + and x_or_y_nb_channels + and cond_nb_channels != x_or_y_nb_channels + ): + raise ValueError( + "Condition and X&Y do not have the same number of bands. Condition has " + f"{cond_nb_channels} bands whereas X&Y have {x_or_y_nb_channels} bands" + ) # If needed, duplicate the single band binary mask to multiband to match the dimensions of x & y if cond_nb_channels == 1 and x_or_y_nb_channels and x_or_y_nb_channels != 1: - logger.info('The condition has one channel whereas X/Y has/have %s channels. Expanding number' - ' of channels of condition to match the number of channels of X/Y', x_or_y_nb_channels) + logger.info( + "The condition has one channel whereas X/Y has/have %s channels. Expanding number" + " of channels of condition to match the number of channels of X/Y", + x_or_y_nb_channels, + ) # Get the number of bands of the result if x_or_y_nb_channels: # if X or Y is a raster @@ -55,10 +69,12 @@ def where(cond: App | str, x: App | str | int | float, y: App | str | int | floa else: # if only cond is a raster out_nb_channels = cond_nb_channels - return Operation('?', cond, x, y, nb_bands=out_nb_channels) + return Operation("?", cond, x, y, nb_bands=out_nb_channels) -def clip(image: App | str, v_min: App | str | int | float, v_max: App | str | int | float): +def clip( + image: App | str, v_min: App | str | int | float, v_max: App | str | int | float +): """Clip values of image in a range of values. Args: @@ -96,7 +112,11 @@ def all(*inputs): # pylint: disable=redefined-builtin if len(inputs) == 1 and isinstance(inputs[0], (list, tuple)): inputs = inputs[0] # Add support for generator inputs (to have the same behavior as built-in `all` function) - if isinstance(inputs, tuple) and len(inputs) == 1 and inspect.isgenerator(inputs[0]): + if ( + isinstance(inputs, tuple) + and len(inputs) == 1 + and inspect.isgenerator(inputs[0]) + ): inputs = list(inputs[0]) # Transforming potential filepaths to pyotb objects inputs = [Input(inp) if isinstance(inp, str) else inp for inp in inputs] @@ -107,7 +127,7 @@ def all(*inputs): # pylint: disable=redefined-builtin if isinstance(inp, LogicalOperation): res = inp[:, :, 0] else: - res = (inp[:, :, 0] != 0) + res = inp[:, :, 0] != 0 for band in range(1, inp.shape[-1]): if isinstance(inp, LogicalOperation): res = res & inp[:, :, band] @@ -147,7 +167,11 @@ def any(*inputs): # pylint: disable=redefined-builtin if len(inputs) == 1 and isinstance(inputs[0], (list, tuple)): inputs = inputs[0] # Add support for generator inputs (to have the same behavior as built-in `any` function) - if isinstance(inputs, tuple) and len(inputs) == 1 and inspect.isgenerator(inputs[0]): + if ( + isinstance(inputs, tuple) + and len(inputs) == 1 + and inspect.isgenerator(inputs[0]) + ): inputs = list(inputs[0]) # Transforming potential filepaths to pyotb objects inputs = [Input(inp) if isinstance(inp, str) else inp for inp in inputs] @@ -158,7 +182,7 @@ def any(*inputs): # pylint: disable=redefined-builtin if isinstance(inp, LogicalOperation): res = inp[:, :, 0] else: - res = (inp[:, :, 0] != 0) + res = inp[:, :, 0] != 0 for band in range(1, inp.shape[-1]): if isinstance(inp, LogicalOperation): @@ -203,10 +227,14 @@ def run_tf_function(func): """ try: - from .apps import TensorflowModelServe # pylint: disable=import-outside-toplevel + from .apps import ( # pylint: disable=import-outside-toplevel + TensorflowModelServe, + ) except ImportError: - logger.error('Could not run Tensorflow function: failed to import TensorflowModelServe.' - 'Check that you have OTBTF configured (https://github.com/remicres/otbtf#how-to-install)') + logger.error( + "Could not run Tensorflow function: failed to import TensorflowModelServe." + "Check that you have OTBTF configured (https://github.com/remicres/otbtf#how-to-install)" + ) raise def get_tf_pycmd(output_dir, channels, scalar_inputs): @@ -228,7 +256,8 @@ def run_tf_function(func): create_and_save_model_str = func_def_str # Adding the instructions to create the model and save it to output dir - create_and_save_model_str += textwrap.dedent(f""" + create_and_save_model_str += textwrap.dedent( + f""" import tensorflow as tf model_inputs = [] @@ -248,11 +277,12 @@ def run_tf_function(func): # Create and save the .pb model model = tf.keras.Model(inputs=model_inputs, outputs=output) model.save("{output_dir}") - """) + """ + ) return create_and_save_model_str - def wrapper(*inputs, tmp_dir='/tmp'): + def wrapper(*inputs, tmp_dir="/tmp"): """For the user point of view, this function simply applies some TensorFlow operations to some rasters. Implicitly, it saves a .pb model that describe the TF operations, then creates an OTB ModelServe application @@ -284,22 +314,35 @@ def run_tf_function(func): # Create and save the model. This is executed **inside an independent process** because (as of 2022-03), # tensorflow python library and OTBTF are incompatible - out_savedmodel = os.path.join(tmp_dir, f'tmp_otbtf_model_{uuid.uuid4()}') + out_savedmodel = os.path.join(tmp_dir, f"tmp_otbtf_model_{uuid.uuid4()}") pycmd = get_tf_pycmd(out_savedmodel, channels, scalar_inputs) cmd_args = [sys.executable, "-c", pycmd] try: - subprocess.run(cmd_args, env=os.environ, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) + subprocess.run( + cmd_args, + env=os.environ, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) except subprocess.SubprocessError: logger.debug("Failed to call subprocess") if not os.path.isdir(out_savedmodel): logger.info("Failed to save the model") # Initialize the OTBTF model serving application - model_serve = TensorflowModelServe({'model.dir': out_savedmodel, 'optim.disabletiling': 'on', - 'model.fullyconv': 'on'}, n_sources=len(raster_inputs), frozen=True) + model_serve = TensorflowModelServe( + { + "model.dir": out_savedmodel, + "optim.disabletiling": "on", + "model.fullyconv": "on", + }, + n_sources=len(raster_inputs), + frozen=True, + ) # Set parameters and execute for i, inp in enumerate(raster_inputs): - model_serve.set_parameters({f'source{i + 1}.il': [inp]}) + model_serve.set_parameters({f"source{i + 1}.il": [inp]}) model_serve.execute() # TODO: handle the deletion of the temporary model ? @@ -308,9 +351,14 @@ def run_tf_function(func): return wrapper -def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_rule: str = 'minimal', - interpolator: str = 'nn', reference_window_input: dict = None, - reference_pixel_size_input: str = None) -> list[App]: +def define_processing_area( + *args, + window_rule: str = "intersection", + pixel_size_rule: str = "minimal", + interpolator: str = "nn", + reference_window_input: dict = None, + reference_pixel_size_input: str = None, +) -> list[App]: """Given several inputs, this function handles the potential resampling and cropping to same extent. WARNING: Not fully implemented / tested @@ -338,7 +386,7 @@ def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_ metadatas = {} for inp in inputs: if isinstance(inp, str): # this is for filepaths - metadata = Input(inp).app.GetImageMetaData('out') + metadata = Input(inp).app.GetImageMetaData("out") elif isinstance(inp, App): metadata = inp.app.GetImageMetaData(inp.output_param) else: @@ -348,100 +396,147 @@ def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_ # Get a metadata of an arbitrary image. This is just to compare later with other images any_metadata = next(iter(metadatas.values())) # Checking if all images have the same projection - if not all(metadata['ProjectionRef'] == any_metadata['ProjectionRef'] - for metadata in metadatas.values()): - logger.warning('All images may not have the same CRS, which might cause unpredictable results') + if not all( + metadata["ProjectionRef"] == any_metadata["ProjectionRef"] + for metadata in metadatas.values() + ): + logger.warning( + "All images may not have the same CRS, which might cause unpredictable results" + ) # Handling different spatial footprints # TODO: there seems to have a bug, ImageMetaData is not updated when running an app, # cf https://gitlab.orfeo-toolbox.org/orfeotoolbox/otb/-/issues/2234. Should we use ImageOrigin instead? - if not all(metadata['UpperLeftCorner'] == any_metadata['UpperLeftCorner'] - and metadata['LowerRightCorner'] == any_metadata['LowerRightCorner'] - for metadata in metadatas.values()): + if not all( + metadata["UpperLeftCorner"] == any_metadata["UpperLeftCorner"] + and metadata["LowerRightCorner"] == any_metadata["LowerRightCorner"] + for metadata in metadatas.values() + ): # Retrieving the bounding box that will be common for all inputs - if window_rule == 'intersection': + if window_rule == "intersection": # The coordinates depend on the orientation of the axis of projection - if any_metadata['GeoTransform'][1] >= 0: - ulx = max(metadata['UpperLeftCorner'][0] for metadata in metadatas.values()) - lrx = min(metadata['LowerRightCorner'][0] for metadata in metadatas.values()) + if any_metadata["GeoTransform"][1] >= 0: + ulx = max( + metadata["UpperLeftCorner"][0] for metadata in metadatas.values() + ) + lrx = min( + metadata["LowerRightCorner"][0] for metadata in metadatas.values() + ) else: - ulx = min(metadata['UpperLeftCorner'][0] for metadata in metadatas.values()) - lrx = max(metadata['LowerRightCorner'][0] for metadata in metadatas.values()) - if any_metadata['GeoTransform'][-1] >= 0: - lry = min(metadata['LowerRightCorner'][1] for metadata in metadatas.values()) - uly = max(metadata['UpperLeftCorner'][1] for metadata in metadatas.values()) + ulx = min( + metadata["UpperLeftCorner"][0] for metadata in metadatas.values() + ) + lrx = max( + metadata["LowerRightCorner"][0] for metadata in metadatas.values() + ) + if any_metadata["GeoTransform"][-1] >= 0: + lry = min( + metadata["LowerRightCorner"][1] for metadata in metadatas.values() + ) + uly = max( + metadata["UpperLeftCorner"][1] for metadata in metadatas.values() + ) else: - lry = max(metadata['LowerRightCorner'][1] for metadata in metadatas.values()) - uly = min(metadata['UpperLeftCorner'][1] for metadata in metadatas.values()) - - elif window_rule == 'same_as_input': - ulx = metadatas[reference_window_input]['UpperLeftCorner'][0] - lrx = metadatas[reference_window_input]['LowerRightCorner'][0] - lry = metadatas[reference_window_input]['LowerRightCorner'][1] - uly = metadatas[reference_window_input]['UpperLeftCorner'][1] - elif window_rule == 'specify': + lry = max( + metadata["LowerRightCorner"][1] for metadata in metadatas.values() + ) + uly = min( + metadata["UpperLeftCorner"][1] for metadata in metadatas.values() + ) + + elif window_rule == "same_as_input": + ulx = metadatas[reference_window_input]["UpperLeftCorner"][0] + lrx = metadatas[reference_window_input]["LowerRightCorner"][0] + lry = metadatas[reference_window_input]["LowerRightCorner"][1] + uly = metadatas[reference_window_input]["UpperLeftCorner"][1] + elif window_rule == "specify": pass # TODO : it is when the user explicitly specifies the bounding box -> add some arguments in the function - elif window_rule == 'union': + elif window_rule == "union": pass # TODO : it is when the user wants the final bounding box to be the union of all bounding box # It should replace any 'outside' pixel by some NoData -> add `fillvalue` argument in the function # Applying this bounding box to all inputs - logger.info('Cropping all images to extent Upper Left (%s, %s), Lower Right (%s, %s)', ulx, uly, lrx, lry) + logger.info( + "Cropping all images to extent Upper Left (%s, %s), Lower Right (%s, %s)", + ulx, + uly, + lrx, + lry, + ) new_inputs = [] for inp in inputs: try: params = { - 'in': inp, 'mode': 'extent', 'mode.extent.unit': 'phy', - 'mode.extent.ulx': ulx, 'mode.extent.uly': lry, # bug in OTB <= 7.3 : - 'mode.extent.lrx': lrx, 'mode.extent.lry': uly, # ULY/LRY are inverted + "in": inp, + "mode": "extent", + "mode.extent.unit": "phy", + "mode.extent.ulx": ulx, + "mode.extent.uly": lry, # bug in OTB <= 7.3 : + "mode.extent.lrx": lrx, + "mode.extent.lry": uly, # ULY/LRY are inverted } - new_input = App('ExtractROI', params) + new_input = App("ExtractROI", params) # TODO: OTB 7.4 fixes this bug, how to handle different versions of OTB? new_inputs.append(new_input) # Potentially update the reference inputs for later resampling - if str(inp) == str(reference_pixel_size_input): # we use comparison of string because calling '==' + if str(inp) == str( + reference_pixel_size_input + ): # we use comparison of string because calling '==' # on pyotb objects implicitly calls BandMathX application, which is not desirable reference_pixel_size_input = new_input except RuntimeError as e: - logger.error('Cannot define the processing area for input %s: %s', inp, e) + logger.error( + "Cannot define the processing area for input %s: %s", inp, e + ) raise inputs = new_inputs # Update metadatas - metadatas = {input: input.app.GetImageMetaData('out') for input in inputs} + metadatas = {input: input.app.GetImageMetaData("out") for input in inputs} # Get a metadata of an arbitrary image. This is just to compare later with other images any_metadata = next(iter(metadatas.values())) # Handling different pixel sizes - if not all(metadata['GeoTransform'][1] == any_metadata['GeoTransform'][1] - and metadata['GeoTransform'][5] == any_metadata['GeoTransform'][5] - for metadata in metadatas.values()): + if not all( + metadata["GeoTransform"][1] == any_metadata["GeoTransform"][1] + and metadata["GeoTransform"][5] == any_metadata["GeoTransform"][5] + for metadata in metadatas.values() + ): # Retrieving the pixel size that will be common for all inputs - if pixel_size_rule == 'minimal': + if pixel_size_rule == "minimal": # selecting the input with the smallest x pixel size - reference_input = min(metadatas, key=lambda x: metadatas[x]['GeoTransform'][1]) - if pixel_size_rule == 'maximal': + reference_input = min( + metadatas, key=lambda x: metadatas[x]["GeoTransform"][1] + ) + if pixel_size_rule == "maximal": # selecting the input with the highest x pixel size - reference_input = max(metadatas, key=lambda x: metadatas[x]['GeoTransform'][1]) - elif pixel_size_rule == 'same_as_input': + reference_input = max( + metadatas, key=lambda x: metadatas[x]["GeoTransform"][1] + ) + elif pixel_size_rule == "same_as_input": reference_input = reference_pixel_size_input - elif pixel_size_rule == 'specify': + elif pixel_size_rule == "specify": pass # TODO : when the user explicitly specify the pixel size -> add argument inside the function - pixel_size = metadatas[reference_input]['GeoTransform'][1] + pixel_size = metadatas[reference_input]["GeoTransform"][1] # Perform resampling on inputs that do not comply with the target pixel size - logger.info('Resampling all inputs to resolution: %s', pixel_size) + logger.info("Resampling all inputs to resolution: %s", pixel_size) new_inputs = [] for inp in inputs: - if metadatas[inp]['GeoTransform'][1] != pixel_size: - superimposed = App('Superimpose', inr=reference_input, inm=inp, interpolator=interpolator) + if metadatas[inp]["GeoTransform"][1] != pixel_size: + superimposed = App( + "Superimpose", + inr=reference_input, + inm=inp, + interpolator=interpolator, + ) new_inputs.append(superimposed) else: new_inputs.append(inp) inputs = new_inputs - metadatas = {inp: inp.app.GetImageMetaData('out') for inp in inputs} + metadatas = {inp: inp.app.GetImageMetaData("out") for inp in inputs} # Final superimposition to be sure to have the exact same image sizes image_sizes = {} @@ -451,13 +546,22 @@ def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_ image_sizes[inp] = inp.shape[:2] # Selecting the most frequent image size. It will be used as reference. most_common_image_size, _ = Counter(image_sizes.values()).most_common(1)[0] - same_size_images = [inp for inp, image_size in image_sizes.items() if image_size == most_common_image_size] + same_size_images = [ + inp + for inp, image_size in image_sizes.items() + if image_size == most_common_image_size + ] # Superimposition for images that do not have the same size as the others new_inputs = [] for inp in inputs: if image_sizes[inp] != most_common_image_size: - superimposed = App('Superimpose', inr=same_size_images[0], inm=inp, interpolator=interpolator) + superimposed = App( + "Superimpose", + inr=same_size_images[0], + inm=inp, + interpolator=interpolator, + ) new_inputs.append(superimposed) else: new_inputs.append(inp) diff --git a/pyotb/helpers.py b/pyotb/helpers.py index 03200a20798baadac6838c511f3de225e670f1c2..5363937940485b33f74c31ef8371af46279e2e56 100644 --- a/pyotb/helpers.py +++ b/pyotb/helpers.py @@ -1,12 +1,11 @@ # -*- coding: utf-8 -*- """This module helps to ensure we properly initialize pyotb: only in case OTB is found and apps are available.""" +import logging import os import sys -import logging from pathlib import Path from shutil import which - # Allow user to switch between OTB directories without setting every env variable OTB_ROOT = os.environ.get("OTB_ROOT") @@ -15,10 +14,14 @@ OTB_ROOT = os.environ.get("OTB_ROOT") # then use pyotb.set_logger_level() to adjust logger verbosity logger = logging.getLogger("pyOTB") logger_handler = logging.StreamHandler(sys.stdout) -formatter = logging.Formatter(fmt="%(asctime)s (%(levelname)-4s) [pyOTB] %(message)s", datefmt="%Y-%m-%d %H:%M:%S") +formatter = logging.Formatter( + fmt="%(asctime)s (%(levelname)-4s) [pyOTB] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" +) logger_handler.setFormatter(formatter) # Search for PYOTB_LOGGER_LEVEL, else use OTB_LOGGER_LEVEL as pyOTB level, or fallback to INFO -LOG_LEVEL = os.environ.get("PYOTB_LOGGER_LEVEL") or os.environ.get("OTB_LOGGER_LEVEL") or "INFO" +LOG_LEVEL = ( + os.environ.get("PYOTB_LOGGER_LEVEL") or os.environ.get("OTB_LOGGER_LEVEL") or "INFO" +) logger.setLevel(getattr(logging, LOG_LEVEL)) # Here it would be possible to use a different level for a specific handler # A more verbose one can go to text file while print only errors to stdout @@ -60,6 +63,7 @@ def find_otb(prefix: str = OTB_ROOT, scan: bool = True, scan_userdir: bool = Tru try: set_environment(prefix) import otbApplication as otb # pylint: disable=import-outside-toplevel + return otb except EnvironmentError as e: raise SystemExit(f"Failed to import OTB with prefix={prefix}") from e @@ -71,6 +75,7 @@ def find_otb(prefix: str = OTB_ROOT, scan: bool = True, scan_userdir: bool = Tru # Here, we can't properly set env variables before OTB import. We assume user did this before running python # For LD_LIBRARY_PATH problems, use OTB_ROOT instead of PYTHONPATH import otbApplication as otb # pylint: disable=import-outside-toplevel + if "OTB_APPLICATION_PATH" not in os.environ: lib_dir = __find_lib(otb_module=otb) apps_path = __find_apps_path(lib_dir) @@ -79,7 +84,9 @@ def find_otb(prefix: str = OTB_ROOT, scan: bool = True, scan_userdir: bool = Tru except ImportError as e: pythonpath = os.environ.get("PYTHONPATH") if not scan: - raise SystemExit(f"Failed to import OTB with env PYTHONPATH={pythonpath}") from e + raise SystemExit( + f"Failed to import OTB with env PYTHONPATH={pythonpath}" + ) from e # Else search system logger.info("Failed to import OTB. Searching for it...") prefix = __find_otb_root(scan_userdir) @@ -87,6 +94,7 @@ def find_otb(prefix: str = OTB_ROOT, scan: bool = True, scan_userdir: bool = Tru try: set_environment(prefix) import otbApplication as otb # pylint: disable=import-outside-toplevel + return otb except EnvironmentError as e: raise SystemExit("Auto setup for OTB env failed. Exiting.") from e @@ -112,7 +120,7 @@ def set_environment(prefix: str): if not prefix.exists(): raise FileNotFoundError(str(prefix)) built_from_source = False - if not (prefix / 'README').exists(): + if not (prefix / "README").exists(): built_from_source = True # External libraries lib_dir = __find_lib(prefix) @@ -151,7 +159,9 @@ def set_environment(prefix: str): gdal_data = str(prefix / "share/data") proj_lib = str(prefix / "share/proj") else: - raise EnvironmentError(f"Can't find GDAL location with current OTB prefix '{prefix}' or in /usr") + raise EnvironmentError( + f"Can't find GDAL location with current OTB prefix '{prefix}' or in /usr" + ) os.environ["GDAL_DATA"] = gdal_data os.environ["PROJ_LIB"] = proj_lib @@ -168,7 +178,7 @@ def __find_lib(prefix: str = None, otb_module=None): """ if prefix is not None: - lib_dir = prefix / 'lib' + lib_dir = prefix / "lib" if lib_dir.exists(): return lib_dir.absolute() if otb_module is not None: @@ -276,33 +286,54 @@ def __suggest_fix_import(error_message: str, prefix: str): logger.critical("An error occurred while importing OTB Python API") logger.critical("OTB error message was '%s'", error_message) if sys.platform == "linux": - if error_message.startswith('libpython3.'): - logger.critical("It seems like you need to symlink or recompile python bindings") - if sys.executable.startswith('/usr/bin'): - lib = f"/usr/lib/x86_64-linux-gnu/libpython3.{sys.version_info.minor}.so" - if which('ctest'): - logger.critical("To recompile python bindings, use 'cd %s ; source otbenv.profile ; " - "ctest -S share/otb/swig/build_wrapping.cmake -VV'", prefix) + if error_message.startswith("libpython3."): + logger.critical( + "It seems like you need to symlink or recompile python bindings" + ) + if sys.executable.startswith("/usr/bin"): + lib = ( + f"/usr/lib/x86_64-linux-gnu/libpython3.{sys.version_info.minor}.so" + ) + if which("ctest"): + logger.critical( + "To recompile python bindings, use 'cd %s ; source otbenv.profile ; " + "ctest -S share/otb/swig/build_wrapping.cmake -VV'", + prefix, + ) elif Path(lib).exists(): expect_minor = int(error_message[11]) if expect_minor != sys.version_info.minor: - logger.critical("Python library version mismatch (OTB was expecting 3.%s) : " - "a simple symlink may not work, depending on your python version", expect_minor) + logger.critical( + "Python library version mismatch (OTB was expecting 3.%s) : " + "a simple symlink may not work, depending on your python version", + expect_minor, + ) target_lib = f"{prefix}/lib/libpython3.{expect_minor}.so.rh-python3{expect_minor}-1.0" logger.critical("Use 'ln -s %s %s'", lib, target_lib) else: - logger.critical("You may need to install cmake in order to recompile python bindings") + logger.critical( + "You may need to install cmake in order to recompile python bindings" + ) else: - logger.critical("Unable to automatically locate python dynamic library of %s", sys.executable) + logger.critical( + "Unable to automatically locate python dynamic library of %s", + sys.executable, + ) elif sys.platform == "win32": if error_message.startswith("DLL load failed"): if sys.version_info.minor != 7: - logger.critical("You need Python 3.5 (OTB releases 6.4 to 7.4) or Python 3.7 (since OTB 8)") + logger.critical( + "You need Python 3.5 (OTB releases 6.4 to 7.4) or Python 3.7 (since OTB 8)" + ) else: - logger.critical("It seems that your env variables aren't properly set," - " first use 'call otbenv.bat' then try to import pyotb once again") + logger.critical( + "It seems that your env variables aren't properly set," + " first use 'call otbenv.bat' then try to import pyotb once again" + ) docs_link = "https://www.orfeo-toolbox.org/CookBook/Installation.html" - logger.critical("You can verify installation requirements for your OS at %s", docs_link) + logger.critical( + "You can verify installation requirements for your OS at %s", docs_link + ) # Since helpers is the first module to be inititialized, this will prevent pyotb to run if OTB is not found diff --git a/pyproject.toml b/pyproject.toml index ac4d3df248e07cdff1ed70803dee9cb2e5214d7a..6f86ad052a0ef0bd5f21cbbcb47d959649e04565 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,14 +5,18 @@ build-backend = "setuptools.build_meta" [project] name = "pyotb" description = "Library to enable easy use of the Orfeo ToolBox (OTB) in Python" -authors = [{name = "Rémi Cresson", email = "remi.cresson@inrae.fr"}, {name = "Nicolas Narçon"}, {name = "Vincent Delbar"}] +authors = [ + { name = "Rémi Cresson", email = "remi.cresson@inrae.fr" }, + { name = "Nicolas Narçon" }, + { name = "Vincent Delbar" }, +] requires-python = ">=3.7" keywords = ["gis", "remote sensing", "otb", "orfeotoolbox", "orfeo toolbox"] dependencies = ["numpy>=1.16"] readme = "README.md" license = { file = "LICENSE" } dynamic = ["version"] -classifiers=[ +classifiers = [ "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.8", @@ -37,22 +41,23 @@ repository = "https://gitlab.orfeo-toolbox.org/nicolasnn/pyotb" packages = ["pyotb"] [tool.setuptools.dynamic] -version = {attr = "pyotb.__version__"} +version = { attr = "pyotb.__version__" } [tool.pylint] -max-line-length = 120 +max-line-length = 88 max-module-lines = 2000 good-names = ["x", "y", "i", "j", "k", "e"] disable = [ "fixme", + "line-too-long", "too-many-locals", "too-many-branches", "too-many-statements", - "too-many-instance-attributes" + "too-many-instance-attributes", ] [tool.pydocstyle] convention = "google" [tool.black] -line-length = 120 +line-length = 88