Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2 

3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data 

4# 

5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de) 

6# 

7# This software was developed within the context of the GeoMultiSens project funded 

8# by the German Federal Ministry of Education and Research 

9# (project grant code: 01 IS 14 010 A-C). 

10# 

11# This program is free software: you can redistribute it and/or modify it under 

12# the terms of the GNU General Public License as published by the Free Software 

13# Foundation, either version 3 of the License, or (at your option) any later version. 

14# Please note the following exception: `gms_preprocessing` depends on tqdm, which 

15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

18# 

19# This program is distributed in the hope that it will be useful, but WITHOUT 

20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

22# details. 

23# 

24# You should have received a copy of the GNU Lesser General Public License along 

25# with this program. If not, see <http://www.gnu.org/licenses/>. 

26 

27import socket 

28import struct 

29import os 

30import re 

31import warnings 

32from datetime import datetime, timedelta 

33from shapely.geometry import Polygon 

34import pytz 

35from logging import getLogger 

36from typing import List # noqa F401 # flake8 issue 

37 

38from ..misc.exceptions import GMSEnvironmentError 

39from ..misc.logging import close_logger 

40 

41 

42class SpatialIndexMediatorServer: 

43 def __init__(self, rootDir, logger=None): 

44 self.rootDir = rootDir 

45 self.path_idxMedSrv = os.path.join(rootDir, 'index-mediator-server.sh') 

46 self.logger = logger or getLogger('SpatialIndexMediatorServer') 

47 

48 # validate 

49 if not os.path.isfile(self.path_idxMedSrv): 

50 self.path_idxMedSrv = os.path.join(rootDir, 'index-mediator-server') 

51 

52 if not os.path.isfile(self.path_idxMedSrv): 

53 raise GMSEnvironmentError('File path of index mediator server does not exist at %s.' 

54 % self.path_idxMedSrv) 

55 

56 def __getstate__(self): 

57 """Defines how the attributes of SpatialIndexMediatorServer are pickled.""" 

58 

59 if self.logger not in [None, 'not set']: 

60 close_logger(self.logger) 

61 self.logger = None 

62 return self.__dict__ 

63 

64 def __del__(self): 

65 close_logger(self.logger) 

66 self.logger = None 

67 

68 @property 

69 def is_running(self): 

70 return self.status['running'] 

71 

72 @property 

73 def process_id(self): 

74 return self.status['process_id'] 

75 

76 @property 

77 def status(self): 

78 """Check server status. 

79 

80 :return running(bool): running or not? 

81 :return process_id(int): 

82 """ 

83 outputStr = self._communicate('status') 

84 

85 # decrypt status 

86 running = 'is running' in outputStr 

87 

88 # get PID 

89 _process_id = re.search(r'with pid ([\d]*)', outputStr) 

90 if _process_id and _process_id.group(1): 

91 process_id = int(_process_id.group(1)) 

92 else: 

93 process_id = None 

94 

95 return {'running': running, 'process_id': process_id} 

96 

97 def start(self): # FIXME can be executed twice without a message that server is already running 

98 outputStr = self._communicate('start') 

99 if outputStr == 'success' and self.is_running: 

100 self.logger.info('Spatial Index Mediator Server started successfully.') 

101 return 'started' 

102 else: 

103 if outputStr != 'success': 

104 self.logger.warning("\nAttempt to start Spatial Index Mediator Server failed with message '%s'!" 

105 % outputStr.replace('\n', '')) 

106 else: 

107 self.logger.warning("\nCommunication to Spatial Index Mediator Server was successful but " 

108 "the server is still not running. Returned message was: %s" 

109 % outputStr.replace('\n', '')) 

110 

111 def stop(self): 

112 outputStr = self._communicate('stop') 

113 

114 if outputStr == 'success' or re.search(r'index-mediator-server stopped', outputStr, re.I): 

115 return 'stopped' 

116 else: 

117 warnings.warn("\nStopping Spatial Index Mediator Server failed with message '%s'!" 

118 % outputStr.replace('\n', '')) 

119 

120 def restart(self): 

121 outputStr = self._communicate('restart') 

122 if outputStr == 'success' and self.is_running: 

123 return 'restarted' 

124 else: 

125 warnings.warn("\nRestarting Spatial Index Mediator Server failed with message '%s'!" 

126 % outputStr.replace('\n', '')) 

127 

128 def _communicate(self, controller_cmd): 

129 curdir = os.path.abspath(os.curdir) 

130 os.chdir(self.rootDir) 

131 # FIXME workaround: otherwise subcall_with_output hangs at proc.communicate (waiting for EOF forever) 

132 no_stdout = no_stderr = controller_cmd in ['start', 'restart'] 

133 # no_stdout = no_stderr = None, None 

134 from ..misc.helper_functions import subcall_with_output 

135 output, exitcode, err = subcall_with_output('bash %s %s' % (self.path_idxMedSrv, 

136 controller_cmd), no_stdout, no_stderr) 

137 os.chdir(curdir) 

138 

139 if exitcode: 

140 raise Exception(err) 

141 else: 

142 if output: 

143 return output.decode('UTF-8') 

144 else: 

145 # FIXME actually there should be always an output (also with controller commands 'start' and 'restart' 

146 return 'success' 

147 

148 

149class SpatialIndexMediator: 

150 FULL_SCENE_QUERY_MSG = 3 

151 """ message value for a full scene query message """ 

152 

153 def __init__(self, host="localhost", port=8654, timeout=5.0, retries=10): 

154 """ 

155 Establishes a connection to the spatial index mediator server. 

156 

157 :param host: host address of the index mediator server (default "localhost") 

158 :param port: port number of the index mediator server (default 8654) 

159 :param timeout: timeout as float in seconds (default 5.0 sec) 

160 :param retries: number of retries in case of timeout 

161 """ 

162 self.host = host 

163 self.port = port 

164 self.timeout = timeout 

165 self.retries = retries 

166 

167 @staticmethod 

168 def __deriveSeasonCode(refDate, maxDaysDelta): 

169 if refDate is None or maxDaysDelta is None: 

170 return 0 

171 

172 delta = timedelta(days=maxDaysDelta) 

173 

174 startMonth = (refDate - delta).month - 1 

175 endMonth = (refDate + delta).month - 1 

176 

177 seasonCode = 0 

178 

179 for x in range(12): 

180 month = (startMonth + x) % 12 

181 

182 seasonCode |= 1 << month 

183 

184 if month == endMonth: 

185 break 

186 

187 return seasonCode 

188 

189 def getFullSceneDataForDataset(self, envelope, timeStart, timeEnd, minCloudCover, maxCloudCover, datasetid, 

190 dayNight=0, refDate=None, maxDaysDelta=None): 

191 # type: (list, datetime, datetime, float, float, int, int, datetime, int) -> List[Scene] 

192 """ 

193 Query the spatial index with the given parameters in order to get a list of matching scenes intersecting the 

194 given envelope 

195 

196 :param envelope: list of left, right and low, up coordinates (in lat/lon wgs84) of the region of 

197 interest in the form of (min_lon, max_lon, min_lat, max_lat), 

198 e.g. envelope = (10.0, 16.0, 50.0, 60.0) 

199 :param timeStart: start timestamp of the relevant timerange as datetime instance, 

200 e.g., datetime(2015, 1, 1) 

201 :param timeEnd: end timestamp of the relevant timerange as datetime instance, e.g. datetime(2016, 6, 15) 

202 :param minCloudCover: minimum cloudcover in percent, e.g. 12, will return scenes with cloudcover >= 12% only 

203 :param maxCloudCover: maximum cloudcover in percent, e.g. 23, will return scenes with cloudcover <= 23% only 

204 :param datasetid: datasetid of the dataset in question, e.g. 104 for Landsat-8 

205 :param dayNight day/night indicator, with (0 = both, 1 = day, 2 = night) 

206 :param refDate: reference timestamp as datetime instance, e.g. datetime(2015, 1, 1) [optional] 

207 :param maxDaysDelta: maximum allowed number of days the target scenes might be apart from the given refDate 

208 [optional] 

209 """ 

210 scenes = [] 

211 

212 for i in range(self.retries): 

213 try: 

214 filterTimerange = not (refDate is None or maxDaysDelta is None) 

215 

216 # prepare buffer 

217 # numbytes = 1 + 4*8 + 8 + 8 + 4 + 1 + 1 + 2 + 2 + 1 

218 b = bytearray(60) 

219 

220 # pack msg header and envelope 

221 offset = 0 

222 struct.pack_into('> b 4d', b, offset, self.FULL_SCENE_QUERY_MSG, *envelope) 

223 offset += 33 

224 

225 # pack the dates 

226 struct.pack_into('> h 6b', b, offset, timeStart.year, timeStart.month, timeStart.day, timeStart.hour, 

227 timeStart.minute, timeStart.second, 0) 

228 offset += 8 

229 struct.pack_into('> h 6b', b, offset, timeEnd.year, timeEnd.month, timeEnd.day, timeEnd.hour, 

230 timeEnd.minute, timeEnd.second, 0) 

231 offset += 8 

232 

233 # derive season code 

234 seasonCode = self.__deriveSeasonCode(refDate, maxDaysDelta) 

235 

236 # pack the rest 

237 # TODO: send unconstraint min/max proclevel values 

238 struct.pack_into('> i 2b h 3b', b, offset, seasonCode, minCloudCover, maxCloudCover, datasetid, 0, 127, 

239 dayNight) 

240 

241 # get connection and lock the channel 

242 con = Connection(self.host, self.port, self.timeout) 

243 

244 # send the buffer 

245 con.socket.sendall(b) 

246 

247 # receive the response 

248 # read first byte, indicating the response type, must match full scene query msg 

249 if con.recvByte() != self.FULL_SCENE_QUERY_MSG: 

250 raise EnvironmentError('Bad Protocol') 

251 

252 # now read the number of bytes that follow 

253 numBytes = con.recvInt() 

254 b = bytearray(numBytes) 

255 offset = 0 

256 

257 # read all data from the channel and unlock it 

258 con.recvBuffer(b, numBytes) 

259 

260 # we received the entire message - return the connection to the global pool 

261 con.disconnect() 

262 

263 # interpret received data 

264 # extract datasetid and number of scenes 

265 dataset = struct.unpack_from('> h', b, offset)[0] 

266 offset += 2 

267 if dataset != datasetid: 

268 raise EnvironmentError('Bad Protocol') 

269 

270 numScenes = struct.unpack_from('> i', b, offset)[0] 

271 offset += 4 

272 

273 scenes = [] 

274 

275 for _x in range(numScenes): 

276 # [0] id (4 byte) 

277 # [1] year (2 byte) 

278 # [2] month (1 byte) 

279 # [3] day (1 byte) 

280 # [4] hour (1 byte) 

281 # [5] minute (1 byte) 

282 # [6] second (1 byte) 

283 # [7] empty (1 byte) 

284 # [8] cloud cover (1 byte) 

285 # [9] proc_level (1 byte) caution: this gets not yet updated in the index 

286 # [10] day/night (1 byte) 

287 # [11] length of bounds array (1 byte) 

288 scenedata = struct.unpack_from('> i h 10b', b, offset) 

289 offset += 16 

290 

291 # print(scenedata) 

292 timestamp = datetime(scenedata[1], scenedata[2], scenedata[3], scenedata[4], scenedata[5], 

293 scenedata[6]) 

294 

295 # read bounds 

296 numBounds = scenedata[11] 

297 fmt = "> {0}d".format(numBounds) 

298 bounds = struct.unpack_from(fmt, b, offset) 

299 offset += numBounds * 8 

300 

301 # check ref date 

302 if filterTimerange: 

303 if timestamp.month == 2 and timestamp.day == 29: 

304 # deal with feb.29th 

305 timestampInRefYear = timestamp.replace(refDate.year, 3, 1).replace(tzinfo=pytz.UTC) 

306 else: 

307 timestampInRefYear = timestamp.replace(refDate.year).replace(tzinfo=pytz.UTC) 

308 

309 if abs(refDate - timestampInRefYear).days > maxDaysDelta: 

310 # skip scene 

311 continue 

312 

313 # create scene 

314 scenes.append(Scene(scenedata[0], timestamp, scenedata[8], scenedata[9], scenedata[10], bounds)) 

315 

316 break 

317 

318 except socket.timeout: 

319 if i < self.retries - 1: 

320 continue 

321 else: 

322 raise TimeoutError('Spatial query timed out 10 times!') 

323 

324 except struct.error: 

325 if i < self.retries - 1: 

326 continue 

327 else: 

328 raise 

329 

330 return scenes 

331 

332 

333class Connection: 

334 """ Connection to the spatial index mediator server """ 

335 

336 HELLO_MSG = 1 

337 """ message value for a "hello" message """ 

338 

339 DISCONNECT_MSG = 6 

340 """ message value for a disconnect message """ 

341 

342 def __init__(self, host, port, timeout): 

343 # connect to index mediator server 

344 try: 

345 self.socket = socket.create_connection((host, port), timeout) 

346 except ConnectionRefusedError: 

347 raise ConnectionRefusedError('The spatial index mediator server refused the connection!') 

348 

349 # send hello and confirm response 

350 if not self.__greet(): 

351 raise EnvironmentError('Bad protocol') 

352 

353 def __greet(self): 

354 # send hello byte 

355 self.writeByte(self.HELLO_MSG) 

356 

357 # receive hello byte echo 

358 response = self.recvByte() 

359 

360 return response == self.HELLO_MSG 

361 

362 def writeByte(self, byte): 

363 # send byte 

364 self.socket.sendall(struct.pack('b', byte)) 

365 

366 def recvByte(self): 

367 return struct.unpack('b', self.socket.recv(1))[0] 

368 

369 def recvInt(self): 

370 return struct.unpack('>i', self.socket.recv(4))[0] 

371 

372 def recvBuffer(self, buffer, numBytes): 

373 toread = numBytes 

374 view = memoryview(buffer) 

375 while toread: 

376 nbytes = self.socket.recv_into(view, toread) 

377 view = view[nbytes:] 

378 toread -= nbytes 

379 

380 def disconnect(self): 

381 """Closes the connection to the index mediator server. 

382 

383 No further communication, like placing queries will be possible. 

384 """ 

385 self.writeByte(self.DISCONNECT_MSG) 

386 self.socket.close() 

387 

388 

389class Scene: 

390 """Scene Metadata class""" 

391 

392 def __init__(self, sceneid, acquisitiondate, cloudcover, proclevel, daynight, bounds): 

393 """ 

394 :param sceneid: database sceneid, e.g. 26366229 

395 :param acquisitiondate: acquisition date of the scene as datetime instance, e.g. 2016-03-25 10:15:26 

396 :param cloudcover: cloudcover value of the scene, e.g. 11 

397 :param daynight: day/night indicator (0=unknown, 1=day, 2=night) 

398 :param bounds: scene bounds as list of lat/lon wgs84 coordinates (lon1, lat1, lon2, lat2, ...), 

399 e.g. (10.00604, 49.19385, 7.45638, 49.64513, 8.13739, 51.3515, 10.77705, 50.89307) 

400 """ 

401 self.sceneid = sceneid 

402 self.acquisitiondate = acquisitiondate 

403 self.cloudcover = cloudcover 

404 self.proclevel = proclevel 

405 self.daynight = daynight 

406 self.bounds = bounds 

407 tempList = list(bounds) + [None] * 2 

408 self.coordsLonLat = [tempList[n:n + 2] for n in range(0, len(bounds), 2)] 

409 

410 # set validated (!) polygon 

411 poly = Polygon(self.coordsLonLat) 

412 if not poly.is_valid: 

413 poly = poly.buffer(0) 

414 assert poly.is_valid 

415 self.polyLonLat = poly