Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
4#
5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6#
7# This software was developed within the context of the GeoMultiSens project funded
8# by the German Federal Ministry of Education and Research
9# (project grant code: 01 IS 14 010 A-C).
10#
11# This program is free software: you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation, either version 3 of the License, or (at your option) any later version.
14# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18#
19# This program is distributed in the hope that it will be useful, but WITHOUT
20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
22# details.
23#
24# You should have received a copy of the GNU Lesser General Public License along
25# with this program. If not, see <http://www.gnu.org/licenses/>.
27__author__ = 'Daniel Scheffler'
29import time
30from redis import Redis
31from redis_semaphore import Semaphore
32from redis.exceptions import ConnectionError as RedisConnectionError
33from retools.lock import Lock, LockTimeout
34import functools
35from psutil import virtual_memory
36from logging import getLogger
38from ..options.config import GMS_config as CFG
40try:
41 redis_conn = Redis(host='localhost', db=0)
42 redis_conn.keys() # may raise ConnectionError, e.g., if redis server is not installed or not running
43except RedisConnectionError:
44 redis_conn = None
47"""NOTE:
49To get a list of all currently set redis keys, run:
51 from redis import Redis
52 conn = Redis('localhost', db=0)
53 list(sorted(conn.keys()))
55Then, to delete all currently set redis keys, run:
57 for i in list(sorted(conn.keys())):
58 k = i.decode('utf-8')
59 conn.delete(k)
60"""
63class MultiSlotLock(Semaphore):
64 def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs):
65 self.disabled = redis_conn is None or allowed_slots in [None, False]
66 self.namespace = name
67 self.allowed_slots = allowed_slots
68 self.logger = logger or getLogger("RedisLock: '%s'" % name)
70 if not self.disabled:
71 super(MultiSlotLock, self).__init__(client=redis_conn, count=allowed_slots, namespace=name, **kwargs)
73 def acquire(self, timeout=0, target=None):
74 if not self.disabled:
75 if self.available_count == 0:
76 self.logger.info("Waiting for free lock '%s'." % self.namespace)
78 token = super(MultiSlotLock, self).acquire(timeout=timeout, target=target)
80 self.logger.info("Acquired lock '%s'" % self.namespace +
81 ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))
83 return token
85 def release(self):
86 if not self.disabled:
87 token = super(MultiSlotLock, self).release()
88 if token:
89 self.logger.info("Released lock '%s'" % self.namespace +
90 ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))
92 def delete(self):
93 if not self.disabled:
94 self.client.delete(self.check_exists_key)
95 self.client.delete(self.available_key)
96 self.client.delete(self.grabbed_key)
98 def __exit__(self, exc_type, exc_val, exc_tb):
99 exitcode = super(MultiSlotLock, self).__exit__(exc_type, exc_val, exc_tb)
100 return exitcode
103class SharedResourceLock(MultiSlotLock):
104 def acquire(self, timeout=0, target=None):
105 if not self.disabled:
106 token = super(SharedResourceLock, self).acquire(timeout=timeout, target=target)
107 self.client.hset(self.grabbed_key_jobID, token, self.current_time)
109 def release_all_jobID_tokens(self):
110 if not self.disabled:
111 for token in self.client.hkeys(self.grabbed_key_jobID):
112 self.signal(token)
114 self.client.delete(self.grabbed_key_jobID)
116 @property
117 def grabbed_key_jobID(self):
118 return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)
120 def signal(self, token):
121 if token is None:
122 return None
123 with self.client.pipeline() as pipe:
124 pipe.multi()
125 pipe.hdel(self.grabbed_key, token)
126 pipe.hdel(self.grabbed_key_jobID, token) # only difference to Semaphore.signal()
127 pipe.lpush(self.available_key, token)
128 pipe.execute()
129 return token
131 def delete(self):
132 if not self.disabled:
133 super(SharedResourceLock, self).delete()
134 self.client.delete(self.grabbed_key_jobID)
136 def __exit__(self, exc_type, exc_val, exc_tb):
137 return super(SharedResourceLock, self).__exit__(exc_type, exc_val, exc_tb)
140class IOLock(SharedResourceLock):
141 def __init__(self, allowed_slots=1, logger=None, **kwargs):
142 self.disabled = CFG.disable_IO_locks
144 if not self.disabled:
145 super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
147 def __exit__(self, exc_type, exc_val, exc_tb):
148 return super(IOLock, self).__exit__(exc_type, exc_val, exc_tb)
151class ProcessLock(SharedResourceLock):
152 def __init__(self, allowed_slots=1, logger=None, **kwargs):
153 self.disabled = CFG.disable_CPU_locks
155 if not self.disabled:
156 super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
158 def __exit__(self, exc_type, exc_val, exc_tb):
159 return super(ProcessLock, self).__exit__(exc_type, exc_val, exc_tb)
162class DatabaseLock(SharedResourceLock):
163 def __init__(self, allowed_slots=1, logger=None, **kwargs):
164 self.disabled = CFG.disable_DB_locks
166 if not self.disabled:
167 super(DatabaseLock, self)\
168 .__init__(name='DatabaseLock', allowed_slots=allowed_slots, logger=logger, **kwargs)
170 def __exit__(self, exc_type, exc_val, exc_tb):
171 return super(DatabaseLock, self).__exit__(exc_type, exc_val, exc_tb)
174class MemoryReserver(object):
175 def __init__(self, mem2lock_gb, max_usage=90, logger=None):
176 """
178 :param mem2lock_gb: Amount of memory to be reserved during the lock is acquired (gigabytes).
179 """
180 self.disabled = redis_conn is None or CFG.disable_memory_locks or mem2lock_gb in [None, False]
181 self.mem2lock_gb = mem2lock_gb
182 self.max_usage = max_usage
183 self.logger = logger or getLogger("RedisLock: 'MemoryReserver'")
184 self.namespace = 'MemoryReserver'
185 self.client = redis_conn
186 self.mem_limit = int(virtual_memory().total * max_usage / 100 / 1024 ** 3)
188 self._waiting = False
190 @property
191 def mem_reserved_gb(self):
192 return int(self.client.get(self.reserved_key) or 0)
194 @property
195 def usable_memory_gb(self):
196 return int((virtual_memory().total * self.max_usage / 100 - virtual_memory().used) / 1024 ** 3) \
197 - int(self.mem_reserved_gb)
199 @property
200 def acquisition_key(self):
201 return "%s:ACQUISITION_LOCK" % self.namespace
203 @property
204 def reserved_key(self):
205 return "%s:MEM_RESERVED" % self.namespace
207 @property
208 def reserved_key_jobID(self):
209 return "%s:MEM_RESERVED_BY_GMSJOB_%s" % (self.namespace, CFG.ID)
211 @property
212 def waiting_key(self):
213 return "%s:NUMBER_WAITING" % self.namespace
215 @property
216 def waiting_key_jobID(self):
217 return "%s:NUMBER_WAITING_GMSJOB_%s" % (self.namespace, CFG.ID)
219 @property
220 def waiting(self):
221 return self._waiting
223 @waiting.setter
224 def waiting(self, val):
225 """Set self.waiting.
227 NOTE: This setter does not use a lock. Redis access must be locked by calling function.
228 """
229 if val is not self._waiting:
230 if val:
231 self.client.incr(self.waiting_key, 1)
232 self.client.incr(self.waiting_key_jobID, 1)
233 else:
234 self.client.decr(self.waiting_key, 1)
235 self.client.decr(self.waiting_key_jobID, 1)
237 self._waiting = val
239 def acquire(self, timeout=20):
240 if not self.disabled:
241 try:
242 with Lock(self.acquisition_key, expires=20, timeout=timeout, redis=self.client):
243 if self.usable_memory_gb >= self.mem2lock_gb:
244 t_start = time.time()
245 self.waiting = False
247 with self.client.pipeline() as pipe:
248 pipe.multi()
249 pipe.incr(self.reserved_key, self.mem2lock_gb)
250 pipe.incr(self.reserved_key_jobID, self.mem2lock_gb)
251 pipe.execute()
253 self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb)
255 # warn in case the lock has expired before incrementing reserved_key and reserved_key_jobID
256 if time.time() > t_start + timeout:
257 self.logger.warning('Reservation of memory took more time than expected. '
258 'Possibly more memory than available has been reserved.')
260 else:
261 if not self.waiting:
262 self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are '
263 'usable.' % (self.usable_memory_gb, self.mem2lock_gb))
264 self.waiting = True
266 except LockTimeout:
267 self.acquire(timeout=timeout)
269 if self.waiting:
270 while self.usable_memory_gb < self.mem2lock_gb:
271 time.sleep(1)
273 self.acquire(timeout=timeout)
275 def release(self):
276 if not self.disabled:
277 with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client):
278 with redis_conn.pipeline() as pipe:
279 pipe.multi()
280 pipe.decr(self.reserved_key, self.mem2lock_gb)
281 pipe.decr(self.reserved_key_jobID, self.mem2lock_gb)
282 pipe.execute()
284 self.logger.info('Released %s GB of reserved memory.' % self.mem2lock_gb)
286 def delete(self):
287 if not self.disabled:
288 with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client):
289 # handle reserved_key and reserved_key_jobID
290 mem_reserved_currJob = int(self.client.get(self.reserved_key_jobID) or 0)
291 with redis_conn.pipeline() as pipe:
292 pipe.multi()
293 pipe.decr(self.reserved_key_jobID, mem_reserved_currJob)
294 pipe.decr(self.reserved_key, mem_reserved_currJob)
295 pipe.delete(self.reserved_key_jobID)
296 pipe.execute()
298 if int(self.client.get(self.reserved_key) or 0) == 0:
299 self.client.delete(self.reserved_key)
301 # handle waiting_key and waiting_key_jobID
302 n_waiting_currJob = int(self.client.get(self.waiting_key_jobID) or 0)
303 with redis_conn.pipeline() as pipe:
304 pipe.multi()
305 pipe.decr(self.waiting_key_jobID, n_waiting_currJob)
306 pipe.decr(self.waiting_key, n_waiting_currJob)
307 pipe.delete(self.waiting_key_jobID)
308 pipe.execute()
310 if int(self.client.get(self.waiting_key) or 0) == 0:
311 self.client.delete(self.waiting_key)
313 def __enter__(self):
314 self.acquire()
315 return self
317 def __exit__(self, exc_type, exc_val, exc_tb):
318 self.release()
320 return True if exc_type is None else False
323def acquire_process_lock(**processlock_kwargs):
324 """Decorator function for ProcessLock.
326 :param processlock_kwargs: Keyword arguments to be passed to ProcessLock class.
327 """
329 def decorator(func):
330 @functools.wraps(func) # needed to avoid pickling errors
331 def wrapped_func(*args, **kwargs):
332 with ProcessLock(**processlock_kwargs):
333 result = func(*args, **kwargs)
335 return result
337 return wrapped_func
339 return decorator
342def reserve_mem(**memlock_kwargs):
343 """Decorator function for MemoryReserver.
345 :param memlock_kwargs: Keyword arguments to be passed to MemoryReserver class.
346 """
348 def decorator(func):
349 @functools.wraps(func) # needed to avoid pickling errors
350 def wrapped_func(*args, **kwargs):
351 with MemoryReserver(**memlock_kwargs):
352 result = func(*args, **kwargs)
354 return result
356 return wrapped_func
358 return decorator
361def release_unclosed_locks():
362 if redis_conn:
363 for L in [IOLock, ProcessLock]:
364 lock = L(allowed_slots=1)
365 lock.release_all_jobID_tokens()
367 # delete the complete redis namespace if no lock slot is acquired anymore
368 if lock.client.hlen(lock.grabbed_key) == 0:
369 lock.delete()
371 MemoryReserver(1).delete()