Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
4#
5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6#
7# This software was developed within the context of the GeoMultiSens project funded
8# by the German Federal Ministry of Education and Research
9# (project grant code: 01 IS 14 010 A-C).
10#
11# This program is free software: you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation, either version 3 of the License, or (at your option) any later version.
14# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18#
19# This program is distributed in the hope that it will be useful, but WITHOUT
20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
22# details.
23#
24# You should have received a copy of the GNU Lesser General Public License along
25# with this program. If not, see <http://www.gnu.org/licenses/>.
27import socket
28import struct
29import os
30import re
31import warnings
32from datetime import datetime, timedelta
33from shapely.geometry import Polygon
34import pytz
35from logging import getLogger
36from typing import List # noqa F401 # flake8 issue
38from ..misc.exceptions import GMSEnvironmentError
39from ..misc.logging import close_logger
42class SpatialIndexMediatorServer:
43 def __init__(self, rootDir, logger=None):
44 self.rootDir = rootDir
45 self.path_idxMedSrv = os.path.join(rootDir, 'index-mediator-server.sh')
46 self.logger = logger or getLogger('SpatialIndexMediatorServer')
48 # validate
49 if not os.path.isfile(self.path_idxMedSrv):
50 self.path_idxMedSrv = os.path.join(rootDir, 'index-mediator-server')
52 if not os.path.isfile(self.path_idxMedSrv):
53 raise GMSEnvironmentError('File path of index mediator server does not exist at %s.'
54 % self.path_idxMedSrv)
56 def __getstate__(self):
57 """Defines how the attributes of SpatialIndexMediatorServer are pickled."""
59 if self.logger not in [None, 'not set']:
60 close_logger(self.logger)
61 self.logger = None
62 return self.__dict__
64 def __del__(self):
65 close_logger(self.logger)
66 self.logger = None
68 @property
69 def is_running(self):
70 return self.status['running']
72 @property
73 def process_id(self):
74 return self.status['process_id']
76 @property
77 def status(self):
78 """Check server status.
80 :return running(bool): running or not?
81 :return process_id(int):
82 """
83 outputStr = self._communicate('status')
85 # decrypt status
86 running = 'is running' in outputStr
88 # get PID
89 _process_id = re.search(r'with pid ([\d]*)', outputStr)
90 if _process_id and _process_id.group(1):
91 process_id = int(_process_id.group(1))
92 else:
93 process_id = None
95 return {'running': running, 'process_id': process_id}
97 def start(self): # FIXME can be executed twice without a message that server is already running
98 outputStr = self._communicate('start')
99 if outputStr == 'success' and self.is_running:
100 self.logger.info('Spatial Index Mediator Server started successfully.')
101 return 'started'
102 else:
103 if outputStr != 'success':
104 self.logger.warning("\nAttempt to start Spatial Index Mediator Server failed with message '%s'!"
105 % outputStr.replace('\n', ''))
106 else:
107 self.logger.warning("\nCommunication to Spatial Index Mediator Server was successful but "
108 "the server is still not running. Returned message was: %s"
109 % outputStr.replace('\n', ''))
111 def stop(self):
112 outputStr = self._communicate('stop')
114 if outputStr == 'success' or re.search(r'index-mediator-server stopped', outputStr, re.I):
115 return 'stopped'
116 else:
117 warnings.warn("\nStopping Spatial Index Mediator Server failed with message '%s'!"
118 % outputStr.replace('\n', ''))
120 def restart(self):
121 outputStr = self._communicate('restart')
122 if outputStr == 'success' and self.is_running:
123 return 'restarted'
124 else:
125 warnings.warn("\nRestarting Spatial Index Mediator Server failed with message '%s'!"
126 % outputStr.replace('\n', ''))
128 def _communicate(self, controller_cmd):
129 curdir = os.path.abspath(os.curdir)
130 os.chdir(self.rootDir)
131 # FIXME workaround: otherwise subcall_with_output hangs at proc.communicate (waiting for EOF forever)
132 no_stdout = no_stderr = controller_cmd in ['start', 'restart']
133 # no_stdout = no_stderr = None, None
134 from ..misc.helper_functions import subcall_with_output
135 output, exitcode, err = subcall_with_output('bash %s %s' % (self.path_idxMedSrv,
136 controller_cmd), no_stdout, no_stderr)
137 os.chdir(curdir)
139 if exitcode:
140 raise Exception(err)
141 else:
142 if output:
143 return output.decode('UTF-8')
144 else:
145 # FIXME actually there should be always an output (also with controller commands 'start' and 'restart'
146 return 'success'
149class SpatialIndexMediator:
150 FULL_SCENE_QUERY_MSG = 3
151 """ message value for a full scene query message """
153 def __init__(self, host="localhost", port=8654, timeout=5.0, retries=10):
154 """
155 Establishes a connection to the spatial index mediator server.
157 :param host: host address of the index mediator server (default "localhost")
158 :param port: port number of the index mediator server (default 8654)
159 :param timeout: timeout as float in seconds (default 5.0 sec)
160 :param retries: number of retries in case of timeout
161 """
162 self.host = host
163 self.port = port
164 self.timeout = timeout
165 self.retries = retries
167 @staticmethod
168 def __deriveSeasonCode(refDate, maxDaysDelta):
169 if refDate is None or maxDaysDelta is None:
170 return 0
172 delta = timedelta(days=maxDaysDelta)
174 startMonth = (refDate - delta).month - 1
175 endMonth = (refDate + delta).month - 1
177 seasonCode = 0
179 for x in range(12):
180 month = (startMonth + x) % 12
182 seasonCode |= 1 << month
184 if month == endMonth:
185 break
187 return seasonCode
189 def getFullSceneDataForDataset(self, envelope, timeStart, timeEnd, minCloudCover, maxCloudCover, datasetid,
190 dayNight=0, refDate=None, maxDaysDelta=None):
191 # type: (list, datetime, datetime, float, float, int, int, datetime, int) -> List[Scene]
192 """
193 Query the spatial index with the given parameters in order to get a list of matching scenes intersecting the
194 given envelope
196 :param envelope: list of left, right and low, up coordinates (in lat/lon wgs84) of the region of
197 interest in the form of (min_lon, max_lon, min_lat, max_lat),
198 e.g. envelope = (10.0, 16.0, 50.0, 60.0)
199 :param timeStart: start timestamp of the relevant timerange as datetime instance,
200 e.g., datetime(2015, 1, 1)
201 :param timeEnd: end timestamp of the relevant timerange as datetime instance, e.g. datetime(2016, 6, 15)
202 :param minCloudCover: minimum cloudcover in percent, e.g. 12, will return scenes with cloudcover >= 12% only
203 :param maxCloudCover: maximum cloudcover in percent, e.g. 23, will return scenes with cloudcover <= 23% only
204 :param datasetid: datasetid of the dataset in question, e.g. 104 for Landsat-8
205 :param dayNight day/night indicator, with (0 = both, 1 = day, 2 = night)
206 :param refDate: reference timestamp as datetime instance, e.g. datetime(2015, 1, 1) [optional]
207 :param maxDaysDelta: maximum allowed number of days the target scenes might be apart from the given refDate
208 [optional]
209 """
210 scenes = []
212 for i in range(self.retries):
213 try:
214 filterTimerange = not (refDate is None or maxDaysDelta is None)
216 # prepare buffer
217 # numbytes = 1 + 4*8 + 8 + 8 + 4 + 1 + 1 + 2 + 2 + 1
218 b = bytearray(60)
220 # pack msg header and envelope
221 offset = 0
222 struct.pack_into('> b 4d', b, offset, self.FULL_SCENE_QUERY_MSG, *envelope)
223 offset += 33
225 # pack the dates
226 struct.pack_into('> h 6b', b, offset, timeStart.year, timeStart.month, timeStart.day, timeStart.hour,
227 timeStart.minute, timeStart.second, 0)
228 offset += 8
229 struct.pack_into('> h 6b', b, offset, timeEnd.year, timeEnd.month, timeEnd.day, timeEnd.hour,
230 timeEnd.minute, timeEnd.second, 0)
231 offset += 8
233 # derive season code
234 seasonCode = self.__deriveSeasonCode(refDate, maxDaysDelta)
236 # pack the rest
237 # TODO: send unconstraint min/max proclevel values
238 struct.pack_into('> i 2b h 3b', b, offset, seasonCode, minCloudCover, maxCloudCover, datasetid, 0, 127,
239 dayNight)
241 # get connection and lock the channel
242 con = Connection(self.host, self.port, self.timeout)
244 # send the buffer
245 con.socket.sendall(b)
247 # receive the response
248 # read first byte, indicating the response type, must match full scene query msg
249 if con.recvByte() != self.FULL_SCENE_QUERY_MSG:
250 raise EnvironmentError('Bad Protocol')
252 # now read the number of bytes that follow
253 numBytes = con.recvInt()
254 b = bytearray(numBytes)
255 offset = 0
257 # read all data from the channel and unlock it
258 con.recvBuffer(b, numBytes)
260 # we received the entire message - return the connection to the global pool
261 con.disconnect()
263 # interpret received data
264 # extract datasetid and number of scenes
265 dataset = struct.unpack_from('> h', b, offset)[0]
266 offset += 2
267 if dataset != datasetid:
268 raise EnvironmentError('Bad Protocol')
270 numScenes = struct.unpack_from('> i', b, offset)[0]
271 offset += 4
273 scenes = []
275 for _x in range(numScenes):
276 # [0] id (4 byte)
277 # [1] year (2 byte)
278 # [2] month (1 byte)
279 # [3] day (1 byte)
280 # [4] hour (1 byte)
281 # [5] minute (1 byte)
282 # [6] second (1 byte)
283 # [7] empty (1 byte)
284 # [8] cloud cover (1 byte)
285 # [9] proc_level (1 byte) caution: this gets not yet updated in the index
286 # [10] day/night (1 byte)
287 # [11] length of bounds array (1 byte)
288 scenedata = struct.unpack_from('> i h 10b', b, offset)
289 offset += 16
291 # print(scenedata)
292 timestamp = datetime(scenedata[1], scenedata[2], scenedata[3], scenedata[4], scenedata[5],
293 scenedata[6])
295 # read bounds
296 numBounds = scenedata[11]
297 fmt = "> {0}d".format(numBounds)
298 bounds = struct.unpack_from(fmt, b, offset)
299 offset += numBounds * 8
301 # check ref date
302 if filterTimerange:
303 if timestamp.month == 2 and timestamp.day == 29:
304 # deal with feb.29th
305 timestampInRefYear = timestamp.replace(refDate.year, 3, 1).replace(tzinfo=pytz.UTC)
306 else:
307 timestampInRefYear = timestamp.replace(refDate.year).replace(tzinfo=pytz.UTC)
309 if abs(refDate - timestampInRefYear).days > maxDaysDelta:
310 # skip scene
311 continue
313 # create scene
314 scenes.append(Scene(scenedata[0], timestamp, scenedata[8], scenedata[9], scenedata[10], bounds))
316 break
318 except socket.timeout:
319 if i < self.retries - 1:
320 continue
321 else:
322 raise TimeoutError('Spatial query timed out 10 times!')
324 except struct.error:
325 if i < self.retries - 1:
326 continue
327 else:
328 raise
330 return scenes
333class Connection:
334 """ Connection to the spatial index mediator server """
336 HELLO_MSG = 1
337 """ message value for a "hello" message """
339 DISCONNECT_MSG = 6
340 """ message value for a disconnect message """
342 def __init__(self, host, port, timeout):
343 # connect to index mediator server
344 try:
345 self.socket = socket.create_connection((host, port), timeout)
346 except ConnectionRefusedError:
347 raise ConnectionRefusedError('The spatial index mediator server refused the connection!')
349 # send hello and confirm response
350 if not self.__greet():
351 raise EnvironmentError('Bad protocol')
353 def __greet(self):
354 # send hello byte
355 self.writeByte(self.HELLO_MSG)
357 # receive hello byte echo
358 response = self.recvByte()
360 return response == self.HELLO_MSG
362 def writeByte(self, byte):
363 # send byte
364 self.socket.sendall(struct.pack('b', byte))
366 def recvByte(self):
367 return struct.unpack('b', self.socket.recv(1))[0]
369 def recvInt(self):
370 return struct.unpack('>i', self.socket.recv(4))[0]
372 def recvBuffer(self, buffer, numBytes):
373 toread = numBytes
374 view = memoryview(buffer)
375 while toread:
376 nbytes = self.socket.recv_into(view, toread)
377 view = view[nbytes:]
378 toread -= nbytes
380 def disconnect(self):
381 """Closes the connection to the index mediator server.
383 No further communication, like placing queries will be possible.
384 """
385 self.writeByte(self.DISCONNECT_MSG)
386 self.socket.close()
389class Scene:
390 """Scene Metadata class"""
392 def __init__(self, sceneid, acquisitiondate, cloudcover, proclevel, daynight, bounds):
393 """
394 :param sceneid: database sceneid, e.g. 26366229
395 :param acquisitiondate: acquisition date of the scene as datetime instance, e.g. 2016-03-25 10:15:26
396 :param cloudcover: cloudcover value of the scene, e.g. 11
397 :param daynight: day/night indicator (0=unknown, 1=day, 2=night)
398 :param bounds: scene bounds as list of lat/lon wgs84 coordinates (lon1, lat1, lon2, lat2, ...),
399 e.g. (10.00604, 49.19385, 7.45638, 49.64513, 8.13739, 51.3515, 10.77705, 50.89307)
400 """
401 self.sceneid = sceneid
402 self.acquisitiondate = acquisitiondate
403 self.cloudcover = cloudcover
404 self.proclevel = proclevel
405 self.daynight = daynight
406 self.bounds = bounds
407 tempList = list(bounds) + [None] * 2
408 self.coordsLonLat = [tempList[n:n + 2] for n in range(0, len(bounds), 2)]
410 # set validated (!) polygon
411 poly = Polygon(self.coordsLonLat)
412 if not poly.is_valid:
413 poly = poly.buffer(0)
414 assert poly.is_valid
415 self.polyLonLat = poly