diff --git a/aore/aoutils/aoupdater.py b/aore/aoutils/aoupdater.py index 8d29555..7b20626 100644 --- a/aore/aoutils/aoupdater.py +++ b/aore/aoutils/aoupdater.py @@ -1,33 +1,25 @@ # -*- coding: utf-8 -*- from aore.aoutils.aodataparser import AoDataParser -from aore.config import db as dbparams from aore.aoutils.aorar import AoRar from aore.aoutils.aoxmltableentry import AoXmlTableEntry -from aore.dbutils.dbschemas import db_shemas, allowed_tables +from aore.dbutils.dbhandler import DbHandler +from aore.dbutils.dbschemas import allowed_tables from aore.aoutils.importer import Importer -from os import walk -from traceback import format_exc -import psycopg2 +from os import walk, path import logging -from aore.dbutils.dbimpl import DBImpl - class AoUpdater: - def __init__(self, dirpath=None): + # Source: "http", directory (as a full path to unpacked xmls) + def __init__(self, source="http"): logging.basicConfig(format='%(asctime)s %(message)s') - self.dirpath = None - self.updatelist = None - self.db = DBImpl(psycopg2, dbparams) + self.db_handler = DbHandler() + self.mode = source + self.updalist_generator = None + self.allowed_tables = None - if dir: - self.dirpath = dirpath - else: - imp = Importer() - self.updatelist = imp.download_updatelist - - def get_table_entries(self, allowed_tables): - for (dirpath, dirnames, filenames) in walk(self.dirpath): + def __get_entries_from_folder(self, path_to_xmls): + for (dirpath, dirnames, filenames) in walk(path_to_xmls): for filename in filenames: if filename.endswith(".XML"): xmltable = AoXmlTableEntry.from_dir(filename, dirpath.replace("\\", "/") + "/") @@ -35,59 +27,50 @@ class AoUpdater: yield xmltable break - def on_receive_sql_file(self, chunck_size, table_name, csv_file_name): - sql_query = "COPY \"{}\" ({}) FROM '{}' DELIMITER '\t' NULL 'NULL'".format(table_name, - ", ".join(db_shemas[table_name].fields), - csv_file_name) - print sql_query - try: - cur = self.db.get_cursor() - cur.execute(sql_query) - self.db.transaction_commit() - except: - self.db.transaction_rollback() - logging.error("Error updating sql. Reason : {}".format(format_exc())) + def __get_updates_from_folder(self, foldername): + # TODO: Вычислять версию, если берем данные из каталога + yield dict(intver=0, textver="Unknown", url=foldername) - logging.warning("Inserted {} queries FROM {}".format(chunck_size, csv_file_name)) + def __init_update_entries(self, full_base): + if self.mode == "http": + imp = Importer() + self.updalist_generator = None + if full_base: + self.updalist_generator = imp.get_full() + else: + self.updalist_generator = imp.get_updates() + else: + assert path.isdir(self.mode), "Invalid directory {}".format(self.mode) + self.updalist_generator = self.__get_updates_from_folder(self.mode) - def update_one_delta(self, table_xmlentry, chunck_size=50000): + def process_single_entry(self, table_xmlentry, chunck_size=50000): aoparser = AoDataParser(table_xmlentry, chunck_size) - aoparser.parse(lambda x: self.on_receive_sql_file(chunck_size, table_xmlentry.table_name, x)) - - def __pre_create_db(self): - f = open("aore/templates/postgre/pre_create.sql") - create_db_syntax = f.read() - f.close() - - try: - cur = self.db.get_cursor() - cur.execute(create_db_syntax) - self.db.transaction_commit() - except: - self.db.transaction_rollback() - raise "Error downloading. Reason : {}".format(format_exc()) + aoparser.parse(lambda x: self.db_handler.bulk_csv(chunck_size, table_xmlentry.table_name, x)) def create(self): - if not self.dirpath: - logging.warning("Cannot update - Updater works in update mode") - return - self.__pre_create_db() + self.__init_update_entries(True) + self.db_handler.pre_create() - for table_entry in self.get_table_entries(allowed_tables): - self.update_one_delta(table_entry) + for update_entry in self.updalist_generator: + for table_entry in self.__get_entries_from_folder(update_entry['url']): + self.process_single_entry(table_entry) + + logging.warning("Create success") def update(self, count=1): - if not self.updatelist: - logging.warning("Cannot update - Updater works in dir mode") - return + self.__init_update_entries(False) + self.db_handler.pre_update() counter = 0 - for fias_update in self.updatelist: + for update_entry in self.updalist_generator: counter += 1 if counter > count: - return + logging.warning("Maximum count of updates are processed - exit") + break aorar = AoRar() - fname = aorar.download(fias_update['url']) + fname = aorar.download(update_entry['url']) for table_entry in aorar.get_table_entries(fname, allowed_tables): - self.update_one_delta(table_entry) + self.process_single_entry(table_entry) + + logging.warning("Update success") diff --git a/aore/aoutils/importer.py b/aore/aoutils/importer.py index aaa175f..936689c 100644 --- a/aore/aoutils/importer.py +++ b/aore/aoutils/importer.py @@ -5,27 +5,35 @@ from pysimplesoap.client import SoapClient class Importer: def __init__(self): - pass - - def get_current_fias_version(self): - return 224 # TODO FIXIT - - # return (int_version, text_version, url) - @property - def download_updatelist(self): - client = SoapClient( + self.client = SoapClient( location="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx", action='http://fias.nalog.ru/WebServices/Public/DownloadService.asmx/', namespace="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx", soap_ns='soap', trace=False, ns=False) - response = client.GetAllDownloadFileInfo() + def get_current_fias_version(self): + return 224 # TODO FIXIT - if not response: - raise "Response is null" + def get_full(self): + response = self.client.GetLastDownloadFileInfo() + + assert response, "Response is null" + downloadfileinfo = response.GetLastDownloadFileInfoResponse.GetLastDownloadFileInfoResult + + assert downloadfileinfo.VersionId < self.get_current_fias_version(), "DB is already up-to-date" + + yield dict(intver=int(downloadfileinfo.VersionId), strver=str(downloadfileinfo.TextVersion), + url=str(downloadfileinfo.FiasCompleteXmlUrl)) + + # return (intver, strver, url) + def get_updates(self): + response = self.client.GetAllDownloadFileInfo() + + assert response, "Response is null" current_fias_version = self.get_current_fias_version() for DownloadFileInfo in response.GetAllDownloadFileInfoResponse.GetAllDownloadFileInfoResult.DownloadFileInfo: if int(DownloadFileInfo.VersionId) > current_fias_version: - yield dict(intver=int(DownloadFileInfo.VersionId), strver=str(DownloadFileInfo.TextVersion), url=str(DownloadFileInfo.FiasDeltaXmlUrl)) \ No newline at end of file + yield dict(intver=int(DownloadFileInfo.VersionId), strver=str(DownloadFileInfo.TextVersion), + url=str(DownloadFileInfo.FiasDeltaXmlUrl)) \ No newline at end of file diff --git a/aore/dbutils/dbhandler.py b/aore/dbutils/dbhandler.py new file mode 100644 index 0000000..9244363 --- /dev/null +++ b/aore/dbutils/dbhandler.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- + +from aore.dbutils.dbimpl import DBImpl +from aore.config import db as dbparams +from aore.dbutils.dbschemas import db_shemas, allowed_tables +from traceback import format_exc +import psycopg2 +import logging + + +class DbHandler: + def __init__(self): + logging.basicConfig(format='%(asctime)s %(message)s') + self.db = DBImpl(psycopg2, dbparams) + + def bulk_csv(self, chunck_size, table_name, csv_file_name): + sql_query = "COPY \"{}\" ({}) FROM '{}' DELIMITER '\t' NULL 'NULL'". \ + format(table_name, + ", ".join( + db_shemas[table_name].fields), + csv_file_name) + try: + cur = self.db.get_cursor() + cur.execute(sql_query) + self.db.transaction_commit() + except: + self.db.transaction_rollback() + logging.error("Error updating sql. Reason : {}".format(format_exc())) + + logging.warning("Inserted {} queries FROM {}".format(chunck_size, csv_file_name)) + + def pre_create(self): + f = open("aore/templates/postgre/pre_create.sql") + create_db_syntax = f.read() + f.close() + + try: + cur = self.db.get_cursor() + cur.execute(create_db_syntax) + self.db.transaction_commit() + except: + self.db.transaction_rollback() + raise "Error downloading. Reason : {}".format(format_exc()) + + def pre_update(self): + # TODO: update actions + pass \ No newline at end of file