Добавлена возможность обновления (создания) конкретной версии

This commit is contained in:
Jack Stdin
2016-01-17 21:08:01 +03:00
parent 4d565e5808
commit 0bd79b1311
14 changed files with 176 additions and 102 deletions

0
aore/updater/__init__.py Normal file
View File

View File

@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
import os
from aore.updater.aoxmltableentry import AoXmlTableEntry
from aore.config import trashfolder
from aore.dbutils.dbschemas import db_shemas
from xmlparser import XMLParser
class AoDataParser:
def __init__(self, datasource, pagesize):
self.datasource = datasource
if self.datasource.table_name not in db_shemas:
raise BaseException("Cannot parse {}: Not configured.".format(self.datasource.table_name))
else:
self.allowed_fields = db_shemas[self.datasource.table_name].fields
self.pagesize = pagesize
self.currentpage = 0
self.counter = 0
self.base_filename = ""
self.csv_file = None
self.data_bereit_callback = None
def import_update(self, attr):
if self.counter > self.pagesize:
# Send old file to DB engine
if self.csv_file:
self.csv_file.close()
self.data_bereit_callback(self.counter, os.path.abspath(self.csv_file.name))
os.remove(self.csv_file.name)
# Prepare to next iteration
self.counter = 0
self.currentpage += 1
self.csv_file = open(self.base_filename.format(self.currentpage), "w")
exit_nodes = list()
for allowed_field in self.allowed_fields:
if allowed_field in attr:
exit_nodes.append(attr[allowed_field])
else:
exit_nodes.append("NULL")
exit_string = "\t".join(exit_nodes)
self.csv_file.write(exit_string + "\n")
self.counter += 1
# Output - sql query
def parse(self, data_callback):
self.data_bereit_callback = data_callback
self.currentpage = 0
self.base_filename = \
trashfolder + "fd_" + \
str(self.datasource.operation_type) + "_" + \
self.datasource.table_name + ".csv.part{}"
self.counter = self.pagesize + 1
xml_parser = XMLParser(self.import_update)
src = self.datasource.open()
xml_parser.parse_buffer(src, db_shemas[self.datasource.table_name].xml_tag)
# Send last file to db processor
if self.csv_file:
self.csv_file.close()
self.data_bereit_callback(self.counter, os.path.abspath(self.csv_file.name))
os.remove(self.csv_file.name)
src.close()

49
aore/updater/aorar.py Normal file
View File

@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
import logging
import os.path
from traceback import format_exc
import rarfile
import requests
from aore.config import unrar, trashfolder
from aoxmltableentry import AoXmlTableEntry
class AoRar:
def __init__(self):
rarfile.UNRAR_TOOL = unrar
def download(self, url):
logging.info("Downloading {}".format(url))
try:
local_filename = os.path.abspath(trashfolder + url.split('/')[-1])
if os.path.isfile(local_filename):
return local_filename
os.remove(local_filename)
request = requests.get(url, stream=True)
with open(local_filename, 'wb') as f:
for chunk in request.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
except:
raise BaseException("Error downloading. Reason : {}".format(format_exc()))
logging.info("Downloaded {} bytes".format(request.headers['Content-length']))
return local_filename
def get_table_entries(self, file_name, allowed_tables):
if file_name and os.path.isfile(file_name):
rf = rarfile.RarFile(file_name)
for arch_entry in rf.infolist():
xmltable = AoXmlTableEntry.from_rar(arch_entry.filename, rf, arch_entry)
if xmltable.table_name in allowed_tables:
yield xmltable
else:
logging.info("All entries processed")
os.remove(file_name)
else:
logging.error("No file specified or not exists")

View File

@@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
import re
from enum import Enum
class AoXmlTableEntry:
class OperationType(Enum):
update = 1
delete = 0
create = 2
def __str__(self):
return self._name_
@classmethod
def from_rar(cls, file_name, rar_factory=None, rar_info=None):
# for rar
return AoXmlTableEntry(file_name, lambda: rar_factory.open(rar_info))
@classmethod
def from_dir(cls, file_name, path):
# for extracted into folder
return AoXmlTableEntry(file_name, lambda: open(path + file_name))
def __init__(self, file_name, lamda_open):
matchings = re.search('^(AS_)(DEL_)*([A-Z]+)', file_name)
self.table_name = matchings.group(3)
self.operation_type = AoXmlTableEntry.OperationType(matchings.group(2) is None)
self.lamda_open = lamda_open
self.file_descriptor = None
def open(self):
if not self.file_descriptor:
self.file_descriptor = self.lamda_open()
return self.file_descriptor
def close(self):
self.file_descriptor.close()
def __unicode__(self):
return "Entry for {} table {}".format(self.operation_type, self.table_name)
def __str__(self):
return "Entry for {} table {}".format(self.operation_type, self.table_name)

64
aore/updater/dbhandler.py Normal file
View File

@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
import logging
import psycopg2
from bottle import template
from aore.updater.aoxmltableentry import AoXmlTableEntry
from aore.config import db as dbparams
from aore.dbutils.dbimpl import DBImpl
from aore.dbutils.dbschemas import db_shemas
class DbHandler:
def __init__(self):
self.db = DBImpl(psycopg2, dbparams)
def bulk_csv(self, operation_type, table_name, processed_count, csv_file_name):
sql_query = None
# simple add new reocrds
if operation_type == AoXmlTableEntry.OperationType.create:
sql_query = template('aore/templates/postgre/bulk_create.sql', delim='\t', tablename=table_name,
fieldslist=", ".join(db_shemas[table_name].fields), csvname=csv_file_name)
# update table
if operation_type == AoXmlTableEntry.OperationType.update:
fields_update_list = ""
for field in db_shemas[table_name].fields:
if field != db_shemas[table_name].unique_field.upper():
fields_update_list += "{}=EXCLUDED.{}, ".format(field, field)
fields_update_list = fields_update_list[:-2]
sql_query = template('aore/templates/postgre/bulk_update.sql', delim='\t', tablename=table_name,
fieldslist=", ".join(db_shemas[table_name].fields), csvname=csv_file_name,
uniquekey=db_shemas[table_name].unique_field, updaterule=fields_update_list)
# delete records from table
if operation_type == AoXmlTableEntry.OperationType.delete:
sql_query = template('aore/templates/postgre/bulk_delete.sql', delim='\t', tablename=table_name,
fieldslist=", ".join(db_shemas[table_name].fields), csvname=csv_file_name,
uniquekey=db_shemas[table_name].unique_field)
assert sql_query, "Invalid operation type: {}".format(operation_type)
self.db.execute(sql_query)
logging.info("Processed {} queries FROM {}".format(processed_count - 1, csv_file_name))
def pre_create(self):
logging.info("Prepare to create DB structure...")
sql_query = template("aore/templates/postgre/pre_create.sql")
self.db.execute(sql_query)
def post_create(self):
logging.info("Indexing ADDROBJ...")
sql_query = template("aore/templates/postgre/post_create.sql")
self.db.execute(sql_query)
logging.info("Indexing done.")
def pre_update(self):
# TODO: update actions
pass

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
from pysimplesoap.client import SoapClient
class SoapReceiver:
def __init__(self):
self.client = SoapClient(
location="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx",
action='http://fias.nalog.ru/WebServices/Public/DownloadService.asmx/',
namespace="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx",
soap_ns='soap', trace=False, ns=False)
def get_current_fias_version(self):
return 224 # TODO FIXIT
# return (intver, strver, url)
def get_update_list(self):
response = self.client.GetAllDownloadFileInfo()
assert response, "Response is null"
for DownloadFileInfo in response.GetAllDownloadFileInfoResponse.GetAllDownloadFileInfoResult.DownloadFileInfo:
yield dict(intver=int(DownloadFileInfo.VersionId), strver=str(DownloadFileInfo.TextVersion),
delta_url=str(DownloadFileInfo.FiasDeltaXmlUrl),
complete_url=str(DownloadFileInfo.FiasCompleteXmlUrl))

80
aore/updater/updater.py Normal file
View File

@@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
import logging
from os import walk, path
from aore.updater.aodataparser import AoDataParser
from aore.updater.aorar import AoRar
from aore.updater.aoxmltableentry import AoXmlTableEntry
from aore.updater.dbhandler import DbHandler
from aore.updater.soapreceiver import SoapReceiver
from aore.dbutils.dbschemas import allowed_tables
class Updater:
# Source: "http", directory (as a full path to unpacked xmls)
def __init__(self, source="http"):
self.db_handler = DbHandler()
self.mode = source
self.updalist_generator = None
self.tablelist_generator = None
self.allowed_tables = None
def __get_entries_from_folder(self, path_to_xmls):
for (dirpath, dirnames, filenames) in walk(path_to_xmls):
for filename in filenames:
if filename.endswith(".XML"):
xmltable = AoXmlTableEntry.from_dir(filename, dirpath.replace("\\", "/") + "/")
if xmltable.table_name in allowed_tables:
yield xmltable
break
def __get_updates_from_folder(self, foldername):
# TODO: Вычислять версию, если берем данные из каталога
yield dict(intver=0, textver="Unknown", delta_url=foldername, complete_url=foldername)
def __get_updates_from_rar(self, url):
aorar = AoRar()
fname = aorar.download(url)
for table_entry in aorar.get_table_entries(fname, allowed_tables):
yield table_entry
def __init_update_entries(self, updates_generator):
if self.mode == "http":
assert updates_generator
self.tablelist_generator = self.__get_updates_from_rar
self.updalist_generator = updates_generator
else:
assert path.isdir(self.mode), "Invalid directory {}".format(self.mode)
self.updalist_generator = self.__get_updates_from_folder(self.mode)
self.tablelist_generator = self.__get_entries_from_folder
def process_single_entry(self, operation_type, table_xmlentry, chunck_size=50000):
aoparser = AoDataParser(table_xmlentry, chunck_size)
aoparser.parse(lambda x, y: self.db_handler.bulk_csv(operation_type, table_xmlentry.table_name, x, y))
def create(self, updates_generator):
self.__init_update_entries(updates_generator)
self.db_handler.pre_create()
for update_entry in self.updalist_generator:
logging.info("Processing update #{}".format(update_entry['intver']))
for table_entry in self.tablelist_generator(update_entry['complete_url']):
if table_entry.operation_type == AoXmlTableEntry.OperationType.update:
table_entry.operation_type = AoXmlTableEntry.OperationType.create
self.process_single_entry(table_entry.operation_type, table_entry)
self.db_handler.post_create()
logging.info("Create success")
def update(self, updates_generator):
self.__init_update_entries(updates_generator)
self.db_handler.pre_update()
for update_entry in self.updates_generator:
logging.info("Processing update #{}".format(update_entry['intver']))
for table_entry in self.tablelist_generator(update_entry['delta_url']):
self.process_single_entry(table_entry.operation_type, table_entry)
logging.info("Update success")

24
aore/updater/xmlparser.py Normal file
View File

@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
from lxml import etree
class XMLParser:
def __init__(self, parse_function):
self.parse_function = parse_function
def fast_iter(self, context, func, *args, **kwargs):
for event, elem in context:
# print event
func(elem, *args, **kwargs)
# It's safe to call clear() here because no descendants will be accessed
elem.clear()
# Also eliminate now-empty references from the root node to elem
for ancestor in elem.xpath('ancestor-or-self::*'):
while ancestor.getprevious() is not None:
del ancestor.getparent()[0]
del context
def parse_buffer(self, data_buffer, tag_name):
context = etree.iterparse(data_buffer, events=('end',), tag=tag_name)
self.fast_iter(context, lambda x: self.parse_function(x.attrib))