Немножко привел к общему виду

This commit is contained in:
jar3b 2016-01-12 23:15:56 +03:00 committed by Jack Stdin
parent 1875b50a4c
commit 9803b1d25a
3 changed files with 111 additions and 73 deletions

View File

@ -1,33 +1,25 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from aore.aoutils.aodataparser import AoDataParser from aore.aoutils.aodataparser import AoDataParser
from aore.config import db as dbparams
from aore.aoutils.aorar import AoRar from aore.aoutils.aorar import AoRar
from aore.aoutils.aoxmltableentry import AoXmlTableEntry from aore.aoutils.aoxmltableentry import AoXmlTableEntry
from aore.dbutils.dbschemas import db_shemas, allowed_tables from aore.dbutils.dbhandler import DbHandler
from aore.dbutils.dbschemas import allowed_tables
from aore.aoutils.importer import Importer from aore.aoutils.importer import Importer
from os import walk from os import walk, path
from traceback import format_exc
import psycopg2
import logging import logging
from aore.dbutils.dbimpl import DBImpl
class AoUpdater: class AoUpdater:
def __init__(self, dirpath=None): # Source: "http", directory (as a full path to unpacked xmls)
def __init__(self, source="http"):
logging.basicConfig(format='%(asctime)s %(message)s') logging.basicConfig(format='%(asctime)s %(message)s')
self.dirpath = None self.db_handler = DbHandler()
self.updatelist = None self.mode = source
self.db = DBImpl(psycopg2, dbparams) self.updalist_generator = None
self.allowed_tables = None
if dir: def __get_entries_from_folder(self, path_to_xmls):
self.dirpath = dirpath for (dirpath, dirnames, filenames) in walk(path_to_xmls):
else:
imp = Importer()
self.updatelist = imp.download_updatelist
def get_table_entries(self, allowed_tables):
for (dirpath, dirnames, filenames) in walk(self.dirpath):
for filename in filenames: for filename in filenames:
if filename.endswith(".XML"): if filename.endswith(".XML"):
xmltable = AoXmlTableEntry.from_dir(filename, dirpath.replace("\\", "/") + "/") xmltable = AoXmlTableEntry.from_dir(filename, dirpath.replace("\\", "/") + "/")
@ -35,59 +27,50 @@ class AoUpdater:
yield xmltable yield xmltable
break break
def on_receive_sql_file(self, chunck_size, table_name, csv_file_name): def __get_updates_from_folder(self, foldername):
sql_query = "COPY \"{}\" ({}) FROM '{}' DELIMITER '\t' NULL 'NULL'".format(table_name, # TODO: Вычислять версию, если берем данные из каталога
", ".join(db_shemas[table_name].fields), yield dict(intver=0, textver="Unknown", url=foldername)
csv_file_name)
print sql_query
try:
cur = self.db.get_cursor()
cur.execute(sql_query)
self.db.transaction_commit()
except:
self.db.transaction_rollback()
logging.error("Error updating sql. Reason : {}".format(format_exc()))
logging.warning("Inserted {} queries FROM {}".format(chunck_size, csv_file_name)) def __init_update_entries(self, full_base):
if self.mode == "http":
imp = Importer()
self.updalist_generator = None
if full_base:
self.updalist_generator = imp.get_full()
else:
self.updalist_generator = imp.get_updates()
else:
assert path.isdir(self.mode), "Invalid directory {}".format(self.mode)
self.updalist_generator = self.__get_updates_from_folder(self.mode)
def update_one_delta(self, table_xmlentry, chunck_size=50000): def process_single_entry(self, table_xmlentry, chunck_size=50000):
aoparser = AoDataParser(table_xmlentry, chunck_size) aoparser = AoDataParser(table_xmlentry, chunck_size)
aoparser.parse(lambda x: self.on_receive_sql_file(chunck_size, table_xmlentry.table_name, x)) aoparser.parse(lambda x: self.db_handler.bulk_csv(chunck_size, table_xmlentry.table_name, x))
def __pre_create_db(self):
f = open("aore/templates/postgre/pre_create.sql")
create_db_syntax = f.read()
f.close()
try:
cur = self.db.get_cursor()
cur.execute(create_db_syntax)
self.db.transaction_commit()
except:
self.db.transaction_rollback()
raise "Error downloading. Reason : {}".format(format_exc())
def create(self): def create(self):
if not self.dirpath: self.__init_update_entries(True)
logging.warning("Cannot update - Updater works in update mode") self.db_handler.pre_create()
return
self.__pre_create_db()
for table_entry in self.get_table_entries(allowed_tables): for update_entry in self.updalist_generator:
self.update_one_delta(table_entry) for table_entry in self.__get_entries_from_folder(update_entry['url']):
self.process_single_entry(table_entry)
logging.warning("Create success")
def update(self, count=1): def update(self, count=1):
if not self.updatelist: self.__init_update_entries(False)
logging.warning("Cannot update - Updater works in dir mode") self.db_handler.pre_update()
return
counter = 0 counter = 0
for fias_update in self.updatelist: for update_entry in self.updalist_generator:
counter += 1 counter += 1
if counter > count: if counter > count:
return logging.warning("Maximum count of updates are processed - exit")
break
aorar = AoRar() aorar = AoRar()
fname = aorar.download(fias_update['url']) fname = aorar.download(update_entry['url'])
for table_entry in aorar.get_table_entries(fname, allowed_tables): for table_entry in aorar.get_table_entries(fname, allowed_tables):
self.update_one_delta(table_entry) self.process_single_entry(table_entry)
logging.warning("Update success")

View File

@ -5,27 +5,35 @@ from pysimplesoap.client import SoapClient
class Importer: class Importer:
def __init__(self): def __init__(self):
pass self.client = SoapClient(
def get_current_fias_version(self):
return 224 # TODO FIXIT
# return (int_version, text_version, url)
@property
def download_updatelist(self):
client = SoapClient(
location="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx", location="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx",
action='http://fias.nalog.ru/WebServices/Public/DownloadService.asmx/', action='http://fias.nalog.ru/WebServices/Public/DownloadService.asmx/',
namespace="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx", namespace="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx",
soap_ns='soap', trace=False, ns=False) soap_ns='soap', trace=False, ns=False)
response = client.GetAllDownloadFileInfo() def get_current_fias_version(self):
return 224 # TODO FIXIT
if not response: def get_full(self):
raise "Response is null" response = self.client.GetLastDownloadFileInfo()
assert response, "Response is null"
downloadfileinfo = response.GetLastDownloadFileInfoResponse.GetLastDownloadFileInfoResult
assert downloadfileinfo.VersionId < self.get_current_fias_version(), "DB is already up-to-date"
yield dict(intver=int(downloadfileinfo.VersionId), strver=str(downloadfileinfo.TextVersion),
url=str(downloadfileinfo.FiasCompleteXmlUrl))
# return (intver, strver, url)
def get_updates(self):
response = self.client.GetAllDownloadFileInfo()
assert response, "Response is null"
current_fias_version = self.get_current_fias_version() current_fias_version = self.get_current_fias_version()
for DownloadFileInfo in response.GetAllDownloadFileInfoResponse.GetAllDownloadFileInfoResult.DownloadFileInfo: for DownloadFileInfo in response.GetAllDownloadFileInfoResponse.GetAllDownloadFileInfoResult.DownloadFileInfo:
if int(DownloadFileInfo.VersionId) > current_fias_version: if int(DownloadFileInfo.VersionId) > current_fias_version:
yield dict(intver=int(DownloadFileInfo.VersionId), strver=str(DownloadFileInfo.TextVersion), url=str(DownloadFileInfo.FiasDeltaXmlUrl)) yield dict(intver=int(DownloadFileInfo.VersionId), strver=str(DownloadFileInfo.TextVersion),
url=str(DownloadFileInfo.FiasDeltaXmlUrl))

47
aore/dbutils/dbhandler.py Normal file
View File

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
from aore.dbutils.dbimpl import DBImpl
from aore.config import db as dbparams
from aore.dbutils.dbschemas import db_shemas, allowed_tables
from traceback import format_exc
import psycopg2
import logging
class DbHandler:
def __init__(self):
logging.basicConfig(format='%(asctime)s %(message)s')
self.db = DBImpl(psycopg2, dbparams)
def bulk_csv(self, chunck_size, table_name, csv_file_name):
sql_query = "COPY \"{}\" ({}) FROM '{}' DELIMITER '\t' NULL 'NULL'". \
format(table_name,
", ".join(
db_shemas[table_name].fields),
csv_file_name)
try:
cur = self.db.get_cursor()
cur.execute(sql_query)
self.db.transaction_commit()
except:
self.db.transaction_rollback()
logging.error("Error updating sql. Reason : {}".format(format_exc()))
logging.warning("Inserted {} queries FROM {}".format(chunck_size, csv_file_name))
def pre_create(self):
f = open("aore/templates/postgre/pre_create.sql")
create_db_syntax = f.read()
f.close()
try:
cur = self.db.get_cursor()
cur.execute(create_db_syntax)
self.db.transaction_commit()
except:
self.db.transaction_rollback()
raise "Error downloading. Reason : {}".format(format_exc())
def pre_update(self):
# TODO: update actions
pass