Исправлена работа апдейтера.

This commit is contained in:
Jack Stdin 2016-02-01 17:44:41 +03:00
parent 69f4b073f3
commit 63f7827e26
11 changed files with 118 additions and 41 deletions

View File

@ -31,4 +31,4 @@ Python application that can operate with FIAS (Russian Address Object DB)
### Debian Linux ### Debian Linux
1. Установить sphinxapi последней версии: 1. Установить sphinxapi последней версии:
`$pip install https://github.com/Romamo/sphinxapi/zipball/master` `pip install https://github.com/Romamo/sphinxapi/zipball/master`

View File

@ -10,15 +10,23 @@ class DbSchema:
db_shemas = dict() db_shemas = dict()
db_shemas['ADDROBJ'] = DbSchema("ADDROBJ", db_shemas['ADDROBJ'] = \
["AOID", "AOGUID", "SHORTNAME", "FORMALNAME", "AOLEVEL", "PARENTGUID", "ACTSTATUS", DbSchema("ADDROBJ",
"LIVESTATUS", "NEXTID"], ["AOID", "AOGUID", "SHORTNAME", "FORMALNAME", "AOLEVEL", "PARENTGUID", "ACTSTATUS", "LIVESTATUS",
"NEXTID"],
"aoid", "aoid",
"Object") "Object")
db_shemas['SOCRBASE'] = DbSchema("SOCRBASE", ["LEVEL", "SOCRNAME", "SCNAME", "KOD_T_ST"], "kod_t_st",
db_shemas['SOCRBASE'] = \
DbSchema("SOCRBASE",
["LEVEL", "SOCRNAME", "SCNAME", "KOD_T_ST"],
"kod_t_st",
"AddressObjectType") "AddressObjectType")
db_shemas['AOTRIG'] = DbSchema("AOTRIG", ["WORD", "TRIGRAMM", "FREQUENCY"], "word", db_shemas['AOTRIG'] = \
DbSchema("AOTRIG", ["WORD", "TRIGRAMM", "FREQUENCY"],
"word",
None) None)
allowed_tables = ["ADDROBJ", "SOCRBASE"] allowed_tables = ["ADDROBJ", "SOCRBASE"]

View File

@ -5,6 +5,3 @@ COPY "{{tablename}}_TEMP" ({{fieldslist}}) FROM '{{csvname}}' DELIMITER '{{delim
INSERT INTO "{{tablename}}" ({{fieldslist}}) SELECT {{fieldslist}} INSERT INTO "{{tablename}}" ({{fieldslist}}) SELECT {{fieldslist}}
FROM FROM
"{{tablename}}_TEMP" ON CONFLICT ({{uniquekey}}) DO UPDATE SET {{updaterule}}; "{{tablename}}_TEMP" ON CONFLICT ({{uniquekey}}) DO UPDATE SET {{updaterule}};
% if tablename=="ADDROBJ":
DELETE FROM "{{tablename}}" WHERE ACTSTATUS = FALSE OR NEXTID IS NOT NULL;
% end

View File

@ -1,10 +1,18 @@
% for table_name in table_names:
% if table_name == "ADDROBJ":
CREATE INDEX "sphinx_ind_aolevel" ON "ADDROBJ" USING btree ("aolevel"); CREATE INDEX "sphinx_ind_aolevel" ON "ADDROBJ" USING btree ("aolevel");
CREATE INDEX "sphinx_ind_parentguid" ON "ADDROBJ" USING btree ("parentguid"); CREATE INDEX "sphinx_ind_parentguid" ON "ADDROBJ" USING btree ("parentguid");
CREATE INDEX "sphinx_ind_livestatus" ON "ADDROBJ" USING btree ("actstatus", "livestatus", "nextid"); CREATE INDEX "sphinx_ind_livestatus" ON "ADDROBJ" USING btree ("actstatus", "livestatus", "nextid");
CREATE INDEX "sphinx_ind_aoguid" ON "ADDROBJ" USING btree ("aoguid"); CREATE INDEX "sphinx_ind_aoguid" ON "ADDROBJ" USING btree ("aoguid");
% end
% if table_name == "SOCRBASE":
CREATE INDEX "SOCRBASE_scname_idx" ON "SOCRBASE" USING btree ("scname"); CREATE INDEX "SOCRBASE_scname_idx" ON "SOCRBASE" USING btree ("scname");
CREATE INDEX "SOCRBASE_socrname_idx" ON "SOCRBASE" USING btree ("socrname"); CREATE INDEX "SOCRBASE_socrname_idx" ON "SOCRBASE" USING btree ("socrname");
CREATE INDEX "SOCRBASE_scname_gin_idx" ON "SOCRBASE" USING gin(scname gin_trgm_ops); CREATE INDEX "SOCRBASE_scname_gin_idx" ON "SOCRBASE" USING gin(scname gin_trgm_ops);
CREATE INDEX "SOCRBASE_socrname_gin_idx" ON "SOCRBASE" USING gin(socrname gin_trgm_ops); CREATE INDEX "SOCRBASE_socrname_gin_idx" ON "SOCRBASE" USING gin(socrname gin_trgm_ops);
% end
% if table_name == "AOTRIG":
CREATE INDEX "AOTRIG_word_idx" ON "AOTRIG" USING btree ("word"); CREATE INDEX "AOTRIG_word_idx" ON "AOTRIG" USING btree ("word");
CREATE INDEX "AOTRIG_word_gin_idx" ON "AOTRIG" USING gin(word gin_trgm_ops); CREATE INDEX "AOTRIG_word_gin_idx" ON "AOTRIG" USING gin(word gin_trgm_ops);
% end
% end

View File

@ -13,8 +13,7 @@ CREATE TABLE "ADDROBJ" (
CONSTRAINT "aoid" UNIQUE ("aoid"), CONSTRAINT "aoid" UNIQUE ("aoid"),
CONSTRAINT "id_addrobj" PRIMARY KEY ("id") CONSTRAINT "id_addrobj" PRIMARY KEY ("id")
) )
WITH (OIDS =FALSE WITH (OIDS =FALSE);
);
DROP TABLE IF EXISTS "SOCRBASE"; DROP TABLE IF EXISTS "SOCRBASE";
CREATE TABLE "SOCRBASE" ( CREATE TABLE "SOCRBASE" (
"id" SERIAL4 NOT NULL, "id" SERIAL4 NOT NULL,
@ -25,8 +24,7 @@ CREATE TABLE "SOCRBASE" (
CONSTRAINT "kod_t_st" UNIQUE ("kod_t_st"), CONSTRAINT "kod_t_st" UNIQUE ("kod_t_st"),
CONSTRAINT "id_socrbase" PRIMARY KEY ("id") CONSTRAINT "id_socrbase" PRIMARY KEY ("id")
) )
WITH (OIDS =FALSE WITH (OIDS =FALSE);
);
DROP TABLE IF EXISTS "AOTRIG"; DROP TABLE IF EXISTS "AOTRIG";
CREATE TABLE "AOTRIG" ( CREATE TABLE "AOTRIG" (
"id" SERIAL4 NOT NULL, "id" SERIAL4 NOT NULL,
@ -36,5 +34,12 @@ CREATE TABLE "AOTRIG" (
CONSTRAINT "word" UNIQUE ("word"), CONSTRAINT "word" UNIQUE ("word"),
CONSTRAINT "id_aotrig" PRIMARY KEY ("id") CONSTRAINT "id_aotrig" PRIMARY KEY ("id")
) )
WITH (OIDS =FALSE WITH (OIDS =FALSE);
); DROP TABLE IF EXISTS "CONFIG";
CREATE TABLE "CONFIG" (
"id" int4 NOT NULL,
"version" int4,
CONSTRAINT "id_config" PRIMARY KEY ("id")
)
WITH (OIDS=FALSE);
INSERT INTO "public"."CONFIG" VALUES ('0', '0');

View File

@ -0,0 +1,18 @@
% for table_name in table_names:
% if table_name == "ADDROBJ":
DROP INDEX IF EXISTS "sphinx_ind_aolevel";
DROP INDEX IF EXISTS "sphinx_ind_parentguid";
DROP INDEX IF EXISTS "sphinx_ind_livestatus";
DROP INDEX IF EXISTS "sphinx_ind_aoguid";
% end
% if table_name == "SOCRBASE":
DROP INDEX IF EXISTS "SOCRBASE_scname_idx";
DROP INDEX IF EXISTS "SOCRBASE_socrname_idx";
DROP INDEX IF EXISTS "SOCRBASE_scname_gin_idx";
DROP INDEX IF EXISTS "SOCRBASE_socrname_gin_idx";
% end
% if table_name == "AOTRIG":
DROP INDEX IF EXISTS "AOTRIG_word_idx";
DROP INDEX IF EXISTS "AOTRIG_word_gin_idx";
% end
% end

View File

@ -26,7 +26,8 @@ index {{ index_name }}
html_strip = 1 html_strip = 1
ignore_chars = @, - ignore_chars = @, -
charset_table = 0..9, A..Z->a..z, _, a..z, \
U+0401->U+0435, U+0451->U+0435, \
U+410..U+42F->U+430..U+44F, U+430..U+44F U+410..U+42F->U+430..U+44F, U+430..U+44F
source = {{index_name}} source = {{index_name}}

View File

@ -46,19 +46,24 @@ class DbHandler:
self.db.execute(sql_query) self.db.execute(sql_query)
logging.info("Processed {} queries FROM {}".format(processed_count - 1, csv_file_name)) logging.info("Processed {} queries FROM {}".format(processed_count - 1, csv_file_name))
def pre_create(self): def create_structure(self):
logging.info("Prepare to create DB structure...") logging.info("Prepare to create DB structure...")
sql_query = template("aore/templates/postgre/pre_create.sql") sql_query = template("aore/templates/postgre/create_structure.sql")
self.db.execute(sql_query) self.db.execute(sql_query)
logging.info("Done.")
def post_create(self): def create_indexes(self, tables):
logging.info("Indexing ADDROBJ...") logging.info("Indexing tables...")
sql_query = template("aore/templates/postgre/post_create.sql") sql_query = template("aore/templates/postgre/create_indexes.sql", table_names=tables)
self.db.execute(sql_query) self.db.execute(sql_query)
logging.info("Indexing done.") logging.info("Indexing done.")
def pre_update(self): def drop_indexes(self, tables):
# TODO: update actions logging.info("Deleting indexes...")
pass sql_query = template("aore/templates/postgre/drop_indexes.sql", table_names=tables)
self.db.execute(sql_query)
logging.info("All indexes was deleted.")

View File

@ -2,7 +2,6 @@
from pysimplesoap.client import SoapClient from pysimplesoap.client import SoapClient
class SoapReceiver: class SoapReceiver:
def __init__(self): def __init__(self):
self.client = SoapClient( self.client = SoapClient(
@ -11,9 +10,6 @@ class SoapReceiver:
namespace="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx", namespace="http://fias.nalog.ru/WebServices/Public/DownloadService.asmx",
soap_ns='soap', trace=False, ns=False) soap_ns='soap', trace=False, ns=False)
def get_current_fias_version(self):
return 224 # TODO FIXIT
# return (intver, strver, url) # return (intver, strver, url)
def get_update_list(self): def get_update_list(self):
response = self.client.GetAllDownloadFileInfo() response = self.client.GetAllDownloadFileInfo()

View File

@ -3,11 +3,14 @@
import logging import logging
from os import walk, path from os import walk, path
from aore.dbutils.dbschemas import allowed_tables from aore.config import db_conf
from aore.dbutils.dbimpl import DBImpl
from aore.dbutils.dbschemas import allowed_tables, db_shemas
from aore.updater.aodataparser import AoDataParser from aore.updater.aodataparser import AoDataParser
from aore.updater.aorar import AoRar from aore.updater.aorar import AoRar
from aore.updater.aoxmltableentry import AoXmlTableEntry from aore.updater.aoxmltableentry import AoXmlTableEntry
from aore.updater.dbhandler import DbHandler from aore.updater.dbhandler import DbHandler
import psycopg2
class Updater: class Updater:
@ -28,6 +31,26 @@ class Updater:
yield xmltable yield xmltable
break break
@classmethod
def get_current_fias_version(cls):
db = DBImpl(psycopg2, db_conf)
try:
rows = db.get_rows('SELECT version FROM "CONFIG" WHERE id=0', True)
assert len(rows) > 0, "Cannot get a version"
return rows[0]['version']
finally:
db.close()
@classmethod
def __set__update_version(cls, updver = 0):
db = DBImpl(psycopg2, db_conf)
try:
assert type(updver) is int, "Update version must be of int type."
db.execute('UPDATE "CONFIG" SET version={} WHERE id=0'.format(updver))
finally:
db.close()
def __get_updates_from_folder(self, foldername): def __get_updates_from_folder(self, foldername):
# TODO: Вычислять версию, если берем данные из каталога # TODO: Вычислять версию, если берем данные из каталога
yield dict(intver=0, textver="Unknown", delta_url=foldername, complete_url=foldername) yield dict(intver=0, textver="Unknown", delta_url=foldername, complete_url=foldername)
@ -54,26 +77,41 @@ class Updater:
def create(self, updates_generator): def create(self, updates_generator):
self.__init_update_entries(updates_generator) self.__init_update_entries(updates_generator)
self.db_handler.pre_create() self.db_handler.create_structure()
for update_entry in self.updalist_generator: for update_entry in self.updalist_generator:
logging.info("Processing update #{}".format(update_entry['intver'])) logging.info("Processing DB #{}".format(update_entry['intver']))
for table_entry in self.tablelist_generator(update_entry['complete_url']): for table_entry in self.tablelist_generator(update_entry['complete_url']):
if table_entry.operation_type == AoXmlTableEntry.OperationType.update: if table_entry.operation_type == AoXmlTableEntry.OperationType.update:
table_entry.operation_type = AoXmlTableEntry.OperationType.create table_entry.operation_type = AoXmlTableEntry.OperationType.create
self.process_single_entry(table_entry.operation_type, table_entry) self.process_single_entry(table_entry.operation_type, table_entry)
Updater.__set__update_version(update_entry['intver'])
else:
logging.info("No updates more.")
self.db_handler.post_create() self.db_handler.create_indexes(db_shemas.keys())
logging.info("Create success") logging.info("Create success")
def update(self, updates_generator): def update(self, updates_generator):
self.__init_update_entries(updates_generator) self.__init_update_entries(updates_generator)
self.db_handler.pre_update()
# Drop all indexes if updates needed
indexes_dropped = False
for update_entry in self.updalist_generator: for update_entry in self.updalist_generator:
if not indexes_dropped:
self.db_handler.drop_indexes(allowed_tables)
indexes_dropped = True
logging.info("Processing update #{}".format(update_entry['intver'])) logging.info("Processing update #{}".format(update_entry['intver']))
for table_entry in self.tablelist_generator(update_entry['delta_url']): for table_entry in self.tablelist_generator(update_entry['delta_url']):
self.process_single_entry(table_entry.operation_type, table_entry) self.process_single_entry(table_entry.operation_type, table_entry)
Updater.__set__update_version(update_entry['intver'])
else:
logging.info("No updates more.")
# Re-create all indexes (if dropped)
if indexes_dropped:
self.db_handler.create_indexes(allowed_tables)
logging.info("Update success") logging.info("Update success")

View File

@ -10,7 +10,7 @@ from aore.updater.updater import Updater
def print_fias_versions(): def print_fias_versions():
imp = SoapReceiver() imp = SoapReceiver()
current_version = imp.get_current_fias_version() current_version = Updater.get_current_fias_version()
all_versions = imp.get_update_list() all_versions = imp.get_update_list()
print("Installed version: {}".format(current_version)) print("Installed version: {}".format(current_version))
@ -40,7 +40,7 @@ def parse_update_str(updates_str):
def get_allowed_updates(updates_str, mode="create"): def get_allowed_updates(updates_str, mode="create"):
imp = SoapReceiver() imp = SoapReceiver()
current_version = imp.get_current_fias_version() current_version = Updater.get_current_fias_version()
all_versions = [x for x in imp.get_update_list()] all_versions = [x for x in imp.get_update_list()]
user_defined_list = parse_update_str(updates_str) user_defined_list = parse_update_str(updates_str)
@ -49,7 +49,8 @@ def get_allowed_updates(updates_str, mode="create"):
if mode == "create" and not user_defined_list: if mode == "create" and not user_defined_list:
yield all_versions[-1] yield all_versions[-1]
assert (mode == "create" and len(user_defined_list) == 1) if mode == "create":
assert len(user_defined_list) == 1, "Ony single update number allowed for DB create"
for uv in all_versions: for uv in all_versions:
uv_ver = uv['intver'] uv_ver = uv['intver']
@ -101,7 +102,7 @@ def main():
aoupdater = Updater(options.source) aoupdater = Updater(options.source)
allowed_updates = None allowed_updates = None
if options.source == "http": if options.source == "http":
allowed_updates = get_allowed_updates(options.update_version) allowed_updates = get_allowed_updates(options.update_version, options.database)
if options.database == "create": if options.database == "create":
aoupdater.create(allowed_updates) aoupdater.create(allowed_updates)