diff --git a/aore/__init__.py b/aore/__init__.py index 2a488d7..81f32be 100644 --- a/aore/__init__.py +++ b/aore/__init__.py @@ -1,3 +1,4 @@ +import logging import os import sys @@ -5,3 +6,4 @@ reload(sys) cwd = os.getcwd() sys.path.append(cwd) sys.setdefaultencoding("utf-8") +logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO) diff --git a/aore/aoutils/aodataparser.py b/aore/aoutils/aodataparser.py index 9b4a16d..cae19c6 100644 --- a/aore/aoutils/aodataparser.py +++ b/aore/aoutils/aodataparser.py @@ -18,17 +18,23 @@ class AoDataParser: self.pagesize = pagesize self.currentpage = 0 self.counter = 0 + self.addrobj_filter = self.datasource.table_name == 'ADDROBJ' and self.datasource.operation_type == AoXmlTableEntry.OperationType.create self.base_filename = "" self.csv_file = None self.data_bereit_callback = None def import_update(self, attr): + # Addrobj anvanced filter + if self.addrobj_filter: + if attr['ACTSTATUS'] == '0' or 'NEXTID' in attr: + return + if self.counter > self.pagesize: # Send old file to DB engine if self.csv_file: self.csv_file.close() - self.data_bereit_callback(os.path.abspath(self.csv_file.name)) + self.data_bereit_callback(self.counter, os.path.abspath(self.csv_file.name)) os.remove(self.csv_file.name) # Prepare to next iteration @@ -49,20 +55,21 @@ class AoDataParser: # Output - sql query def parse(self, data_callback): - if self.datasource.operation_type == AoXmlTableEntry.OperationType.update: - self.data_bereit_callback = data_callback - self.currentpage = 0 - self.base_filename = trashfolder + "fd_" + str(self.datasource.operation_type) + "_" + \ - self.datasource.table_name + ".csv.part{}" - self.counter = self.pagesize + 1 + self.data_bereit_callback = data_callback + self.currentpage = 0 + self.base_filename = \ + trashfolder + "fd_" + \ + str(self.datasource.operation_type) + "_" + \ + self.datasource.table_name + ".csv.part{}" + self.counter = self.pagesize + 1 - xml_parser = XMLParser(self.import_update) - src = self.datasource.open() - xml_parser.parse_buffer(src, db_shemas[self.datasource.table_name].xml_tag) + xml_parser = XMLParser(self.import_update) + src = self.datasource.open() + xml_parser.parse_buffer(src, db_shemas[self.datasource.table_name].xml_tag) - # Send last file to db processor - if self.csv_file: - self.csv_file.close() - self.data_bereit_callback(os.path.abspath(self.csv_file.name)) - os.remove(self.csv_file.name) - src.close() + # Send last file to db processor + if self.csv_file: + self.csv_file.close() + self.data_bereit_callback(self.counter, os.path.abspath(self.csv_file.name)) + os.remove(self.csv_file.name) + src.close() diff --git a/aore/aoutils/aorar.py b/aore/aoutils/aorar.py index 9e52aaa..ff136e7 100644 --- a/aore/aoutils/aorar.py +++ b/aore/aoutils/aorar.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- +import logging import os.path from traceback import format_exc @@ -15,7 +16,7 @@ class AoRar: rarfile.UNRAR_TOOL = unrar def download(self, url): - print("Downloading {}".format(url)) + logging.info("Downloading {}".format(url)) try: local_filename = os.path.abspath(trashfolder + url.split('/')[-1]) if os.path.isfile(local_filename): @@ -28,10 +29,9 @@ class AoRar: if chunk: f.write(chunk) except: - print("Error downloading. Reason : {}".format(format_exc())) - return None + raise BaseException("Error downloading. Reason : {}".format(format_exc())) - print("Downloaded {} bytes".format(request.headers['Content-length'])) + logging.info("Downloaded {} bytes".format(request.headers['Content-length'])) return local_filename def get_table_entries(self, file_name, allowed_tables): @@ -43,7 +43,7 @@ class AoRar: if xmltable.table_name in allowed_tables: yield xmltable else: - print "Done" - # os.remove(file_name) TODO : Uncomment + logging.info("All entries processed") + os.remove(file_name) else: - print("No file specified or not exists") + logging.error("No file specified or not exists") diff --git a/aore/aoutils/aoupdater.py b/aore/aoutils/aoupdater.py index 9c3c6a4..bb36b27 100644 --- a/aore/aoutils/aoupdater.py +++ b/aore/aoutils/aoupdater.py @@ -14,7 +14,6 @@ from aore.dbutils.dbschemas import allowed_tables class AoUpdater: # Source: "http", directory (as a full path to unpacked xmls) def __init__(self, source="http"): - logging.basicConfig(format='%(asctime)s %(message)s') self.db_handler = DbHandler() self.mode = source self.updalist_generator = None @@ -53,9 +52,9 @@ class AoUpdater: self.updalist_generator = self.__get_updates_from_folder(self.mode) self.tablelist_generator = self.__get_entries_from_folder - def process_single_entry(self, table_xmlentry, chunck_size=50000): + def process_single_entry(self, operation_type, table_xmlentry, chunck_size=50000): aoparser = AoDataParser(table_xmlentry, chunck_size) - aoparser.parse(lambda x: self.db_handler.bulk_csv(chunck_size, table_xmlentry.table_name, x)) + aoparser.parse(lambda x, y: self.db_handler.bulk_csv(operation_type, table_xmlentry.table_name, x, y)) def create(self): self.__init_update_entries(True) @@ -63,9 +62,11 @@ class AoUpdater: for update_entry in self.updalist_generator: for table_entry in self.tablelist_generator(update_entry['url']): - self.process_single_entry(table_entry) + if table_entry.operation_type == AoXmlTableEntry.OperationType.update: + table_entry.operation_type = AoXmlTableEntry.OperationType.create + self.process_single_entry(table_entry.operation_type, table_entry) - logging.warning("Create success") + logging.info("Create success") def update(self, count=1): self.__init_update_entries(False) @@ -79,6 +80,6 @@ class AoUpdater: break for table_entry in self.tablelist_generator(update_entry['url']): - self.process_single_entry(table_entry) + self.process_single_entry(table_entry.operation_type, table_entry) - logging.warning("Update success") + logging.info("Update success") diff --git a/aore/aoutils/aoxmltableentry.py b/aore/aoutils/aoxmltableentry.py index 422a0db..942bed9 100644 --- a/aore/aoutils/aoxmltableentry.py +++ b/aore/aoutils/aoxmltableentry.py @@ -9,6 +9,7 @@ class AoXmlTableEntry: class OperationType(Enum): update = 1 delete = 0 + create = 2 def __str__(self): return self._name_ diff --git a/aore/config.py b/aore/config.py index 4fda068..b354591 100644 --- a/aore/config.py +++ b/aore/config.py @@ -15,9 +15,9 @@ DB_INSTANCES = dict( ), production=dict( host="localhost", - user="postgres", - password="intercon", - database="postgres", + user="***", + password="***", + database="***", ) ) @@ -26,6 +26,9 @@ UNRAR_PATHES = dict( production="unrar" ) +# Uncomment if you want to specify config_type manually +# config_type = "test" + # Main section db = DB_INSTANCES[config_type] unrar = UNRAR_PATHES[config_type] diff --git a/aore/dbutils/dbhandler.py b/aore/dbutils/dbhandler.py index 72118f3..fb57e55 100644 --- a/aore/dbutils/dbhandler.py +++ b/aore/dbutils/dbhandler.py @@ -1,10 +1,10 @@ # -*- coding: utf-8 -*- import logging -from traceback import format_exc import psycopg2 +from aore.aoutils.aoxmltableentry import AoXmlTableEntry from aore.config import db as dbparams from aore.dbutils.dbimpl import DBImpl from aore.dbutils.dbschemas import db_shemas @@ -12,37 +12,73 @@ from aore.dbutils.dbschemas import db_shemas class DbHandler: def __init__(self): - logging.basicConfig(format='%(asctime)s %(message)s') self.db = DBImpl(psycopg2, dbparams) - def bulk_csv(self, chunk_size, table_name, csv_file_name): - sql_query = "COPY \"{}\" ({}) FROM '{}' DELIMITER '\t' NULL 'NULL'". \ - format(table_name, - ", ".join( - db_shemas[table_name].fields), - csv_file_name) - try: - cur = self.db.get_cursor() - cur.execute(sql_query) - self.db.transaction_commit() - except: - self.db.transaction_rollback() - raise BaseException("Error updating sql. Reason : {}".format(format_exc())) + f = open("aore/templates/postgre/bulk_create.sql") + self.syntax_bulk_create = f.read() + f.close() - logging.warning("Inserted {} queries FROM {}".format(chunk_size, csv_file_name)) + f = open("aore/templates/postgre/bulk_update.sql") + self.syntax_bulk_update = f.read() + f.close() + + f = open("aore/templates/postgre/bulk_delete.sql") + self.syntax_bulk_delete = f.read() + f.close() + + def bulk_csv(self, operation_type, table_name, processed_count, csv_file_name): + sql_query = None + + # simple add new reocrds + if operation_type == AoXmlTableEntry.OperationType.create: + sql_query = self.syntax_bulk_create \ + .replace("%tab%", "\t") \ + .replace("%tablename%", table_name) \ + .replace("%fieldslist%", ", ".join(db_shemas[table_name].fields)) \ + .replace("%csvname%", csv_file_name) + + # update table + if operation_type == AoXmlTableEntry.OperationType.update: + fields_update_list = "" + for field in db_shemas[table_name].fields: + if field != db_shemas[table_name].unique_field.upper(): + fields_update_list += "{}=EXCLUDED.{}, ".format(field, field) + fields_update_list = fields_update_list[:-2] + + sql_query = self.syntax_bulk_update \ + .replace("%tab%", "\t") \ + .replace("%tablename%", table_name) \ + .replace("%fieldslist%", ", ".join(db_shemas[table_name].fields)) \ + .replace("%csvname%", csv_file_name) \ + .replace("%uniquekey%", db_shemas[table_name].unique_field) \ + .replace("%updaterule%", fields_update_list) + + if table_name == "ADDROBJ": + sql_query += "DELETE FROM \"%tablename%\" WHERE %filterrule%;" \ + .replace("%tablename%", table_name) \ + .replace("%filterrule%", + "ACTSTATUS = FALSE OR NEXTID IS NOT NULL") + + # delete records from table + if operation_type == AoXmlTableEntry.OperationType.delete: + sql_query = self.syntax_bulk_delete \ + .replace("%tab%", "\t") \ + .replace("%tablename%", table_name) \ + .replace("%fieldslist%", ", ".join(db_shemas[table_name].fields)) \ + .replace("%csvname%", csv_file_name) \ + .replace("%uniquekey%", db_shemas[table_name].unique_field) + + assert sql_query, "Invalid operation type: {}".format(operation_type) + + self.db.execute(sql_query) + logging.info("Processed {} queries FROM {}".format(processed_count-1, csv_file_name)) def pre_create(self): f = open("aore/templates/postgre/pre_create.sql") - create_db_syntax = f.read() + sql_query = f.read() f.close() - try: - cur = self.db.get_cursor() - cur.execute(create_db_syntax) - self.db.transaction_commit() - except: - self.db.transaction_rollback() - raise "Error downloading. Reason : {}".format(format_exc()) + self.db.execute(sql_query) def pre_update(self): # TODO: update actions diff --git a/aore/dbutils/dbimpl.py b/aore/dbutils/dbimpl.py index 704114d..7c26126 100644 --- a/aore/dbutils/dbimpl.py +++ b/aore/dbutils/dbimpl.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- +from traceback import format_exc + class DBImpl: def __init__(self, engine, params): @@ -19,6 +21,15 @@ class DBImpl: def get_cursor(self): return self.connection.cursor() + def execute(self, sql_query): + try: + cur = self.get_cursor() + cur.execute(sql_query) + self.transaction_commit() + except: + self.transaction_rollback() + raise BaseException("Error execute sql query. Reason : {}".format(format_exc())) + def get_rows(self, query_string, for_dict=True): if for_dict: cur = self.connection.cursor(self.db_engine.cursors.DictCursor) diff --git a/aore/dbutils/dbschemas.py b/aore/dbutils/dbschemas.py index 7559dab..13f1f52 100644 --- a/aore/dbutils/dbschemas.py +++ b/aore/dbutils/dbschemas.py @@ -2,17 +2,20 @@ class DbSchema: - def __init__(self, name, fieldlist, xmltag): + def __init__(self, name, fieldlist, unique_key, xmltag): self.tablename = name self.fields = fieldlist + self.unique_field = unique_key self.xml_tag = xmltag db_shemas = dict() db_shemas['ADDROBJ'] = DbSchema("ADDROBJ", ["AOID", "AOGUID", "SHORTNAME", "FORMALNAME", "AOLEVEL", "PARENTGUID", "ACTSTATUS", - "CURRSTATUS"], + "LIVESTATUS", "NEXTID"], + "aoid", "Object") -db_shemas['SOCRBASE'] = DbSchema("SOCRBASE", ["LEVEL", "SOCRNAME", "SCNAME", "KOD_T_ST"], "AddressObjectType") +db_shemas['SOCRBASE'] = DbSchema("SOCRBASE", ["LEVEL", "SOCRNAME", "SCNAME", "KOD_T_ST"], "kod_t_st", + "AddressObjectType") allowed_tables = ["ADDROBJ", "SOCRBASE"] diff --git a/aore/templates/postgre/bulk_create.sql b/aore/templates/postgre/bulk_create.sql new file mode 100644 index 0000000..a04ecae --- /dev/null +++ b/aore/templates/postgre/bulk_create.sql @@ -0,0 +1 @@ +COPY "%tablename%" (%fieldslist%) FROM '%csvname%' DELIMITER '%tab%' NULL 'NULL' \ No newline at end of file diff --git a/aore/templates/postgre/bulk_delete.sql b/aore/templates/postgre/bulk_delete.sql new file mode 100644 index 0000000..b0e90a5 --- /dev/null +++ b/aore/templates/postgre/bulk_delete.sql @@ -0,0 +1,5 @@ +DROP TABLE IF EXISTS "%tablename%_TEMP"; +CREATE TEMP TABLE "%tablename%_TEMP" ON COMMIT DROP AS SELECT * + FROM "%tablename%" WITH NO DATA; +COPY "%tablename%_TEMP" (%fieldslist%) FROM '%csvname%' DELIMITER '%tab%' NULL 'NULL'; +DELETE FROM "%tablename%" WHERE %uniquekey% IN (SELECT %uniquekey% FROM "%tablename%_TEMP"); \ No newline at end of file diff --git a/aore/templates/postgre/bulk_update.sql b/aore/templates/postgre/bulk_update.sql new file mode 100644 index 0000000..36aa3b8 --- /dev/null +++ b/aore/templates/postgre/bulk_update.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS "%tablename%_TEMP"; +CREATE TEMP TABLE "%tablename%_TEMP" ON COMMIT DROP AS SELECT * + FROM "%tablename%" WITH NO DATA; +COPY "%tablename%_TEMP" (%fieldslist%) FROM '%csvname%' DELIMITER '%tab%' NULL 'NULL'; +INSERT INTO "%tablename%" (%fieldslist%) SELECT %fieldslist% +FROM +"%tablename%_TEMP" ON CONFLICT (%uniquekey%) DO UPDATE SET %updaterule%; \ No newline at end of file diff --git a/aore/templates/postgre/pre_create.sql b/aore/templates/postgre/pre_create.sql index 51ba195..942e5c4 100644 --- a/aore/templates/postgre/pre_create.sql +++ b/aore/templates/postgre/pre_create.sql @@ -7,10 +7,11 @@ CREATE TABLE "public"."ADDROBJ" ( "formalname" VARCHAR(120) COLLATE "default", "aolevel" INT2, "parentguid" UUID, - "actstatus" BIT(1), - "currstatus" INT2, + "actstatus" BOOL, + "livestatus" BOOL, + "nextid" UUID, CONSTRAINT "aoid" UNIQUE ("aoid"), - CONSTRAINT "id" PRIMARY KEY ("id") + CONSTRAINT "id_addrobj" PRIMARY KEY ("id") ) WITH (OIDS =FALSE ); @@ -21,7 +22,8 @@ CREATE TABLE "public"."SOCRBASE" ( "scname" VARCHAR(10), "socrname" VARCHAR(50), "kod_t_st" VARCHAR(4), - PRIMARY KEY ("id") + CONSTRAINT "kod_t_st" UNIQUE ("kod_t_st"), + CONSTRAINT "id_socrbase" PRIMARY KEY ("id") ) WITH (OIDS =FALSE ); \ No newline at end of file diff --git a/passenger_wsgi.py b/passenger_wsgi.py index 1b33a5f..dbfefb6 100644 --- a/passenger_wsgi.py +++ b/passenger_wsgi.py @@ -8,8 +8,8 @@ reload(sys) cwd = os.path.dirname(os.path.abspath(__file__)) sys.path.append(cwd) sys.setdefaultencoding("utf-8") -sys.path.append('/home/i/interc7j/.local/lib/python2.7/site-packages') +# sys.path.append('/home/i/interc7j/.local/lib/python2.7/site-packages') -from fias import fias +from aore import aore -application = fias.app +application = aore.app