Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# -*- coding: utf-8 -*-
3# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
4#
5# Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6#
7# This software was developed within the context of the GeoMultiSens project funded
8# by the German Federal Ministry of Education and Research
9# (project grant code: 01 IS 14 010 A-C).
10#
11# This program is free software: you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation, either version 3 of the License, or (at your option) any later version.
14# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
16# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
17# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18#
19# This program is distributed in the hope that it will be useful, but WITHOUT
20# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
21# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
22# details.
23#
24# You should have received a copy of the GNU Lesser General Public License along
25# with this program. If not, see <http://www.gnu.org/licenses/>.
27import collections
28import glob
29import itertools
30import os
31import re
32import shutil
33import sys
34import traceback
35import warnings
36from datetime import datetime
37from typing import Union, TYPE_CHECKING # noqa F401 # flake8 issue
38from pkg_resources import parse_version
40import numpy as np
41import pandas as pd
42from pandas.io.sql import pandasSQL_builder, SQLTable, DataFrame, Series
43import psycopg2
44from shapely.wkb import loads as wkb_loads
45from geoalchemy2.types import Geometry as GEOMETRY
46from geopandas import GeoDataFrame
47from shapely.geometry import Polygon, box, MultiPolygon
48from sqlalchemy import create_engine
49from sqlalchemy.types import to_instance, TypeEngine
51from ..options.config import GMS_config as CFG
52from . import path_generator as PG
53from .definition_dicts import proc_chain
55if TYPE_CHECKING:
56 from ..model.gms_object import GMS_object # noqa F401 # flake8 issue
58# + misc.helper_functions.cornerLonLat_to_postgreSQL_poly: (left out here in order to avoid circular dependencies)
60__author__ = 'Daniel Scheffler'
63def execute_pgSQL_query(cursor, query_command):
64 """Executes a postgreSQL query catches the full error message if there is one.
65 """
67 try:
68 cursor.execute(query_command)
69 except psycopg2.ProgrammingError as e:
70 raise psycopg2.ProgrammingError(e.pgerror + 'Query failed. Command was:\n%s' % query_command)
73def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid):
74 # type: (int) -> collections.OrderedDict
75 """Creates an OrderedDict containing further information about a given scene ID by querying the pgSQL database.
77 :param sceneid: <int> the GMS scene ID to get information for
78 """
80 def query(tablename, vals2return, cond_dict, records2fetch=0):
81 return get_info_from_postgreSQLdb(CFG.conn_database, tablename, vals2return, cond_dict, records2fetch)
82 resultset = query('scenes', ['datasetid', 'satelliteid', 'sensorid', 'subsystemid', 'acquisitiondate', 'entityid',
83 'filename'], {'id': sceneid})
84 if len(resultset) == 0:
85 sys.stderr.write("Scene with id %s not found. Skipping.." % sceneid)
87 scenedata = resultset[0]
88 ds = collections.OrderedDict()
89 proc_level_tmp = query('scenes_proc', 'proc_level', {'sceneid': sceneid})
90 ds.update({'proc_level': 'L0A' if proc_level_tmp == [] else proc_level_tmp[0][0]})
91 ds.update({'scene_ID': sceneid})
92 ds.update({'datasetid': scenedata[0]})
93 ds.update({'image_type': query('datasets', 'image_type', {'id': scenedata[0]})[0][0]})
94 ds.update({'satellite': query('satellites', 'name', {'id': scenedata[1]})[0][0]})
95 ds.update({'sensor': query('sensors', 'name', {'id': scenedata[2]})[0][0]})
96 ds.update({'subsystem': query('subsystems', 'name', {'id': scenedata[3]})[0][0] if scenedata[3] else None})
97 ds.update({'acq_datetime': scenedata[4]})
98 ds.update({'entity_ID': scenedata[5]})
99 ds.update({'filename': scenedata[6]})
100 return ds
103def get_postgreSQL_value(value):
104 # type: (any) -> str
105 """Converts Python variable to a postgreSQL value respecting postgreSQL type casts.
106 The resulting value can be directly inserted into a postgreSQL query."""
108 assert type(value) in [int, float, bool, str, Polygon, datetime, list, tuple] or value is None, \
109 "Unsupported value type within postgreSQL matching expression. Got %s." % type(value)
110 if isinstance(value, int):
111 pgV = value
112 elif isinstance(value, float):
113 pgV = value
114 elif isinstance(value, bool):
115 pgV = value
116 elif value is None:
117 pgV = 'NULL'
118 elif isinstance(value, str):
119 pgV = "'%s'" % value.replace("'", "")
120 elif isinstance(value, Polygon):
121 pgV = "'%s'" % value.wkb_hex
122 elif isinstance(value, datetime):
123 pgV = "TIMESTAMP '%s'" % str(value)
124 else: # list or tuple in value
125 if not value: # empty list/tuple
126 pgV = 'NULL'
127 else:
128 dTypes_in_value = list(set([type(i) for i in value]))
129 assert len(dTypes_in_value) == 1, \
130 'Mixed data types in postgreSQL matching expressions are not supported. Got %s.' % dTypes_in_value
131 assert dTypes_in_value[0] in [int, str, float, np.int64, bool]
132 pgList = ",".join(["'%s'" % i if isinstance(value[0], str) else "%s" % i for i in value])
133 pgV = "'{%s}'" % pgList
134 return pgV
137def get_postgreSQL_matchingExp(key, value):
138 # type: (str,any) -> str
139 """Converts a key/value pair to a postgreSQL matching expression in the form "column=value" respecting postgreSQL
140 type casts. The resulting string can be directly inserted into a postgreSQL query.
141 """
142 pgVal = get_postgreSQL_value(value)
143 if isinstance(pgVal, str) and pgVal.startswith("'{") and pgVal.endswith("}'"):
144 return '%s in %s' % (key, pgVal.replace("'{", '(').replace("}'", ')')) # '{1,2,3}' => (1,2,3)
145 elif pgVal == 'NULL':
146 return '%s is NULL' % key
147 else:
148 return '%s=%s' % (key, pgVal)
151def get_info_from_postgreSQLdb(conn_params, tablename, vals2return, cond_dict=None, records2fetch=0, timeout=15000):
152 # type: (str, str, Union[list, str], dict, int, int) -> Union[list, str]
153 """Queries a postgreSQL database for the given parameters.
155 :param conn_params: <str> connection parameters as provided by CFG.conn_params
156 :param tablename: <str> name of the table within the database to be queried
157 :param vals2return: <list or str> a list of strings containing the column titles of the values to be returned
158 :param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
159 HINT: <value> can also be a list or a tuple of elements to match, BUT note that the order
160 of the list items is NOT respected!
161 :param records2fetch: <int> number of records to be fetched (default=0: fetch unlimited records)
162 :param timeout: <int> allows to set a custom statement timeout (milliseconds)
163 """
165 if not isinstance(vals2return, list):
166 vals2return = [vals2return]
167 assert isinstance(records2fetch, int), "get_info_from_postgreSQLdb: Expected an integer for the argument " \
168 "'records2return'. Got %s" % type(records2fetch)
169 cond_dict = cond_dict if cond_dict else {}
170 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
171 connection = psycopg2.connect(conn_params)
172 if connection is None:
173 warnings.warn('database connection fault')
174 return 'database connection fault'
175 cursor = connection.cursor()
176 condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
177 if cond_dict else ""
178 cmd = "SELECT " + ','.join(vals2return) + " FROM " + tablename + " " + condition
179 execute_pgSQL_query(cursor, cmd)
181 records2return = cursor.fetchall() if records2fetch == 0 else [cursor.fetchone()] if records2fetch == 1 else \
182 cursor.fetchmany(size=records2fetch) # e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
183 cursor.close()
184 connection.close()
185 return records2return
188def update_records_in_postgreSQLdb(conn_params, tablename, vals2update_dict, cond_dict=None, timeout=15000):
189 # type: (str, str, dict, dict, int) -> Union[None, str]
190 """Queries a postgreSQL database for the given parameters and updates the given columns of the query result.
192 :param conn_params: <str> connection parameters as provided by CFG.conn_params
193 :param tablename: <str> name of the table within the database to be updated
194 :param vals2update_dict: <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
195 :param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
196 HINT: <value> can also be a list or a tuple of elements to match
197 :param timeout: <int> allows to set a custom statement timeout (milliseconds)
198 """
200 cond_dict = cond_dict if cond_dict else {}
201 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
202 connection = psycopg2.connect(conn_params)
203 if connection is None:
204 warnings.warn('database connection fault')
205 return 'database connection fault'
206 cursor = connection.cursor()
207 condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
208 if cond_dict else ""
209 update_cond = "SET " + ', '.join(['%s=%s' % (k, get_postgreSQL_value(vals2update_dict[k]))
210 for k in vals2update_dict.keys()])
211 execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
212 if cursor.fetchone()[0] == 0:
213 warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
214 else:
215 execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + update_cond + " " + condition)
217 if 'connection' in locals():
218 connection.commit()
219 connection.close()
222def append_item_to_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2append_dict, cond_dict=None, timeout=15000):
223 # type: (str, str, dict, dict, int) -> Union[None, str]
224 """Queries a postgreSQL database for the given parameters
225 and appends the given value to the specified column of the query result.
227 :param conn_params: <str> connection parameters as provided by CFG.conn_params
228 :param tablename: <str> name of the table within the database to be updated
229 :param vals2append_dict: <dict> a dictionary containing keys and value(s) to be set in the form
230 {'col_name':[<value>,<value>]}
231 :param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
232 HINT: <value> can also be a list or a tuple of elements to match
233 :param timeout: <int> allows to set a custom statement timeout (milliseconds)
234 """
236 assert len(vals2append_dict) == 1, 'Values can be appended to only one column at once.'
237 if type(list(vals2append_dict.values())[0]) in [list, tuple]:
238 raise NotImplementedError('Appending multiple values to one column at once is not yet supported.')
239 cond_dict = cond_dict if cond_dict else {}
240 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
241 connection = psycopg2.connect(conn_params)
242 if connection is None:
243 warnings.warn('database connection fault')
244 return 'database connection fault'
245 cursor = connection.cursor()
246 condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
247 if cond_dict else ""
248 col2update = list(vals2append_dict.keys())[0]
249 pgSQL_val = get_postgreSQL_value(vals2append_dict[col2update])
250 pgSQL_val = pgSQL_val if type(vals2append_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
251 append_cond = "SET %s = array_cat(%s, '%s')" % (col2update, col2update, pgSQL_val)
252 execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
253 if cursor.fetchone()[0] == 0:
254 warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
255 else:
256 execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + append_cond + " " + condition + ';')
257 if 'connection' in locals():
258 connection.commit()
259 connection.close()
262def remove_item_from_arrayCol_in_postgreSQLdb(conn_params, tablename, vals2remove_dict, cond_dict=None, timeout=15000):
263 # type: (str, str, dict, dict, int) -> Union[None, str]
264 """Queries a postgreSQL database for the given parameters
265 and removes the given value from the specified column of the query result.
267 :param conn_params: <str> connection parameters as provided by CFG.conn_params
268 :param tablename: <str> name of the table within the database to be updated
269 :param vals2remove_dict: <dict> a dictionary containing keys and value(s) to be set in the form
270 {'col_name':[<value>,<value>]}
271 :param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
272 HINT: <value> can also be a list or a tuple of elements to match
273 :param timeout: <int> allows to set a custom statement timeout (milliseconds)
274 """
276 assert len(vals2remove_dict) == 1, 'Values can be removed from only one column at once.'
277 if type(list(vals2remove_dict.values())[0]) in [list, tuple]:
278 raise NotImplementedError('Removing multiple values from one column at once is not yet supported.')
279 cond_dict = cond_dict if cond_dict else {}
280 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
281 connection = psycopg2.connect(conn_params)
282 if connection is None:
283 warnings.warn('database connection fault')
284 return 'database connection fault'
285 cursor = connection.cursor()
286 condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
287 if cond_dict else ""
288 col2update = list(vals2remove_dict.keys())[0]
289 pgSQL_val = get_postgreSQL_value(vals2remove_dict[col2update])
290 pgSQL_val = pgSQL_val if type(vals2remove_dict[col2update]) in [list, tuple] else '{%s}' % pgSQL_val
291 remove_cond = "SET %s = array_remove(%s, '%s')" % (col2update, col2update, pgSQL_val)
292 execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM scenes %s);" % condition)
293 if cursor.fetchone()[0] == 0:
294 warnings.warn("No record found fulfilling this condition: \n'%s'." % condition)
295 else:
296 execute_pgSQL_query(cursor, "UPDATE " + tablename + " " + remove_cond + " " + condition + ';')
297 if 'connection' in locals():
298 connection.commit()
299 connection.close()
302def increment_decrement_arrayCol_in_postgreSQLdb(conn_params, tablename, col2update, idx_val2decrement=None,
303 idx_val2increment=None, cond_dict=None, timeout=15000):
304 # type: (str, str, str, int, int, dict, int) -> Union[None, str]
305 """Updates an array column of a specific postgreSQL table in the form that it increments or decrements the elements
306 at a given position. HINT: The column must have values like that: [52,0,27,10,8,0,0,0,0]
308 :param conn_params: <str> connection parameters as provided by CFG.conn_params
309 :param tablename: <str> name of the table within the database to be update
310 :param col2update: <str> column name of the column to be updated
311 :param idx_val2decrement: <int> the index of the array element to be decremented (starts with 1)
312 :param idx_val2increment: <int> the index of the array element to be incremented (starts with 1)
313 :param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
314 HINT: <value> can also be a list or a tuple of elements to match
315 :param timeout: <int> allows to set a custom statement timeout (milliseconds)
316 :return:
317 """
319 cond_dict = cond_dict if cond_dict else {}
320 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
321 connection = psycopg2.connect(conn_params)
322 if connection is None:
323 warnings.warn('database connection fault')
324 return 'database connection fault'
325 cursor = connection.cursor()
326 condition = "WHERE " + " AND ".join([get_postgreSQL_matchingExp(k, v) for k, v in cond_dict.items()]) \
327 if cond_dict else ""
329 dec_str = '' if idx_val2decrement is None else \
330 "%s[%s] = %s[%s]-1" % (col2update, idx_val2decrement, col2update, idx_val2decrement)
331 inc_str = '' if idx_val2increment is None else \
332 "%s[%s] = %s[%s]+1" % (col2update, idx_val2increment, col2update, idx_val2increment)
334 if dec_str or inc_str:
335 dec_inc_str = ','.join([dec_str, inc_str])
336 execute_pgSQL_query(cursor, "UPDATE %s SET %s %s" % (tablename, dec_inc_str, condition))
338 if 'connection' in locals():
339 connection.commit()
340 connection.close()
343def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeout=15000):
344 # type: (str, str, dict, int) -> Union[int, str]
345 """Creates a single new record in a postgreSQL database and pupulates its columns with the given values.
347 :param conn_params: <str> connection parameters as provided by CFG.conn_params
348 :param tablename: <str> name of the table within the database to be updated
349 :param vals2write_dict: <dict> a dictionary containing keys and values to be set in the form {'col_name':<value>}
350 :param timeout: <int> allows to set a custom statement timeout (milliseconds)
351 """
353 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
354 connection = psycopg2.connect(conn_params)
355 if connection is None:
356 warnings.warn('database connection fault')
357 return 'database connection fault'
358 cursor = connection.cursor()
360 keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in vals2write_dict.items()])
362 execute_pgSQL_query(cursor, "INSERT INTO %s (%s) VALUES (%s);" % (tablename, ','.join(keys), ','.join(vals)))
363 execute_pgSQL_query(cursor, "SELECT id FROM %s ORDER BY id DESC LIMIT 1" % tablename)
364 newID = cursor.fetchone()[0]
366 if 'connection' in locals():
367 connection.commit()
368 connection.close()
370 return newID
373def delete_record_in_postgreSQLdb(conn_params, tablename, record_id, timeout=15000):
374 # type: (str, str, dict, int) -> Union[int, str]
375 """Delete a single record in a postgreSQL database.
377 :param conn_params: <str> connection parameters as provided by CFG.conn_params
378 :param tablename: <str> name of the table within the database to be updated
379 :param record_id: <dict> ID of the record to be deleted
380 :param timeout: <int> allows to set a custom statement timeout (milliseconds)
381 """
383 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
384 connection = psycopg2.connect(conn_params)
385 if connection is None:
386 warnings.warn('database connection fault')
387 return 'database connection fault'
388 cursor = connection.cursor()
390 execute_pgSQL_query(cursor, "DELETE FROM %s WHERE id=%s;" % (tablename, record_id))
391 execute_pgSQL_query(cursor, "SELECT id FROM %s WHERE id=%s" % (tablename, record_id))
393 res = cursor.fetchone()
395 if 'connection' in locals():
396 connection.commit()
397 connection.close()
399 return 'success' if res is None else 'fail'
402def get_pgSQL_geospatial_query_cond(conn_params, table2query, geomCol2use='bounds', tgt_corners_lonlat=None,
403 scene_ID=None, queryfunc='ST_Intersects', crossing_dateline_check=True):
404 assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
406 if tgt_corners_lonlat:
407 # handle coordinates crossing the 180 degress meridian (dateline)
408 # FIXME in that case the polygone has to be split at the dateline. otherwise pgSQL may yield wrong results
409 if crossing_dateline_check:
410 xvals = [x for x, y in tgt_corners_lonlat]
411 if max(xvals) - min(xvals) > 180:
412 tgt_corners_lonlat = [(x, y) if x > 0 else (x + 360, y) for x, y in tgt_corners_lonlat]
414 from .helper_functions import cornerLonLat_to_postgreSQL_poly
415 pGSQL_poly = cornerLonLat_to_postgreSQL_poly(tgt_corners_lonlat)
416 src_geom = "'SRID=4326;%s'::geometry" % pGSQL_poly # source geometry is given
417 # FIXME scenes tabelle hat "geography" geoinfos -> eigener Index wird bei "geometry" nicht genutzt:
418 tgt_geom = "%s.%s::geometry" % (table2query, geomCol2use)
419 geocond = "%s(%s, %s)" % (queryfunc, src_geom, tgt_geom)
420 else: # scene_ID is not None:
421 connection = psycopg2.connect(conn_params)
422 if connection is None:
423 return 'database connection fault'
424 cursor = connection.cursor()
425 cmd = "SELECT ST_AsText(bounds) FROM scenes WHERE scenes.id = %s" % scene_ID
426 execute_pgSQL_query(cursor, cmd)
427 res = cursor.fetchone()
428 cursor.close()
429 connection.close()
430 if len(res):
431 src_geom = "'SRID=4326;%s'::geometry" % res
432 else:
433 print('The scene with the ID %s does not exist in the scenes table.')
434 return []
435 geocond = "%s(%s, %s.%s::geometry)" % (queryfunc, src_geom, table2query, geomCol2use)
436 return geocond
439def get_overlapping_scenes_from_postgreSQLdb(conn_params, table='scenes_proc', scene_ID=None,
440 tgt_corners_lonlat=None, conditions=None, add_cmds='', timeout=15000):
441 # type: (str, str, int, list, Union[list, str], str, int) -> Union[list, str]
443 """Queries the postgreSQL database in order to find those scenes of a specified reference satellite (Landsat-8 or
444 Sentinel-2) that have an overlap to the given corner coordinates AND that fulfill the given conditions.
446 :param conn_params: <str> connection parameters as provided by CFG.conn_params
447 :param table: <str> name of the table within the database to be updated
448 :param scene_ID: <int> a sceneID to get the target geographical extent from
449 (needed if tgt_corners_lonlat is not provided)
450 :param tgt_corners_lonlat: <list> a list of coordinates defining the target geographical extent
451 (needed if scene_ID is not provided)
452 :param conditions: <list> a list of additional query conditions
453 :param add_cmds: <str> additional pgSQL commands to be added to the pgSQL query
454 :param timeout: <int> allows to set a custom statement timeout (milliseconds)
455 """
457 conditions = [] if conditions is None else conditions if isinstance(conditions, list) else [conditions]
458 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
459 connection = psycopg2.connect(conn_params)
460 if connection is None:
461 return 'database connection fault'
462 datasetids = [int(d.split('=')[1].strip()) for d in conditions if d.startswith('datasetid')]
463 datasetid = datasetids[0] if datasetids else 104 # Landsat-8
464 # FIXME: use Landsat-8 instead of Sentinel-2 as long as S2 L1A_P is not working:
465 datasetid = 104 if datasetid == 249 else datasetid
467 if table != 'scenes_proc':
468 assert datasetid, "filtdsId is needed if table is not 'scenes_proc'"
469 if scene_ID is None:
470 assert tgt_corners_lonlat, "Provide either scene_ID or tgt_corners_lonlat!"
471 if tgt_corners_lonlat is None:
472 assert scene_ID, "Provide either scene_ID or tgt_corners_lonlat!"
474 val2get = "scenes.id" if table == 'scenes' else "%s.sceneid" % table
475 # refcond = ['scenes_proc.georef = True'] if not datasetids else ['scenes.datasetid = %s' %datasetid]
476 refcond = ['scenes.datasetid = %s' % datasetid]
478 geocond = [get_pgSQL_geospatial_query_cond(conn_params, table, tgt_corners_lonlat=tgt_corners_lonlat,
479 scene_ID=scene_ID, queryfunc='ST_Intersects',
480 crossing_dateline_check=True)]
482 join = "INNER JOIN scenes ON (%s.sceneid = scenes.id) " % table if table != 'scenes' and datasetids else ''
483 conditions = [c for c in conditions if not c.startswith('datasetid')]
484 where = "WHERE %s" % " AND ".join(geocond + refcond + conditions)
485 usedtbls = "scenes" if table == 'scenes' else "%s, scenes" % table if 'scenes.' in where and join == '' else table
486 query = "SELECT %s FROM %s %s%s %s" % (val2get, usedtbls, join, where, add_cmds)
487 cursor = connection.cursor()
488 execute_pgSQL_query(cursor, query)
489 records2return = cursor.fetchall()
490 cursor.close()
491 connection.close()
492 return records2return
495def get_overlapping_MGRS_tiles(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
496 """In contrast to pgSQL 'Overlapping' here means that both geometries share some spatial area.
497 So it combines ST_Overlaps and ST_Contains."""
498 assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
500 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
501 connection = psycopg2.connect(conn_params)
502 if connection is None:
503 return 'database connection fault'
505 vals2get = ['grid100k', 'grid1mil', 'geom']
506 # FIXME this is covered by ST_Intersects:
507 # geocond1 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Overlaps',
508 # tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
509 # geocond2 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Contains',
510 # tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
511 # geocond3 = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Within',
512 # tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
513 geocond = get_pgSQL_geospatial_query_cond(conn_params, 'mgrs_tiles', geomCol2use='geom', queryfunc='ST_Intersects',
514 tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID,
515 crossing_dateline_check=True)
516 # query = "SELECT %s FROM %s WHERE %s OR %s OR %s"
517 # % (', '.join(vals2get), 'mgrs_tiles', geocond1, geocond2, geocond3)
518 query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'mgrs_tiles', geocond)
519 cursor = connection.cursor()
520 execute_pgSQL_query(cursor, query)
521 records = cursor.fetchall()
522 cursor.close()
523 connection.close()
525 GDF = GeoDataFrame(records, columns=['grid100k', 'grid1mil', 'wkb_hex'])
527 GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
528 GDF['granuleid'] = GDF['grid1mil'].str.cat(GDF['grid100k'])
529 return GDF[['granuleid', 'shapelyPoly_LonLat']]
532def get_overlapping_MGRS_tiles2(conn_params, scene_ID=None, tgt_corners_lonlat=None, timeout=15000):
533 assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
535 conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
536 connection = psycopg2.connect(conn_params)
537 if connection is None:
538 return 'database connection fault'
540 vals2get = ['granuleid', 'footprint_wgs84']
541 geocond = get_pgSQL_geospatial_query_cond(conn_params, 'footprints_sentinel2_granules',
542 geomCol2use='footprint_wgs84',
543 tgt_corners_lonlat=tgt_corners_lonlat, scene_ID=scene_ID)
544 query = "SELECT %s FROM %s WHERE %s" % (', '.join(vals2get), 'footprints_sentinel2_granules', geocond)
546 cursor = connection.cursor()
547 execute_pgSQL_query(cursor, query)
548 records = cursor.fetchall()
549 cursor.close()
550 connection.close()
552 GDF = GeoDataFrame(records, columns=['granuleid', 'wkb_hex'])
554 GDF['shapelyPoly_LonLat'] = list(GDF['wkb_hex'].map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
556 return GDF[['granuleid', 'shapelyPoly_LonLat']]
559def get_dict_satellite_name_id(conn_params):
560 # type: (str) -> dict
561 """Returns a dictionary with satellite names as keys and satellite IDs as values as read from pgSQL database.
563 :param conn_params: <str> pgSQL database connection parameters
564 """
566 res = get_info_from_postgreSQLdb(conn_params, 'satellites', ['name', 'id'])
567 assert len(res) > 0, 'Error getting satellite names from postgreSQL database.'
568 arr = np.array(res)
569 return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
572def get_dict_sensor_name_id(conn_params):
573 # type: (str) -> dict
574 """Returns a dictionary with sensor names as keys and sensor IDs as values as read from pgSQL database.
575 :param conn_params: <str> pgSQL database connection parameters """
577 res = get_info_from_postgreSQLdb(conn_params, 'sensors', ['name', 'id'])
578 assert len(res) > 0, 'Error getting sensor names from postgreSQL database.'
579 arr = np.array(res)
580 return dict(zip(list(arr[:, 0]), list(arr[:, 1])))
583def get_entityIDs_from_filename(conn_DB, filename):
584 # type: (str, str) -> list
585 """Returns entityID(s) for the given filename. In case of Sentinel-2 there can be more multiple entity IDs if
586 multiple granules are saved in one .zip file.
588 :param conn_DB: <str> pgSQL database connection parameters
589 :param filename: <str> the filename to get the corresponding entity ID(s) for
590 """
592 if filename[:2] in ['LE', 'LC', 'LO'] and filename.endswith('.tar.gz'): # Landsat
593 entityIDs = [filename.split('.tar.gz')[0]]
594 else:
595 print('Querying database in order to get entityIDs for %s...' % filename)
596 res = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['entityid'], {'filename': filename}, timeout=40000)
597 entityIDs = [subres[0] for subres in res] if len(res) > 0 else []
598 return entityIDs
601def get_filename_by_entityID(conn_DB, entityid, satellite):
602 # type: (str,str,str) -> str
603 """Returns the filename for the given entity ID.
605 :param conn_DB: <str> pgSQL database connection parameters
606 :param entityid: <str> entity ID
607 :param satellite: <str> satellite name to which the entity ID is belonging
608 """
610 if re.search(r'Landsat', satellite, re.I):
611 filename = '%s.tar.gz' % entityid
612 elif re.search(r'Sentinel-2', satellite, re.I):
613 filename = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['filename'],
614 {'entityid': entityid}, records2fetch=1)[0][0]
615 else:
616 raise NotImplementedError
617 return filename
620def get_notDownloadedsceneIDs(conn_DB, entityIDs, satellite, sensor, src_folder):
621 # type: (str,list,str,str,str) -> np.ndarray
622 """Takes a list of entity IDs and extracts those records that have the corresponding archive file in the given
623 source folder and that have the processing level 'METADATA' in the pgSQL database. Based on this subset a numpy
624 array containing the corresponding scene IDs and the target filenames for the fileserver is returned.
626 :param conn_DB: <str> pgSQL database connection parameters
627 :param entityIDs: <list> a list of entity IDs
628 :param satellite: <str> the name of the satellite to restrict the query on
629 :param sensor: <str> the name of the sensor to restrict the query on
630 :param src_folder: <str> the source directory where archive files are saved
631 """
633 columns = ['id', 'entityid', 'satelliteid', 'sensorid', 'filename', 'proc_level']
634 result = get_info_from_postgreSQLdb(conn_DB, 'scenes', columns, {'entityid': entityIDs})
635 df = pd.DataFrame(result, columns=columns)
637 satNameID_dic = get_dict_satellite_name_id(conn_DB)
638 satID = satNameID_dic[satellite]
639 target_folder = os.path.join(CFG.path_archive, satellite, sensor)
641 def get_fName(entityid): return get_filename_by_entityID(conn_DB, entityid, satellite)
643 def tgt_exists(fileName): return os.path.exists(os.path.join(target_folder, fileName))
645 def src_exists(entityid):
646 return os.path.exists(os.path.join(src_folder, get_filename_by_entityID(conn_DB, entityid, satellite)))
647 df['tgt_fileName'] = list(df['entityid'].map(get_fName))
648 df['tgtFile_exists'] = list(df['tgt_fileName'].map(tgt_exists))
649 df['srcFile_exists'] = list(df['entityid'].map(src_exists))
650 tgt_satID = (df.satelliteid == float(satID))
651 # isDL = (df.proc_level == 'DOWNLOADED')
652 isMET = (df.proc_level == 'METADATA')
653 # tgtE = (df.tgtFile_exists == True)
654 srcE = df.srcFile_exists # (df.srcFile_exists == True)
655 # sceneIDs_notDL_tgtE = df[tgt_satID & (isDL == 0) & tgtE] # maybe needed later
656 # sceneIDs_DL_tgtNE = df[tgt_satID & isDL & (tgtE == 0)] # maybe needed later
657 # sceneIDs_DL_tgtE = df[tgt_satID & isDL & tgtE] # maybe needed later
658 sceneIDs_isMET_srcE = df[tgt_satID & isMET & srcE]
659 return sceneIDs_isMET_srcE[['id', 'tgt_fileName']].values
662class GMS_JOB(object):
663 """gms_preprocessing job manager"""
665 def __init__(self, conn_db):
666 # type: (str) -> None
667 """
668 :param conn_db: <str> the database connection parameters as given by CFG.conn_params
669 """
670 # privates
671 self._virtualsensorid = None
673 # defaults
674 self.conn = conn_db
675 self.dataframe = DataFrame()
676 self.scene_counts = {} # set by self.create()
678 self.exists_in_db = False
679 self.id = None #: int
680 self.creationtime = datetime.now() # default, needed to create new job
681 self.finishtime = None
682 self.sceneids = []
683 self.timerange_start = datetime.min
684 self.timerange_end = datetime.max
685 self.bounds = box(-180, -90, 180, 90) # default, needed to create new job
686 self.distribution_index = None
687 self.progress = None
688 self.feedback = None
689 self.failed_sceneids = []
690 self.ref_job_id = None
691 self.datacube_mgrs_tiles_proc = []
692 self.non_ref_datasetids = []
693 self.max_cloudcover = None
694 self.season_code = None # type: int
695 self.path_analysis_script = '' # TODO
696 self.job_mode = 'processing_only' # FIXME download/processing/...
697 self.jobs_table_columns = ['id', 'creationtime', 'finishtime', 'sceneids', 'timerange_start',
698 'timerange_end', 'bounds', 'distribution_index', 'progress', 'feedback',
699 'failed_sceneids', 'datasetid_spatial_ref',
700 'virtualsensorid', 'ref_job_id', 'datacube_mgrs_tiles_proc', 'comment',
701 'non_ref_datasetids', 'max_cloudcover', 'season_code', 'status',
702 'path_analysis_script', 'analysis_parameter', 'statistics', 'job_mode']
703 self.datasetid_spatial_ref = 249 # this is overwritten if existing job is read from DB but needed to create new
704 self.datasetname_spatial_ref = 'SENTINEL-2A' # same here
705 self.status = None
706 self.statistics = []
707 self.comment = None
708 self.epsg = None # set by self._set_target_sensor_specs()
709 self.ground_spatial_sampling = None # set by self._set_target_sensor_specs()
710 self.analysis_parameter = None
712 def __repr__(self):
713 return 'GMS job:\n\n' + Series(self.db_entry).to_string()
715 @property
716 def virtualsensorid(self):
717 return self._virtualsensorid
719 @virtualsensorid.setter
720 def virtualsensorid(self, value):
721 """Set virtual sensor ID but continue if no data value is received
722 NOTE: set by self._set_target_sensor_specs() and self.from_ID()"""
723 if value != -1: # no data value
724 self._virtualsensorid = value
726 def _set_target_sensor_specs(self, virtual_sensor_id, datasetid_spatial_ref):
727 self.virtualsensorid = virtual_sensor_id
729 if not isinstance(datasetid_spatial_ref, int):
730 raise ValueError(datasetid_spatial_ref)
732 res = get_info_from_postgreSQLdb(self.conn, 'virtual_sensors', ['spatial_resolution',
733 "projection_epsg"], {'id': virtual_sensor_id})
734 assert res, \
735 "'virtual_sensor_id'=%s does not exist in the table 'virtual_sensors' of the database." % virtual_sensor_id
736 target_gsd = res[0][0]
737 self.ground_spatial_sampling = [target_gsd, target_gsd] if type(target_gsd) in [int, float] else target_gsd
738 self.epsg = int(res[0][1])
740 self.datasetid_spatial_ref = datasetid_spatial_ref
741 res = get_info_from_postgreSQLdb(self.conn, 'datasets', ['name'], {'id': datasetid_spatial_ref})
742 assert res, \
743 "'datasetid_spatial_ref'=%s does not exist in the table 'datasets' of the database." % datasetid_spatial_ref
744 self.datasetname_spatial_ref = res
746 @property
747 def db_entry(self):
748 """Returns an OrderedDict containing keys and values of the database entry.
749 """
751 db_entry = collections.OrderedDict()
752 for i in self.jobs_table_columns:
753 val = getattr(self, i)
755 if i == 'virtualsensorid' and val is None:
756 val = -1 # nodata value
758 db_entry[i] = val
760 return db_entry
762 def from_dictlist(self, dictlist_data2process, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
763 # type: (list, int, int, str) -> GMS_JOB
764 """
765 :param dictlist_data2process: <list> a list of dictionaries containing the keys "satellite", "sensor" and
766 "filenames",
767 e.g. [{'satellite:'Landsat-8,'sensor':'OLI_TIRS','filenames':file.tar.gz},{...}]
768 :param virtual_sensor_id : <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
769 :param datasetid_spatial_ref: <int> a valid dataset ID of the dataset to be chosen as spatial reference
770 (from the 'datasets' table of the postgreSQL database)
771 (default:249 - Sentinel-2A), 104=Landsat-8
772 :param comment: <str> a comment describing the job (e.g. 'Beta job')
773 """
775 self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
776 self.comment = comment
778 dictlist_data2process = dictlist_data2process if dictlist_data2process else []
780 for idx, datadict in enumerate(dictlist_data2process):
781 assert isinstance(datadict, dict), "Expected only dictionaries within 'dictlist_data2process'. " \
782 "Got %s in there." % type(datadict)
783 assert False not in [i in datadict for i in ['satellite', 'sensor', 'filenames']]
784 assert type(datadict['filenames']) in [list, str]
786 if isinstance(datadict['filenames'], str):
787 if datadict['filenames'].endswith('.csv'):
788 assert os.path.exists(datadict['filenames'])
789 else:
790 datadict['filenames'] = [datadict['filenames']]
792 # find all duplicates in input datadicts and build common dataframe
793 all_dfs = []
794 for datadict in dictlist_data2process:
795 assert isinstance(datadict, dict)
797 if isinstance(datadict['filenames'], str) and datadict['filenames'].endswith('.csv'):
798 datadict['filenames'] = None # TODO implement csv reader here
799 raise NotImplementedError
801 else:
802 temp_df = DataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
804 if re.search(r'Landsat-7', datadict['satellite'], re.I) and \
805 re.search(r'ETM+', datadict['sensor'], re.I):
807 from .helper_functions import Landsat_entityID_decrypter as LED
809 def get_L7_sensor(fN):
810 return LED(fN.split('.tar.gz')[0]).sensorIncSLC
812 temp_df['sensor'] = list(temp_df['filenames'].map(get_L7_sensor))
814 all_dfs.append(temp_df)
816 df = DataFrame(pd.concat(all_dfs)).drop_duplicates()
817 df.columns = ['satellite', 'sensor', 'filename']
819 # run self.from_dictlist
820 sceneInfoDF = self._get_validated_sceneInfoDFs(df)
822 # populate attributes
823 self._populate_jobAttrs_from_sceneInfoDF(sceneInfoDF)
825 return self
827 def from_sceneIDlist(self, list_sceneIDs, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
828 # type: (list, int, int, str) -> object
829 """
830 Create a GMS_JOB instance based on the given list of scene IDs.
832 :param list_sceneIDs: <list> of scene IDs, e.g. [26781907, 26781917, 26542650, 26542451, 26541679]
833 :param virtual_sensor_id : <int> a valid ID from the 'virtual_sensors' table of the postgreSQL database
834 :param datasetid_spatial_ref: <int> a valid dataset ID of the dataset to be chosen as spatial reference
835 (from the 'datasets' table of the postgreSQL database)
836 (default:249 - Sentinel-2A), 104=Landsat-8
837 :param comment: <str> a comment describing the job (e.g. 'Beta job')
838 """
840 self._set_target_sensor_specs(virtual_sensor_id, datasetid_spatial_ref)
841 self.comment = comment
843 list_sceneIDs = list(list_sceneIDs)
845 # query 'satellite', 'sensor', 'filename' from database and summarize in DataFrame
846 with psycopg2.connect(self.conn) as conn:
847 with conn.cursor() as cursor:
848 execute_pgSQL_query(cursor,
849 """SELECT scenes.id, satellites.name, sensors.name, scenes.filename FROM scenes
850 LEFT JOIN satellites on scenes.satelliteid=satellites.id
851 LEFT JOIN sensors on scenes.sensorid=sensors.id
852 WHERE scenes.id in (%s)""" % ','.join([str(i) for i in list_sceneIDs]))
853 df = DataFrame(cursor.fetchall(), columns=['sceneid', 'satellite', 'sensor', 'filename'])
855 # FIXME overwriting 'ETM+_SLC_OFF' with 'ETM+' causes _get_validated_sceneInfoDFs() to fail because the
856 # FIXME sensorid for ETM+_SLC_OFF cannot be retrieved
857 # df['sensor'] = df['sensor'].apply(lambda senN: senN if senN != 'ETM+_SLC_OFF' else 'ETM+')
858 df = df.drop_duplicates()
860 if df.empty:
861 raise ValueError('None of the given scene IDs could be found in the GeoMultiSens database. '
862 'Job creation failed.')
863 else:
864 missing_IDs = [i for i in list_sceneIDs if i not in df['sceneid'].values]
865 if missing_IDs:
866 warnings.warn('The following scene IDs could not been found in the GeoMultiSens database: \n%s'
867 % '\n'.join([str(i) for i in missing_IDs]))
869 # run self.from_dictlist
870 sceneInfoDF = self._get_validated_sceneInfoDFs(df)
872 # populate attributes
873 self._populate_jobAttrs_from_sceneInfoDF(sceneInfoDF)
875 return self
877 def from_entityIDlist(self, list_entityids, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
878 """Create a GMS_JOB instance based on the given list of entity IDs.
880 :param list_entityids:
881 :param virtual_sensor_id:
882 :param datasetid_spatial_ref:
883 :param comment:
884 :return:
885 """
887 res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'entityid': list_entityids})
888 if not res_sceneIDs:
889 raise ValueError('No matching database entries found for the given entity IDs.')
891 list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
892 count_no_match = len(list_entityids) - len(list_sceneIDs)
894 if count_no_match:
895 warnings.warn('%s datasets could not be found the database. They cannot be processed.' % count_no_match)
897 return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
898 datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)
900 def from_filenames(self, list_filenames, virtual_sensor_id, datasetid_spatial_ref=249, comment=None):
901 """Create a GMS_JOB instance based on the given list of provider archive filenames.
903 :param list_filenames:
904 :param virtual_sensor_id:
905 :param datasetid_spatial_ref:
906 :param comment:
907 :return:
908 """
910 res_sceneIDs = get_info_from_postgreSQLdb(self.conn, 'scenes', ['id', ], {'filename': list_filenames})
911 if not res_sceneIDs:
912 raise ValueError('No matching database entries found for the given filenames.')
914 list_sceneIDs = np.array(res_sceneIDs)[:, 0].tolist()
915 count_no_match = len(list_filenames) - len(list_sceneIDs)
917 if count_no_match:
918 warnings.warn('%s datasets could not be found the database. They cannot be processed.')
920 return self.from_sceneIDlist(list_sceneIDs, virtual_sensor_id,
921 datasetid_spatial_ref=datasetid_spatial_ref, comment=comment)
923 def _get_validated_sceneInfoDFs(self, DF_SatSenFname):
924 # type: (DataFrame) -> DataFrame
925 """
927 :param DF_SatSenFname:
928 :return:
929 """
931 df = DF_SatSenFname
933 # loop through all satellite-sensor combinations and get scene information from database
934 all_df_recs, all_df_miss = [], []
935 all_satellites, all_sensors = zip(
936 *[i.split('__') for i in (np.unique(df['satellite'] + '__' + df['sensor']))])
938 for satellite, sensor in zip(all_satellites, all_sensors):
939 cur_df = df.loc[(df['satellite'] == satellite) & (df['sensor'] == sensor)]
940 filenames = list(cur_df['filename'])
942 satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': satellite})
943 senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': sensor})
944 assert len(satID_res), "No satellite named '%s' found in database." % satellite
945 assert len(senID_res), "No sensor named '%s' found in database." % sensor
947 # append sceneid and wkb_hex bounds
948 if 'sceneid' in df.columns:
949 sceneIDs = list(cur_df['sceneid'])
950 conddict = dict(id=sceneIDs, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])
951 else:
952 conddict = dict(filename=filenames, satelliteid=satID_res[0][0], sensorid=senID_res[0][0])
954 records = get_info_from_postgreSQLdb(
955 self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'], conddict)
956 records = DataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
957 if 'sceneid' in df.columns:
958 del records['sceneid']
960 cur_df = cur_df.merge(records, on='filename', how="outer", copy=False)
962 # separate records with valid matches in database from invalid matches (filename not found in database)
963 df_recs = cur_df[
964 cur_df.sceneid.notnull()].copy() # creates a copy (needed to be able to apply maps later)
965 df_miss = cur_df[cur_df.sceneid.isnull()] # creates a view
967 # convert scene ids from floats to integers
968 df_recs['sceneid'] = list(df_recs.sceneid.map(lambda sceneid: int(sceneid)))
970 # wkb_hex bounds to shapely polygons
971 df_recs['polygons'] = list(df_recs.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
973 all_df_recs.append(df_recs)
974 all_df_miss.append(df_miss)
976 # merge all dataframes of all satellite-sensor combinations
977 df_recs_compl = DataFrame(pd.concat(all_df_recs))
978 df_miss_compl = DataFrame(pd.concat(all_df_miss))
980 # populate attributes
981 if not df_miss_compl.empty:
982 warnings.warn('The following scene filenames could not been found in the GeoMultiSens database: \n%s'
983 % '\n'.join(list(df_miss_compl['filename'])))
985 return df_recs_compl
987 def _populate_jobAttrs_from_sceneInfoDF(self, sceneInfoDF):
988 # type: (DataFrame) -> None
989 """
991 :param sceneInfoDF:
992 :return:
993 """
995 if not sceneInfoDF.empty:
996 self.dataframe = sceneInfoDF
997 self.sceneids = list(self.dataframe['sceneid'])
998 self.statistics = [len(self.sceneids)] + [0] * 8
999 self.bounds = box(*MultiPolygon(list(self.dataframe['polygons'])).bounds)
1000 self.timerange_start = self.dataframe.acquisitiondate.min().to_pydatetime()
1001 self.timerange_end = self.dataframe.acquisitiondate.max().to_pydatetime()
1003 def from_job_ID(self, job_ID):
1004 # type: (int) -> GMS_JOB
1005 """
1006 Create a GMS_JOB instance by querying the database for a specific job ID.
1007 :param job_ID: <int> a valid id from the database table 'jobs'
1008 """
1010 res = get_info_from_postgreSQLdb(self.conn, 'jobs', self.jobs_table_columns, {'id': job_ID})
1011 if not res:
1012 raise ValueError("No job with ID %s found in 'jobs' table of the database." % job_ID)
1014 self.exists_in_db = True
1015 [setattr(self, attrName, res[0][i]) for i, attrName in enumerate(self.jobs_table_columns)]
1016 self.bounds = wkb_loads(self.bounds, hex=True)
1018 # fill self.dataframe
1019 records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['satelliteid', 'sensorid', 'filename',
1020 'id', 'acquisitiondate', 'bounds'],
1021 {'id': self.sceneids})
1022 df = DataFrame(records,
1023 columns=['satelliteid', 'sensorid', 'filename', 'sceneid', 'acquisitiondate', 'geom'])
1024 all_satIDs = df.satelliteid.unique().tolist()
1025 all_senIDs = df.sensorid.unique().tolist()
1026 satName_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['name'], {'id': all_satIDs})
1027 senName_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['name'], {'id': all_senIDs})
1028 all_satNames = [i[0] for i in satName_res]
1029 all_senNames = [i[0] for i in senName_res]
1030 id_satName_dict = dict(zip(all_satIDs, all_satNames))
1031 id_senName_dict = dict(zip(all_senIDs, all_senNames))
1032 df.insert(0, 'satellite', list(df.satelliteid.map(lambda satID: id_satName_dict[satID])))
1033 df.insert(1, 'sensor', list(df.sensorid.map(lambda senID: id_senName_dict[senID])))
1034 df['polygons'] = list(df.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
1036 self.dataframe = df[['satellite', 'sensor', 'filename', 'sceneid', 'acquisitiondate', 'geom', 'polygons']]
1038 return self
1040 def reset_job_progress(self):
1041 """Resets everthing in the database entry that has been written during the last run of the job..
1042 """
1044 self.finishtime = None
1045 self.failed_sceneids = []
1046 self.progress = None
1047 self.status = 'pending'
1048 self.statistics = [len(self.sceneids)] + [0] * 8
1050 self.update_db_entry()
1052 def _get_dataframe(self, datadict): # FIXME deprecated
1053 df = DataFrame(datadict, columns=['satellite', 'sensor', 'filenames'])
1054 df.columns = ['satellite', 'sensor', 'filename']
1056 satID_res = get_info_from_postgreSQLdb(self.conn, 'satellites', ['id'], {'name': datadict['satellite']})
1057 senID_res = get_info_from_postgreSQLdb(self.conn, 'sensors', ['id'], {'name': datadict['sensor']})
1058 assert len(satID_res), "No satellite named '%s' found in database." % datadict['satellite']
1059 assert len(senID_res), "No sensor named '%s' found in database." % datadict['sensor']
1061 # append sceneid and wkb_hex bounds
1062 records = get_info_from_postgreSQLdb(self.conn, 'scenes', ['filename', 'id', 'acquisitiondate', 'bounds'],
1063 {'filename': datadict['filenames'],
1064 'satelliteid': satID_res[0][0], 'sensorid': senID_res[0][0]})
1065 records = DataFrame(records, columns=['filename', 'sceneid', 'acquisitiondate', 'geom'])
1066 df = df.merge(records, on='filename', how="outer")
1068 # separate records with valid matches in database from invalid matches (filename not found in database)
1069 df_recs = df[df.sceneid.notnull()].copy() # creates a copy (needed to be able to apply maps later)
1070 df_miss = df[df.sceneid.isnull()] # creates a view
1072 # convert scene ids from floats to integers
1073 df_recs['sceneid'] = list(df_recs.sceneid.map(lambda sceneid: int(sceneid)))
1075 # wkb_hex bounds to shapely polygons
1076 df_recs['polygons'] = list(df_recs.geom.map(lambda wkb_hex: wkb_loads(wkb_hex, hex=True)))
1078 return df_recs, df_miss
1080 def create(self):
1081 # type: () -> int
1082 """
1083 Add the job to the 'jobs' table of the database
1084 :return: <int> the job ID of the newly created job
1085 """
1087 if not self.dataframe.empty:
1088 all_sat, all_sen = \
1089 zip(*[i.split('__') for i in
1090 (np.unique(self.dataframe['satellite'] + '__' + self.dataframe['sensor']))])
1091 counts = [self.dataframe[(self.dataframe['satellite'] == sat) &
1092 (self.dataframe['sensor'] == sen)]['sceneid'].count()
1093 for sat, sen in zip(all_sat, all_sen)]
1094 self.scene_counts = {'%s %s' % (sat, sen): cnt for sat, sen, cnt in zip(all_sat, all_sen, counts)}
1095 self.statistics = [len(self.sceneids)] + [0] * 8
1097 db_entry = self.db_entry
1098 del db_entry['id']
1100 newID = create_record_in_postgreSQLdb(self.conn, 'jobs', db_entry)
1101 assert isinstance(newID, int)
1103 SatSen_CountTXT = ['%s %s %s scene' % (cnt, sat, sen) if cnt == 1 else '%s %s %s scenes' % (cnt, sat, sen)
1104 for sat, sen, cnt in zip(all_sat, all_sen, counts)]
1105 print('New job created successfully. job-ID: %s\nThe job contains:' % newID)
1106 [print('\t- %s' % txt) for txt in SatSen_CountTXT]
1108 self.exists_in_db = True
1109 self.id = newID
1110 return self.id
1111 else:
1112 print('No job created because no matching scene could be found in database!')
1114 def update_db_entry(self):
1115 """Updates the all values of current database entry belonging to the respective job ID. New values are taken
1116 from the attributes of the GMS_JOB instance.
1117 """
1119 assert self.exists_in_db
1120 db_entry = self.db_entry
1121 del db_entry['id'] # primary key of the record cannot be overwritten
1122 update_records_in_postgreSQLdb(self.conn, 'jobs', db_entry, {'id': self.id})
1124 def delete_procdata_of_failed_sceneIDs(self, proc_level='all', force=False):
1125 """Deletes all data where processing failed within the current job ID.
1127 :param proc_level: <str> delete only results that have the given processing level
1128 :param force:
1129 """
1131 self.__delete_procdata(self.failed_sceneids, 'failed', proc_level=proc_level, force=force)
1133 def delete_procdata_of_entire_job(self, proc_level='all', force=False):
1134 """Deletes all scene data processed by the current job ID.
1136 :param proc_level: <str> delete only results that have the given processing level
1137 :param force:
1138 """
1140 self.__delete_procdata(self.sceneids, 'processed', proc_level=proc_level, force=force)
1142 def __delete_procdata(self, list_sceneIDs, scene_desc, proc_level='all', force=False):
1143 """Applies delete_processing_results on each scene given in list_sceneIDs.
1145 :param list_sceneIDs: <list> a list of scene IDs
1146 :param scene_desc: <str> a description like 'succeeded' or 'failed'
1147 :param proc_level: <str> delete only results that have the given processing level
1148 :param force:
1149 """
1151 if self.exists_in_db:
1152 if list_sceneIDs:
1153 delete = 'J'
1154 if not force:
1155 delete = input("Do you really want to delete the processing results of %s scenes? (J/n)"
1156 % len(list_sceneIDs))
1157 if delete == 'J':
1158 [delete_processing_results(ScID, proc_level=proc_level, force=force) for ScID in list_sceneIDs]
1159 else:
1160 warnings.warn(
1161 '\nAccording to the database the job has no %s scene IDs. Nothing to delete.' % scene_desc)
1162 else:
1163 warnings.warn('The job with the ID %s does not exist in the database. Thus there are no %s scene IDs.'
1164 % (scene_desc, self.id))
1167def delete_processing_results(scene_ID, proc_level='all', force=False):
1168 """Deletes the processing results of a given scene ID
1170 :param scene_ID: <int> the scene ID to delete results from
1171 :param proc_level: <str> delete only results that have the given processing level
1172 :param force: <bool> force deletion without user interaction
1173 """
1175 if proc_level not in ['all'] + proc_chain:
1176 raise ValueError("'%s' is not a supported processing level." % proc_level)
1178 path_procdata = PG.path_generator(scene_ID=scene_ID).get_path_procdata()
1179 if not os.path.isdir(path_procdata):
1180 print('The folder %s does not exist. Nothing to delete.' % path_procdata)
1181 else:
1182 delete = 'J'
1183 if not force:
1184 dir_list = os.listdir(path_procdata) if proc_level == 'all' else \
1185 glob.glob(os.path.join(path_procdata, '*%s*' % proc_level))
1186 count_files = len([i for i in dir_list if os.path.isfile(os.path.join(path_procdata, i))])
1187 count_dirs = len([i for i in dir_list if os.path.isdir(os.path.join(path_procdata, i))])
1188 if count_files or count_dirs:
1189 delete = input("Do you really want to delete the folder %s? It contains %s files and %s directories"
1190 " to delete. (J/n)" % (path_procdata, count_files, count_dirs))
1191 else:
1192 print('The folder %s does not not contain any files that match the given deletion criteria. '
1193 'Nothing to delete.' % path_procdata)
1194 if delete == 'J':
1195 try:
1196 if proc_level == 'all':
1197 try:
1198 shutil.rmtree(path_procdata)
1199 except OSError: # directory not deletable because it is not empty
1200 if [F for F in os.listdir(path_procdata) if not os.path.basename(F).startswith('.fuse_hidden')]:
1201 raise # raise OSError if there are other files than .fuse_hidden... remaining
1202 else:
1203 files2delete = glob.glob(os.path.join(path_procdata, '*%s*' % proc_level))
1204 errors = False # default
1205 for F in files2delete:
1206 try:
1207 os.remove(F)
1208 except OSError:
1209 if not os.path.basename(F).startswith('.fuse_hidden'):
1210 errors = True
1211 if errors:
1212 raise OSError('Not all files deleted properly.')
1214 except OSError:
1215 msg = '\nNot all files of scene %s could be deleted properly. Remaining files:\n%s\n\nThe following ' \
1216 'error occurred:\n%s' % (scene_ID, '\n'.join(os.listdir(path_procdata)), traceback.format_exc())
1217 warnings.warn(msg)
1220def add_externally_downloaded_data_to_GMSDB(conn_DB, src_folder, filenames, satellite, sensor):
1221 # type: (str,str,list,str,str) -> None
1222 """Adds externally downloaded satellite scenes to GMS fileserver AND updates the corresponding postgreSQL records
1223 by adding a filename and setting the processing level to 'DOWNLOADED'.:
1225 :param conn_DB: <str> pgSQL database connection parameters
1226 :param src_folder: <str> the source directory where externally provided archive files are saved
1227 :param filenames: <list> a list of filenames to be added to the GMS database
1228 :param satellite: <str> the name of the satellite to which the filenames are belonging
1229 :param sensor: <str> the name of the sensor to which the filenames are belonging
1230 """
1232 # FIXME this method only works for Landsat archives or if filename is already set in database
1233 # FIXME (not always the case for S2A)!
1234 res = [get_entityIDs_from_filename(conn_DB, fName) for fName in filenames]
1235 entityIDs = list(itertools.chain.from_iterable(res))
1237 sceneID_fName_arr = get_notDownloadedsceneIDs(conn_DB, entityIDs, satellite, sensor, src_folder)
1239 files2copy = list(set(sceneID_fName_arr[:, 1]))
1240 target_folder = os.path.join(CFG.path_archive, satellite, sensor)
1241 assert os.path.exists(target_folder), 'Target folder not found: %s.' % target_folder
1242 print('Copying %s files to %s.' % (len(files2copy), target_folder))
1244 for i in range(sceneID_fName_arr.shape[0]):
1245 sceneID, fName = sceneID_fName_arr[i, :]
1246 src_P = os.path.join(src_folder, fName)
1247 if os.path.exists(os.path.join(target_folder, os.path.basename(src_P))):
1248 print("File '%s' already exists in the target folder. Skipped." % os.path.basename(src_P))
1249 else:
1250 print('copying %s...' % src_P)
1251 shutil.copy(src_P, target_folder)
1253 print("Setting proc_level for scene ID '%s' to 'DOWNLOADED' and adding filename..." % sceneID)
1254 update_records_in_postgreSQLdb(conn_DB, 'scenes', {'filename': fName, 'proc_level': 'DOWNLOADED'},
1255 {'id': sceneID})
1258def add_missing_filenames_in_pgSQLdb(conn_params): # FIXME
1259 res = get_info_from_postgreSQLdb(conn_params, 'scenes', ['id', 'entityid', 'satelliteid', 'sensorid', 'filename'],
1260 {'filename': None, 'proc_level': 'DOWNLOADED', 'sensorid': 8}, timeout=120000)
1261 gdf = GeoDataFrame(res, columns=['sceneid', 'entityid', 'satelliteid', 'sensorid', 'filename'])
1263 def get_fName(sceneid): return PG.path_generator(scene_ID=sceneid).get_local_archive_path_baseN()
1265 def get_fName_if_exists(path): return os.path.basename(path) if os.path.exists(path) else None
1267 gdf['archive_path'] = list(gdf['sceneid'].map(get_fName))
1268 gdf['filename'] = list(gdf['archive_path'].map(get_fName_if_exists))
1270 print(gdf)
1273def pdDataFrame_to_sql_k(engine, frame, name, if_exists='fail', index=True,
1274 index_label=None, schema=None, chunksize=None, dtype=None, **kwargs):
1275 # type: (any,pd.DataFrame,str,str,bool,str,str,int,dict,any) -> None
1276 """Extends the standard function pandas.io.SQLDatabase.to_sql() with 'kwargs' which allows to set the primary key
1277 of the target table for example. This is usually not possible with the standard to_sql() function.
1279 :param engine: SQLAlchemy engine (created by sqlalchemy.create_engine)
1280 :param frame: the pandas.DataFrame or geopandas.GeoDataFrame to be exported to SQL-like database
1281 :param name: <str> Name of SQL table
1282 :param if_exists: <str> {'fail', 'replace', 'append'} the action to be executed if target table already exists
1283 :param index: <bool> Write DataFrame index as a column.
1284 :param index_label: <str> Column label for index column(s).
1285 :param schema: <str> Specify the schema (if database flavor supports this). If None, use default schema.
1286 :param chunksize: <int> If not None, then rows will be written in batches of this size at a time.
1287 If None, all rows will be written at once.
1288 :param dtype: <dict> a dictionary of column names and corresponding postgreSQL types
1289 The types should be a SQLAlchemy or GeoSQLAlchemy2 type,
1290 :param kwargs: keyword arguments to be passed to SQLTable
1291 """
1293 pandas_sql = pandasSQL_builder(engine, schema=None, flavor=None)
1294 if dtype is not None:
1295 for col, my_type in dtype.items():
1296 if not isinstance(to_instance(my_type), TypeEngine):
1297 raise ValueError('The type of %s is not a SQLAlchemy type ' % col)
1299 table = SQLTable(name, pandas_sql, frame=frame, index=index, if_exists=if_exists, index_label=index_label,
1300 schema=schema, dtype=dtype, **kwargs)
1301 table.create()
1302 table.insert(chunksize)
1305def import_shapefile_into_postgreSQL_database(path_shp, tablename, cols2import=None, dtype_dic=None,
1306 if_exists='fail', index_label=None, primarykey=None):
1307 # type: (str,str,list,dict,str,str,str) -> None
1308 """Imports all features of shapefile into the specified table of the postgreSQL database. Geometry is automatically
1309 converted to postgreSQL geometry data type.
1310 :param path_shp: <str> path of the shapefile to be imported
1311 :param tablename: <str> name of the table within the postgreSQL database where records shall be added
1312 :param cols2import: <list> a list of column names to be imported
1313 :param dtype_dic: <dict> a dictionary of column names and corresponding postgreSQL types
1314 The types should be a SQLAlchemy or GeoSQLAlchemy2 type,
1315 or a string for sqlite3 fallback connection.
1316 :param if_exists: <str> {'fail', 'replace', 'append'} the action to be executed if target table already exists
1317 :param index_label: <str> Column label for index column(s).
1318 :param primarykey: <str> the name of the column to be set as primary key of the target table
1319 """
1321 print('Reading shapefile %s...' % path_shp)
1322 GDF = GeoDataFrame.from_file(path_shp)
1323 GDF['geom'] = list(GDF['geometry'].map(str))
1324 # GDF['geom'] = [*GDF['geometry'].map(lambda shapelyPoly: "'SRID=4326;%s'::geometry" %shapelyPoly)]
1325 # GDF['geom'] = [*GDF['geometry'].map(lambda shapelyPoly: "'SRID=4326;%s'" % shapelyPoly)]
1326 # GDF['geom'] = [*GDF['geometry'].map(lambda shapelyPoly: shapelyPoly.wkb_hex)]
1327 # GDF['geom'] = [*GDF['geometry'].map(lambda shapelyPoly: str(from_shape(shapelyPoly, srid=4326)))]
1328 # from geoalchemy2.shape import from_shape
1330 cols2import = cols2import + ['geom'] if cols2import else list(GDF.columns)
1331 subGDF = GDF[cols2import]
1332 dtype_dic = dtype_dic if dtype_dic else {}
1333 dtype_dic.update({'geom': GEOMETRY})
1334 # import geoalchemy2
1335 # dtype_dic.update({'geom': geoalchemy2.types.Geometry(geometry_type='POLYGON',srid=4326)})
1336 print('Adding shapefile geometries to postgreSQL table %s...' % tablename)
1337 engine = create_engine('postgresql://gmsdb:gmsdb@%s/geomultisens' % CFG.db_host)
1338 pdDataFrame_to_sql_k(engine, subGDF, tablename, index_label=index_label,
1339 keys=primarykey, if_exists=if_exists, dtype=dtype_dic)
1340 # set SRID
1341 conn = psycopg2.connect(CFG.conn_database)
1342 cursor = conn.cursor()
1343 cursor.execute("UPDATE %s SET geom = ST_SetSRID(geom, 4326);" % tablename)
1344 conn.commit()
1345 cursor.close()
1346 conn.close()
1349def data_DB_updater(obj_dict):
1350 # type: (dict) -> None
1351 """Updates the table "scenes_proc" or "mgrs_tiles_proc within the postgreSQL database
1352 according to the given dictionary of a GMS object.
1354 :param obj_dict: <dict> a copy of the dictionary of the respective GMS object
1355 """
1357 assert isinstance(obj_dict, dict), 'The input for data_DB_updater() has to be a dictionary.'
1359 def list2str(list2convert): return ''.join([str(val) for val in list2convert])
1361 connection = psycopg2.connect(CFG.conn_database)
1362 if connection is None:
1363 print('Database connection could not be established. Database entry could not be created or updated.')
1364 else:
1365 if obj_dict['arr_shape'] != 'MGRS_tile':
1366 table2update = 'scenes_proc'
1367 dict_dbkey_objkey = {'sceneid': obj_dict['scene_ID'],
1368 'georef': True if obj_dict['georef'] else False,
1369 'proc_level': obj_dict['proc_level'],
1370 'layer_bands_assignment': ''.join(obj_dict['LayerBandsAssignment']),
1371 'bounds': Polygon(obj_dict['trueDataCornerLonLat'])}
1373 matchExp = 'WHERE ' + get_postgreSQL_matchingExp('sceneid', dict_dbkey_objkey['sceneid'])
1374 keys2update = ['georef', 'proc_level', 'layer_bands_assignment', 'bounds']
1376 else: # MGRS_tile
1377 table2update = 'mgrs_tiles_proc'
1379 def get_tile_bounds_box(bnds): return box(bnds[0], bnds[2], bnds[1], bnds[3])
1380 dict_dbkey_objkey = {'sceneid': obj_dict['scene_ID'],
1381 'scenes_proc_id': obj_dict['scenes_proc_ID'],
1382 'mgrs_code': obj_dict['MGRS_info']['tile_ID'],
1383 'virtual_sensor_id': CFG.virtual_sensor_id,
1384 'proc_level': obj_dict['proc_level'],
1385 'coreg_success': obj_dict['coreg_info']['success'],
1386 'tile_bounds': get_tile_bounds_box(obj_dict['bounds_LonLat']),
1387 'data_corners': Polygon(obj_dict['trueDataCornerLonLat'])}
1389 matchExp = 'WHERE ' + ' AND '.join([get_postgreSQL_matchingExp(k, dict_dbkey_objkey[k])
1390 for k in ['sceneid', 'mgrs_code', 'virtual_sensor_id']])
1391 keys2update = ['scenes_proc_id', 'proc_level', 'coreg_success', 'tile_bounds', 'data_corners']
1392 if obj_dict['scenes_proc_ID'] is None:
1393 keys2update.remove('scenes_proc_id')
1395 cursor = connection.cursor()
1397 # check if record exists
1398 execute_pgSQL_query(cursor, "SELECT EXISTS(SELECT 1 FROM %s %s)" % (table2update, matchExp))
1400 # create new entry
1401 if cursor.fetchone()[0] == 0:
1402 keys, vals = zip(*[(k, str(get_postgreSQL_value(v))) for k, v in dict_dbkey_objkey.items()])
1403 execute_pgSQL_query(cursor,
1404 "INSERT INTO %s (%s) VALUES (%s);" % (table2update, ','.join(keys), ','.join(vals)))
1405 # or update existing entry
1406 else:
1407 setExp = 'SET ' + ','.join(
1408 ['%s=%s' % (k, get_postgreSQL_value(dict_dbkey_objkey[k])) for k in keys2update])
1409 execute_pgSQL_query(cursor, "UPDATE %s %s %s;" % (table2update, setExp, matchExp))
1411 if 'connection' in locals():
1412 connection.commit()
1413 connection.close()
1416def postgreSQL_table_to_csv(conn_db, path_csv, tablename):
1417 # GeoDataFrame.to_csv(path_csv, index_label='id')
1418 raise NotImplementedError # TODO
1421def archive_exists_on_fileserver(conn_DB, entityID):
1422 # type: (str,str) -> bool
1423 """Queries the postgreSQL database for the archive filename of the given entity ID and checks if the
1424 corresponding archive file exists in the archive folder.
1426 :param conn_DB: <str> pgSQL database connection parameters
1427 :param entityID: <str> entity ID to be checked
1428 """
1430 records = get_info_from_postgreSQLdb(conn_DB, 'scenes', ['satelliteid', 'sensorid'], {'entityid': entityID})
1431 records_filt = [rec for rec in records if rec[0] is not None and rec[1] is not None]
1432 if len(records_filt) == 1:
1433 satID, senID = records_filt[0]
1434 satellite = get_info_from_postgreSQLdb(conn_DB, 'satellites', ['name'], {'id': satID})[0][0]
1435 sensor = get_info_from_postgreSQLdb(conn_DB, 'sensors', ['name'], {'id': senID})[0][0]
1436 sensor = sensor if sensor != 'ETM+_SLC_OFF' else 'ETM+' # join sensors 'ETM+' and 'ETM+_SLC_OFF'
1437 archive_fold = os.path.join(CFG.path_archive, satellite, sensor)
1438 assert os.path.exists(archive_fold), 'Archive folder not found: %s.' % archive_fold
1440 if re.search(r'Landsat', satellite, re.I):
1441 exists = os.path.exists(os.path.join(archive_fold, entityID + '.tar.gz'))
1442 else:
1443 raise NotImplementedError
1444 elif len(records_filt) == 0:
1445 warnings.warn("No database record found for entity ID '%s'. Dataset skipped." % entityID)
1446 exists = False
1447 else:
1448 warnings.warn("More than one database records found for entity ID '%s'. Dataset skipped." % entityID)
1449 exists = False
1451 return exists
1454def record_stats_memusage(conn_db, GMS_obj):
1455 # type: (str, GMS_object) -> bool
1456 if list(sorted(GMS_obj.mem_usage.keys())) != ['L1A', 'L1B', 'L1C', 'L2A', 'L2B', 'L2C']:
1457 GMS_obj.logger.info('Unable to record memory usage statistics because statistics are missing for some '
1458 'processing levels. ')
1459 return False
1461 vals2write_dict = dict(
1462 creationtime=datetime.now(),
1463 software_version=CFG.version,
1464 datasetid=GMS_obj.dataset_ID,
1465 virtual_sensor_id=CFG.virtual_sensor_id,
1466 target_gsd=CFG.target_gsd[0], # respects only xgsd
1467 target_nbands=len(CFG.target_CWL),
1468 inmem_serialization=CFG.inmem_serialization,
1469 target_radunit_optical=CFG.target_radunit_optical,
1470 skip_coreg=CFG.skip_coreg,
1471 ac_estimate_accuracy=CFG.ac_estimate_accuracy,
1472 ac_bandwise_accuracy=CFG.ac_bandwise_accuracy,
1473 spathomo_estimate_accuracy=CFG.spathomo_estimate_accuracy,
1474 spechomo_estimate_accuracy=CFG.spechomo_estimate_accuracy,
1475 spechomo_bandwise_accuracy=CFG.spechomo_bandwise_accuracy,
1476 parallelization_level=CFG.parallelization_level,
1477 skip_thermal=CFG.skip_thermal,
1478 skip_pan=CFG.skip_pan,
1479 mgrs_pixel_buffer=CFG.mgrs_pixel_buffer,
1480 cloud_masking_algorithm=CFG.cloud_masking_algorithm[GMS_obj.satellite],
1481 used_mem_l1a=GMS_obj.mem_usage['L1A'],
1482 used_mem_l1b=GMS_obj.mem_usage['L1B'],
1483 used_mem_l1c=GMS_obj.mem_usage['L1C'],
1484 used_mem_l2a=GMS_obj.mem_usage['L2A'],
1485 used_mem_l2b=GMS_obj.mem_usage['L2B'],
1486 used_mem_l2c=GMS_obj.mem_usage['L2C'],
1487 dims_x_l2a=GMS_obj.arr.cols,
1488 dims_y_l2a=GMS_obj.arr.rows,
1489 is_test=CFG.is_test,
1490 sceneid=GMS_obj.scene_ID
1491 )
1493 # get all existing database records matching the respective config
1494 # NOTE: those columns that do not belong the config specification are ignored
1495 vals2get = list(vals2write_dict.keys())
1496 df_existing_recs = pd.DataFrame(
1497 get_info_from_postgreSQLdb(conn_db, 'stats_mem_usage_homo',
1498 vals2return=vals2get,
1499 cond_dict={k: v for k, v in vals2write_dict.items()
1500 if k not in ['creationtime', 'used_mem_l1a', 'used_mem_l1b',
1501 'used_mem_l1c', 'used_mem_l2a', 'used_mem_l2b',
1502 'used_mem_l2c', 'dims_x_l2a', 'dims_y_l2b', 'sceneid']}),
1503 columns=vals2get)
1505 # filter the existing records by gms_preprocessing software version number
1506 # (higher than CFG.min_version_mem_usage_stats)
1507 vers = list(df_existing_recs.software_version)
1508 vers_usable = [ver for ver in vers if parse_version(ver) >= parse_version(CFG.min_version_mem_usage_stats)]
1509 df_existing_recs_usable = df_existing_recs.loc[df_existing_recs.software_version.isin(vers_usable)]
1511 # add memory stats to database
1512 # (but skip if there are already 10 records matching the respective config and software version number
1513 # or if the current scene ID is already among the matching records)
1514 if len(df_existing_recs_usable) < 10 and GMS_obj.scene_ID not in list(df_existing_recs_usable.sceneid):
1515 create_record_in_postgreSQLdb(conn_db, 'stats_mem_usage_homo', vals2write_dict=vals2write_dict)
1516 return True
1517 else:
1518 return False