commit 7f657057422a5a4e205ea73b858acbc3e9953606 Author: Jack Stdin Date: Tue Jan 12 19:07:57 2016 +0300 Initial commit diff --git a/aore/__init__.py b/aore/__init__.py new file mode 100644 index 0000000..2a488d7 --- /dev/null +++ b/aore/__init__.py @@ -0,0 +1,7 @@ +import os +import sys + +reload(sys) +cwd = os.getcwd() +sys.path.append(cwd) +sys.setdefaultencoding("utf-8") diff --git a/aore/aoutils/__init__.py b/aore/aoutils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aore/aoutils/aodataparser.py b/aore/aoutils/aodataparser.py new file mode 100644 index 0000000..e89d2ac --- /dev/null +++ b/aore/aoutils/aodataparser.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +from aore.aoutils.aoxmltableentry import AoXmlTableEntry +from xmlparser import XMLParser +from aore.config import trashfolder +from aore.dbutils.dbschemas import db_shemas +import os + + +class AoDataParser: + def __init__(self, datasource, pagesize): + self.datasource = datasource + if self.datasource.table_name not in db_shemas: + raise BaseException("Cannot parse {}: Not configured.".format(self.datasource.table_name)) + else: + self.allowed_fields = db_shemas[self.datasource.table_name].fields + + self.pagesize = pagesize + self.currentpage = 0 + self.counter = 0 + + self.base_filename = "" + self.csv_file = None + self.data_bereit_callback = None + + def import_update(self, attr): + if self.counter > self.pagesize: + # Send old file to DB engine + if self.csv_file: + self.csv_file.close() + self.data_bereit_callback(os.path.abspath(self.csv_file.name)) + os.remove(self.csv_file.name) + + # Prepare to next iteration + self.counter = 0 + self.currentpage += 1 + self.csv_file = open(self.base_filename.format(self.currentpage), "w") + + exit_nodes = list() + for allowed_field in self.allowed_fields: + if allowed_field in attr: + exit_nodes.append(attr[allowed_field]) + else: + exit_nodes.append("NULL") + + exit_string = "\t".join(exit_nodes) + self.csv_file.write(exit_string + "\n") + self.counter += 1 + + # Output - sql query + def parse(self, data_callback): + if self.datasource.operation_type == AoXmlTableEntry.OperationType.update: + self.data_bereit_callback = data_callback + self.currentpage = 0 + self.base_filename = trashfolder + "fd_" + str(self.datasource.operation_type) + "_" + \ + self.datasource.table_name + ".csv.part{}" + self.counter = self.pagesize + 1 + + xml_parser = XMLParser(self.import_update) + src = self.datasource.open() + xml_parser.parse_buffer(src, db_shemas[self.datasource.table_name].xml_tag) + + # Send last file to db processor + if self.csv_file: + self.csv_file.close() + self.data_bereit_callback(os.path.abspath(self.csv_file.name)) + os.remove(self.csv_file.name) + src.close() diff --git a/aore/aoutils/aorar.py b/aore/aoutils/aorar.py new file mode 100644 index 0000000..62edf9f --- /dev/null +++ b/aore/aoutils/aorar.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- + +import requests +import os.path +import rarfile +from aore.config import unrar, trashfolder +from traceback import format_exc +from aoxmltableentry import AoXmlTableEntry + + +class AoRar: + def __init__(self): + rarfile.UNRAR_TOOL = unrar + + def download(self, url): + print("Downloading {}".format(url)) + try: + local_filename = os.path.abspath(trashfolder + url.split('/')[-1]) + if os.path.isfile(local_filename): + return local_filename + os.remove(local_filename) + + request = requests.get(url, stream=True) + with open(local_filename, 'wb') as f: + for chunk in request.iter_content(chunk_size=1024): + if chunk: + f.write(chunk) + except: + print("Error downloading. Reason : {}".format(format_exc())) + return None + + print("Downloaded {} bytes".format(request.headers['Content-length'])) + return local_filename + + def get_table_entries(self, file_name, allowed_tables): + if file_name and os.path.isfile(file_name): + rf = rarfile.RarFile(file_name) + + for arch_entry in rf.infolist(): + xmltable = AoXmlTableEntry.from_rar(arch_entry.filename, rf, arch_entry) + if xmltable.table_name in allowed_tables: + yield xmltable + else: + print "Done" + # os.remove(file_name) TODO : Uncomment + else: + print("No file specified or not exists") diff --git a/aore/aoutils/aoupdater.py b/aore/aoutils/aoupdater.py new file mode 100644 index 0000000..8d29555 --- /dev/null +++ b/aore/aoutils/aoupdater.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- + +from aore.aoutils.aodataparser import AoDataParser +from aore.config import db as dbparams +from aore.aoutils.aorar import AoRar +from aore.aoutils.aoxmltableentry import AoXmlTableEntry +from aore.dbutils.dbschemas import db_shemas, allowed_tables +from aore.aoutils.importer import Importer +from os import walk +from traceback import format_exc +import psycopg2 +import logging +from aore.dbutils.dbimpl import DBImpl + + +class AoUpdater: + def __init__(self, dirpath=None): + logging.basicConfig(format='%(asctime)s %(message)s') + self.dirpath = None + self.updatelist = None + self.db = DBImpl(psycopg2, dbparams) + + if dir: + self.dirpath = dirpath + else: + imp = Importer() + self.updatelist = imp.download_updatelist + + def get_table_entries(self, allowed_tables): + for (dirpath, dirnames, filenames) in walk(self.dirpath): + for filename in filenames: + if filename.endswith(".XML"): + xmltable = AoXmlTableEntry.from_dir(filename, dirpath.replace("\\", "/") + "/") + if xmltable.table_name in allowed_tables: + yield xmltable + break + + def on_receive_sql_file(self, chunck_size, table_name, csv_file_name): + sql_query = "COPY \"{}\" ({}) FROM '{}' DELIMITER '\t' NULL 'NULL'".format(table_name, + ", ".join(db_shemas[table_name].fields), + csv_file_name) + print sql_query + try: + cur = self.db.get_cursor() + cur.execute(sql_query) + self.db.transaction_commit() + except: + self.db.transaction_rollback() + logging.error("Error updating sql. Reason : {}".format(format_exc())) + + logging.warning("Inserted {} queries FROM {}".format(chunck_size, csv_file_name)) + + def update_one_delta(self, table_xmlentry, chunck_size=50000): + aoparser = AoDataParser(table_xmlentry, chunck_size) + aoparser.parse(lambda x: self.on_receive_sql_file(chunck_size, table_xmlentry.table_name, x)) + + def __pre_create_db(self): + f = open("aore/templates/postgre/pre_create.sql") + create_db_syntax = f.read() + f.close() + + try: + cur = self.db.get_cursor() + cur.execute(create_db_syntax) + self.db.transaction_commit() + except: + self.db.transaction_rollback() + raise "Error downloading. Reason : {}".format(format_exc()) + + def create(self): + if not self.dirpath: + logging.warning("Cannot update - Updater works in update mode") + return + self.__pre_create_db() + + for table_entry in self.get_table_entries(allowed_tables): + self.update_one_delta(table_entry) + + def update(self, count=1): + if not self.updatelist: + logging.warning("Cannot update - Updater works in dir mode") + return + + counter = 0 + for fias_update in self.updatelist: + counter += 1 + if counter > count: + return + + aorar = AoRar() + fname = aorar.download(fias_update['url']) + for table_entry in aorar.get_table_entries(fname, allowed_tables): + self.update_one_delta(table_entry) diff --git a/aore/aoutils/aoxmltableentry.py b/aore/aoutils/aoxmltableentry.py new file mode 100644 index 0000000..60cf268 --- /dev/null +++ b/aore/aoutils/aoxmltableentry.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- + +import re +from enum import Enum + + +class AoXmlTableEntry: + class OperationType(Enum): + update = 1 + delete = 0 + + def __str__(self): + return self._name_ + + @classmethod + def from_rar(cls, file_name, rar_factory=None, rar_info=None): + # for rar + return AoXmlTableEntry(file_name, lambda: rar_factory.open(rar_info)) + + @classmethod + def from_dir(cls, file_name, path): + # for extracted into folder + return AoXmlTableEntry(file_name, lambda: open(path + file_name)) + + def __init__(self, file_name, lamda_open): + matchings = re.search('^(AS_)(DEL_)*([A-Z]+)', file_name) + + self.table_name = matchings.group(3) + self.operation_type = AoXmlTableEntry.OperationType(matchings.group(2) is None) + + self.lamda_open = lamda_open + self.file_descriptor = None + + def open(self): + if not self.file_descriptor: + self.file_descriptor = self.lamda_open() + + return self.file_descriptor + + def close(self): + self.file_descriptor.close() + + def __unicode__(self): + return "Entry for {} table {}".format(self.operation_type, self.table_name) + + def __str__(self): + return "Entry for {} table {}".format(self.operation_type, self.table_name) diff --git a/aore/aoutils/importer.py b/aore/aoutils/importer.py new file mode 100644 index 0000000..aaa175f --- /dev/null +++ b/aore/aoutils/importer.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- + +from pysimplesoap.client import SoapClient + + +class Importer: + def __init__(self): + pass + + def get_current_fias_version(self): + return 224 # TODO FIXIT + + # return (int_version, text_version, url) + @property + def download_updatelist(self): + client = SoapClient( + location="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx", + action='http://fias.nalog.ru/WebServices/Public/DownloadService.asmx/', + namespace="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx", + soap_ns='soap', trace=False, ns=False) + + response = client.GetAllDownloadFileInfo() + + if not response: + raise "Response is null" + + current_fias_version = self.get_current_fias_version() + + for DownloadFileInfo in response.GetAllDownloadFileInfoResponse.GetAllDownloadFileInfoResult.DownloadFileInfo: + if int(DownloadFileInfo.VersionId) > current_fias_version: + yield dict(intver=int(DownloadFileInfo.VersionId), strver=str(DownloadFileInfo.TextVersion), url=str(DownloadFileInfo.FiasDeltaXmlUrl)) \ No newline at end of file diff --git a/aore/aoutils/xmlparser.py b/aore/aoutils/xmlparser.py new file mode 100644 index 0000000..c553c12 --- /dev/null +++ b/aore/aoutils/xmlparser.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- + +from lxml import etree + + +class XMLParser: + def __init__(self, parse_function): + self.parse_function = parse_function + + def fast_iter(self, context, func, *args, **kwargs): + for event, elem in context: + # print event + func(elem, *args, **kwargs) + # It's safe to call clear() here because no descendants will be accessed + elem.clear() + # Also eliminate now-empty references from the root node to elem + for ancestor in elem.xpath('ancestor-or-self::*'): + while ancestor.getprevious() is not None: + del ancestor.getparent()[0] + del context + + def parse_buffer(self, data_buffer, tag_name): + context = etree.iterparse(data_buffer, events=('end',), tag=tag_name) + self.fast_iter(context, lambda x: self.parse_function(x.attrib)) \ No newline at end of file diff --git a/aore/config.py b/aore/config.py new file mode 100644 index 0000000..4fda068 --- /dev/null +++ b/aore/config.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- + +from platform import system + +config_type = "production" +if "Windows" in system(): + config_type = "test" + +DB_INSTANCES = dict( + test=dict( + host="localhost", + user="postgres", + password="intercon", + database="postgres", + ), + production=dict( + host="localhost", + user="postgres", + password="intercon", + database="postgres", + ) +) + +UNRAR_PATHES = dict( + test="C:\Program Files (x86)\WinRAR\unrar.exe", + production="unrar" +) + +# Main section +db = DB_INSTANCES[config_type] +unrar = UNRAR_PATHES[config_type] +trashfolder = "files/" diff --git a/aore/dbutils/__init__.py b/aore/dbutils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aore/dbutils/dbimpl.py b/aore/dbutils/dbimpl.py new file mode 100644 index 0000000..6346c7e --- /dev/null +++ b/aore/dbutils/dbimpl.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- + + +class DBImpl: + def __init__(self, engine, params): + self.db_engine = engine + self.connection = engine.connect(**params) + + def transaction_commit(self): + self.connection.commit() + + def transaction_rollback(self): + self.connection.rollback() + + def close(self): + if self.connection: + self.connection.close() + + def get_cursor(self): + return self.connection.cursor() + + def get_rows(self, query_string, for_dict=True): + if for_dict: + cur = self.connection.cursor(self.db_engine.cursors.DictCursor) + else: + cur = self.connection.cursor() + cur.execute(query_string) + + rows = cur.fetchall() + if cur: + cur.close() + + return rows \ No newline at end of file diff --git a/aore/dbutils/dbschemas.py b/aore/dbutils/dbschemas.py new file mode 100644 index 0000000..7559dab --- /dev/null +++ b/aore/dbutils/dbschemas.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- + + +class DbSchema: + def __init__(self, name, fieldlist, xmltag): + self.tablename = name + self.fields = fieldlist + self.xml_tag = xmltag + + +db_shemas = dict() +db_shemas['ADDROBJ'] = DbSchema("ADDROBJ", + ["AOID", "AOGUID", "SHORTNAME", "FORMALNAME", "AOLEVEL", "PARENTGUID", "ACTSTATUS", + "CURRSTATUS"], + "Object") +db_shemas['SOCRBASE'] = DbSchema("SOCRBASE", ["LEVEL", "SOCRNAME", "SCNAME", "KOD_T_ST"], "AddressObjectType") + +allowed_tables = ["ADDROBJ", "SOCRBASE"] diff --git a/aore/templates/postgre/pre_create.sql b/aore/templates/postgre/pre_create.sql new file mode 100644 index 0000000..5b72afd --- /dev/null +++ b/aore/templates/postgre/pre_create.sql @@ -0,0 +1,26 @@ +DROP TABLE IF EXISTS "public"."ADDROBJ"; +CREATE TABLE "public"."ADDROBJ" ( + "id" SERIAL4 NOT NULL, + "aoid" UUID NOT NULL, + "aoguid" UUID, + "shortname" VARCHAR(10) COLLATE "default", + "formalname" VARCHAR(120) COLLATE "default", + "aolevel" INT2, + "parentguid" UUID, + "actstatus" BIT(1), + "currstatus" INT2, + PRIMARY KEY ("id", "aoid") +) +WITH (OIDS =FALSE +); +DROP TABLE IF EXISTS "public"."SOCRBASE"; +CREATE TABLE "public"."SOCRBASE" ( + "id" SERIAL4 NOT NULL, + "level" INT2, + "scname" VARCHAR(10), + "socrname" VARCHAR(50), + "kod_t_st" VARCHAR(4), + PRIMARY KEY ("id") +) +WITH (OIDS =FALSE +); \ No newline at end of file diff --git a/manage.py b/manage.py new file mode 100644 index 0000000..be40d74 --- /dev/null +++ b/manage.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- + +from aore.aoutils.aoupdater import AoUpdater +import optparse + + +def update_base(updates_count): + aoupdater = AoUpdater() + aoupdater.update_db(updates_count) + + +def create_base(path_to_xmls): + aoupdater = AoUpdater(path_to_xmls) + aoupdater.create() + + +def main(): + # Parse options + p = optparse.OptionParser() + p.add_option('--create', '-c', help="Create DB from official full XMLs; " + "CREATE = path to xml source dir") + p.add_option('--update', '-u', help="Update DB from official delta archive; " + "UPDATE = count of updates") + options, arguments = p.parse_args() + + # create new database + if options.create: + create_base(options.create) + # update database + if options.update: + update_base(int(options.update)) + + +if __name__ == '__main__': + main() diff --git a/passenger_wsgi.py b/passenger_wsgi.py new file mode 100644 index 0000000..aeb4cbd --- /dev/null +++ b/passenger_wsgi.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- + +import os +import sys + +# append current dir to module path +reload(sys) +cwd = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(cwd) +sys.setdefaultencoding("utf-8") +sys.path.append('/home/i/interc7j/.local/lib/python2.7/site-packages') + +from fias import fias +application = fias.app