Source code for gms_preprocessing.misc.locks

# -*- coding: utf-8 -*-

# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

__author__ = 'Daniel Scheffler'

import time
from redis import Redis
from redis_semaphore import Semaphore
from redis.exceptions import ConnectionError as RedisConnectionError
from retools.lock import Lock, LockTimeout
import functools
from psutil import virtual_memory
from logging import getLogger

from ..options.config import GMS_config as CFG

try:
    redis_conn = Redis(host='localhost', db=0)
    redis_conn.keys()  # may raise ConnectionError, e.g., if redis server is not installed or not running
except RedisConnectionError:
    redis_conn = None


"""NOTE:

To get a list of all currently set redis keys, run:

    from redis import Redis
    conn = Redis('localhost', db=0)
    list(sorted(conn.keys()))

Then, to delete all currently set redis keys, run:

    for i in list(sorted(conn.keys())):
        k = i.decode('utf-8')
        conn.delete(k)
"""


[docs]class MultiSlotLock(Semaphore): def __init__(self, name='MultiSlotLock', allowed_slots=1, logger=None, **kwargs): self.disabled = redis_conn is None or allowed_slots in [None, False] self.namespace = name self.allowed_slots = allowed_slots self.logger = logger or getLogger("RedisLock: '%s'" % name) if not self.disabled: super(MultiSlotLock, self).__init__(client=redis_conn, count=allowed_slots, namespace=name, **kwargs)
[docs] def acquire(self, timeout=0, target=None): if not self.disabled: if self.available_count == 0: self.logger.info("Waiting for free lock '%s'." % self.namespace) token = super(MultiSlotLock, self).acquire(timeout=timeout, target=target) self.logger.info("Acquired lock '%s'" % self.namespace + ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1))) return token
[docs] def release(self): if not self.disabled: token = super(MultiSlotLock, self).release() if token: self.logger.info("Released lock '%s'" % self.namespace + ('.' if self.allowed_slots == 1 else ', slot #%s.' % (int(token) + 1)))
[docs] def delete(self): if not self.disabled: self.client.delete(self.check_exists_key) self.client.delete(self.available_key) self.client.delete(self.grabbed_key)
def __exit__(self, exc_type, exc_val, exc_tb): exitcode = super(MultiSlotLock, self).__exit__(exc_type, exc_val, exc_tb) return exitcode
[docs]class SharedResourceLock(MultiSlotLock):
[docs] def acquire(self, timeout=0, target=None): if not self.disabled: token = super(SharedResourceLock, self).acquire(timeout=timeout, target=target) self.client.hset(self.grabbed_key_jobID, token, self.current_time)
[docs] def release_all_jobID_tokens(self): if not self.disabled: for token in self.client.hkeys(self.grabbed_key_jobID): self.signal(token) self.client.delete(self.grabbed_key_jobID)
@property def grabbed_key_jobID(self): return self._get_and_set_key('_grabbed_key_jobID', 'GRABBED_BY_GMSJOB_%s' % CFG.ID)
[docs] def signal(self, token): if token is None: return None with self.client.pipeline() as pipe: pipe.multi() pipe.hdel(self.grabbed_key, token) pipe.hdel(self.grabbed_key_jobID, token) # only difference to Semaphore.signal() pipe.lpush(self.available_key, token) pipe.execute() return token
[docs] def delete(self): if not self.disabled: super(SharedResourceLock, self).delete() self.client.delete(self.grabbed_key_jobID)
def __exit__(self, exc_type, exc_val, exc_tb): return super(SharedResourceLock, self).__exit__(exc_type, exc_val, exc_tb)
[docs]class IOLock(SharedResourceLock): def __init__(self, allowed_slots=1, logger=None, **kwargs): self.disabled = CFG.disable_IO_locks if not self.disabled: super(IOLock, self).__init__(name='IOLock', allowed_slots=allowed_slots, logger=logger, **kwargs) def __exit__(self, exc_type, exc_val, exc_tb): return super(IOLock, self).__exit__(exc_type, exc_val, exc_tb)
[docs]class ProcessLock(SharedResourceLock): def __init__(self, allowed_slots=1, logger=None, **kwargs): self.disabled = CFG.disable_CPU_locks if not self.disabled: super(ProcessLock, self).__init__(name='ProcessLock', allowed_slots=allowed_slots, logger=logger, **kwargs) def __exit__(self, exc_type, exc_val, exc_tb): return super(ProcessLock, self).__exit__(exc_type, exc_val, exc_tb)
[docs]class DatabaseLock(SharedResourceLock): def __init__(self, allowed_slots=1, logger=None, **kwargs): self.disabled = CFG.disable_DB_locks if not self.disabled: super(DatabaseLock, self)\ .__init__(name='DatabaseLock', allowed_slots=allowed_slots, logger=logger, **kwargs) def __exit__(self, exc_type, exc_val, exc_tb): return super(DatabaseLock, self).__exit__(exc_type, exc_val, exc_tb)
[docs]class MemoryReserver(object): def __init__(self, mem2lock_gb, max_usage=90, logger=None): """ :param mem2lock_gb: Amount of memory to be reserved during the lock is acquired (gigabytes). """ self.disabled = redis_conn is None or CFG.disable_memory_locks or mem2lock_gb in [None, False] self.mem2lock_gb = mem2lock_gb self.max_usage = max_usage self.logger = logger or getLogger("RedisLock: 'MemoryReserver'") self.namespace = 'MemoryReserver' self.client = redis_conn self.mem_limit = int(virtual_memory().total * max_usage / 100 / 1024 ** 3) self._waiting = False @property def mem_reserved_gb(self): return int(self.client.get(self.reserved_key) or 0) @property def usable_memory_gb(self): return int((virtual_memory().total * self.max_usage / 100 - virtual_memory().used) / 1024 ** 3) \ - int(self.mem_reserved_gb) @property def acquisition_key(self): return "%s:ACQUISITION_LOCK" % self.namespace @property def reserved_key(self): return "%s:MEM_RESERVED" % self.namespace @property def reserved_key_jobID(self): return "%s:MEM_RESERVED_BY_GMSJOB_%s" % (self.namespace, CFG.ID) @property def waiting_key(self): return "%s:NUMBER_WAITING" % self.namespace @property def waiting_key_jobID(self): return "%s:NUMBER_WAITING_GMSJOB_%s" % (self.namespace, CFG.ID) @property def waiting(self): return self._waiting @waiting.setter def waiting(self, val): """Set self.waiting. NOTE: This setter does not use a lock. Redis access must be locked by calling function. """ if val is not self._waiting: if val: self.client.incr(self.waiting_key, 1) self.client.incr(self.waiting_key_jobID, 1) else: self.client.decr(self.waiting_key, 1) self.client.decr(self.waiting_key_jobID, 1) self._waiting = val
[docs] def acquire(self, timeout=20): if not self.disabled: try: with Lock(self.acquisition_key, expires=20, timeout=timeout, redis=self.client): if self.usable_memory_gb >= self.mem2lock_gb: t_start = time.time() self.waiting = False with self.client.pipeline() as pipe: pipe.multi() pipe.incr(self.reserved_key, self.mem2lock_gb) pipe.incr(self.reserved_key_jobID, self.mem2lock_gb) pipe.execute() self.logger.info('Reserved %s GB of memory.' % self.mem2lock_gb) # warn in case the lock has expired before incrementing reserved_key and reserved_key_jobID if time.time() > t_start + timeout: self.logger.warning('Reservation of memory took more time than expected. ' 'Possibly more memory than available has been reserved.') else: if not self.waiting: self.logger.info('Currently usable memory: %s GB. Waiting until at least %s GB are ' 'usable.' % (self.usable_memory_gb, self.mem2lock_gb)) self.waiting = True except LockTimeout: self.acquire(timeout=timeout) if self.waiting: while self.usable_memory_gb < self.mem2lock_gb: time.sleep(1) self.acquire(timeout=timeout)
[docs] def release(self): if not self.disabled: with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client): with redis_conn.pipeline() as pipe: pipe.multi() pipe.decr(self.reserved_key, self.mem2lock_gb) pipe.decr(self.reserved_key_jobID, self.mem2lock_gb) pipe.execute() self.logger.info('Released %s GB of reserved memory.' % self.mem2lock_gb)
[docs] def delete(self): if not self.disabled: with Lock(self.acquisition_key, expires=20, timeout=20, redis=self.client): # handle reserved_key and reserved_key_jobID mem_reserved_currJob = int(self.client.get(self.reserved_key_jobID) or 0) with redis_conn.pipeline() as pipe: pipe.multi() pipe.decr(self.reserved_key_jobID, mem_reserved_currJob) pipe.decr(self.reserved_key, mem_reserved_currJob) pipe.delete(self.reserved_key_jobID) pipe.execute() if int(self.client.get(self.reserved_key) or 0) == 0: self.client.delete(self.reserved_key) # handle waiting_key and waiting_key_jobID n_waiting_currJob = int(self.client.get(self.waiting_key_jobID) or 0) with redis_conn.pipeline() as pipe: pipe.multi() pipe.decr(self.waiting_key_jobID, n_waiting_currJob) pipe.decr(self.waiting_key, n_waiting_currJob) pipe.delete(self.waiting_key_jobID) pipe.execute() if int(self.client.get(self.waiting_key) or 0) == 0: self.client.delete(self.waiting_key)
def __enter__(self): self.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): self.release() return True if exc_type is None else False
[docs]def acquire_process_lock(**processlock_kwargs): """Decorator function for ProcessLock. :param processlock_kwargs: Keyword arguments to be passed to ProcessLock class. """ def decorator(func): @functools.wraps(func) # needed to avoid pickling errors def wrapped_func(*args, **kwargs): with ProcessLock(**processlock_kwargs): result = func(*args, **kwargs) return result return wrapped_func return decorator
[docs]def reserve_mem(**memlock_kwargs): """Decorator function for MemoryReserver. :param memlock_kwargs: Keyword arguments to be passed to MemoryReserver class. """ def decorator(func): @functools.wraps(func) # needed to avoid pickling errors def wrapped_func(*args, **kwargs): with MemoryReserver(**memlock_kwargs): result = func(*args, **kwargs) return result return wrapped_func return decorator
[docs]def release_unclosed_locks(): if redis_conn: for L in [IOLock, ProcessLock]: lock = L(allowed_slots=1) lock.release_all_jobID_tokens() # delete the complete redis namespace if no lock slot is acquired anymore if lock.client.hlen(lock.grabbed_key) == 0: lock.delete() MemoryReserver(1).delete()