diff --git a/python/utils.py b/python/utils.py index aae92048..45fb65ef 100644 --- a/python/utils.py +++ b/python/utils.py @@ -12,6 +12,7 @@ import psycopg2 import json import decimal +import re from elasticsearch import Elasticsearch from elasticsearch import helpers @@ -61,7 +62,7 @@ def __init__(self, PostGISConnection, ESConnection, view, sqlquerystring): self._pgConnection = PostGISConnection self._esConnection = ESConnection self._view = view - self._sqlquerystring = sqlquerystring + self._sqlquerystring = re.sub(r'\s{2,}', ' ', sqlquerystring) self._auth = get_config_params('config.ini') def pgConnection(self): @@ -79,12 +80,8 @@ def auth(self): def sqlquerystring(self): return self._sqlquerystring - def getGeoJson(self, sqlquerystring, pgConnection): - cur = pgConnection.pgConnection().cursor() - cur.execute(sqlquerystring) - rows = cur.fetchall() + def getGeoJson(self, rows, columns): if rows: - columns = [name[0] for name in cur.description] geomIndex = columns.index('st_asgeojson') feature_collection = {'type': 'FeatureCollection', 'features': []} @@ -101,6 +98,7 @@ def getGeoJson(self, sqlquerystring, pgConnection): value = row[index] feature['properties'][column] = value feature_collection['features'].append(feature) + geojsonobject = json.dumps(feature_collection, indent=2, default=decimal_default) @@ -142,33 +140,41 @@ def postgis2es(self): sqlquerystring = self.sqlquerystring().format( **{'limit': self.LIMIT, 'offset': self.OFFSET}) - geojsonobject = self.getGeoJson(sqlquerystring, self.pgConnection()) - while geojsonobject is not None: - - print(sqlquerystring) - self.populateElasticSearchIndex(self.esConnection(), - geojsonobject, - self.auth(), - self.view()) - self.OFFSET += self.LIMIT - - sqlquerystring = self.sqlquerystring().format( - **{'limit': self.LIMIT, - 'offset': self.OFFSET}) - geojsonobject = self.getGeoJson(sqlquerystring, - self.pgConnection()) + + # Remove LIMIT and OFFSET until we decide to change all caller scripts + sqlquerystring = re.sub(r'\s+LIMIT.*', '', sqlquerystring) + + print(sqlquerystring) + + with self.pgConnection().pgConnection() as conn: + with conn.cursor(name='postgis2es_cursor') as cur: + cur.itersize = self.LIMIT + cur.execute(sqlquerystring) + rows = cur.fetchmany(self.LIMIT) + columns = [name[0] for name in cur.description] + + count = 0 + while rows: + count = count + 1 + print("Rows %d-%d, %s = %s" % + (self.LIMIT * (count - 1) + 1, self.LIMIT * count, + columns[0], rows[0][0])) + + geojsonobject = self.getGeoJson(rows, columns) + # print("populateElasticsearchIndex()") + self.populateElasticSearchIndex(self.esConnection(), + geojsonobject, + self.auth(), + self.view()) + rows = cur.fetchmany(self.LIMIT) return class PostGISPointDataset(PostGISdataset): - def getGeoJson(self, sqlquerystring, pgConnection): - cur = pgConnection.pgConnection().cursor() - cur.execute(sqlquerystring) - rows = cur.fetchall() + def getGeoJson(self, rows, columns): if rows: - columns = [name[0] for name in cur.description] geomIndex = columns.index('st_asgeojson') feature_collection = {'type': 'FeatureCollection', 'features': []} @@ -187,6 +193,7 @@ def getGeoJson(self, sqlquerystring, pgConnection): value = row[index] feature['properties'][column] = value feature_collection['features'].append(feature) + geojsonobject = json.dumps(feature_collection, indent=2, default=decimal_default) @@ -197,12 +204,8 @@ def getGeoJson(self, sqlquerystring, pgConnection): class PostGISTable(PostGISdataset): - def getGeoJson(self, sqlquerystring, pgConnection): - cur = pgConnection.pgConnection().cursor() - cur.execute(sqlquerystring) - rows = cur.fetchall() + def getGeoJson(self, rows, columns): if rows: - columns = [name[0] for name in cur.description] # geomIndex = columns.index('st_asgeojson') feature_collection = {'type': 'FeatureCollection', 'features': []} @@ -221,6 +224,7 @@ def getGeoJson(self, sqlquerystring, pgConnection): value = row[index] feature['properties'][column] = value feature_collection['features'].append(feature) + geojsonobject = json.dumps(feature_collection, indent=2, default=decimal_default)