Day 2, full DB import/update/delete from dir with XML or HTTP

This commit is contained in:
Jack Stdin 2016-01-13 17:38:01 +03:00
parent 3aeb00d82a
commit 67f6943dce
14 changed files with 146 additions and 67 deletions

View File

@ -1,3 +1,4 @@
import logging
import os import os
import sys import sys
@ -5,3 +6,4 @@ reload(sys)
cwd = os.getcwd() cwd = os.getcwd()
sys.path.append(cwd) sys.path.append(cwd)
sys.setdefaultencoding("utf-8") sys.setdefaultencoding("utf-8")
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)

View File

@ -18,17 +18,23 @@ class AoDataParser:
self.pagesize = pagesize self.pagesize = pagesize
self.currentpage = 0 self.currentpage = 0
self.counter = 0 self.counter = 0
self.addrobj_filter = self.datasource.table_name == 'ADDROBJ' and self.datasource.operation_type == AoXmlTableEntry.OperationType.create
self.base_filename = "" self.base_filename = ""
self.csv_file = None self.csv_file = None
self.data_bereit_callback = None self.data_bereit_callback = None
def import_update(self, attr): def import_update(self, attr):
# Addrobj anvanced filter
if self.addrobj_filter:
if attr['ACTSTATUS'] == '0' or 'NEXTID' in attr:
return
if self.counter > self.pagesize: if self.counter > self.pagesize:
# Send old file to DB engine # Send old file to DB engine
if self.csv_file: if self.csv_file:
self.csv_file.close() self.csv_file.close()
self.data_bereit_callback(os.path.abspath(self.csv_file.name)) self.data_bereit_callback(self.counter, os.path.abspath(self.csv_file.name))
os.remove(self.csv_file.name) os.remove(self.csv_file.name)
# Prepare to next iteration # Prepare to next iteration
@ -49,10 +55,11 @@ class AoDataParser:
# Output - sql query # Output - sql query
def parse(self, data_callback): def parse(self, data_callback):
if self.datasource.operation_type == AoXmlTableEntry.OperationType.update:
self.data_bereit_callback = data_callback self.data_bereit_callback = data_callback
self.currentpage = 0 self.currentpage = 0
self.base_filename = trashfolder + "fd_" + str(self.datasource.operation_type) + "_" + \ self.base_filename = \
trashfolder + "fd_" + \
str(self.datasource.operation_type) + "_" + \
self.datasource.table_name + ".csv.part{}" self.datasource.table_name + ".csv.part{}"
self.counter = self.pagesize + 1 self.counter = self.pagesize + 1
@ -63,6 +70,6 @@ class AoDataParser:
# Send last file to db processor # Send last file to db processor
if self.csv_file: if self.csv_file:
self.csv_file.close() self.csv_file.close()
self.data_bereit_callback(os.path.abspath(self.csv_file.name)) self.data_bereit_callback(self.counter, os.path.abspath(self.csv_file.name))
os.remove(self.csv_file.name) os.remove(self.csv_file.name)
src.close() src.close()

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import logging
import os.path import os.path
from traceback import format_exc from traceback import format_exc
@ -15,7 +16,7 @@ class AoRar:
rarfile.UNRAR_TOOL = unrar rarfile.UNRAR_TOOL = unrar
def download(self, url): def download(self, url):
print("Downloading {}".format(url)) logging.info("Downloading {}".format(url))
try: try:
local_filename = os.path.abspath(trashfolder + url.split('/')[-1]) local_filename = os.path.abspath(trashfolder + url.split('/')[-1])
if os.path.isfile(local_filename): if os.path.isfile(local_filename):
@ -28,10 +29,9 @@ class AoRar:
if chunk: if chunk:
f.write(chunk) f.write(chunk)
except: except:
print("Error downloading. Reason : {}".format(format_exc())) raise BaseException("Error downloading. Reason : {}".format(format_exc()))
return None
print("Downloaded {} bytes".format(request.headers['Content-length'])) logging.info("Downloaded {} bytes".format(request.headers['Content-length']))
return local_filename return local_filename
def get_table_entries(self, file_name, allowed_tables): def get_table_entries(self, file_name, allowed_tables):
@ -43,7 +43,7 @@ class AoRar:
if xmltable.table_name in allowed_tables: if xmltable.table_name in allowed_tables:
yield xmltable yield xmltable
else: else:
print "Done" logging.info("All entries processed")
# os.remove(file_name) TODO : Uncomment os.remove(file_name)
else: else:
print("No file specified or not exists") logging.error("No file specified or not exists")

View File

@ -14,7 +14,6 @@ from aore.dbutils.dbschemas import allowed_tables
class AoUpdater: class AoUpdater:
# Source: "http", directory (as a full path to unpacked xmls) # Source: "http", directory (as a full path to unpacked xmls)
def __init__(self, source="http"): def __init__(self, source="http"):
logging.basicConfig(format='%(asctime)s %(message)s')
self.db_handler = DbHandler() self.db_handler = DbHandler()
self.mode = source self.mode = source
self.updalist_generator = None self.updalist_generator = None
@ -53,9 +52,9 @@ class AoUpdater:
self.updalist_generator = self.__get_updates_from_folder(self.mode) self.updalist_generator = self.__get_updates_from_folder(self.mode)
self.tablelist_generator = self.__get_entries_from_folder self.tablelist_generator = self.__get_entries_from_folder
def process_single_entry(self, table_xmlentry, chunck_size=50000): def process_single_entry(self, operation_type, table_xmlentry, chunck_size=50000):
aoparser = AoDataParser(table_xmlentry, chunck_size) aoparser = AoDataParser(table_xmlentry, chunck_size)
aoparser.parse(lambda x: self.db_handler.bulk_csv(chunck_size, table_xmlentry.table_name, x)) aoparser.parse(lambda x, y: self.db_handler.bulk_csv(operation_type, table_xmlentry.table_name, x, y))
def create(self): def create(self):
self.__init_update_entries(True) self.__init_update_entries(True)
@ -63,9 +62,11 @@ class AoUpdater:
for update_entry in self.updalist_generator: for update_entry in self.updalist_generator:
for table_entry in self.tablelist_generator(update_entry['url']): for table_entry in self.tablelist_generator(update_entry['url']):
self.process_single_entry(table_entry) if table_entry.operation_type == AoXmlTableEntry.OperationType.update:
table_entry.operation_type = AoXmlTableEntry.OperationType.create
self.process_single_entry(table_entry.operation_type, table_entry)
logging.warning("Create success") logging.info("Create success")
def update(self, count=1): def update(self, count=1):
self.__init_update_entries(False) self.__init_update_entries(False)
@ -79,6 +80,6 @@ class AoUpdater:
break break
for table_entry in self.tablelist_generator(update_entry['url']): for table_entry in self.tablelist_generator(update_entry['url']):
self.process_single_entry(table_entry) self.process_single_entry(table_entry.operation_type, table_entry)
logging.warning("Update success") logging.info("Update success")

View File

@ -9,6 +9,7 @@ class AoXmlTableEntry:
class OperationType(Enum): class OperationType(Enum):
update = 1 update = 1
delete = 0 delete = 0
create = 2
def __str__(self): def __str__(self):
return self._name_ return self._name_

View File

@ -15,9 +15,9 @@ DB_INSTANCES = dict(
), ),
production=dict( production=dict(
host="localhost", host="localhost",
user="postgres", user="***",
password="intercon", password="***",
database="postgres", database="***",
) )
) )
@ -26,6 +26,9 @@ UNRAR_PATHES = dict(
production="unrar" production="unrar"
) )
# Uncomment if you want to specify config_type manually
# config_type = "test"
# Main section # Main section
db = DB_INSTANCES[config_type] db = DB_INSTANCES[config_type]
unrar = UNRAR_PATHES[config_type] unrar = UNRAR_PATHES[config_type]

View File

@ -1,10 +1,10 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import logging import logging
from traceback import format_exc
import psycopg2 import psycopg2
from aore.aoutils.aoxmltableentry import AoXmlTableEntry
from aore.config import db as dbparams from aore.config import db as dbparams
from aore.dbutils.dbimpl import DBImpl from aore.dbutils.dbimpl import DBImpl
from aore.dbutils.dbschemas import db_shemas from aore.dbutils.dbschemas import db_shemas
@ -12,37 +12,73 @@ from aore.dbutils.dbschemas import db_shemas
class DbHandler: class DbHandler:
def __init__(self): def __init__(self):
logging.basicConfig(format='%(asctime)s %(message)s')
self.db = DBImpl(psycopg2, dbparams) self.db = DBImpl(psycopg2, dbparams)
def bulk_csv(self, chunk_size, table_name, csv_file_name): f = open("aore/templates/postgre/bulk_create.sql")
sql_query = "COPY \"{}\" ({}) FROM '{}' DELIMITER '\t' NULL 'NULL'". \ self.syntax_bulk_create = f.read()
format(table_name, f.close()
", ".join(
db_shemas[table_name].fields),
csv_file_name)
try:
cur = self.db.get_cursor()
cur.execute(sql_query)
self.db.transaction_commit()
except:
self.db.transaction_rollback()
raise BaseException("Error updating sql. Reason : {}".format(format_exc()))
logging.warning("Inserted {} queries FROM {}".format(chunk_size, csv_file_name)) f = open("aore/templates/postgre/bulk_update.sql")
self.syntax_bulk_update = f.read()
f.close()
f = open("aore/templates/postgre/bulk_delete.sql")
self.syntax_bulk_delete = f.read()
f.close()
def bulk_csv(self, operation_type, table_name, processed_count, csv_file_name):
sql_query = None
# simple add new reocrds
if operation_type == AoXmlTableEntry.OperationType.create:
sql_query = self.syntax_bulk_create \
.replace("%tab%", "\t") \
.replace("%tablename%", table_name) \
.replace("%fieldslist%", ", ".join(db_shemas[table_name].fields)) \
.replace("%csvname%", csv_file_name)
# update table
if operation_type == AoXmlTableEntry.OperationType.update:
fields_update_list = ""
for field in db_shemas[table_name].fields:
if field != db_shemas[table_name].unique_field.upper():
fields_update_list += "{}=EXCLUDED.{}, ".format(field, field)
fields_update_list = fields_update_list[:-2]
sql_query = self.syntax_bulk_update \
.replace("%tab%", "\t") \
.replace("%tablename%", table_name) \
.replace("%fieldslist%", ", ".join(db_shemas[table_name].fields)) \
.replace("%csvname%", csv_file_name) \
.replace("%uniquekey%", db_shemas[table_name].unique_field) \
.replace("%updaterule%", fields_update_list)
if table_name == "ADDROBJ":
sql_query += "DELETE FROM \"%tablename%\" WHERE %filterrule%;" \
.replace("%tablename%", table_name) \
.replace("%filterrule%",
"ACTSTATUS = FALSE OR NEXTID IS NOT NULL")
# delete records from table
if operation_type == AoXmlTableEntry.OperationType.delete:
sql_query = self.syntax_bulk_delete \
.replace("%tab%", "\t") \
.replace("%tablename%", table_name) \
.replace("%fieldslist%", ", ".join(db_shemas[table_name].fields)) \
.replace("%csvname%", csv_file_name) \
.replace("%uniquekey%", db_shemas[table_name].unique_field)
assert sql_query, "Invalid operation type: {}".format(operation_type)
self.db.execute(sql_query)
logging.info("Processed {} queries FROM {}".format(processed_count-1, csv_file_name))
def pre_create(self): def pre_create(self):
f = open("aore/templates/postgre/pre_create.sql") f = open("aore/templates/postgre/pre_create.sql")
create_db_syntax = f.read() sql_query = f.read()
f.close() f.close()
try: self.db.execute(sql_query)
cur = self.db.get_cursor()
cur.execute(create_db_syntax)
self.db.transaction_commit()
except:
self.db.transaction_rollback()
raise "Error downloading. Reason : {}".format(format_exc())
def pre_update(self): def pre_update(self):
# TODO: update actions # TODO: update actions

View File

@ -1,5 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from traceback import format_exc
class DBImpl: class DBImpl:
def __init__(self, engine, params): def __init__(self, engine, params):
@ -19,6 +21,15 @@ class DBImpl:
def get_cursor(self): def get_cursor(self):
return self.connection.cursor() return self.connection.cursor()
def execute(self, sql_query):
try:
cur = self.get_cursor()
cur.execute(sql_query)
self.transaction_commit()
except:
self.transaction_rollback()
raise BaseException("Error execute sql query. Reason : {}".format(format_exc()))
def get_rows(self, query_string, for_dict=True): def get_rows(self, query_string, for_dict=True):
if for_dict: if for_dict:
cur = self.connection.cursor(self.db_engine.cursors.DictCursor) cur = self.connection.cursor(self.db_engine.cursors.DictCursor)

View File

@ -2,17 +2,20 @@
class DbSchema: class DbSchema:
def __init__(self, name, fieldlist, xmltag): def __init__(self, name, fieldlist, unique_key, xmltag):
self.tablename = name self.tablename = name
self.fields = fieldlist self.fields = fieldlist
self.unique_field = unique_key
self.xml_tag = xmltag self.xml_tag = xmltag
db_shemas = dict() db_shemas = dict()
db_shemas['ADDROBJ'] = DbSchema("ADDROBJ", db_shemas['ADDROBJ'] = DbSchema("ADDROBJ",
["AOID", "AOGUID", "SHORTNAME", "FORMALNAME", "AOLEVEL", "PARENTGUID", "ACTSTATUS", ["AOID", "AOGUID", "SHORTNAME", "FORMALNAME", "AOLEVEL", "PARENTGUID", "ACTSTATUS",
"CURRSTATUS"], "LIVESTATUS", "NEXTID"],
"aoid",
"Object") "Object")
db_shemas['SOCRBASE'] = DbSchema("SOCRBASE", ["LEVEL", "SOCRNAME", "SCNAME", "KOD_T_ST"], "AddressObjectType") db_shemas['SOCRBASE'] = DbSchema("SOCRBASE", ["LEVEL", "SOCRNAME", "SCNAME", "KOD_T_ST"], "kod_t_st",
"AddressObjectType")
allowed_tables = ["ADDROBJ", "SOCRBASE"] allowed_tables = ["ADDROBJ", "SOCRBASE"]

View File

@ -0,0 +1 @@
COPY "%tablename%" (%fieldslist%) FROM '%csvname%' DELIMITER '%tab%' NULL 'NULL'

View File

@ -0,0 +1,5 @@
DROP TABLE IF EXISTS "%tablename%_TEMP";
CREATE TEMP TABLE "%tablename%_TEMP" ON COMMIT DROP AS SELECT *
FROM "%tablename%" WITH NO DATA;
COPY "%tablename%_TEMP" (%fieldslist%) FROM '%csvname%' DELIMITER '%tab%' NULL 'NULL';
DELETE FROM "%tablename%" WHERE %uniquekey% IN (SELECT %uniquekey% FROM "%tablename%_TEMP");

View File

@ -0,0 +1,7 @@
DROP TABLE IF EXISTS "%tablename%_TEMP";
CREATE TEMP TABLE "%tablename%_TEMP" ON COMMIT DROP AS SELECT *
FROM "%tablename%" WITH NO DATA;
COPY "%tablename%_TEMP" (%fieldslist%) FROM '%csvname%' DELIMITER '%tab%' NULL 'NULL';
INSERT INTO "%tablename%" (%fieldslist%) SELECT %fieldslist%
FROM
"%tablename%_TEMP" ON CONFLICT (%uniquekey%) DO UPDATE SET %updaterule%;

View File

@ -7,10 +7,11 @@ CREATE TABLE "public"."ADDROBJ" (
"formalname" VARCHAR(120) COLLATE "default", "formalname" VARCHAR(120) COLLATE "default",
"aolevel" INT2, "aolevel" INT2,
"parentguid" UUID, "parentguid" UUID,
"actstatus" BIT(1), "actstatus" BOOL,
"currstatus" INT2, "livestatus" BOOL,
"nextid" UUID,
CONSTRAINT "aoid" UNIQUE ("aoid"), CONSTRAINT "aoid" UNIQUE ("aoid"),
CONSTRAINT "id" PRIMARY KEY ("id") CONSTRAINT "id_addrobj" PRIMARY KEY ("id")
) )
WITH (OIDS =FALSE WITH (OIDS =FALSE
); );
@ -21,7 +22,8 @@ CREATE TABLE "public"."SOCRBASE" (
"scname" VARCHAR(10), "scname" VARCHAR(10),
"socrname" VARCHAR(50), "socrname" VARCHAR(50),
"kod_t_st" VARCHAR(4), "kod_t_st" VARCHAR(4),
PRIMARY KEY ("id") CONSTRAINT "kod_t_st" UNIQUE ("kod_t_st"),
CONSTRAINT "id_socrbase" PRIMARY KEY ("id")
) )
WITH (OIDS =FALSE WITH (OIDS =FALSE
); );

View File

@ -8,8 +8,8 @@ reload(sys)
cwd = os.path.dirname(os.path.abspath(__file__)) cwd = os.path.dirname(os.path.abspath(__file__))
sys.path.append(cwd) sys.path.append(cwd)
sys.setdefaultencoding("utf-8") sys.setdefaultencoding("utf-8")
sys.path.append('/home/i/interc7j/.local/lib/python2.7/site-packages') # sys.path.append('/home/i/interc7j/.local/lib/python2.7/site-packages')
from fias import fias from aore import aore
application = fias.app application = aore.app