Source code for spechomo.resampling

# -*- coding: utf-8 -*-

# spechomo, Spectral homogenization of multispectral satellite data
#
# Copyright (C) 2019-2021
# - Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
# - Helmholtz Centre Potsdam - GFZ German Research Centre for Geosciences Potsdam,
#   Germany (https://www.gfz-potsdam.de/)
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Please note the following exception: `spechomo` depends on tqdm, which is
# distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from multiprocessing import Pool
from typing import Union, TYPE_CHECKING  # noqa F401  # flake8 issue

import numpy as np
from geoarray import GeoArray
from geoarray.baseclasses import get_array_tilebounds

from .logging import SpecHomo_Logger

if TYPE_CHECKING:
    from pyrsr import RSR  # noqa F401  # flake8 issue


[docs]class SpectralResampler(object): """Class for spectral resampling of a single spectral signature (1D-array) or an image (3D-array).""" def __init__(self, wvl_src, rsr_tgt, logger=None): # type: (np.ndarray, RSR, str) -> None """Get an instance of the SpectralResampler class. :param wvl_src: center wavelength positions of the source spectrum :param rsr_tgt: relative spectral response of the target instrument as an instance of pyrsr.RelativeSpectralRespnse. """ # privates self._wvl_1nm = None self._rsr_1nm = {} wvl_src = np.array(wvl_src, dtype=float).flatten() if rsr_tgt.wvl_unit != 'nanometers': rsr_tgt.convert_wvl_unit() self.wvl_src_nm = wvl_src if max(wvl_src) > 100 else wvl_src * 1000 self.rsr_tgt = rsr_tgt self.logger = logger or SpecHomo_Logger(__name__) # must be pickable @property def wvl_1nm(self): # spectral resampling of input image to 1 nm resolution if self._wvl_1nm is None: self._wvl_1nm = np.arange(np.ceil(self.wvl_src_nm.min()), np.floor(self.wvl_src_nm.max()), 1) return self._wvl_1nm @property def rsr_1nm(self): if not self._rsr_1nm: from scipy.interpolate import interp1d # import here to avoid static TLS ImportError for band in self.rsr_tgt.bands: # resample RSR to 1 nm self._rsr_1nm[band] = \ interp1d(self.rsr_tgt.rsrs_wvl, self.rsr_tgt.rsrs[band], bounds_error=False, fill_value=0, kind='linear')(self.wvl_1nm) # validate assert len(self._rsr_1nm[band]) == len(self.wvl_1nm) return self._rsr_1nm
[docs] def resample_signature(self, spectrum, scale_factor=10000, nodataVal=None, alg_nodata='radical', v=False): # type: (np.ndarray, int, Union[int, float], str, bool) -> np.ndarray """Resample the given spectrum according to the spectral response functions of the target instument. :param spectrum: spectral signature data :param scale_factor: the scale factor to apply to the given spectrum when it is plotted (default: 10000) :param nodataVal: no data value to be respected during resampling :param alg_nodata: algorithm how to deal with pixels where the spectral bands of the source image contain nodata within the spectral response of a target band 'radical': set output band to nodata 'conservative': use existing spectral information and ignore nodata (might alter the outpur spectral information, e.g., at spectral absorption bands) :param v: enable verbose mode (shows a plot of the resampled spectrum) (default: False) :return: resampled spectral signature """ if not spectrum.ndim == 1: raise ValueError("The array of the given spectral signature must be 1-dimensional. " "Received a %s-dimensional array." % spectrum.ndim) spectrum = np.array(spectrum, dtype=float).flatten() assert spectrum.size == self.wvl_src_nm.size # convert spectrum to one multispectral image pixel and resample it spectrum_rsp = \ self.resample_image(spectrum.reshape(1, 1, spectrum.size), tiledims=(1, 1), nodataVal=nodataVal, alg_nodata=alg_nodata, CPUs=1)\ .ravel() if v: from matplotlib import pyplot as plt plt.figure() for band in self.rsr_tgt.bands: plt.plot(self.wvl_1nm, self.rsr_1nm[band] / max(self.rsr_1nm[band])) plt.plot(self.wvl_src_nm, spectrum / scale_factor, '.') plt.plot(self.rsr_tgt.wvl, spectrum_rsp / scale_factor, 'x', color='r') plt.show() return spectrum_rsp
[docs] def resample_spectra(self, spectra, chunksize=200, nodataVal=None, alg_nodata='radical', CPUs=None): # type: (Union[GeoArray, np.ndarray], int, Union[int, float], str, Union[None, int]) -> np.ndarray """Resample the given spectral signatures according to the spectral response functions of the target instrument. :param spectra: spectral signatures, provided as 2D array (rows: spectral samples, columns: spectral information / bands) :param chunksize: defines how many spectral signatures are resampled per CPU :param nodataVal: no data value to be respected during resampling :param alg_nodata: algorithm how to deal with pixels where the spectral bands of the source image contain nodata within the spectral response of a target band 'radical': set output band to nodata 'conservative': use existing spectral information and ignore nodata (might alter the outpur spectral information, e.g., at spectral absorption bands) :param CPUs: CPUs to use for processing """ # input validation if not spectra.ndim == 2: ValueError("The given spectra array must be 2-dimensional. Received a %s-dimensional array." % spectra.ndim) assert spectra.shape[1] == self.wvl_src_nm.size # convert spectra to one multispectral image column im_col = spectra.reshape(spectra.shape[0], 1, spectra.shape[1]) im_col_rsp = self.resample_image(im_col, tiledims=(1, chunksize), nodataVal=nodataVal, alg_nodata=alg_nodata, CPUs=CPUs) spectra_rsp = im_col_rsp.reshape(im_col_rsp.shape[0], im_col_rsp.shape[2]) return spectra_rsp
[docs] def resample_image(self, image_cube, tiledims=(20, 20), nodataVal=None, alg_nodata='radical', CPUs=None): # type: (Union[GeoArray, np.ndarray], tuple, Union[int, float], str, Union[None, int]) -> np.ndarray """Resample the given spectral image cube according to the spectral response functions of the target instrument. :param image_cube: image (3D array) containing the spectral information in the third dimension :param tiledims: dimension of tiles to be used during computation (rows, columns) :param nodataVal: nodata value of the input image :param alg_nodata: algorithm how to deal with pixels where the spectral bands of the source image contain nodata within the spectral response of a target band 'radical': set output band to nodata 'conservative': use existing spectral information and ignore nodata (might alter the output spectral information, e.g., at spectral absorption bands) :param CPUs: CPUs to use for processing :return: resampled spectral image cube """ # input validation if not image_cube.ndim == 3: raise ValueError("The given image cube must be 3-dimensional. Received a %s-dimensional array." % image_cube.ndim) assert image_cube.shape[2] == self.wvl_src_nm.size image_cube = GeoArray(image_cube) (R, C), B = image_cube.shape[:2], len(self.rsr_tgt.bands) image_rsp = np.zeros((R, C, B), dtype=image_cube.dtype) initargs = image_cube, self.rsr_tgt, self.rsr_1nm, self.wvl_src_nm, self.wvl_1nm if CPUs is None or CPUs > 1: with Pool(CPUs, initializer=_initializer_mp, initargs=initargs) as pool: tiles_rsp = pool.starmap(_resample_tile_mp, [(bounds, nodataVal, alg_nodata) for bounds in get_array_tilebounds(image_cube.shape, tiledims)]) pool.close() # needed to make coverage work in multiprocessing pool.join() else: _initializer_mp(*initargs) tiles_rsp = [_resample_tile_mp(bounds, nodataVal, alg_nodata) for bounds in get_array_tilebounds(image_cube.shape, tiledims)] for ((rS, rE), (cS, cE)), tile_rsp in tiles_rsp: image_rsp[rS: rE + 1, cS: cE + 1, :] = tile_rsp return image_rsp
_globs_mp = dict() def _initializer_mp(image_cube, rsr_tgt, rsr_1nm, wvl_src_nm, wvl_1nm): # type: (np.ndarray, RSR, RSR, np.ndarray, np.ndarray) -> None global _globs_mp _globs_mp.update(dict( image_cube=image_cube, rsr_tgt=rsr_tgt, rsr_1nm=rsr_1nm, wvl_src_nm=wvl_src_nm, wvl_1nm=wvl_1nm, )) def _resample_tile_mp(tilebounds, nodataVal=None, alg_nodata='radical'): # TODO speed up by using numba as described here https://krstn.eu/fast-linear-1D-interpolation-with-numba/ from scipy.interpolate import interp1d (rS, rE), (cS, cE) = tilebounds # get global share variables tiledata = _globs_mp['image_cube'][rS: rE + 1, cS: cE + 1, :] # type: Union[GeoArray, np.ndarray] rsr_tgt = _globs_mp['rsr_tgt'] rsr_1nm = _globs_mp['rsr_1nm'] wvl_src_nm = _globs_mp['wvl_src_nm'] wvl_1nm = _globs_mp['wvl_1nm'] if alg_nodata not in ['radical', 'conservative']: raise ValueError(alg_nodata) tile_rsp = np.zeros((*tiledata.shape[:2], len(rsr_tgt.bands)), dtype=tiledata.dtype) if nodataVal is not None: if np.isfinite(nodataVal): _mask_bool3d = tiledata == nodataVal mask_anynodata = np.any(_mask_bool3d, axis=2) mask_allnodata = np.all(_mask_bool3d, axis=2) else: raise NotImplementedError(nodataVal) tiledata = tiledata.astype(float) tiledata[_mask_bool3d] = np.nan # fill pixels with all bands nodata tile_rsp[mask_allnodata] = nodataVal # upsample spectra containing data to 1nm and get nan-mask tilespectra_1nm = interp1d(wvl_src_nm, tiledata[~mask_allnodata], axis=1, bounds_error=False, fill_value=0, kind='linear')(wvl_1nm) isnan = np.isnan(tilespectra_1nm) nan_in_spec = np.any(isnan, axis=1) # compute resampled values for pixels without nodata tilespectra_1nm_nonan = tilespectra_1nm[~nan_in_spec, :] for band_idx, band in enumerate(rsr_tgt.bands): # compute the resampled image cube (np.average computes the weighted mean value) if rsr_1nm[band].sum() != 0: res = np.average(tilespectra_1nm_nonan, weights=rsr_1nm[band], axis=1) # NOTE: rounding here is important to prevent -9999. converted to -9998 if not np.issubdtype(tile_rsp.dtype, np.floating): res = np.around(res) else: res = nodataVal tile_rsp[~mask_anynodata, band_idx] = res # compute resampled values for pixels with nodata tilespectra_1nm_withnan = tilespectra_1nm[nan_in_spec, :] tilespectra_1nm_withnan_ma = np.ma.masked_invalid(tilespectra_1nm_withnan) mask_withnan = mask_anynodata & ~mask_allnodata isnan_sub = isnan[nan_in_spec, :] for band_idx, band in enumerate(rsr_tgt.bands): res_ma = np.ma.average(tilespectra_1nm_withnan_ma, axis=1, weights=rsr_1nm[band]) # in case all 1nm bands for the current band are NaN, the resulting average will also be NaN => fill res = np.ma.filled(res_ma, nodataVal) if alg_nodata == 'radical': # set those output values to nodata where the input bands within the target RSR contain any nodata badspec = np.any(isnan_sub & (rsr_1nm[band] > 0), axis=1) res[badspec] = nodataVal if not np.issubdtype(tile_rsp.dtype, np.floating): res = np.around(res) tile_rsp[mask_withnan, band_idx] = res else: # spectral resampling of input image to 1 nm resolution tile_1nm = interp1d(wvl_src_nm, tiledata, axis=2, bounds_error=False, fill_value=0, kind='linear')(wvl_1nm) for band_idx, band in enumerate(rsr_tgt.bands): # compute the resampled image cube (np.average computes the weighted mean value) res = np.average(tile_1nm, weights=rsr_1nm[band], axis=2) # NOTE: rounding here is important to prevent -9999. converted to -9998 tile_rsp[:, :, band_idx] = res if np.issubdtype(tile_rsp.dtype, np.floating) else np.around(res) return tilebounds, tile_rsp