Source code for gms_preprocessing.processing.multiproc

# -*- coding: utf-8 -*-

# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.


from multiprocessing import Pool, current_process
from itertools import chain

from ..options.config import GMS_config as CFG

__author__ = 'Daniel Scheffler'


[docs]def MAP(func, args, CPUs=None, flatten_output=False): # type: (any, list, int, bool) -> list """Parallelize the execution of the given function. NOTE: if Job.CPUs in config is 1, execution is not parallelized. :param func: function to parallelize :param args: function arguments :param CPUs: number of CPUs to use :param flatten_output: whether to flatten output list, e.g. [ [ Tile1Scene1, Tile2Scene1], Tile1Scene2, Tile2Scene2] to [ Tile1Scene1, Tile2Scene1, Tile1Scene2, Tile2Scene2 ] """ CPUs = CPUs or CFG.CPUs CPUs = CPUs if CPUs <= CFG.CPUs else CFG.CPUs # treat CFG.CPUs as maximum number of CPUs if CPUs and CPUs > 1 and len(args) > 1: with Pool(CPUs) as pool: results = pool.map(func, args) # always returns a list else: results = [func(argset) for argset in args] # generator does not always work properly here if flatten_output: try: ch = chain.from_iterable(results) return list(ch) except TypeError: # if elements of chain are not iterable ch = chain.from_iterable([results]) return list(ch) else: return results
[docs]def imap_unordered(func, args, CPUs=None, flatten_output=False): # type: (any, list, int, bool) -> list """Parallelize the execution of the given function. NOTE: if Job.CPUs in config is 1, execution is not parallelized. :param func: function to parallelize :param args: function arguments :param CPUs: number of CPUs to use :param flatten_output: whether to flatten output list, e.g. [ [ Tile1Scene1, Tile2Scene1], Tile1Scene2, Tile2Scene2] to [ Tile1Scene1, Tile2Scene1, Tile1Scene2, Tile2Scene2 ] """ CPUs = CPUs or CFG.CPUs CPUs = CPUs if CPUs <= CFG.CPUs else CFG.CPUs # treat CFG.CPUs as maximum number of CPUs if CPUs and CPUs > 1 and len(args) > 1: with Pool(CPUs) as pool: results = list(pool.imap_unordered(func, args)) # returns an iterator else: results = [func(argset) for argset in args] # generator does not always work properly here if flatten_output: try: ch = chain.from_iterable(results) return list(ch) except TypeError: # if elements of chain are not iterable ch = chain.from_iterable([results]) return list(ch) else: return list(results)
[docs]def is_mainprocess(): # type: () -> bool """Return True if the current process is the main process and False if it is a multiprocessing child process.""" return current_process().name == 'MainProcess'