Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
4#
5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6#
7# This software was developed within the context of the GeoMultiSens project funded
8# by the German Federal Ministry of Education and Research
9# (project grant code: 01 IS 14 010 A-C).
10#
11# This program is free software: you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation, either version 3 of the License, or (at your option) any later version.
14# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18#
19# This program is distributed in the hope that it will be useful, but WITHOUT
20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
22# details.
23#
24# You should have received a copy of the GNU Lesser General Public License along
25# with this program. If not, see <http://www.gnu.org/licenses/>.
28from multiprocessing import Pool, current_process
29from itertools import chain
31from ..options.config import GMS_config as CFG
33__author__ = 'Daniel Scheffler'
36def MAP(func, args, CPUs=None, flatten_output=False):
37 # type: (any, list, int, bool) -> list
38 """Parallelize the execution of the given function.
39 NOTE: if Job.CPUs in config is 1, execution is not parallelized.
41 :param func: function to parallelize
42 :param args: function arguments
43 :param CPUs: number of CPUs to use
44 :param flatten_output: whether to flatten output list,
45 e.g. [ [ Tile1Scene1, Tile2Scene1], Tile1Scene2, Tile2Scene2] to
46 [ Tile1Scene1, Tile2Scene1, Tile1Scene2, Tile2Scene2 ]
47 """
49 CPUs = CPUs or CFG.CPUs
50 CPUs = CPUs if CPUs <= CFG.CPUs else CFG.CPUs # treat CFG.CPUs as maximum number of CPUs
52 if CPUs and CPUs > 1 and len(args) > 1:
53 with Pool(CPUs) as pool:
54 results = pool.map(func, args) # always returns a list
55 else:
56 results = [func(argset) for argset in args] # generator does not always work properly here
58 if flatten_output:
59 try:
60 ch = chain.from_iterable(results)
61 return list(ch)
62 except TypeError: # if elements of chain are not iterable
63 ch = chain.from_iterable([results])
64 return list(ch)
65 else:
66 return results
69def imap_unordered(func, args, CPUs=None, flatten_output=False):
70 # type: (any, list, int, bool) -> list
71 """Parallelize the execution of the given function.
72 NOTE: if Job.CPUs in config is 1, execution is not parallelized.
74 :param func: function to parallelize
75 :param args: function arguments
76 :param CPUs: number of CPUs to use
77 :param flatten_output: whether to flatten output list,
78 e.g. [ [ Tile1Scene1, Tile2Scene1], Tile1Scene2, Tile2Scene2] to
79 [ Tile1Scene1, Tile2Scene1, Tile1Scene2, Tile2Scene2 ]
80 """
82 CPUs = CPUs or CFG.CPUs
83 CPUs = CPUs if CPUs <= CFG.CPUs else CFG.CPUs # treat CFG.CPUs as maximum number of CPUs
85 if CPUs and CPUs > 1 and len(args) > 1:
86 with Pool(CPUs) as pool:
87 results = list(pool.imap_unordered(func, args)) # returns an iterator
88 else:
89 results = [func(argset) for argset in args] # generator does not always work properly here
91 if flatten_output:
92 try:
93 ch = chain.from_iterable(results)
94 return list(ch)
95 except TypeError: # if elements of chain are not iterable
96 ch = chain.from_iterable([results])
97 return list(ch)
98 else:
99 return list(results)
102def is_mainprocess():
103 # type: () -> bool
104 """Return True if the current process is the main process and False if it is a multiprocessing child process."""
105 return current_process().name == 'MainProcess'