Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# -*- coding: utf-8 -*- 

2 

3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data 

4# 

5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de) 

6# 

7# This software was developed within the context of the GeoMultiSens project funded 

8# by the German Federal Ministry of Education and Research 

9# (project grant code: 01 IS 14 010 A-C). 

10# 

11# This program is free software: you can redistribute it and/or modify it under 

12# the terms of the GNU General Public License as published by the Free Software 

13# Foundation, either version 3 of the License, or (at your option) any later version. 

14# Please note the following exception: `gms_preprocessing` depends on tqdm, which 

15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files 

16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore". 

17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE. 

18# 

19# This program is distributed in the hope that it will be useful, but WITHOUT 

20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 

22# details. 

23# 

24# You should have received a copy of the GNU Lesser General Public License along 

25# with this program. If not, see <http://www.gnu.org/licenses/>. 

26 

27import collections 

28import glob 

29import itertools 

30import os 

31import re 

32import shutil 

33import sys 

34import traceback 

35import warnings 

36from datetime import datetime 

37from typing import Union, TYPE_CHECKING # noqa F401 # flake8 issue 

38from pkg_resources import parse_version 

39 

40import numpy as np 

41import pandas as pd 

42from pandas.io.sql import pandasSQL_builder, SQLTable, DataFrame, Series 

43import psycopg2 

44from shapely.wkb import loads as wkb_loads 

45from geoalchemy2.types import Geometry as GEOMETRY 

46from geopandas import GeoDataFrame 

47from shapely.geometry import Polygon, box, MultiPolygon 

48from sqlalchemy import create_engine 

49from sqlalchemy.types import to_instance, TypeEngine 

50 

51from ..options.config import GMS_config as CFG 

52from . import path_generator as PG 

53from .definition_dicts import proc_chain 

54 

55if TYPE_CHECKING: 

56 from ..model.gms_object import GMS_object # noqa F401 # flake8 issue 

57 

58# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies) 

59 

60__author__ = 'Daniel Scheffler' 

61 

62 

63def execute_pgSQL_query(cursor, query_command): 

64 """Executes a postgreSQL query catches the full error message if there is one. 

65 """ 

66 

67 try: 

68 cursor.execute(query_command) 

69 except psycopg2.ProgrammingError as e: 

70 raise psycopg2.ProgrammingError(e.pgerror + 'Query failed. Command was:\n%s' % query_command) 

71 

72 

73def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid): 

74 # type: (int) -> collections.OrderedDict 

75 """Creates an OrderedDict containing further information about a given scene ID by querying the pgSQL database. 

76 

77 :param sceneid: <int> the GMS scene ID to get information for 

78 """ 

79 

80 def query(tablename, vals2return, cond_dict, records2fetch=0): 

81 return get_info_from_postgreSQLdb(CFG.conn_database, tablename, vals2return, cond_dict, records2fetch) 

82 resultset = query('scenes', ['datasetid', 'satelliteid', 'sensorid', 'subsystemid', 'acquisitiondate', 'entityid', 

83 'filename'], {'id': sceneid}) 

84 if len(resultset) == 0: 

85 sys.stderr.write("Scene with id %s not found. Skipping.." % sceneid) 

86 

87 scenedata = resultset[0] 

88 ds = collections.OrderedDict() 

89 proc_level_tmp = query('scenes_proc', 'proc_level', {'sceneid': sceneid}) 

90 ds.update({'proc_level': 'L0A' if proc_level_tmp == [] else proc_level_tmp[0][0]}) 

91 ds.update({'scene_ID': sceneid}) 

92 ds.update({'datasetid': scenedata[0]}) 

93 ds.update({'image_type': query('datasets', 'image_type', {'id': scenedata[0]})[0][0]}) 

94 ds.update({'satellite': query('satellites', 'name', {'id': scenedata[1]})[0][0]}) 

95 ds.update({'sensor': query('sensors', 'name', {'id': scenedata[2]})[0][0]}) 

96 ds.update({'subsystem': query('subsystems', 'name', {'id': scenedata[3]})[0][0] if scenedata[3] else None}) 

97 ds.update({'acq_datetime': scenedata[4]}) 

98 ds.update({'entity_ID': scenedata[5]}) 

99 ds.update({'filename': scenedata[6]}) 

100 return ds 

101 

102 

103def get_postgreSQL_value(value): 

104 # type: (any) -> str 

105 """Converts Python variable to a postgreSQL value respecting postgreSQL type casts. 

106 The resulting value can be directly inserted into a postgreSQL query.""" 

107 

108 assert type(value) in [int, float, bool, str, Polygon, datetime, list, tuple] or value is None, \ 

109 "Unsupported value type within postgreSQL matching expression. Got %s." % type(value) 

110 if isinstance(value, int): 

111 pgV = value 

112 elif isinstance(value, float): 

113 pgV = value 

114 elif isinstance(value, bool): 

115 pgV = value 

116 elif value is None: 

117 pgV = 'NULL' 

118 elif isinstance(value, str): 

119 pgV = "'%s'" % value.replace("'", "") 

120 elif isinstance(value, Polygon): 

121 pgV = "'%s'" % value.wkb_hex 

122 elif isinstance(value, datetime): 

123 pgV = "TIMESTAMP '%s'" % str(value) 

124 else: # list or tuple in value 

125 if not value: # empty list/tuple 

126 pgV = 'NULL' 

127 else: 

128 dTypes_in_value = list(set([type(i) for i in value])) 

129 assert len(dTypes_in_value) == 1, \ 

130 'Mixed data types in postgreSQL matching expressions are not supported. Got %s.' % dTypes_in_value 

131 assert dTypes_in_value[0] in [int, str, float, np.int64, bool] 

132 pgList = ",".join(["'%s'" % i if isinstance(value[0], str) else "%s" % i for i in value]) 

133 pgV = "'{%s}'" % pgList 

134 return pgV 

135 

136 

137def get_postgreSQL_matchingExp(key, value): 

138 # type: (str,any) -> str 

139 """Converts a key/value pair to a postgreSQL matching expression in the form "column=value" respecting postgreSQL 

140 type casts. The resulting string can be directly inserted into a postgreSQL query. 

141 """ 

142 pgVal = get_postgreSQL_value(value) 

143 if isinstance(pgVal, str) and pgVal.startswith("'{") and pgVal.endswith("}'"): 

144 return '%s in %s' % (key, pgVal.replace("'{", '(').replace("}'", ')')) # '{1,2,3}' => (1,2,3) 

145 elif pgVal == 'NULL': 

146 return '%s is NULL' % key 

147 else: 

148 return '%s=%s' % (key, pgVal) 

149 

150 

151def get_info_from_postgreSQLdb(conn_params, tablename, vals2return, cond_dict=None, records2fetch=0, timeout=15000): 

152 # type: (str, str, Union[list, str], dict, int, int) -> Union[list, str] 

153 """Queries a postgreSQL database for the given parameters. 

154 

155 :param conn_params: <str> connection parameters as provided by CFG.conn_params 

156 :param tablename: <str> name of the table within the database to be queried 

157 :param vals2return: <list or str> a list of strings containing the column titles of the values to be returned 

158 :param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>} 

159 HINT: <value> can also be a list or a tuple of elements to match, BUT note that the order 

160 of the list items is NOT respected! 

161 :param records2fetch: <int> number of records to be fetched (default=0: fetch unlimited records) 

162 :param timeout: <int> allows to set a custom statement timeout (milliseconds) 

163 """ 

164 

165 if not isinstance(vals2return, list): 

166 vals2return = [vals2return] 

167 assert isinstance(records2fetch, int), "get_info_from_postgreSQLdb: Expected an integer for the argument " \ 

168 "'records2return'. Got %s" % type(records2fetch) 

169 cond_dict = cond_dict if cond_dict else {} 

170 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout) 

171 connection = psycopg2.connect(conn_params) 

172 if connection is None: 

173 warnings.warn('database connection fault') 

174 return 'database connection fault' 

175 cursor = connection.cursor() 

176 condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \ 

177 if cond_dict else "" 

178 cmd = "SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition 

179 execute_pgSQL_query(cursor, cmd) 

180 

181 records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \ 

182 cursor.fetchmany(size=records2fetch) # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)] 

183 cursor.close() 

184 connection.close() 

185 return records2return 

186 

187 

188def update_records_in_postgreSQLdb(conn_params, tablename, vals2update_dict, cond_dict=None, timeout=15000): 

189 # type: (str, str, dict, dict, int) -> Union[None, str] 

190 """Queries a postgreSQL database for the given parameters and updates the given columns of the query result. 

191 

192 :param conn_params: <str> connection parameters as provided by CFG.conn_params 

193 :param tablename: <str> name of the table within the database to be updated 

194 :param vals2update_dict: <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>} 

195 :param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>} 

196 HINT: <value> can also be a list or a tuple of elements to match 

197 :param timeout: <int> allows to set a custom statement timeout (milliseconds) 

198 """ 

199 

200 cond_dict = cond_dict if cond_dict else {} 

201 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout) 

202 connection = psycopg2.connect(conn_params) 

203 if connection is None: 

204 warnings.warn('database connection fault') 

205 return 'database connection fault' 

206 cursor = connection.cursor() 

207 condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \ 

208 if cond_dict else "" 

209 update_cond = "SET " + ', '.join(['%s=%s' % (k, get_postgreSQL_value(vals2update_dict[k])) 

210 for k in vals2update_dict.keys()]) 

211 execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition) 

212 if cursor.fetchone()[0] == 0: 

213 warnings.warn("No record found fulfilling this condition: \n'%s'." % condition) 

214 else: 

215 execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + update_cond + " " + condition) 

216 

217 if 'connection' in locals(): 

218 connection.commit() 

219 connection.close() 

220 

221 

222def append_item_to_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2append_dict, cond_dict=None, timeout=15000): 

223 # type: (str, str, dict, dict, int) -> Union[None, str] 

224 """Queries a postgreSQL database for the given parameters 

225 and appends the given value to the specified column of the query result. 

226 

227 :param conn_params: <str> connection parameters as provided by CFG.conn_params 

228 :param tablename: <str> name of the table within the database to be updated 

229 :param vals2append_dict: <dict> a dictionary containing keys and value(s) to be set in the form 

230 {'col_name':[<value>,<value>]} 

231 :param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>} 

232 HINT: <value> can also be a list or a tuple of elements to match 

233 :param timeout: <int> allows to set a custom statement timeout (milliseconds) 

234 """ 

235 

236 assert len(vals2append_dict) == 1, 'Values can be appended to only one column at once.' 

237 if type(list(vals2append_dict.values())[0]) in [list, tuple]: 

238 raise NotImplementedError('Appending multiple values to one column at once is not yet supported.') 

239 cond_dict = cond_dict if cond_dict else {} 

240 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout) 

241 connection = psycopg2.connect(conn_params) 

242 if connection is None: 

243 warnings.warn('database connection fault') 

244 return 'database connection fault' 

245 cursor = connection.cursor() 

246 condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \ 

247 if cond_dict else "" 

248 col2update = list(vals2append_dict.keys())[0] 

249 pgSQL_val = get_postgreSQL_value(vals2append_dict[col2update]) 

250 pgSQL_val = pgSQL_val if type(vals2append_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val 

251 append_cond = "SET %s = array_cat(%s, '%s')" % (col2update, col2update, pgSQL_val) 

252 execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition) 

253 if cursor.fetchone()[0] == 0: 

254 warnings.warn("No record found fulfilling this condition: \n'%s'." % condition) 

255 else: 

256 execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + append_cond + " " + condition + ';') 

257 if 'connection' in locals(): 

258 connection.commit() 

259 connection.close() 

260 

261 

262def remove_item_from_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2remove_dict, cond_dict=None, timeout=15000): 

263 # type: (str, str, dict, dict, int) -> Union[None, str] 

264 """Queries a postgreSQL database for the given parameters 

265 and removes the given value from the specified column of the query result. 

266 

267 :param conn_params: <str> connection parameters as provided by CFG.conn_params 

268 :param tablename: <str> name of the table within the database to be updated 

269 :param vals2remove_dict: <dict> a dictionary containing keys and value(s) to be set in the form 

270 {'col_name':[<value>,<value>]} 

271 :param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>} 

272 HINT: <value> can also be a list or a tuple of elements to match 

273 :param timeout: <int> allows to set a custom statement timeout (milliseconds) 

274 """ 

275 

276 assert len(vals2remove_dict) == 1, 'Values can be removed from only one column at once.' 

277 if type(list(vals2remove_dict.values())[0]) in [list, tuple]: 

278 raise NotImplementedError('Removing multiple values from one column at once is not yet supported.') 

279 cond_dict = cond_dict if cond_dict else {} 

280 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout) 

281 connection = psycopg2.connect(conn_params) 

282 if connection is None: 

283 warnings.warn('database connection fault') 

284 return 'database connection fault' 

285 cursor = connection.cursor() 

286 condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \ 

287 if cond_dict else "" 

288 col2update = list(vals2remove_dict.keys())[0] 

289 pgSQL_val = get_postgreSQL_value(vals2remove_dict[col2update]) 

290 pgSQL_val = pgSQL_val if type(vals2remove_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val 

291 remove_cond = "SET %s = array_remove(%s, '%s')" % (col2update, col2update, pgSQL_val) 

292 execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition) 

293 if cursor.fetchone()[0] == 0: 

294 warnings.warn("No record found fulfilling this condition: \n'%s'." % condition) 

295 else: 

296 execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + remove_cond + " " + condition + ';') 

297 if 'connection' in locals(): 

298 connection.commit() 

299 connection.close() 

300 

301 

302def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2update, idx_val2decrement=None, 

303 idx_val2increment=None, cond_dict=None, timeout=15000): 

304 # type: (str, str, str, int, int, dict, int) -> Union[None, str] 

305 """Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements 

306 at a given position. HINT: The column must have values like that: [52,0,27,10,8,0,0,0,0] 

307 

308 :param conn_params: <str> connection parameters as provided by CFG.conn_params 

309 :param tablename: <str> name of the table within the database to be update 

310 :param col2update: <str> column name of the column to be updated 

311 :param idx_val2decrement: <int> the index of the array element to be decremented (starts with 1) 

312 :param idx_val2increment: <int> the index of the array element to be incremented (starts with 1) 

313 :param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>} 

314 HINT: <value> can also be a list or a tuple of elements to match 

315 :param timeout: <int> allows to set a custom statement timeout (milliseconds) 

316 :return: 

317 """ 

318 

319 cond_dict = cond_dict if cond_dict else {} 

320 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout) 

321 connection = psycopg2.connect(conn_params) 

322 if connection is None: 

323 warnings.warn('database connection fault') 

324 return 'database connection fault' 

325 cursor = connection.cursor() 

326 condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \ 

327 if cond_dict else "" 

328 

329 dec_str = '' if idx_val2decrement is None else \ 

330 "%s[%s] = %s[%s]-1" % (col2update, idx_val2decrement, col2update, idx_val2decrement) 

331 inc_str = '' if idx_val2increment is None else \ 

332 "%s[%s] = %s[%s]+1" % (col2update, idx_val2increment, col2update, idx_val2increment) 

333 

334 if dec_str or inc_str: 

335 dec_inc_str = ','.join([dec_str, inc_str]) 

336 execute_pgSQL_query(cursor, "UPDATE %s SET %s %s" % (tablename, dec_inc_str, condition)) 

337 

338 if 'connection' in locals(): 

339 connection.commit() 

340 connection.close() 

341 

342 

343def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeout=15000): 

344 # type: (str, str, dict, int) -> Union[int, str] 

345 """Creates a single new record in a postgreSQL database and pupulates its columns with the given values. 

346 

347 :param conn_params: <str> connection parameters as provided by CFG.conn_params 

348 :param tablename: <str> name of the table within the database to be updated 

349 :param vals2write_dict: <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>} 

350 :param timeout: <int> allows to set a custom statement timeout (milliseconds) 

351 """ 

352 

353 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout) 

354 connection = psycopg2.connect(conn_params) 

355 if connection is None: 

356 warnings.warn('database connection fault') 

357 return 'database connection fault' 

358 cursor = connection.cursor() 

359 

360 keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in vals2write_dict.items()]) 

361 

362 execute_pgSQL_query(cursor, "INSERT INTO %s (%s) VALUES (%s);" % (tablename, ','.join(keys), ','.join(vals))) 

363 execute_pgSQL_query(cursor, "SELECT id FROM %s ORDER BY id DESC LIMIT 1" % tablename) 

364 newID = cursor.fetchone()[0] 

365 

366 if 'connection' in locals(): 

367 connection.commit() 

368 connection.close() 

369 

370 return newID 

371 

372 

373def delete_record_in_postgreSQLdb(conn_params, tablename, record_id, timeout=15000): 

374 # type: (str, str, dict, int) -> Union[int, str] 

375 """Delete a single record in a postgreSQL database. 

376 

377 :param conn_params: <str> connection parameters as provided by CFG.conn_params 

378 :param tablename: <str> name of the table within the database to be updated 

379 :param record_id: <dict> ID of the record to be deleted 

380 :param timeout: <int> allows to set a custom statement timeout (milliseconds) 

381 """ 

382 

383 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout) 

384 connection = psycopg2.connect(conn_params) 

385 if connection is None: 

386 warnings.warn('database connection fault') 

387 return 'database connection fault' 

388 cursor = connection.cursor() 

389 

390 execute_pgSQL_query(cursor, "DELETE FROM %s WHERE id=%s;" % (tablename, record_id)) 

391 execute_pgSQL_query(cursor, "SELECT id FROM %s WHERE id=%s" % (tablename, record_id)) 

392 

393 res = cursor.fetchone() 

394 

395 if 'connection' in locals(): 

396 connection.commit() 

397 connection.close() 

398 

399 return 'success' if res is None else 'fail' 

400 

401 

402def get_pgSQL_geospatial_query_cond(conn_params, table2query, geomCol2use='bounds', tgt_corners_lonlat=None, 

403 scene_ID=None, queryfunc='ST_Intersects', crossing_dateline_check=True): 

404 assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!" 

405 

406 if tgt_corners_lonlat: 

407 # handle coordinates crossing the 180 degress meridian (dateline) 

408 # FIXME in that case the polygone has to be split at the dateline. otherwise pgSQL may yield wrong results 

409 if crossing_dateline_check: 

410 xvals = [x for x, y in tgt_corners_lonlat] 

411 if max(xvals) - min(xvals) > 180: 

412 tgt_corners_lonlat = [(x, y) if x > 0 else (x + 360, y) for x, y in tgt_corners_lonlat] 

413 

414 from .helper_functions import cornerLonLat_to_postgreSQL_poly 

415 pGSQL_poly = cornerLonLat_to_postgreSQL_poly(tgt_corners_lonlat) 

416 src_geom = "'SRID=4326;%s'::geometry" % pGSQL_poly # source geometry is given 

417 # FIXME scenes tabelle hat "geography" geoinfos -> eigener Index wird bei "geometry" nicht genutzt: 

418 tgt_geom = "%s.%s::geometry" % (table2query, geomCol2use) 

419 geocond = "%s(%s, %s)" % (queryfunc, src_geom, tgt_geom) 

420 else: # scene_ID is not None: 

421 connection = psycopg2.connect(conn_params) 

422 if connection is None: 

423 return 'database connection fault' 

424 cursor = connection.cursor() 

425 cmd = "SELECT ST_AsText(bounds) FROM scenes WHERE scenes.id = %s" % scene_ID 

426 execute_pgSQL_query(cursor, cmd) 

427 res = cursor.fetchone() 

428 cursor.close() 

429 connection.close() 

430 if len(res): 

431 src_geom = "'SRID=4326;%s'::geometry" % res 

432 else: 

433 print('The scene with the ID %s does not exist in the scenes table.') 

434 return [] 

435 geocond = "%s(%s, %s.%s::geometry)" % (queryfunc, src_geom, table2query, geomCol2use) 

436 return geocond 

437 

438 

439def get_overlapping_scenes_from_postgreSQLdb(conn_params, table='scenes_proc', scene_ID=None, 

440 tgt_corners_lonlat=None, conditions=None, add_cmds='', timeout=15000): 

441 # type: (str, str, int, list, Union[list, str], str, int) -> Union[list, str] 

442 

443 """Queries the postgreSQL database in order to find those scenes of a specified reference satellite (Landsat-8 or 

444 Sentinel-2) that have an overlap to the given corner coordinates AND that fulfill the given conditions. 

445 

446 :param conn_params: <str> connection parameters as provided by CFG.conn_params 

447 :param table: <str> name of the table within the database to be updated 

448 :param scene_ID: <int> a sceneID to get the target geographical extent from 

449 (needed if tgt_corners_lonlat is not provided) 

450 :param tgt_corners_lonlat: <list> a list of coordinates defining the target geographical extent 

451 (needed if scene_ID is not provided) 

452 :param conditions: <list> a list of additional query conditions 

453 :param add_cmds: <str> additional pgSQL commands to be added to the pgSQL query 

454 :param timeout: <int> allows to set a custom statement timeout (milliseconds) 

455 """ 

456 

457 conditions = [] if conditions is None else conditions if isinstance(conditions, list) else [conditions] 

458 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout) 

459 connection = psycopg2.connect(conn_params) 

460 if connection is None: 

461 return 'database connection fault' 

462 datasetids = [int(d.split('=')[1].strip()) for d in conditions if d.startswith('datasetid')] 

463 datasetid = datasetids[0] if datasetids else 104 # Landsat-8 

464 # FIXME: use Landsat-8 instead of Sentinel-2 as long as S2 L1A_P is not working: 

465 datasetid = 104 if datasetid == 249 else datasetid 

466 

467 if table != 'scenes_proc': 

468 assert datasetid, "filtdsId is needed if table is not 'scenes_proc'" 

469 if scene_ID is None: 

470 assert tgt_corners_lonlat, "Provide either scene_ID or tgt_corners_lonlat!" 

471 if tgt_corners_lonlat is None: 

472 assert scene_ID, "Provide either scene_ID or tgt_corners_lonlat!" 

473 

474 val2get = "scenes.id" if table == 'scenes' else "%s.sceneid" % table 

475 # refcond = ['scenes_proc.georef = True'] if not datasetids else ['scenes.datasetid = %s' %datasetid] 

476 refcond = ['scenes.datasetid = %s' % datasetid] 

477 

478 geocond = [get_pgSQL_geospatial_query_cond(conn_params, table, tgt_corners_lonlat=tgt_corners_lonlat, 

479 scene_ID=scene_ID, queryfunc='ST_Intersects', 

480 crossing_dateline_check=True)] 

481 

482 join = "INNER JOIN scenes ON (%s.sceneid = scenes.id) " % table if table != 'scenes' and datasetids else '' 

483 conditions = [c for c in conditions if not c.startswith('datasetid')] 

484 where = "WHERE %s" % " AND ".join(geocond + refcond + conditions) 

485 usedtbls = "scenes" if table == 'scenes' else "%s, scenes" % table if 'scenes.' in where and join == '' else table 

486 query = "SELECT %s FROM %s %s%s %s" % (val2get, usedtbls, join, where, add_cmds) 

487 cursor = connection.cursor() 

488 execute_pgSQL_query(cursor, query) 

489 records2return = cursor.fetchall() 

490 cursor.close() 

491 connection.close() 

492 return records2return 

493 

494 

495def get_overlapping_MGRS_tiles(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000): 

496 """In contrast to pgSQL 'Overlapping' here means that both geometries share some spatial area. 

497 So it combines ST_Overlaps and ST_Contains.""" 

498 assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!" 

499 

500 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout) 

501 connection = psycopg2.connect(conn_params) 

502 if connection is None: 

503 return 'database connection fault' 

504 

505 vals2get = ['grid100k', 'grid1mil', 'geom'] 

506 # FIXME this is covered by ST_Intersects: 

507 # geocond1 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Overlaps', 

508 # tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID) 

509 # geocond2 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Contains', 

510 # tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID) 

511 # geocond3 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Within', 

512 # tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID) 

513 geocond = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Intersects', 

514 tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID, 

515 crossing_dateline_check=True) 

516 # query = "SELECT %s FROM %s WHERE %s OR %s OR %s" 

517 # % (', '.join(vals2get), 'mgrs_tiles', geocond1, geocond2, geocond3) 

518 query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'mgrs_tiles', geocond) 

519 cursor = connection.cursor() 

520 execute_pgSQL_query(cursor, query) 

521 records = cursor.fetchall() 

522 cursor.close() 

523 connection.close() 

524 

525 GDF = GeoDataFrame(records, columns=['grid100k', 'grid1mil', 'wkb_hex']) 

526 

527 GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True))) 

528 GDF['granuleid'] = GDF['grid1mil'].str.cat(GDF['grid100k']) 

529 return GDF[['granuleid', 'shapelyPoly_LonLat']] 

530 

531 

532def get_overlapping_MGRS_tiles2(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000): 

533 assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!" 

534 

535 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout) 

536 connection = psycopg2.connect(conn_params) 

537 if connection is None: 

538 return 'database connection fault' 

539 

540 vals2get = ['granuleid', 'footprint_wgs84'] 

541 geocond = get_pgSQL_geospatial_query_cond(conn_params, 'footprints_sentinel2_granules', 

542 geomCol2use='footprint_wgs84', 

543 tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID) 

544 query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'footprints_sentinel2_granules', geocond) 

545 

546 cursor = connection.cursor() 

547 execute_pgSQL_query(cursor, query) 

548 records = cursor.fetchall() 

549 cursor.close() 

550 connection.close() 

551 

552 GDF = GeoDataFrame(records, columns=['granuleid', 'wkb_hex']) 

553 

554 GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True))) 

555 

556 return GDF[['granuleid', 'shapelyPoly_LonLat']] 

557 

558 

559def get_dict_satellite_name_id(conn_params): 

560 # type: (str) -> dict 

561 """Returns a dictionary with satellite names as keys and satellite IDs as values as read from pgSQL database. 

562 

563 :param conn_params: <str> pgSQL database connection parameters 

564 """ 

565 

566 res = get_info_from_postgreSQLdb(conn_params, 'satellites', ['name', 'id']) 

567 assert len(res) > 0, 'Error getting satellite names from postgreSQL database.' 

568 arr = np.array(res) 

569 return dict(zip(list(arr[:, 0]), list(arr[:, 1]))) 

570 

571 

572def get_dict_sensor_name_id(conn_params): 

573 # type: (str) -> dict 

574 """Returns a dictionary with sensor names as keys and sensor IDs as values as read from pgSQL database. 

575 :param conn_params: <str> pgSQL database connection parameters """ 

576 

577 res = get_info_from_postgreSQLdb(conn_params, 'sensors', ['name', 'id']) 

578 assert len(res) > 0, 'Error getting sensor names from postgreSQL database.' 

579 arr = np.array(res) 

580 return dict(zip(list(arr[:, 0]), list(arr[:, 1]))) 

581 

582 

583def get_entityIDs_from_filename(conn_DB, filename): 

584 # type: (str, str) -> list 

585 """Returns entityID(s) for the given filename. In case of Sentinel-2 there can be more multiple entity IDs if 

586 multiple granules are saved in one .zip file. 

587 

588 :param conn_DB: <str> pgSQL database connection parameters 

589 :param filename: <str> the filename to get the corresponding entity ID(s) for 

590 """ 

591 

592 if filename[:2] in ['LE', 'LC', 'LO'] and filename.endswith('.tar.gz'): # Landsat 

593 entityIDs = [filename.split('.tar.gz')[0]] 

594 else: 

595 print('Querying database in order to get entityIDs for %s...' % filename) 

596 res = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['entityid'], {'filename': filename}, timeout=40000) 

597 entityIDs = [subres[0] for subres in res] if len(res) > 0 else [] 

598 return entityIDs 

599 

600 

601def get_filename_by_entityID(conn_DB, entityid, satellite): 

602 # type: (str,str,str) -> str 

603 """Returns the filename for the given entity ID. 

604 

605 :param conn_DB: <str> pgSQL database connection parameters 

606 :param entityid: <str> entity ID 

607 :param satellite: <str> satellite name to which the entity ID is belonging 

608 """ 

609 

610 if re.search(r'Landsat', satellite, re.I): 

611 filename = '%s.tar.gz' % entityid 

612 elif re.search(r'Sentinel-2', satellite, re.I): 

613 filename = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['filename'], 

614 {'entityid': entityid}, records2fetch=1)[0][0] 

615 else: 

616 raise NotImplementedError 

617 return filename 

618 

619 

620def get_notDownloadedsceneIDs(conn_DB, entityIDs, satellite, sensor, src_folder): 

621 # type: (str,list,str,str,str) -> np.ndarray 

622 """Takes a list of entity IDs and extracts those records that have the corresponding archive file in the given 

623 source folder and that have the processing level 'METADATA' in the pgSQL database. Based on this subset a numpy 

624 array containing the corresponding scene IDs and the target filenames for the fileserver is returned. 

625 

626 :param conn_DB: <str> pgSQL database connection parameters 

627 :param entityIDs: <list> a list of entity IDs 

628 :param satellite: <str> the name of the satellite to restrict the query on 

629 :param sensor: <str> the name of the sensor to restrict the query on 

630 :param src_folder: <str> the source directory where archive files are saved 

631 """ 

632 

633 columns = ['id', 'entityid', 'satelliteid', 'sensorid', 'filename', 'proc_level'] 

634 result = get_info_from_postgreSQLdb(conn_DB, 'scenes', columns, {'entityid': entityIDs}) 

635 df = pd.DataFrame(result, columns=columns) 

636 

637 satNameID_dic = get_dict_satellite_name_id(conn_DB) 

638 satID = satNameID_dic[satellite] 

639 target_folder = os.path.join(CFG.path_archive, satellite, sensor) 

640 

641 def get_fName(entityid): return get_filename_by_entityID(conn_DB, entityid, satellite) 

642 

643 def tgt_exists(fileName): return os.path.exists(os.path.join(target_folder, fileName)) 

644 

645 def src_exists(entityid): 

646 return os.path.exists(os.path.join(src_folder, get_filename_by_entityID(conn_DB, entityid, satellite))) 

647 df['tgt_fileName'] = list(df['entityid'].map(get_fName)) 

648 df['tgtFile_exists'] = list(df['tgt_fileName'].map(tgt_exists)) 

649 df['srcFile_exists'] = list(df['entityid'].map(src_exists)) 

650 tgt_satID = (df.satelliteid == float(satID)) 

651 # isDL = (df.proc_level == 'DOWNLOADED') 

652 isMET = (df.proc_level == 'METADATA') 

653 # tgtE = (df.tgtFile_exists == True) 

654 srcE = df.srcFile_exists # (df.srcFile_exists == True) 

655 # sceneIDs_notDL_tgtE = df[tgt_satID & (isDL == 0) & tgtE] # maybe needed later 

656 # sceneIDs_DL_tgtNE = df[tgt_satID & isDL & (tgtE == 0)] # maybe needed later 

657 # sceneIDs_DL_tgtE = df[tgt_satID & isDL & tgtE] # maybe needed later 

658 sceneIDs_isMET_srcE = df[tgt_satID & isMET & srcE] 

659 return sceneIDs_isMET_srcE[['id', 'tgt_fileName']].values 

660 

661 

662class GMS_JOB(object): 

663 """gms_preprocessing job manager""" 

664 

665 def __init__(self, conn_db): 

666 # type: (str) -> None 

667 """ 

668 :param conn_db: <str> the database connection parameters as given by CFG.conn_params 

669 """ 

670 # privates 

671 self._virtualsensorid = None 

672 

673 # defaults 

674 self.conn = conn_db 

675 self.dataframe = DataFrame() 

676 self.scene_counts = {} # set by self.create() 

677 

678 self.exists_in_db = False 

679 self.id = None #: int 

680 self.creationtime = datetime.now() # default, needed to create new job 

681 self.finishtime = None 

682 self.sceneids = [] 

683 self.timerange_start = datetime.min 

684 self.timerange_end = datetime.max 

685 self.bounds = box(-180, -90, 180, 90) # default, needed to create new job 

686 self.distribution_index = None 

687 self.progress = None 

688 self.feedback = None 

689 self.failed_sceneids = [] 

690 self.ref_job_id = None 

691 self.datacube_mgrs_tiles_proc = [] 

692 self.non_ref_datasetids = [] 

693 self.max_cloudcover = None 

694 self.season_code = None # type: int 

695 self.path_analysis_script = '' # TODO 

696 self.job_mode = 'processing_only' # FIXME download/processing/... 

697 self.jobs_table_columns = ['id', 'creationtime', 'finishtime', 'sceneids', 'timerange_start', 

698 'timerange_end', 'bounds', 'distribution_index', 'progress', 'feedback', 

699 'failed_sceneids', 'datasetid_spatial_ref', 

700 'virtualsensorid', 'ref_job_id', 'datacube_mgrs_tiles_proc', 'comment', 

701 'non_ref_datasetids', 'max_cloudcover', 'season_code', 'status', 

702 'path_analysis_script', 'analysis_parameter', 'statistics', 'job_mode'] 

703 self.datasetid_spatial_ref = 249 # this is overwritten if existing job is read from DB but needed to create new 

704 self.datasetname_spatial_ref = 'SENTINEL-2A' # same here 

705 self.status = None 

706 self.statistics = [] 

707 self.comment = None 

708 self.epsg = None # set by self._set_target_sensor_specs() 

709 self.ground_spatial_sampling = None # set by self._set_target_sensor_specs() 

710 self.analysis_parameter = None 

711 

712 def __repr__(self): 

713 return 'GMS job:\n\n' + Series(self.db_entry).to_string() 

714 

715 @property 

716 def virtualsensorid(self): 

717 return self._virtualsensorid 

718 

719 @virtualsensorid.setter 

720 def virtualsensorid(self, value): 

721 """Set virtual sensor ID but continue if no data value is received 

722 NOTE: set by self._set_target_sensor_specs() and self.from_ID()""" 

723 if value != -1: # no data value 

724 self._virtualsensorid = value 

725 

726 def _set_target_sensor_specs(self, virtual_sensor_id, datasetid_spatial_ref): 

727 self.virtualsensorid = virtual_sensor_id 

728 

729 if not isinstance(datasetid_spatial_ref, int): 

730 raise ValueError(datasetid_spatial_ref) 

731 

732 res = get_info_from_postgreSQLdb(self.conn, 'virtual_sensors', ['spatial_resolution', 

733 "projection_epsg"], {'id': virtual_sensor_id}) 

734 assert res, \ 

735 "'virtual_sensor_id'=%s does not exist in the table 'virtual_sensors' of the database." % virtual_sensor_id 

736 target_gsd = res[0][0] 

737 self.ground_spatial_sampling = [target_gsd, target_gsd] if type(target_gsd) in [int, float] else target_gsd 

738 self.epsg = int(res[0][1]) 

739 

740 self.datasetid_spatial_ref = datasetid_spatial_ref 

741 res = get_info_from_postgreSQLdb(self.conn, 'datasets', ['name'], {'id': datasetid_spatial_ref}) 

742 assert res, \ 

743 "'datasetid_spatial_ref'=%s does not exist in the table 'datasets' of the database." % datasetid_spatial_ref 

744 self.datasetname_spatial_ref = res 

745 

746 @property 

747 def db_entry(self): 

748 """Returns an OrderedDict containing keys and values of the database entry. 

749 """ 

750 

751 db_entry = collections.OrderedDict() 

752 for i in self.jobs_table_columns: 

753 val = getattr(self, i) 

754 

755 if i == 'virtualsensorid' and val is None: 

756 val = -1 # nodata value 

757 

758 db_entry[i] = val 

759 

760 return db_entry 

761 

762 def from_dictlist(self, dictlist_data2process, virtual_sensor_id, datasetid_spatial_ref=249, comment=None): 

763 # type: (list, int, int, str) -> GMS_JOB 

764 """ 

765 :param dictlist_data2process: <list> a list of dictionaries containing the keys "satellite", "sensor" and 

766 "filenames", 

767 e.g. [{'satellite:'Landsat-8,'sensor':'OLI_TIRS','filenames':file.tar.gz},{...}] 

768 :param virtual_sensor_id : <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database 

769 :param datasetid_spatial_ref: <int> a valid dataset ID of the dataset to be chosen as spatial reference 

770 (from the 'datasets' table of the postgreSQL database) 

771 (default:249 - Sentinel-2A), 104=Landsat-8 

772 :param comment: <str> a comment describing the job (e.g. 'Beta job') 

773 """ 

774 

775 self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref) 

776 self.comment = comment 

777 

778 dictlist_data2process = dictlist_data2process if dictlist_data2process else [] 

779 

780 for idx, datadict in enumerate(dictlist_data2process): 

781 assert isinstance(datadict, dict), "Expected only dictionaries within 'dictlist_data2process'. " \ 

782 "Got %s in there." % type(datadict) 

783 assert False not in [i in datadict for i in ['satellite', 'sensor', 'filenames']] 

784 assert type(datadict['filenames']) in [list, str] 

785 

786 if isinstance(datadict['filenames'], str): 

787 if datadict['filenames'].endswith('.csv'): 

788 assert os.path.exists(datadict['filenames']) 

789 else: 

790 datadict['filenames'] = [datadict['filenames']] 

791 

792 # find all duplicates in input datadicts and build common dataframe 

793 all_dfs = [] 

794 for datadict in dictlist_data2process: 

795 assert isinstance(datadict, dict) 

796 

797 if isinstance(datadict['filenames'], str) and datadict['filenames'].endswith('.csv'): 

798 datadict['filenames'] = None # TODO implement csv reader here 

799 raise NotImplementedError 

800 

801 else: 

802 temp_df = DataFrame(datadict, columns=['satellite', 'sensor', 'filenames']) 

803 

804 if re.search(r'Landsat-7', datadict['satellite'], re.I) and \ 

805 re.search(r'ETM+', datadict['sensor'], re.I): 

806 

807 from .helper_functions import Landsat_entityID_decrypter as LED 

808 

809 def get_L7_sensor(fN): 

810 return LED(fN.split('.tar.gz')[0]).sensorIncSLC 

811 

812 temp_df['sensor'] = list(temp_df['filenames'].map(get_L7_sensor)) 

813 

814 all_dfs.append(temp_df) 

815 

816 df = DataFrame(pd.concat(all_dfs)).drop_duplicates() 

817 df.columns = ['satellite', 'sensor', 'filename'] 

818 

819 # run self.from_dictlist 

820 sceneInfoDF = self._get_validated_sceneInfoDFs(df) 

821 

822 # populate attributes 

823 self._populate_jobAttrs_from_sceneInfoDF(sceneInfoDF) 

824 

825 return self 

826 

827 def from_sceneIDlist(self, list_sceneIDs, virtual_sensor_id, datasetid_spatial_ref=249, comment=None): 

828 # type: (list, int, int, str) -> object 

829 """ 

830 Create a GMS_JOB instance based on the given list of scene IDs. 

831 

832 :param list_sceneIDs: <list> of scene IDs, e.g. [26781907, 26781917, 26542650, 26542451, 26541679] 

833 :param virtual_sensor_id : <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database 

834 :param datasetid_spatial_ref: <int> a valid dataset ID of the dataset to be chosen as spatial reference 

835 (from the 'datasets' table of the postgreSQL database) 

836 (default:249 - Sentinel-2A), 104=Landsat-8 

837 :param comment: <str> a comment describing the job (e.g. 'Beta job') 

838 """ 

839 

840 self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref) 

841 self.comment = comment 

842 

843 list_sceneIDs = list(list_sceneIDs) 

844 

845 # query 'satellite', 'sensor', 'filename' from database and summarize in DataFrame 

846 with psycopg2.connect(self.conn) as conn: 

847 with conn.cursor() as cursor: 

848 execute_pgSQL_query(cursor, 

849 """SELECT scenes.id, satellites.name, sensors.name, scenes.filename FROM scenes 

850 LEFT JOIN satellites on scenes.satelliteid=satellites.id 

851 LEFT JOIN sensors on scenes.sensorid=sensors.id 

852 WHERE scenes.id in (%s)""" % ','.join([str(i) for i in list_sceneIDs])) 

853 df = DataFrame(cursor.fetchall(), columns=['sceneid', 'satellite', 'sensor', 'filename']) 

854 

855 # FIXME overwriting 'ETM+_SLC_OFF' with 'ETM+' causes _get_validated_sceneInfoDFs() to fail because the 

856 # FIXME sensorid for ETM+_SLC_OFF cannot be retrieved 

857 # df['sensor'] = df['sensor'].apply(lambda senN: senN if senN != 'ETM+_SLC_OFF' else 'ETM+') 

858 df = df.drop_duplicates() 

859 

860 if df.empty: 

861 raise ValueError('None of the given scene IDs could be found in the GeoMultiSens database. ' 

862 'Job creation failed.') 

863 else: 

864 missing_IDs = [i for i in list_sceneIDs if i not in df['sceneid'].values] 

865 if missing_IDs: 

866 warnings.warn('The following scene IDs could not been found in the GeoMultiSens database: \n%s' 

867 % '\n'.join([str(i) for i in missing_IDs])) 

868 

869 # run self.from_dictlist 

870 sceneInfoDF = self._get_validated_sceneInfoDFs(df) 

871 

872 # populate attributes 

873 self._populate_jobAttrs_from_sceneInfoDF(sceneInfoDF) 

874 

875 return self 

876 

877 def from_entityIDlist(self, list_entityids, virtual_sensor_id, datasetid_spatial_ref=249, comment=None): 

878 """Create a GMS_JOB instance based on the given list of entity IDs. 

879 

880 :param list_entityids: 

881 :param virtual_sensor_id: 

882 :param datasetid_spatial_ref: 

883 :param comment: 

884 :return: 

885 """ 

886 

887 res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'entityid': list_entityids}) 

888 if not res_sceneIDs: 

889 raise ValueError('No matching database entries found for the given entity IDs.') 

890 

891 list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist() 

892 count_no_match = len(list_entityids) - len(list_sceneIDs) 

893 

894 if count_no_match: 

895 warnings.warn('%s datasets could not be found the database. They cannot be processed.' % count_no_match) 

896 

897 return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id, 

898 datasetid_spatial_ref=datasetid_spatial_ref, comment=comment) 

899 

900 def from_filenames(self, list_filenames, virtual_sensor_id, datasetid_spatial_ref=249, comment=None): 

901 """Create a GMS_JOB instance based on the given list of provider archive filenames. 

902 

903 :param list_filenames: 

904 :param virtual_sensor_id: 

905 :param datasetid_spatial_ref: 

906 :param comment: 

907 :return: 

908 """ 

909 

910 res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'filename': list_filenames}) 

911 if not res_sceneIDs: 

912 raise ValueError('No matching database entries found for the given filenames.') 

913 

914 list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist() 

915 count_no_match = len(list_filenames) - len(list_sceneIDs) 

916 

917 if count_no_match: 

918 warnings.warn('%s datasets could not be found the database. They cannot be processed.') 

919 

920 return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id, 

921 datasetid_spatial_ref=datasetid_spatial_ref, comment=comment) 

922 

923 def _get_validated_sceneInfoDFs(self, DF_SatSenFname): 

924 # type: (DataFrame) -> DataFrame 

925 """ 

926 

927 :param DF_SatSenFname: 

928 :return: 

929 """ 

930 

931 df = DF_SatSenFname 

932 

933 # loop through all satellite-sensor combinations and get scene information from database 

934 all_df_recs, all_df_miss = [], [] 

935 all_satellites, all_sensors = zip( 

936 *[i.split('__') for i in (np.unique(df['satellite'] + '__' + df['sensor']))]) 

937 

938 for satellite, sensor in zip(all_satellites, all_sensors): 

939 cur_df = df.loc[(df['satellite'] == satellite) & (df['sensor'] == sensor)] 

940 filenames = list(cur_df['filename']) 

941 

942 satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': satellite}) 

943 senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': sensor}) 

944 assert len(satID_res), "No satellite named '%s' found in database." % satellite 

945 assert len(senID_res), "No sensor named '%s' found in database." % sensor 

946 

947 # append sceneid and wkb_hex bounds 

948 if 'sceneid' in df.columns: 

949 sceneIDs = list(cur_df['sceneid']) 

950 conddict = dict(id=sceneIDs, satelliteid=satID_res[0][0], sensorid=senID_res[0][0]) 

951 else: 

952 conddict = dict(filename=filenames, satelliteid=satID_res[0][0], sensorid=senID_res[0][0]) 

953 

954 records = get_info_from_postgreSQLdb( 

955 self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'], conddict) 

956 records = DataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom']) 

957 if 'sceneid' in df.columns: 

958 del records['sceneid'] 

959 

960 cur_df = cur_df.merge(records, on='filename', how="outer", copy=False) 

961 

962 # separate records with valid matches in database from invalid matches (filename not found in database) 

963 df_recs = cur_df[ 

964 cur_df.sceneid.notnull()].copy() # creates a copy (needed to be able to apply maps later) 

965 df_miss = cur_df[cur_df.sceneid.isnull()] # creates a view 

966 

967 # convert scene ids from floats to integers 

968 df_recs['sceneid'] = list(df_recs.sceneid.map(lambda sceneid: int(sceneid))) 

969 

970 # wkb_hex bounds to shapely polygons 

971 df_recs['polygons'] = list(df_recs.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True))) 

972 

973 all_df_recs.append(df_recs) 

974 all_df_miss.append(df_miss) 

975 

976 # merge all dataframes of all satellite-sensor combinations 

977 df_recs_compl = DataFrame(pd.concat(all_df_recs)) 

978 df_miss_compl = DataFrame(pd.concat(all_df_miss)) 

979 

980 # populate attributes 

981 if not df_miss_compl.empty: 

982 warnings.warn('The following scene filenames could not been found in the GeoMultiSens database: \n%s' 

983 % '\n'.join(list(df_miss_compl['filename']))) 

984 

985 return df_recs_compl 

986 

987 def _populate_jobAttrs_from_sceneInfoDF(self, sceneInfoDF): 

988 # type: (DataFrame) -> None 

989 """ 

990 

991 :param sceneInfoDF: 

992 :return: 

993 """ 

994 

995 if not sceneInfoDF.empty: 

996 self.dataframe = sceneInfoDF 

997 self.sceneids = list(self.dataframe['sceneid']) 

998 self.statistics = [len(self.sceneids)] + [0] * 8 

999 self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds) 

1000 self.timerange_start = self.dataframe.acquisitiondate.min().to_pydatetime() 

1001 self.timerange_end = self.dataframe.acquisitiondate.max().to_pydatetime() 

1002 

1003 def from_job_ID(self, job_ID): 

1004 # type: (int) -> GMS_JOB 

1005 """ 

1006 Create a GMS_JOB instance by querying the database for a specific job ID. 

1007 :param job_ID: <int> a valid id from the database table 'jobs' 

1008 """ 

1009 

1010 res = get_info_from_postgreSQLdb(self.conn, 'jobs', self.jobs_table_columns, {'id': job_ID}) 

1011 if not res: 

1012 raise ValueError("No job with ID %s found in 'jobs' table of the database." % job_ID) 

1013 

1014 self.exists_in_db = True 

1015 [setattr(self, attrName, res[0][i]) for i, attrName in enumerate(self.jobs_table_columns)] 

1016 self.bounds = wkb_loads(self.bounds, hex=True) 

1017 

1018 # fill self.dataframe 

1019 records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['satelliteid', 'sensorid', 'filename', 

1020 'id', 'acquisitiondate', 'bounds'], 

1021 {'id': self.sceneids}) 

1022 df = DataFrame(records, 

1023 columns=['satelliteid', 'sensorid', 'filename', 'sceneid', 'acquisitiondate', 'geom']) 

1024 all_satIDs = df.satelliteid.unique().tolist() 

1025 all_senIDs = df.sensorid.unique().tolist() 

1026 satName_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['name'], {'id': all_satIDs}) 

1027 senName_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['name'], {'id': all_senIDs}) 

1028 all_satNames = [i[0] for i in satName_res] 

1029 all_senNames = [i[0] for i in senName_res] 

1030 id_satName_dict = dict(zip(all_satIDs, all_satNames)) 

1031 id_senName_dict = dict(zip(all_senIDs, all_senNames)) 

1032 df.insert(0, 'satellite', list(df.satelliteid.map(lambda satID: id_satName_dict[satID]))) 

1033 df.insert(1, 'sensor', list(df.sensorid.map(lambda senID: id_senName_dict[senID]))) 

1034 df['polygons'] = list(df.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True))) 

1035 

1036 self.dataframe = df[['satellite', 'sensor', 'filename', 'sceneid', 'acquisitiondate', 'geom', 'polygons']] 

1037 

1038 return self 

1039 

1040 def reset_job_progress(self): 

1041 """Resets everthing in the database entry that has been written during the last run of the job.. 

1042 """ 

1043 

1044 self.finishtime = None 

1045 self.failed_sceneids = [] 

1046 self.progress = None 

1047 self.status = 'pending' 

1048 self.statistics = [len(self.sceneids)] + [0] * 8 

1049 

1050 self.update_db_entry() 

1051 

1052 def _get_dataframe(self, datadict): # FIXME deprecated 

1053 df = DataFrame(datadict, columns=['satellite', 'sensor', 'filenames']) 

1054 df.columns = ['satellite', 'sensor', 'filename'] 

1055 

1056 satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': datadict['satellite']}) 

1057 senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': datadict['sensor']}) 

1058 assert len(satID_res), "No satellite named '%s' found in database." % datadict['satellite'] 

1059 assert len(senID_res), "No sensor named '%s' found in database." % datadict['sensor'] 

1060 

1061 # append sceneid and wkb_hex bounds 

1062 records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'], 

1063 {'filename': datadict['filenames'], 

1064 'satelliteid': satID_res[0][0], 'sensorid': senID_res[0][0]}) 

1065 records = DataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom']) 

1066 df = df.merge(records, on='filename', how="outer") 

1067 

1068 # separate records with valid matches in database from invalid matches (filename not found in database) 

1069 df_recs = df[df.sceneid.notnull()].copy() # creates a copy (needed to be able to apply maps later) 

1070 df_miss = df[df.sceneid.isnull()] # creates a view 

1071 

1072 # convert scene ids from floats to integers 

1073 df_recs['sceneid'] = list(df_recs.sceneid.map(lambda sceneid: int(sceneid))) 

1074 

1075 # wkb_hex bounds to shapely polygons 

1076 df_recs['polygons'] = list(df_recs.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True))) 

1077 

1078 return df_recs, df_miss 

1079 

1080 def create(self): 

1081 # type: () -> int 

1082 """ 

1083 Add the job to the 'jobs' table of the database 

1084 :return: <int> the job ID of the newly created job 

1085 """ 

1086 

1087 if not self.dataframe.empty: 

1088 all_sat, all_sen = \ 

1089 zip(*[i.split('__') for i in 

1090 (np.unique(self.dataframe['satellite'] + '__' + self.dataframe['sensor']))]) 

1091 counts = [self.dataframe[(self.dataframe['satellite'] == sat) & 

1092 (self.dataframe['sensor'] == sen)]['sceneid'].count() 

1093 for sat, sen in zip(all_sat, all_sen)] 

1094 self.scene_counts = {'%s %s' % (sat, sen): cnt for sat, sen, cnt in zip(all_sat, all_sen, counts)} 

1095 self.statistics = [len(self.sceneids)] + [0] * 8 

1096 

1097 db_entry = self.db_entry 

1098 del db_entry['id'] 

1099 

1100 newID = create_record_in_postgreSQLdb(self.conn, 'jobs', db_entry) 

1101 assert isinstance(newID, int) 

1102 

1103 SatSen_CountTXT = ['%s %s %s scene' % (cnt, sat, sen) if cnt == 1 else '%s %s %s scenes' % (cnt, sat, sen) 

1104 for sat, sen, cnt in zip(all_sat, all_sen, counts)] 

1105 print('New job created successfully. job-ID: %s\nThe job contains:' % newID) 

1106 [print('\t- %s' % txt) for txt in SatSen_CountTXT] 

1107 

1108 self.exists_in_db = True 

1109 self.id = newID 

1110 return self.id 

1111 else: 

1112 print('No job created because no matching scene could be found in database!') 

1113 

1114 def update_db_entry(self): 

1115 """Updates the all values of current database entry belonging to the respective job ID. New values are taken 

1116 from the attributes of the GMS_JOB instance. 

1117 """ 

1118 

1119 assert self.exists_in_db 

1120 db_entry = self.db_entry 

1121 del db_entry['id'] # primary key of the record cannot be overwritten 

1122 update_records_in_postgreSQLdb(self.conn, 'jobs', db_entry, {'id': self.id}) 

1123 

1124 def delete_procdata_of_failed_sceneIDs(self, proc_level='all', force=False): 

1125 """Deletes all data where processing failed within the current job ID. 

1126 

1127 :param proc_level: <str> delete only results that have the given processing level 

1128 :param force: 

1129 """ 

1130 

1131 self.__delete_procdata(self.failed_sceneids, 'failed', proc_level=proc_level, force=force) 

1132 

1133 def delete_procdata_of_entire_job(self, proc_level='all', force=False): 

1134 """Deletes all scene data processed by the current job ID. 

1135 

1136 :param proc_level: <str> delete only results that have the given processing level 

1137 :param force: 

1138 """ 

1139 

1140 self.__delete_procdata(self.sceneids, 'processed', proc_level=proc_level, force=force) 

1141 

1142 def __delete_procdata(self, list_sceneIDs, scene_desc, proc_level='all', force=False): 

1143 """Applies delete_processing_results on each scene given in list_sceneIDs. 

1144 

1145 :param list_sceneIDs: <list> a list of scene IDs 

1146 :param scene_desc: <str> a description like 'succeeded' or 'failed' 

1147 :param proc_level: <str> delete only results that have the given processing level 

1148 :param force: 

1149 """ 

1150 

1151 if self.exists_in_db: 

1152 if list_sceneIDs: 

1153 delete = 'J' 

1154 if not force: 

1155 delete = input("Do you really want to delete the processing results of %s scenes? (J/n)" 

1156 % len(list_sceneIDs)) 

1157 if delete == 'J': 

1158 [delete_processing_results(ScID, proc_level=proc_level, force=force) for ScID in list_sceneIDs] 

1159 else: 

1160 warnings.warn( 

1161 '\nAccording to the database the job has no %s scene IDs. Nothing to delete.' % scene_desc) 

1162 else: 

1163 warnings.warn('The job with the ID %s does not exist in the database. Thus there are no %s scene IDs.' 

1164 % (scene_desc, self.id)) 

1165 

1166 

1167def delete_processing_results(scene_ID, proc_level='all', force=False): 

1168 """Deletes the processing results of a given scene ID 

1169 

1170 :param scene_ID: <int> the scene ID to delete results from 

1171 :param proc_level: <str> delete only results that have the given processing level 

1172 :param force: <bool> force deletion without user interaction 

1173 """ 

1174 

1175 if proc_level not in ['all'] + proc_chain: 

1176 raise ValueError("'%s' is not a supported processing level." % proc_level) 

1177 

1178 path_procdata = PG.path_generator(scene_ID=scene_ID).get_path_procdata() 

1179 if not os.path.isdir(path_procdata): 

1180 print('The folder %s does not exist. Nothing to delete.' % path_procdata) 

1181 else: 

1182 delete = 'J' 

1183 if not force: 

1184 dir_list = os.listdir(path_procdata) if proc_level == 'all' else \ 

1185 glob.glob(os.path.join(path_procdata, '*%s*' % proc_level)) 

1186 count_files = len([i for i in dir_list if os.path.isfile(os.path.join(path_procdata, i))]) 

1187 count_dirs = len([i for i in dir_list if os.path.isdir(os.path.join(path_procdata, i))]) 

1188 if count_files or count_dirs: 

1189 delete = input("Do you really want to delete the folder %s? It contains %s files and %s directories" 

1190 " to delete. (J/n)" % (path_procdata, count_files, count_dirs)) 

1191 else: 

1192 print('The folder %s does not not contain any files that match the given deletion criteria. ' 

1193 'Nothing to delete.' % path_procdata) 

1194 if delete == 'J': 

1195 try: 

1196 if proc_level == 'all': 

1197 try: 

1198 shutil.rmtree(path_procdata) 

1199 except OSError: # directory not deletable because it is not empty 

1200 if [F for F in os.listdir(path_procdata) if not os.path.basename(F).startswith('.fuse_hidden')]: 

1201 raise # raise OSError if there are other files than .fuse_hidden... remaining 

1202 else: 

1203 files2delete = glob.glob(os.path.join(path_procdata, '*%s*' % proc_level)) 

1204 errors = False # default 

1205 for F in files2delete: 

1206 try: 

1207 os.remove(F) 

1208 except OSError: 

1209 if not os.path.basename(F).startswith('.fuse_hidden'): 

1210 errors = True 

1211 if errors: 

1212 raise OSError('Not all files deleted properly.') 

1213 

1214 except OSError: 

1215 msg = '\nNot all files of scene %s could be deleted properly. Remaining files:\n%s\n\nThe following ' \ 

1216 'error occurred:\n%s' % (scene_ID, '\n'.join(os.listdir(path_procdata)), traceback.format_exc()) 

1217 warnings.warn(msg) 

1218 

1219 

1220def add_externally_downloaded_data_to_GMSDB(conn_DB, src_folder, filenames, satellite, sensor): 

1221 # type: (str,str,list,str,str) -> None 

1222 """Adds externally downloaded satellite scenes to GMS fileserver AND updates the corresponding postgreSQL records 

1223 by adding a filename and setting the processing level to 'DOWNLOADED'.: 

1224 

1225 :param conn_DB: <str> pgSQL database connection parameters 

1226 :param src_folder: <str> the source directory where externally provided archive files are saved 

1227 :param filenames: <list> a list of filenames to be added to the GMS database 

1228 :param satellite: <str> the name of the satellite to which the filenames are belonging 

1229 :param sensor: <str> the name of the sensor to which the filenames are belonging 

1230 """ 

1231 

1232 # FIXME this method only works for Landsat archives or if filename is already set in database 

1233 # FIXME (not always the case for S2A)! 

1234 res = [get_entityIDs_from_filename(conn_DB, fName) for fName in filenames] 

1235 entityIDs = list(itertools.chain.from_iterable(res)) 

1236 

1237 sceneID_fName_arr = get_notDownloadedsceneIDs(conn_DB, entityIDs, satellite, sensor, src_folder) 

1238 

1239 files2copy = list(set(sceneID_fName_arr[:, 1])) 

1240 target_folder = os.path.join(CFG.path_archive, satellite, sensor) 

1241 assert os.path.exists(target_folder), 'Target folder not found: %s.' % target_folder 

1242 print('Copying %s files to %s.' % (len(files2copy), target_folder)) 

1243 

1244 for i in range(sceneID_fName_arr.shape[0]): 

1245 sceneID, fName = sceneID_fName_arr[i, :] 

1246 src_P = os.path.join(src_folder, fName) 

1247 if os.path.exists(os.path.join(target_folder, os.path.basename(src_P))): 

1248 print("File '%s' already exists in the target folder. Skipped." % os.path.basename(src_P)) 

1249 else: 

1250 print('copying %s...' % src_P) 

1251 shutil.copy(src_P, target_folder) 

1252 

1253 print("Setting proc_level for scene ID '%s' to 'DOWNLOADED' and adding filename..." % sceneID) 

1254 update_records_in_postgreSQLdb(conn_DB, 'scenes', {'filename': fName, 'proc_level': 'DOWNLOADED'}, 

1255 {'id': sceneID}) 

1256 

1257 

1258def add_missing_filenames_in_pgSQLdb(conn_params): # FIXME 

1259 res = get_info_from_postgreSQLdb(conn_params, 'scenes', ['id', 'entityid', 'satelliteid', 'sensorid', 'filename'], 

1260 {'filename': None, 'proc_level': 'DOWNLOADED', 'sensorid': 8}, timeout=120000) 

1261 gdf = GeoDataFrame(res, columns=['sceneid', 'entityid', 'satelliteid', 'sensorid', 'filename']) 

1262 

1263 def get_fName(sceneid): return PG.path_generator(scene_ID=sceneid).get_local_archive_path_baseN() 

1264 

1265 def get_fName_if_exists(path): return os.path.basename(path) if os.path.exists(path) else None 

1266 

1267 gdf['archive_path'] = list(gdf['sceneid'].map(get_fName)) 

1268 gdf['filename'] = list(gdf['archive_path'].map(get_fName_if_exists)) 

1269 

1270 print(gdf) 

1271 

1272 

1273def pdDataFrame_to_sql_k(engine, frame, name, if_exists='fail', index=True, 

1274 index_label=None, schema=None, chunksize=None, dtype=None, **kwargs): 

1275 # type: (any,pd.DataFrame,str,str,bool,str,str,int,dict,any) -> None 

1276 """Extends the standard function pandas.io.SQLDatabase.to_sql() with 'kwargs' which allows to set the primary key 

1277 of the target table for example. This is usually not possible with the standard to_sql() function. 

1278 

1279 :param engine: SQLAlchemy engine (created by sqlalchemy.create_engine) 

1280 :param frame: the pandas.DataFrame or geopandas.GeoDataFrame to be exported to SQL-like database 

1281 :param name: <str> Name of SQL table 

1282 :param if_exists: <str> {'fail', 'replace', 'append'} the action to be executed if target table already exists 

1283 :param index: <bool> Write DataFrame index as a column. 

1284 :param index_label: <str> Column label for index column(s). 

1285 :param schema: <str> Specify the schema (if database flavor supports this). If None, use default schema. 

1286 :param chunksize: <int> If not None, then rows will be written in batches of this size at a time. 

1287 If None, all rows will be written at once. 

1288 :param dtype: <dict> a dictionary of column names and corresponding postgreSQL types 

1289 The types should be a SQLAlchemy or GeoSQLAlchemy2 type, 

1290 :param kwargs: keyword arguments to be passed to SQLTable 

1291 """ 

1292 

1293 pandas_sql = pandasSQL_builder(engine, schema=None, flavor=None) 

1294 if dtype is not None: 

1295 for col, my_type in dtype.items(): 

1296 if not isinstance(to_instance(my_type), TypeEngine): 

1297 raise ValueError('The type of %s is not a SQLAlchemy type ' % col) 

1298 

1299 table = SQLTable(name, pandas_sql, frame=frame, index=index, if_exists=if_exists, index_label=index_label, 

1300 schema=schema, dtype=dtype, **kwargs) 

1301 table.create() 

1302 table.insert(chunksize) 

1303 

1304 

1305def import_shapefile_into_postgreSQL_database(path_shp, tablename, cols2import=None, dtype_dic=None, 

1306 if_exists='fail', index_label=None, primarykey=None): 

1307 # type: (str,str,list,dict,str,str,str) -> None 

1308 """Imports all features of shapefile into the specified table of the postgreSQL database. Geometry is automatically 

1309 converted to postgreSQL geometry data type. 

1310 :param path_shp: <str> path of the shapefile to be imported 

1311 :param tablename: <str> name of the table within the postgreSQL database where records shall be added 

1312 :param cols2import: <list> a list of column names to be imported 

1313 :param dtype_dic: <dict> a dictionary of column names and corresponding postgreSQL types 

1314 The types should be a SQLAlchemy or GeoSQLAlchemy2 type, 

1315 or a string for sqlite3 fallback connection. 

1316 :param if_exists: <str> {'fail', 'replace', 'append'} the action to be executed if target table already exists 

1317 :param index_label: <str> Column label for index column(s). 

1318 :param primarykey: <str> the name of the column to be set as primary key of the target table 

1319 """ 

1320 

1321 print('Reading shapefile %s...' % path_shp) 

1322 GDF = GeoDataFrame.from_file(path_shp) 

1323 GDF['geom'] = list(GDF['geometry'].map(str)) 

1324 # GDF['geom'] = [*GDF['geometry'].map(lambda shapelyPoly: "'SRID=4326;%s'::geometry" %shapelyPoly)] 

1325 # GDF['geom'] = [*GDF['geometry'].map(lambda shapelyPoly: "'SRID=4326;%s'" % shapelyPoly)] 

1326 # GDF['geom'] = [*GDF['geometry'].map(lambda shapelyPoly: shapelyPoly.wkb_hex)] 

1327 # GDF['geom'] = [*GDF['geometry'].map(lambda shapelyPoly: str(from_shape(shapelyPoly, srid=4326)))] 

1328 # from geoalchemy2.shape import from_shape 

1329 

1330 cols2import = cols2import + ['geom'] if cols2import else list(GDF.columns) 

1331 subGDF = GDF[cols2import] 

1332 dtype_dic = dtype_dic if dtype_dic else {} 

1333 dtype_dic.update({'geom': GEOMETRY}) 

1334 # import geoalchemy2 

1335 # dtype_dic.update({'geom': geoalchemy2.types.Geometry(geometry_type='POLYGON',srid=4326)}) 

1336 print('Adding shapefile geometries to postgreSQL table %s...' % tablename) 

1337 engine = create_engine('postgresql://gmsdb:gmsdb@%s/geomultisens' % CFG.db_host) 

1338 pdDataFrame_to_sql_k(engine, subGDF, tablename, index_label=index_label, 

1339 keys=primarykey, if_exists=if_exists, dtype=dtype_dic) 

1340 # set SRID 

1341 conn = psycopg2.connect(CFG.conn_database) 

1342 cursor = conn.cursor() 

1343 cursor.execute("UPDATE %s SET geom = ST_SetSRID(geom, 4326);" % tablename) 

1344 conn.commit() 

1345 cursor.close() 

1346 conn.close() 

1347 

1348 

1349def data_DB_updater(obj_dict): 

1350 # type: (dict) -> None 

1351 """Updates the table "scenes_proc" or "mgrs_tiles_proc within the postgreSQL database 

1352 according to the given dictionary of a GMS object. 

1353 

1354 :param obj_dict: <dict> a copy of the dictionary of the respective GMS object 

1355 """ 

1356 

1357 assert isinstance(obj_dict, dict), 'The input for data_DB_updater() has to be a dictionary.' 

1358 

1359 def list2str(list2convert): return ''.join([str(val) for val in list2convert]) 

1360 

1361 connection = psycopg2.connect(CFG.conn_database) 

1362 if connection is None: 

1363 print('Database connection could not be established. Database entry could not be created or updated.') 

1364 else: 

1365 if obj_dict['arr_shape'] != 'MGRS_tile': 

1366 table2update = 'scenes_proc' 

1367 dict_dbkey_objkey = {'sceneid': obj_dict['scene_ID'], 

1368 'georef': True if obj_dict['georef'] else False, 

1369 'proc_level': obj_dict['proc_level'], 

1370 'layer_bands_assignment': ''.join(obj_dict['LayerBandsAssignment']), 

1371 'bounds': Polygon(obj_dict['trueDataCornerLonLat'])} 

1372 

1373 matchExp = 'WHERE ' + get_postgreSQL_matchingExp('sceneid', dict_dbkey_objkey['sceneid']) 

1374 keys2update = ['georef', 'proc_level', 'layer_bands_assignment', 'bounds'] 

1375 

1376 else: # MGRS_tile 

1377 table2update = 'mgrs_tiles_proc' 

1378 

1379 def get_tile_bounds_box(bnds): return box(bnds[0], bnds[2], bnds[1], bnds[3]) 

1380 dict_dbkey_objkey = {'sceneid': obj_dict['scene_ID'], 

1381 'scenes_proc_id': obj_dict['scenes_proc_ID'], 

1382 'mgrs_code': obj_dict['MGRS_info']['tile_ID'], 

1383 'virtual_sensor_id': CFG.virtual_sensor_id, 

1384 'proc_level': obj_dict['proc_level'], 

1385 'coreg_success': obj_dict['coreg_info']['success'], 

1386 'tile_bounds': get_tile_bounds_box(obj_dict['bounds_LonLat']), 

1387 'data_corners': Polygon(obj_dict['trueDataCornerLonLat'])} 

1388 

1389 matchExp = 'WHERE ' + ' AND '.join([get_postgreSQL_matchingExp(k, dict_dbkey_objkey[k]) 

1390 for k in ['sceneid', 'mgrs_code', 'virtual_sensor_id']]) 

1391 keys2update = ['scenes_proc_id', 'proc_level', 'coreg_success', 'tile_bounds', 'data_corners'] 

1392 if obj_dict['scenes_proc_ID'] is None: 

1393 keys2update.remove('scenes_proc_id') 

1394 

1395 cursor = connection.cursor() 

1396 

1397 # check if record exists 

1398 execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM %s %s)" % (table2update, matchExp)) 

1399 

1400 # create new entry 

1401 if cursor.fetchone()[0] == 0: 

1402 keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in dict_dbkey_objkey.items()]) 

1403 execute_pgSQL_query(cursor, 

1404 "INSERT INTO %s (%s) VALUES (%s);" % (table2update, ','.join(keys), ','.join(vals))) 

1405 # or update existing entry 

1406 else: 

1407 setExp = 'SET ' + ','.join( 

1408 ['%s=%s' % (k, get_postgreSQL_value(dict_dbkey_objkey[k])) for k in keys2update]) 

1409 execute_pgSQL_query(cursor, "UPDATE %s %s %s;" % (table2update, setExp, matchExp)) 

1410 

1411 if 'connection' in locals(): 

1412 connection.commit() 

1413 connection.close() 

1414 

1415 

1416def postgreSQL_table_to_csv(conn_db, path_csv, tablename): 

1417 # GeoDataFrame.to_csv(path_csv, index_label='id') 

1418 raise NotImplementedError # TODO 

1419 

1420 

1421def archive_exists_on_fileserver(conn_DB, entityID): 

1422 # type: (str,str) -> bool 

1423 """Queries the postgreSQL database for the archive filename of the given entity ID and checks if the 

1424 corresponding archive file exists in the archive folder. 

1425 

1426 :param conn_DB: <str> pgSQL database connection parameters 

1427 :param entityID: <str> entity ID to be checked 

1428 """ 

1429 

1430 records = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['satelliteid', 'sensorid'], {'entityid': entityID}) 

1431 records_filt = [rec for rec in records if rec[0] is not None and rec[1] is not None] 

1432 if len(records_filt) == 1: 

1433 satID, senID = records_filt[0] 

1434 satellite = get_info_from_postgreSQLdb(conn_DB, 'satellites', ['name'], {'id': satID})[0][0] 

1435 sensor = get_info_from_postgreSQLdb(conn_DB, 'sensors', ['name'], {'id': senID})[0][0] 

1436 sensor = sensor if sensor != 'ETM+_SLC_OFF' else 'ETM+' # join sensors 'ETM+' and 'ETM+_SLC_OFF' 

1437 archive_fold = os.path.join(CFG.path_archive, satellite, sensor) 

1438 assert os.path.exists(archive_fold), 'Archive folder not found: %s.' % archive_fold 

1439 

1440 if re.search(r'Landsat', satellite, re.I): 

1441 exists = os.path.exists(os.path.join(archive_fold, entityID + '.tar.gz')) 

1442 else: 

1443 raise NotImplementedError 

1444 elif len(records_filt) == 0: 

1445 warnings.warn("No database record found for entity ID '%s'. Dataset skipped." % entityID) 

1446 exists = False 

1447 else: 

1448 warnings.warn("More than one database records found for entity ID '%s'. Dataset skipped." % entityID) 

1449 exists = False 

1450 

1451 return exists 

1452 

1453 

1454def record_stats_memusage(conn_db, GMS_obj): 

1455 # type: (str, GMS_object) -> bool 

1456 if list(sorted(GMS_obj.mem_usage.keys())) != ['L1A', 'L1B', 'L1C', 'L2A', 'L2B', 'L2C']: 

1457 GMS_obj.logger.info('Unable to record memory usage statistics because statistics are missing for some ' 

1458 'processing levels. ') 

1459 return False 

1460 

1461 vals2write_dict = dict( 

1462 creationtime=datetime.now(), 

1463 software_version=CFG.version, 

1464 datasetid=GMS_obj.dataset_ID, 

1465 virtual_sensor_id=CFG.virtual_sensor_id, 

1466 target_gsd=CFG.target_gsd[0], # respects only xgsd 

1467 target_nbands=len(CFG.target_CWL), 

1468 inmem_serialization=CFG.inmem_serialization, 

1469 target_radunit_optical=CFG.target_radunit_optical, 

1470 skip_coreg=CFG.skip_coreg, 

1471 ac_estimate_accuracy=CFG.ac_estimate_accuracy, 

1472 ac_bandwise_accuracy=CFG.ac_bandwise_accuracy, 

1473 spathomo_estimate_accuracy=CFG.spathomo_estimate_accuracy, 

1474 spechomo_estimate_accuracy=CFG.spechomo_estimate_accuracy, 

1475 spechomo_bandwise_accuracy=CFG.spechomo_bandwise_accuracy, 

1476 parallelization_level=CFG.parallelization_level, 

1477 skip_thermal=CFG.skip_thermal, 

1478 skip_pan=CFG.skip_pan, 

1479 mgrs_pixel_buffer=CFG.mgrs_pixel_buffer, 

1480 cloud_masking_algorithm=CFG.cloud_masking_algorithm[GMS_obj.satellite], 

1481 used_mem_l1a=GMS_obj.mem_usage['L1A'], 

1482 used_mem_l1b=GMS_obj.mem_usage['L1B'], 

1483 used_mem_l1c=GMS_obj.mem_usage['L1C'], 

1484 used_mem_l2a=GMS_obj.mem_usage['L2A'], 

1485 used_mem_l2b=GMS_obj.mem_usage['L2B'], 

1486 used_mem_l2c=GMS_obj.mem_usage['L2C'], 

1487 dims_x_l2a=GMS_obj.arr.cols, 

1488 dims_y_l2a=GMS_obj.arr.rows, 

1489 is_test=CFG.is_test, 

1490 sceneid=GMS_obj.scene_ID 

1491 ) 

1492 

1493 # get all existing database records matching the respective config 

1494 # NOTE: those columns that do not belong the config specification are ignored 

1495 vals2get = list(vals2write_dict.keys()) 

1496 df_existing_recs = pd.DataFrame( 

1497 get_info_from_postgreSQLdb(conn_db, 'stats_mem_usage_homo', 

1498 vals2return=vals2get, 

1499 cond_dict={k: v for k, v in vals2write_dict.items() 

1500 if k not in ['creationtime', 'used_mem_l1a', 'used_mem_l1b', 

1501 'used_mem_l1c', 'used_mem_l2a', 'used_mem_l2b', 

1502 'used_mem_l2c', 'dims_x_l2a', 'dims_y_l2b', 'sceneid']}), 

1503 columns=vals2get) 

1504 

1505 # filter the existing records by gms_preprocessing software version number 

1506 # (higher than CFG.min_version_mem_usage_stats) 

1507 vers = list(df_existing_recs.software_version) 

1508 vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(CFG.min_version_mem_usage_stats)] 

1509 df_existing_recs_usable = df_existing_recs.loc[df_existing_recs.software_version.isin(vers_usable)] 

1510 

1511 # add memory stats to database 

1512 # (but skip if there are already 10 records matching the respective config and software version number 

1513 # or if the current scene ID is already among the matching records) 

1514 if len(df_existing_recs_usable) < 10 and GMS_obj.scene_ID not in list(df_existing_recs_usable.sceneid): 

1515 create_record_in_postgreSQLdb(conn_db, 'stats_mem_usage_homo', vals2write_dict=vals2write_dict) 

1516 return True 

1517 else: 

1518 return False