Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2 

3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data 

4# 

5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de) 

6# 

7# This software was developed within the context of the GeoMultiSens project funded 

8# by the German Federal Ministry of Education and Research 

9# (project grant code: 01 IS 14 010 A-C). 

10# 

11# This program is free software: you can redistribute it and/or modify it under 

12# the terms of the GNU General Public License as published by the Free Software 

13# Foundation, either version 3 of the License, or (at your option) any later version. 

14# Please note the following exception: `gms_preprocessing` depends on tqdm, which 

15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

18# 

19# This program is distributed in the hope that it will be useful, but WITHOUT 

20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

22# details. 

23# 

24# You should have received a copy of the GNU Lesser General Public License along 

25# with this program. If not, see <http://www.gnu.org/licenses/>. 

26 

27__author__ = 'Daniel Scheffler' 

28 

29import time 

30from redis import Redis 

31from redis_semaphore import Semaphore 

32from redis.exceptions import ConnectionError as RedisConnectionError 

33from retools.lock import Lock, LockTimeout 

34import functools 

35from psutil import virtual_memory 

36from logging import getLogger 

37 

38from ..options.config import GMS_config as CFG 

39 

40try: 

41 redis_conn = Redis(host='localhost', db=0) 

42 redis_conn.keys() # may raise ConnectionError, e.g., if redis server is not installed or not running 

43except RedisConnectionError: 

44 redis_conn = None 

45 

46 

47"""NOTE: 

48 

49To get a list of all currently set redis keys, run: 

50 

51 from redis import Redis 

52 conn = Redis('localhost', db=0) 

53 list(sorted(conn.keys())) 

54 

55Then, to delete all currently set redis keys, run: 

56 

57 for i in list(sorted(conn.keys())): 

58 k = i.decode('utf-8') 

59 conn.delete(k) 

60""" 

61 

62 

63class MultiSlotLock(Semaphore): 

64 def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs): 

65 self.disabled = redis_conn is None or allowed_slots in [None, False] 

66 self.namespace = name 

67 self.allowed_slots = allowed_slots 

68 self.logger = logger or getLogger("RedisLock: '%s'" % name) 

69 

70 if not self.disabled: 

71 super(MultiSlotLock, self).__init__(client=redis_conn, count=allowed_slots, namespace=name, **kwargs) 

72 

73 def acquire(self, timeout=0, target=None): 

74 if not self.disabled: 

75 if self.available_count == 0: 

76 self.logger.info("Waiting for free lock '%s'." % self.namespace) 

77 

78 token = super(MultiSlotLock, self).acquire(timeout=timeout, target=target) 

79 

80 self.logger.info("Acquired lock '%s'" % self.namespace + 

81 ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1))) 

82 

83 return token 

84 

85 def release(self): 

86 if not self.disabled: 

87 token = super(MultiSlotLock, self).release() 

88 if token: 

89 self.logger.info("Released lock '%s'" % self.namespace + 

90 ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1))) 

91 

92 def delete(self): 

93 if not self.disabled: 

94 self.client.delete(self.check_exists_key) 

95 self.client.delete(self.available_key) 

96 self.client.delete(self.grabbed_key) 

97 

98 def __exit__(self, exc_type, exc_val, exc_tb): 

99 exitcode = super(MultiSlotLock, self).__exit__(exc_type, exc_val, exc_tb) 

100 return exitcode 

101 

102 

103class SharedResourceLock(MultiSlotLock): 

104 def acquire(self, timeout=0, target=None): 

105 if not self.disabled: 

106 token = super(SharedResourceLock, self).acquire(timeout=timeout, target=target) 

107 self.client.hset(self.grabbed_key_jobID, token, self.current_time) 

108 

109 def release_all_jobID_tokens(self): 

110 if not self.disabled: 

111 for token in self.client.hkeys(self.grabbed_key_jobID): 

112 self.signal(token) 

113 

114 self.client.delete(self.grabbed_key_jobID) 

115 

116 @property 

117 def grabbed_key_jobID(self): 

118 return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID) 

119 

120 def signal(self, token): 

121 if token is None: 

122 return None 

123 with self.client.pipeline() as pipe: 

124 pipe.multi() 

125 pipe.hdel(self.grabbed_key, token) 

126 pipe.hdel(self.grabbed_key_jobID, token) # only difference to Semaphore.signal() 

127 pipe.lpush(self.available_key, token) 

128 pipe.execute() 

129 return token 

130 

131 def delete(self): 

132 if not self.disabled: 

133 super(SharedResourceLock, self).delete() 

134 self.client.delete(self.grabbed_key_jobID) 

135 

136 def __exit__(self, exc_type, exc_val, exc_tb): 

137 return super(SharedResourceLock, self).__exit__(exc_type, exc_val, exc_tb) 

138 

139 

140class IOLock(SharedResourceLock): 

141 def __init__(self, allowed_slots=1, logger=None, **kwargs): 

142 self.disabled = CFG.disable_IO_locks 

143 

144 if not self.disabled: 

145 super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs) 

146 

147 def __exit__(self, exc_type, exc_val, exc_tb): 

148 return super(IOLock, self).__exit__(exc_type, exc_val, exc_tb) 

149 

150 

151class ProcessLock(SharedResourceLock): 

152 def __init__(self, allowed_slots=1, logger=None, **kwargs): 

153 self.disabled = CFG.disable_CPU_locks 

154 

155 if not self.disabled: 

156 super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs) 

157 

158 def __exit__(self, exc_type, exc_val, exc_tb): 

159 return super(ProcessLock, self).__exit__(exc_type, exc_val, exc_tb) 

160 

161 

162class DatabaseLock(SharedResourceLock): 

163 def __init__(self, allowed_slots=1, logger=None, **kwargs): 

164 self.disabled = CFG.disable_DB_locks 

165 

166 if not self.disabled: 

167 super(DatabaseLock, self)\ 

168 .__init__(name='DatabaseLock', allowed_slots=allowed_slots, logger=logger, **kwargs) 

169 

170 def __exit__(self, exc_type, exc_val, exc_tb): 

171 return super(DatabaseLock, self).__exit__(exc_type, exc_val, exc_tb) 

172 

173 

174class MemoryReserver(object): 

175 def __init__(self, mem2lock_gb, max_usage=90, logger=None): 

176 """ 

177 

178 :param mem2lock_gb: Amount of memory to be reserved during the lock is acquired (gigabytes). 

179 """ 

180 self.disabled = redis_conn is None or CFG.disable_memory_locks or mem2lock_gb in [None, False] 

181 self.mem2lock_gb = mem2lock_gb 

182 self.max_usage = max_usage 

183 self.logger = logger or getLogger("RedisLock: 'MemoryReserver'") 

184 self.namespace = 'MemoryReserver' 

185 self.client = redis_conn 

186 self.mem_limit = int(virtual_memory().total * max_usage / 100 / 1024 ** 3) 

187 

188 self._waiting = False 

189 

190 @property 

191 def mem_reserved_gb(self): 

192 return int(self.client.get(self.reserved_key) or 0) 

193 

194 @property 

195 def usable_memory_gb(self): 

196 return int((virtual_memory().total * self.max_usage / 100 - virtual_memory().used) / 1024 ** 3) \ 

197 - int(self.mem_reserved_gb) 

198 

199 @property 

200 def acquisition_key(self): 

201 return "%s:ACQUISITION_LOCK" % self.namespace 

202 

203 @property 

204 def reserved_key(self): 

205 return "%s:MEM_RESERVED" % self.namespace 

206 

207 @property 

208 def reserved_key_jobID(self): 

209 return "%s:MEM_RESERVED_BY_GMSJOB_%s" % (self.namespace, CFG.ID) 

210 

211 @property 

212 def waiting_key(self): 

213 return "%s:NUMBER_WAITING" % self.namespace 

214 

215 @property 

216 def waiting_key_jobID(self): 

217 return "%s:NUMBER_WAITING_GMSJOB_%s" % (self.namespace, CFG.ID) 

218 

219 @property 

220 def waiting(self): 

221 return self._waiting 

222 

223 @waiting.setter 

224 def waiting(self, val): 

225 """Set self.waiting. 

226 

227 NOTE: This setter does not use a lock. Redis access must be locked by calling function. 

228 """ 

229 if val is not self._waiting: 

230 if val: 

231 self.client.incr(self.waiting_key, 1) 

232 self.client.incr(self.waiting_key_jobID, 1) 

233 else: 

234 self.client.decr(self.waiting_key, 1) 

235 self.client.decr(self.waiting_key_jobID, 1) 

236 

237 self._waiting = val 

238 

239 def acquire(self, timeout=20): 

240 if not self.disabled: 

241 try: 

242 with Lock(self.acquisition_key, expires=20, timeout=timeout, redis=self.client): 

243 if self.usable_memory_gb >= self.mem2lock_gb: 

244 t_start = time.time() 

245 self.waiting = False 

246 

247 with self.client.pipeline() as pipe: 

248 pipe.multi() 

249 pipe.incr(self.reserved_key, self.mem2lock_gb) 

250 pipe.incr(self.reserved_key_jobID, self.mem2lock_gb) 

251 pipe.execute() 

252 

253 self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb) 

254 

255 # warn in case the lock has expired before incrementing reserved_key and reserved_key_jobID 

256 if time.time() > t_start + timeout: 

257 self.logger.warning('Reservation of memory took more time than expected. ' 

258 'Possibly more memory than available has been reserved.') 

259 

260 else: 

261 if not self.waiting: 

262 self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are ' 

263 'usable.' % (self.usable_memory_gb, self.mem2lock_gb)) 

264 self.waiting = True 

265 

266 except LockTimeout: 

267 self.acquire(timeout=timeout) 

268 

269 if self.waiting: 

270 while self.usable_memory_gb < self.mem2lock_gb: 

271 time.sleep(1) 

272 

273 self.acquire(timeout=timeout) 

274 

275 def release(self): 

276 if not self.disabled: 

277 with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client): 

278 with redis_conn.pipeline() as pipe: 

279 pipe.multi() 

280 pipe.decr(self.reserved_key, self.mem2lock_gb) 

281 pipe.decr(self.reserved_key_jobID, self.mem2lock_gb) 

282 pipe.execute() 

283 

284 self.logger.info('Released %s GB of reserved memory.' % self.mem2lock_gb) 

285 

286 def delete(self): 

287 if not self.disabled: 

288 with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client): 

289 # handle reserved_key and reserved_key_jobID 

290 mem_reserved_currJob = int(self.client.get(self.reserved_key_jobID) or 0) 

291 with redis_conn.pipeline() as pipe: 

292 pipe.multi() 

293 pipe.decr(self.reserved_key_jobID, mem_reserved_currJob) 

294 pipe.decr(self.reserved_key, mem_reserved_currJob) 

295 pipe.delete(self.reserved_key_jobID) 

296 pipe.execute() 

297 

298 if int(self.client.get(self.reserved_key) or 0) == 0: 

299 self.client.delete(self.reserved_key) 

300 

301 # handle waiting_key and waiting_key_jobID 

302 n_waiting_currJob = int(self.client.get(self.waiting_key_jobID) or 0) 

303 with redis_conn.pipeline() as pipe: 

304 pipe.multi() 

305 pipe.decr(self.waiting_key_jobID, n_waiting_currJob) 

306 pipe.decr(self.waiting_key, n_waiting_currJob) 

307 pipe.delete(self.waiting_key_jobID) 

308 pipe.execute() 

309 

310 if int(self.client.get(self.waiting_key) or 0) == 0: 

311 self.client.delete(self.waiting_key) 

312 

313 def __enter__(self): 

314 self.acquire() 

315 return self 

316 

317 def __exit__(self, exc_type, exc_val, exc_tb): 

318 self.release() 

319 

320 return True if exc_type is None else False 

321 

322 

323def acquire_process_lock(**processlock_kwargs): 

324 """Decorator function for ProcessLock. 

325 

326 :param processlock_kwargs: Keyword arguments to be passed to ProcessLock class. 

327 """ 

328 

329 def decorator(func): 

330 @functools.wraps(func) # needed to avoid pickling errors 

331 def wrapped_func(*args, **kwargs): 

332 with ProcessLock(**processlock_kwargs): 

333 result = func(*args, **kwargs) 

334 

335 return result 

336 

337 return wrapped_func 

338 

339 return decorator 

340 

341 

342def reserve_mem(**memlock_kwargs): 

343 """Decorator function for MemoryReserver. 

344 

345 :param memlock_kwargs: Keyword arguments to be passed to MemoryReserver class. 

346 """ 

347 

348 def decorator(func): 

349 @functools.wraps(func) # needed to avoid pickling errors 

350 def wrapped_func(*args, **kwargs): 

351 with MemoryReserver(**memlock_kwargs): 

352 result = func(*args, **kwargs) 

353 

354 return result 

355 

356 return wrapped_func 

357 

358 return decorator 

359 

360 

361def release_unclosed_locks(): 

362 if redis_conn: 

363 for L in [IOLock, ProcessLock]: 

364 lock = L(allowed_slots=1) 

365 lock.release_all_jobID_tokens() 

366 

367 # delete the complete redis namespace if no lock slot is acquired anymore 

368 if lock.client.hlen(lock.grabbed_key) == 0: 

369 lock.delete() 

370 

371 MemoryReserver(1).delete()