Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2 

3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data 

4# 

5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de) 

6# 

7# This software was developed within the context of the GeoMultiSens project funded 

8# by the German Federal Ministry of Education and Research 

9# (project grant code: 01 IS 14 010 A-C). 

10# 

11# This program is free software: you can redistribute it and/or modify it under 

12# the terms of the GNU General Public License as published by the Free Software 

13# Foundation, either version 3 of the License, or (at your option) any later version. 

14# Please note the following exception: `gms_preprocessing` depends on tqdm, which 

15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

18# 

19# This program is distributed in the hope that it will be useful, but WITHOUT 

20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

22# details. 

23# 

24# You should have received a copy of the GNU Lesser General Public License along 

25# with this program. If not, see <http://www.gnu.org/licenses/>. 

26 

27 

28from multiprocessing import Pool, current_process 

29from itertools import chain 

30 

31from ..options.config import GMS_config as CFG 

32 

33__author__ = 'Daniel Scheffler' 

34 

35 

36def MAP(func, args, CPUs=None, flatten_output=False): 

37 # type: (any, list, int, bool) -> list 

38 """Parallelize the execution of the given function. 

39 NOTE: if Job.CPUs in config is 1, execution is not parallelized. 

40 

41 :param func: function to parallelize 

42 :param args: function arguments 

43 :param CPUs: number of CPUs to use 

44 :param flatten_output: whether to flatten output list, 

45 e.g. [ [ Tile1Scene1, Tile2Scene1], Tile1Scene2, Tile2Scene2] to 

46 [ Tile1Scene1, Tile2Scene1, Tile1Scene2, Tile2Scene2 ] 

47 """ 

48 

49 CPUs = CPUs or CFG.CPUs 

50 CPUs = CPUs if CPUs <= CFG.CPUs else CFG.CPUs # treat CFG.CPUs as maximum number of CPUs 

51 

52 if CPUs and CPUs > 1 and len(args) > 1: 

53 with Pool(CPUs) as pool: 

54 results = pool.map(func, args) # always returns a list 

55 else: 

56 results = [func(argset) for argset in args] # generator does not always work properly here 

57 

58 if flatten_output: 

59 try: 

60 ch = chain.from_iterable(results) 

61 return list(ch) 

62 except TypeError: # if elements of chain are not iterable 

63 ch = chain.from_iterable([results]) 

64 return list(ch) 

65 else: 

66 return results 

67 

68 

69def imap_unordered(func, args, CPUs=None, flatten_output=False): 

70 # type: (any, list, int, bool) -> list 

71 """Parallelize the execution of the given function. 

72 NOTE: if Job.CPUs in config is 1, execution is not parallelized. 

73 

74 :param func: function to parallelize 

75 :param args: function arguments 

76 :param CPUs: number of CPUs to use 

77 :param flatten_output: whether to flatten output list, 

78 e.g. [ [ Tile1Scene1, Tile2Scene1], Tile1Scene2, Tile2Scene2] to 

79 [ Tile1Scene1, Tile2Scene1, Tile1Scene2, Tile2Scene2 ] 

80 """ 

81 

82 CPUs = CPUs or CFG.CPUs 

83 CPUs = CPUs if CPUs <= CFG.CPUs else CFG.CPUs # treat CFG.CPUs as maximum number of CPUs 

84 

85 if CPUs and CPUs > 1 and len(args) > 1: 

86 with Pool(CPUs) as pool: 

87 results = list(pool.imap_unordered(func, args)) # returns an iterator 

88 else: 

89 results = [func(argset) for argset in args] # generator does not always work properly here 

90 

91 if flatten_output: 

92 try: 

93 ch = chain.from_iterable(results) 

94 return list(ch) 

95 except TypeError: # if elements of chain are not iterable 

96 ch = chain.from_iterable([results]) 

97 return list(ch) 

98 else: 

99 return list(results) 

100 

101 

102def is_mainprocess(): 

103 # type: () -> bool 

104 """Return True if the current process is the main process and False if it is a multiprocessing child process.""" 

105 return current_process().name == 'MainProcess'