2016-04-22 13:30:13 +03:00

1212 lines
41 KiB
Python

#
# $Id: sphinxapi.py 4885 2015-01-20 07:02:07Z deogar $
#
# Python version of Sphinx searchd client (Python API)
#
# Copyright (c) 2006, Mike Osadnik
# Copyright (c) 2006-2015, Andrew Aksyonoff
# Copyright (c) 2008-2015, Sphinx Technologies Inc
# All rights reserved
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License. You should have
# received a copy of the GPL license along with this program; if you
# did not, you can find it at http://www.gnu.org/
#
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# WARNING
# We strongly recommend you to use SphinxQL instead of the API
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
import re
import select
import socket
import sys
from struct import *
# known searchd commands
SEARCHD_COMMAND_SEARCH = 0
SEARCHD_COMMAND_EXCERPT = 1
SEARCHD_COMMAND_UPDATE = 2
SEARCHD_COMMAND_KEYWORDS = 3
SEARCHD_COMMAND_PERSIST = 4
SEARCHD_COMMAND_STATUS = 5
SEARCHD_COMMAND_FLUSHATTRS = 7
# current client-side command implementation versions
VER_COMMAND_SEARCH = 0x11E
VER_COMMAND_EXCERPT = 0x104
VER_COMMAND_UPDATE = 0x103
VER_COMMAND_KEYWORDS = 0x100
VER_COMMAND_STATUS = 0x101
VER_COMMAND_FLUSHATTRS = 0x100
# known searchd status codes
SEARCHD_OK = 0
SEARCHD_ERROR = 1
SEARCHD_RETRY = 2
SEARCHD_WARNING = 3
# known match modes
SPH_MATCH_ALL = 0
SPH_MATCH_ANY = 1
SPH_MATCH_PHRASE = 2
SPH_MATCH_BOOLEAN = 3
SPH_MATCH_EXTENDED = 4
SPH_MATCH_FULLSCAN = 5
SPH_MATCH_EXTENDED2 = 6
# known ranking modes (extended2 mode only)
SPH_RANK_PROXIMITY_BM25 = 0 # default mode, phrase proximity major factor and BM25 minor one
SPH_RANK_BM25 = 1 # statistical mode, BM25 ranking only (faster but worse quality)
SPH_RANK_NONE = 2 # no ranking, all matches get a weight of 1
SPH_RANK_WORDCOUNT = 3 # simple word-count weighting, rank is a weighted sum of per-field keyword occurence counts
SPH_RANK_PROXIMITY = 4
SPH_RANK_MATCHANY = 5
SPH_RANK_FIELDMASK = 6
SPH_RANK_SPH04 = 7
SPH_RANK_EXPR = 8
SPH_RANK_TOTAL = 9
# known sort modes
SPH_SORT_RELEVANCE = 0
SPH_SORT_ATTR_DESC = 1
SPH_SORT_ATTR_ASC = 2
SPH_SORT_TIME_SEGMENTS = 3
SPH_SORT_EXTENDED = 4
SPH_SORT_EXPR = 5
# known filter types
SPH_FILTER_VALUES = 0
SPH_FILTER_RANGE = 1
SPH_FILTER_FLOATRANGE = 2
SPH_FILTER_STRING = 3
# known attribute types
SPH_ATTR_NONE = 0
SPH_ATTR_INTEGER = 1
SPH_ATTR_TIMESTAMP = 2
SPH_ATTR_ORDINAL = 3
SPH_ATTR_BOOL = 4
SPH_ATTR_FLOAT = 5
SPH_ATTR_BIGINT = 6
SPH_ATTR_STRING = 7
SPH_ATTR_FACTORS = 1001
SPH_ATTR_MULTI = 0X40000001
SPH_ATTR_MULTI64 = 0X40000002
SPH_ATTR_TYPES = (SPH_ATTR_NONE,
SPH_ATTR_INTEGER,
SPH_ATTR_TIMESTAMP,
SPH_ATTR_ORDINAL,
SPH_ATTR_BOOL,
SPH_ATTR_FLOAT,
SPH_ATTR_BIGINT,
SPH_ATTR_STRING,
SPH_ATTR_MULTI,
SPH_ATTR_MULTI64)
# known grouping functions
SPH_GROUPBY_DAY = 0
SPH_GROUPBY_WEEK = 1
SPH_GROUPBY_MONTH = 2
SPH_GROUPBY_YEAR = 3
SPH_GROUPBY_ATTR = 4
SPH_GROUPBY_ATTRPAIR = 5
class SphinxClient:
def __init__(self):
"""
Create a new client object, and fill defaults.
"""
self._host = 'localhost' # searchd host (default is "localhost")
self._port = 9312 # searchd port (default is 9312)
self._path = None # searchd unix-domain socket path
self._socket = None
self._offset = 0 # how much records to seek from result-set start (default is 0)
self._limit = 20 # how much records to return from result-set starting at offset (default is 20)
self._mode = SPH_MATCH_EXTENDED2 # query matching mode (default is SPH_MATCH_EXTENDED2)
self._weights = [] # per-field weights (default is 1 for all fields)
self._sort = SPH_SORT_RELEVANCE # match sorting mode (default is SPH_SORT_RELEVANCE)
self._sortby = '' # attribute to sort by (defualt is "")
self._min_id = 0 # min ID to match (default is 0)
self._max_id = 0 # max ID to match (default is UINT_MAX)
self._filters = [] # search filters
self._groupby = '' # group-by attribute name
self._groupfunc = SPH_GROUPBY_DAY # group-by function (to pre-process group-by attribute value with)
self._groupsort = '@group desc' # group-by sorting clause (to sort groups in result set with)
self._groupdistinct = '' # group-by count-distinct attribute
self._maxmatches = 1000 # max matches to retrieve
self._cutoff = 0 # cutoff to stop searching at
self._retrycount = 0 # distributed retry count
self._retrydelay = 0 # distributed retry delay
self._anchor = {} # geographical anchor point
self._indexweights = {} # per-index weights
self._ranker = SPH_RANK_PROXIMITY_BM25 # ranking mode
self._rankexpr = '' # ranking expression for SPH_RANK_EXPR
self._maxquerytime = 0 # max query time, milliseconds (default is 0, do not limit)
self._timeout = 1.0 # connection timeout
self._fieldweights = {} # per-field-name weights
self._overrides = {} # per-query attribute values overrides
self._select = '*' # select-list (attributes or expressions, with optional aliases)
self._query_flags = SetBit(0, 6, True) # default idf=tfidf_normalized
self._predictedtime = 0 # per-query max_predicted_time
self._outerorderby = '' # outer match sort by
self._outeroffset = 0 # outer offset
self._outerlimit = 0 # outer limit
self._hasouter = False # sub-select enabled
self._error = '' # last error message
self._warning = '' # last warning message
self._reqs = [] # requests array for multi-query
def __del__(self):
if self._socket:
self._socket.close()
def GetLastError(self):
"""
Get last error message (string).
"""
return self._error
def GetLastWarning(self):
"""
Get last warning message (string).
"""
return self._warning
def SetServer(self, host, port=None):
"""
Set searchd server host and port.
"""
assert (isinstance(host, str))
if host.startswith('/'):
self._path = host
return
elif host.startswith('unix://'):
self._path = host[7:]
return
self._host = host
if isinstance(port, int):
assert (0 < port < 65536)
self._port = port
self._path = None
def SetConnectTimeout(self, timeout):
"""
Set connection timeout ( float second )
"""
assert (isinstance(timeout, float))
# set timeout to 0 make connaection non-blocking that is wrong so timeout got clipped to reasonable minimum
self._timeout = max(0.001, timeout)
def _Connect(self):
"""
INTERNAL METHOD, DO NOT CALL. Connects to searchd server.
"""
if self._socket:
# we have a socket, but is it still alive?
sr, sw, _ = select.select([self._socket], [self._socket], [], 0)
# this is how alive socket should look
if len(sr) == 0 and len(sw) == 1:
return self._socket
# oops, looks like it was closed, lets reopen
self._socket.close()
self._socket = None
try:
if self._path:
af = socket.AF_UNIX
addr = self._path
desc = self._path
else:
af = socket.AF_INET
addr = (self._host, self._port)
desc = '%s;%s' % addr
sock = socket.socket(af, socket.SOCK_STREAM)
sock.settimeout(self._timeout)
sock.connect(addr)
except socket.error as msg:
if sock:
sock.close()
self._error = 'connection to %s failed (%s)' % (desc, msg)
return
v = unpack('>L', sock.recv(4))
if not v or v[0] < 1:
sock.close()
self._error = 'expected searchd protocol version, got %s' % v
return
# all ok, send my version
sock.send(pack('>L', 1))
return sock
def _GetResponse(self, sock, client_ver):
"""
INTERNAL METHOD, DO NOT CALL. Gets and checks response packet from searchd server.
"""
(status, ver, length) = unpack('>2HL', sock.recv(8))
response = b''
left = length
while left > 0:
chunk = sock.recv(left)
if chunk:
response += chunk
left -= len(chunk)
else:
break
if not self._socket:
sock.close()
# check response
read = len(response)
if not response or read != length:
if length:
self._error = 'failed to read searchd response (status=%s, ver=%s, len=%s, read=%s)' \
% (status, ver, length, read)
else:
self._error = 'received zero-sized searchd response'
return None
# check status
if status == SEARCHD_WARNING:
wend = 4 + unpack('>L', response[0:4])[0]
self._warning = response[4:wend]
return response[wend:]
if status == SEARCHD_ERROR:
self._error = 'searchd error: ' + response[4:]
return None
if status == SEARCHD_RETRY:
self._error = 'temporary searchd error: ' + response[4:]
return None
if status != SEARCHD_OK:
self._error = 'unknown status code %d' % status
return None
# check version
if ver < client_ver:
self._warning = 'searchd command v.%d.%d older than client\'s v.%d.%d, some options might not work' \
% (ver >> 8, ver & 0xff, client_ver >> 8, client_ver & 0xff)
return response
def _Send(self, sock, req):
"""
INTERNAL METHOD, DO NOT CALL. send request to searchd server.
"""
total = 0
while True:
sent = sock.send(req[total:])
if sent <= 0:
break
total = total + sent
return total
def SetLimits(self, offset, limit, maxmatches=0, cutoff=0):
"""
Set offset and count into result set, and optionally set max-matches and cutoff limits.
"""
assert (type(offset) in [int, int] and 0 <= offset < 16777216)
assert (type(limit) in [int, int] and 0 < limit < 16777216)
assert (maxmatches >= 0)
self._offset = offset
self._limit = limit
if maxmatches > 0:
self._maxmatches = maxmatches
if cutoff >= 0:
self._cutoff = cutoff
def SetMaxQueryTime(self, maxquerytime):
"""
Set maximum query time, in milliseconds, per-index. 0 means 'do not limit'.
"""
assert (isinstance(maxquerytime, int) and maxquerytime > 0)
self._maxquerytime = maxquerytime
def SetMatchMode(self, mode):
"""
Set matching mode.
"""
print('DEPRECATED: Do not call this method or, even better, use SphinxQL instead of an API', file=sys.stderr)
assert (mode in [SPH_MATCH_ALL, SPH_MATCH_ANY, SPH_MATCH_PHRASE, SPH_MATCH_BOOLEAN, SPH_MATCH_EXTENDED,
SPH_MATCH_FULLSCAN, SPH_MATCH_EXTENDED2])
self._mode = mode
def SetRankingMode(self, ranker, rankexpr=''):
"""
Set ranking mode.
"""
assert (0 <= ranker < SPH_RANK_TOTAL)
self._ranker = ranker
self._rankexpr = rankexpr
def SetSortMode(self, mode, clause=''):
"""
Set sorting mode.
"""
assert (
mode in [SPH_SORT_RELEVANCE, SPH_SORT_ATTR_DESC, SPH_SORT_ATTR_ASC, SPH_SORT_TIME_SEGMENTS,
SPH_SORT_EXTENDED,
SPH_SORT_EXPR])
assert (isinstance(clause, str))
self._sort = mode
self._sortby = clause
def SetFieldWeights(self, weights):
"""
Bind per-field weights by name; expects (name,field_weight) dictionary as argument.
"""
assert (isinstance(weights, dict))
for key, val in list(weights.items()):
assert (isinstance(key, str))
AssertUInt32(val)
self._fieldweights = weights
def SetIndexWeights(self, weights):
"""
Bind per-index weights by name; expects (name,index_weight) dictionary as argument.
"""
assert (isinstance(weights, dict))
for key, val in list(weights.items()):
assert (isinstance(key, str))
AssertUInt32(val)
self._indexweights = weights
def SetIDRange(self, minid, maxid):
"""
Set IDs range to match.
Only match records if document ID is beetwen $min and $max (inclusive).
"""
assert (isinstance(minid, int))
assert (isinstance(maxid, int))
assert (minid <= maxid)
self._min_id = minid
self._max_id = maxid
def SetFilter(self, attribute, values, exclude=0):
"""
Set values set filter.
Only match records where 'attribute' value is in given 'values' set.
"""
assert (isinstance(attribute, str))
assert iter(values)
for value in values:
AssertInt32(value)
self._filters.append({'type': SPH_FILTER_VALUES, 'attr': attribute, 'exclude': exclude, 'values': values})
def SetFilterString(self, attribute, value, exclude=0):
"""
Set string filter.
Only match records where 'attribute' value is equal
"""
assert (isinstance(attribute, str))
assert (isinstance(value, str))
print(("attr='%s' val='%s' " % (attribute, value)))
self._filters.append({'type': SPH_FILTER_STRING, 'attr': attribute, 'exclude': exclude, 'value': value})
def SetFilterRange(self, attribute, min_, max_, exclude=0):
"""
Set range filter.
Only match records if 'attribute' value is beetwen 'min_' and 'max_' (inclusive).
"""
assert (isinstance(attribute, str))
AssertInt32(min_)
AssertInt32(max_)
assert (min_ <= max_)
self._filters.append(
{'type': SPH_FILTER_RANGE, 'attr': attribute, 'exclude': exclude, 'min': min_, 'max': max_})
def SetFilterFloatRange(self, attribute, min_, max_, exclude=0):
assert (isinstance(attribute, str))
assert (isinstance(min_, float))
assert (isinstance(max_, float))
assert (min_ <= max_)
self._filters.append(
{'type': SPH_FILTER_FLOATRANGE, 'attr': attribute, 'exclude': exclude, 'min': min_, 'max': max_})
def SetGeoAnchor(self, attrlat, attrlong, latitude, longitude):
assert (isinstance(attrlat, str))
assert (isinstance(attrlong, str))
assert (isinstance(latitude, float))
assert (isinstance(longitude, float))
self._anchor['attrlat'] = attrlat
self._anchor['attrlong'] = attrlong
self._anchor['lat'] = latitude
self._anchor['long'] = longitude
def SetGroupBy(self, attribute, func, groupsort='@group desc'):
"""
Set grouping attribute and function.
"""
assert (isinstance(attribute, str))
assert (func in [SPH_GROUPBY_DAY, SPH_GROUPBY_WEEK, SPH_GROUPBY_MONTH, SPH_GROUPBY_YEAR, SPH_GROUPBY_ATTR,
SPH_GROUPBY_ATTRPAIR])
assert (isinstance(groupsort, str))
self._groupby = attribute
self._groupfunc = func
self._groupsort = groupsort
def SetGroupDistinct(self, attribute):
assert (isinstance(attribute, str))
self._groupdistinct = attribute
def SetRetries(self, count, delay=0):
assert (isinstance(count, int) and count >= 0)
assert (isinstance(delay, int) and delay >= 0)
self._retrycount = count
self._retrydelay = delay
def SetOverride(self, name, type, values):
print('DEPRECATED: Do not call this method. Use SphinxQL REMAP() function instead.', file=sys.stderr)
assert (isinstance(name, str))
assert (type in SPH_ATTR_TYPES)
assert (isinstance(values, dict))
self._overrides[name] = {'name': name, 'type': type, 'values': values}
def SetSelect(self, select):
assert (isinstance(select, str))
self._select = select
def SetQueryFlag(self, name, value):
known_names = ["reverse_scan", "sort_method", "max_predicted_time", "boolean_simplify", "idf", "global_idf"]
flags = {"reverse_scan": [0, 1], "sort_method": ["pq", "kbuffer"], "max_predicted_time": [0],
"boolean_simplify": [True, False],
"idf": ["normalized", "plain", "tfidf_normalized", "tfidf_unnormalized"], "global_idf": [True, False]}
assert (name in known_names)
assert (value in flags[name] or (name == "max_predicted_time" and isinstance(value, int) and value >= 0))
if name == "reverse_scan":
self._query_flags = SetBit(self._query_flags, 0, value == 1)
if name == "sort_method":
self._query_flags = SetBit(self._query_flags, 1, value == "kbuffer")
if name == "max_predicted_time":
self._query_flags = SetBit(self._query_flags, 2, value > 0)
self._predictedtime = int(value)
if name == "boolean_simplify":
self._query_flags = SetBit(self._query_flags, 3, value)
if name == "idf" and (value == "plain" or value == "normalized"):
self._query_flags = SetBit(self._query_flags, 4, value == "plain")
if name == "global_idf":
self._query_flags = SetBit(self._query_flags, 5, value)
if name == "idf" and (value == "tfidf_normalized" or value == "tfidf_unnormalized"):
self._query_flags = SetBit(self._query_flags, 6, value == "tfidf_normalized")
def SetOuterSelect(self, orderby, offset, limit):
assert (isinstance(orderby, str))
assert (isinstance(offset, int))
assert (isinstance(limit, int))
assert (offset >= 0)
assert (limit > 0)
self._outerorderby = orderby
self._outeroffset = offset
self._outerlimit = limit
self._hasouter = True
def ResetOverrides(self):
self._overrides = {}
def ResetFilters(self):
"""
Clear all filters (for multi-queries).
"""
self._filters = []
self._anchor = {}
def ResetGroupBy(self):
"""
Clear groupby settings (for multi-queries).
"""
self._groupby = ''
self._groupfunc = SPH_GROUPBY_DAY
self._groupsort = '@group desc'
self._groupdistinct = ''
def ResetQueryFlag(self):
self._query_flags = SetBit(0, 6, True) # default idf=tfidf_normalized
self._predictedtime = 0
def ResetOuterSelect(self):
self._outerorderby = ''
self._outeroffset = 0
self._outerlimit = 0
self._hasouter = False
def Query(self, query, index='*', comment=''):
"""
Connect to searchd server and run given search query.
Returns None on failure; result set hash on success (see documentation for details).
"""
assert (len(self._reqs) == 0)
self.AddQuery(query, index, comment)
results = self.RunQueries()
self._reqs = [] # we won't re-run erroneous batch
if not results or len(results) == 0:
return None
self._error = results[0]['error']
self._warning = results[0]['warning']
if results[0]['status'] == SEARCHD_ERROR:
return None
return results[0]
def AddQuery(self, query, index='*', comment=''):
"""
Add query to batch.
"""
# build request
req = []
req.append(pack('>5L', self._query_flags, self._offset, self._limit, self._mode, self._ranker))
if self._ranker == SPH_RANK_EXPR:
req.append(pack('>L', len(self._rankexpr)))
req.append(self._rankexpr)
req.append(pack('>L', self._sort))
req.append(pack('>L', len(self._sortby)))
req.append(self._sortby.encode())
if isinstance(query, str):
query = query.encode()
assert (isinstance(query, bytes))
req.append(pack('>L', len(query)))
req.append(query)
req.append(pack('>L', len(self._weights)))
for w in self._weights:
req.append(pack('>L', w))
assert (isinstance(index, str))
req.append(pack('>L', len(index)))
req.append(index.encode())
req.append(pack('>L', 1)) # id64 range marker
req.append(pack('>Q', self._min_id))
req.append(pack('>Q', self._max_id))
# filters
req.append(pack('>L', len(self._filters)))
for f in self._filters:
req.append(pack('>L', len(f['attr'])) + f['attr'].encode())
filtertype = f['type']
req.append(pack('>L', filtertype))
if filtertype == SPH_FILTER_VALUES:
req.append(pack('>L', len(f['values'])))
for val in f['values']:
req.append(pack('>q', val))
elif filtertype == SPH_FILTER_RANGE:
req.append(pack('>2q', f['min'], f['max']))
elif filtertype == SPH_FILTER_FLOATRANGE:
req.append(pack('>2f', f['min'], f['max']))
elif filtertype == SPH_FILTER_STRING:
req.append(pack('>L', len(f['value'])))
req.append(f['value'])
req.append(pack('>L', f['exclude']))
# group-by, max-matches, group-sort
req.append(pack('>2L', self._groupfunc, len(self._groupby)))
req.append(self._groupby.encode())
req.append(pack('>2L', self._maxmatches, len(self._groupsort)))
req.append(self._groupsort.encode())
req.append(pack('>LLL', self._cutoff, self._retrycount, self._retrydelay))
req.append(pack('>L', len(self._groupdistinct)))
req.append(self._groupdistinct.encode())
# anchor point
if len(self._anchor) == 0:
req.append(pack('>L', 0))
else:
attrlat, attrlong = self._anchor['attrlat'], self._anchor['attrlong']
latitude, longitude = self._anchor['lat'], self._anchor['long']
req.append(pack('>L', 1))
req.append(pack('>L', len(attrlat)) + attrlat)
req.append(pack('>L', len(attrlong)) + attrlong)
req.append(pack('>f', latitude) + pack('>f', longitude))
# per-index weights
req.append(pack('>L', len(self._indexweights)))
for indx, weight in list(self._indexweights.items()):
req.append(pack('>L', len(indx)) + indx + pack('>L', weight))
# max query time
req.append(pack('>L', self._maxquerytime))
# per-field weights
req.append(pack('>L', len(self._fieldweights)))
for field, weight in list(self._fieldweights.items()):
req.append(pack('>L', len(field)) + field + pack('>L', weight))
# comment
comment = str(comment)
req.append(pack('>L', len(comment)) + comment.encode())
# attribute overrides
req.append(pack('>L', len(self._overrides)))
for v in list(self._overrides.values()):
req.extend((pack('>L', len(v['name'])), v['name']))
req.append(pack('>LL', v['type'], len(v['values'])))
for id, value in v['values'].items():
req.append(pack('>Q', id))
if v['type'] == SPH_ATTR_FLOAT:
req.append(pack('>f', value))
elif v['type'] == SPH_ATTR_BIGINT:
req.append(pack('>q', value))
else:
req.append(pack('>l', value))
# select-list
req.append(pack('>L', len(self._select)))
req.append(self._select.encode())
if self._predictedtime > 0:
req.append(pack('>L', self._predictedtime))
# outer
req.append(pack('>L', len(self._outerorderby)) + self._outerorderby.encode())
req.append(pack('>2L', self._outeroffset, self._outerlimit))
if self._hasouter:
req.append(pack('>L', 1))
else:
req.append(pack('>L', 0))
# send query, get response
req = b''.join(req)
self._reqs.append(req)
return
def RunQueries(self):
"""
Run queries batch.
Returns None on network IO failure; or an array of result set hashes on success.
"""
if len(self._reqs) == 0:
self._error = 'no queries defined, issue AddQuery() first'
return None
sock = self._Connect()
if not sock:
return None
req = b''.join(self._reqs)
length = len(req) + 8
req = pack('>HHLLL', SEARCHD_COMMAND_SEARCH, VER_COMMAND_SEARCH, length, 0, len(self._reqs)) + req
self._Send(sock, req)
response = self._GetResponse(sock, VER_COMMAND_SEARCH)
if not response:
return None
nreqs = len(self._reqs)
# parse response
max_ = len(response)
p = 0
results = []
for i in range(0, nreqs, 1):
result = {}
results.append(result)
result['error'] = ''
result['warning'] = ''
status = unpack('>L', response[p:p + 4])[0]
p += 4
result['status'] = status
if status != SEARCHD_OK:
length = unpack('>L', response[p:p + 4])[0]
p += 4
message = response[p:p + length]
p += length
if status == SEARCHD_WARNING:
result['warning'] = message
else:
result['error'] = message
continue
# read schema
fields = []
attrs = []
nfields = unpack('>L', response[p:p + 4])[0]
p += 4
while nfields > 0 and p < max_:
nfields -= 1
length = unpack('>L', response[p:p + 4])[0]
p += 4
fields.append(response[p:p + length].decode())
p += length
result['fields'] = fields
nattrs = unpack('>L', response[p:p + 4])[0]
p += 4
while nattrs > 0 and p < max_:
nattrs -= 1
length = unpack('>L', response[p:p + 4])[0]
p += 4
attr = response[p:p + length].decode()
p += length
type_ = unpack('>L', response[p:p + 4])[0]
p += 4
attrs.append([attr, type_])
result['attrs'] = attrs
# read match count
count = unpack('>L', response[p:p + 4])[0]
p += 4
id64 = unpack('>L', response[p:p + 4])[0]
p += 4
# read matches
result['matches'] = []
while count > 0 and p < max_:
count -= 1
if id64:
doc, weight = unpack('>QL', response[p:p + 12])
p += 12
else:
doc, weight = unpack('>2L', response[p:p + 8])
p += 8
match = {'id': doc, 'weight': weight, 'attrs': {}}
for i in range(len(attrs)):
if attrs[i][1] == SPH_ATTR_FLOAT:
match['attrs'][attrs[i][0]] = unpack('>f', response[p:p + 4])[0]
elif attrs[i][1] == SPH_ATTR_BIGINT:
match['attrs'][attrs[i][0]] = unpack('>q', response[p:p + 8])[0]
p += 4
elif attrs[i][1] == SPH_ATTR_STRING:
slen = unpack('>L', response[p:p + 4])[0]
p += 4
match['attrs'][attrs[i][0]] = ''
if slen > 0:
match['attrs'][attrs[i][0]] = response[p:p + slen].decode()
p += slen - 4
elif attrs[i][1] == SPH_ATTR_FACTORS:
slen = unpack('>L', response[p:p + 4])[0]
p += 4
match['attrs'][attrs[i][0]] = ''
if slen > 0:
match['attrs'][attrs[i][0]] = response[p:p + slen - 4]
p += slen - 4
p -= 4
elif attrs[i][1] == SPH_ATTR_MULTI:
match['attrs'][attrs[i][0]] = []
nvals = unpack('>L', response[p:p + 4])[0]
p += 4
for n in range(0, nvals, 1):
match['attrs'][attrs[i][0]].append(unpack('>L', response[p:p + 4])[0])
p += 4
p -= 4
elif attrs[i][1] == SPH_ATTR_MULTI64:
match['attrs'][attrs[i][0]] = []
nvals = unpack('>L', response[p:p + 4])[0]
nvals /= 2
p += 4
for n in range(0, nvals, 1):
match['attrs'][attrs[i][0]].append(unpack('>q', response[p:p + 8])[0])
p += 8
p -= 4
else:
match['attrs'][attrs[i][0]] = unpack('>L', response[p:p + 4])[0]
p += 4
result['matches'].append(match)
result['total'], result['total_found'], result['time'], words = unpack('>4L', response[p:p + 16])
result['time'] = '%.3f' % (result['time'] / 1000.0)
p += 16
result['words'] = []
while words > 0:
words -= 1
length = unpack('>L', response[p:p + 4])[0]
p += 4
word = response[p:p + length]
p += length
docs, hits = unpack('>2L', response[p:p + 8])
p += 8
result['words'].append({'word': word, 'docs': docs, 'hits': hits})
self._reqs = []
return results
def BuildExcerpts(self, docs, index, words, opts=None):
"""
Connect to searchd server and generate exceprts from given documents.
"""
if not opts:
opts = {}
if isinstance(words, str):
words = words.encode('utf-8')
assert (isinstance(docs, list))
assert (isinstance(index, str))
assert (isinstance(words, str))
assert (isinstance(opts, dict))
sock = self._Connect()
if not sock:
return None
# fixup options
opts.setdefault('before_match', '<b>')
opts.setdefault('after_match', '</b>')
opts.setdefault('chunk_separator', ' ... ')
opts.setdefault('html_strip_mode', 'index')
opts.setdefault('limit', 256)
opts.setdefault('limit_passages', 0)
opts.setdefault('limit_words', 0)
opts.setdefault('around', 5)
opts.setdefault('start_passage_id', 1)
opts.setdefault('passage_boundary', 'none')
# build request
# v.1.0 req
flags = 1 # (remove spaces)
if opts.get('exact_phrase'): flags |= 2
if opts.get('single_passage'): flags |= 4
if opts.get('use_boundaries'): flags |= 8
if opts.get('weight_order'): flags |= 16
if opts.get('query_mode'): flags |= 32
if opts.get('force_all_words'): flags |= 64
if opts.get('load_files'): flags |= 128
if opts.get('allow_empty'): flags |= 256
if opts.get('emit_zones'): flags |= 512
if opts.get('load_files_scattered'): flags |= 1024
# mode=0, flags
req = [pack('>2L', 0, flags)]
# req index
req.append(pack('>L', len(index)))
req.append(index.encode())
# req words
req.append(pack('>L', len(words)))
req.append(words.encode())
# options
req.append(pack('>L', len(opts['before_match'])))
req.append(opts['before_match'])
req.append(pack('>L', len(opts['after_match'])))
req.append(opts['after_match'])
req.append(pack('>L', len(opts['chunk_separator'])))
req.append(opts['chunk_separator'])
req.append(pack('>L', int(opts['limit'])))
req.append(pack('>L', int(opts['around'])))
req.append(pack('>L', int(opts['limit_passages'])))
req.append(pack('>L', int(opts['limit_words'])))
req.append(pack('>L', int(opts['start_passage_id'])))
req.append(pack('>L', len(opts['html_strip_mode'])))
req.append((opts['html_strip_mode']))
req.append(pack('>L', len(opts['passage_boundary'])))
req.append((opts['passage_boundary']))
# documents
req.append(pack('>L', len(docs)))
for doc in docs:
if isinstance(doc, str):
doc = doc.encode('utf-8')
assert (isinstance(doc, str))
req.append(pack('>L', len(doc)))
req.append(doc.encode())
req = b''.join(req)
# send query, get response
length = len(req)
# add header
req = pack('>2HL', SEARCHD_COMMAND_EXCERPT, VER_COMMAND_EXCERPT, length) + req
self._Send(sock, req)
response = self._GetResponse(sock, VER_COMMAND_EXCERPT)
if not response:
return []
# parse response
pos = 0
res = []
rlen = len(response)
for i in range(len(docs)):
length = unpack('>L', response[pos:pos + 4])[0]
pos += 4
if pos + length > rlen:
self._error = 'incomplete reply'
return []
res.append(response[pos:pos + length])
pos += length
return res
def UpdateAttributes(self, index, attrs, values, mva=False, ignorenonexistent=False):
"""
Update given attribute values on given documents in given indexes.
Returns amount of updated documents (0 or more) on success, or -1 on failure.
'attrs' must be a list of strings.
'values' must be a dict with int key (document ID) and list of int values (new attribute values).
optional boolean parameter 'mva' points that there is update of MVA attributes.
In this case the 'values' must be a dict with int key (document ID) and list of lists of int values
(new MVA attribute values).
Optional boolean parameter 'ignorenonexistent' points that the update will silently ignore any warnings about
trying to update a column which is not exists in current index schema.
Example:
res = cl.UpdateAttributes ( 'test1', [ 'group_id', 'date_added' ], { 2:[123,1000000000], 4:[456,1234567890] } )
"""
assert (isinstance(index, str))
assert (isinstance(attrs, list))
assert (isinstance(values, dict))
for attr in attrs:
assert (isinstance(attr, str))
for docid, entry in list(values.items()):
AssertUInt32(docid)
assert (isinstance(entry, list))
assert (len(attrs) == len(entry))
for val in entry:
if mva:
assert (isinstance(val, list))
for vals in val:
AssertInt32(vals)
else:
AssertInt32(val)
# build request
req = [pack('>L', len(index)), index]
req.append(pack('>L', len(attrs)))
ignore_absent = 0
if ignorenonexistent: ignore_absent = 1
req.append(pack('>L', ignore_absent))
mva_attr = 0
if mva: mva_attr = 1
for attr in attrs:
req.append(pack('>L', len(attr)) + attr)
req.append(pack('>L', mva_attr))
req.append(pack('>L', len(values)))
for docid, entry in list(values.items()):
req.append(pack('>Q', docid))
for val in entry:
val_len = val
if mva: val_len = len(val)
req.append(pack('>L', val_len))
if mva:
for vals in val:
req.append(pack('>L', vals))
# connect, send query, get response
sock = self._Connect()
if not sock:
return None
req = ''.join(req)
length = len(req)
req = pack('>2HL', SEARCHD_COMMAND_UPDATE, VER_COMMAND_UPDATE, length) + req.encode()
self._Send(sock, req)
response = self._GetResponse(sock, VER_COMMAND_UPDATE)
if not response:
return -1
# parse response
updated = unpack('>L', response[0:4])[0]
return updated
def BuildKeywords(self, query, index, hits):
"""
Connect to searchd server, and generate keywords list for a given query.
Returns None on failure, or a list of keywords on success.
"""
assert (isinstance(query, str))
assert (isinstance(index, str))
assert (isinstance(hits, int))
# build request
req = [pack('>L', len(query)) + query.encode()]
req.append(pack('>L', len(index)) + index.encode())
req.append(pack('>L', hits))
# connect, send query, get response
sock = self._Connect()
if not sock:
return None
req = b''.join(req)
length = len(req)
req = pack('>2HL', SEARCHD_COMMAND_KEYWORDS, VER_COMMAND_KEYWORDS, length) + req
self._Send(sock, req)
response = self._GetResponse(sock, VER_COMMAND_KEYWORDS)
if not response:
return None
# parse response
res = []
nwords = unpack('>L', response[0:4])[0]
p = 4
max_ = len(response)
while nwords > 0 and p < max_:
nwords -= 1
length = unpack('>L', response[p:p + 4])[0]
p += 4
tokenized = response[p:p + length]
p += length
length = unpack('>L', response[p:p + 4])[0]
p += 4
normalized = response[p:p + length]
p += length
entry = {'tokenized': tokenized, 'normalized': normalized}
if hits:
entry['docs'], entry['hits'] = unpack('>2L', response[p:p + 8])
p += 8
res.append(entry)
if nwords > 0 or p > max_:
self._error = 'incomplete reply'
return None
return res
def Status(self, session=False):
"""
Get the status
"""
# connect, send query, get response
sock = self._Connect()
if not sock:
return None
sess = 1
if session:
sess = 0
req = pack('>2HLL', SEARCHD_COMMAND_STATUS, VER_COMMAND_STATUS, 4, sess)
self._Send(sock, req)
response = self._GetResponse(sock, VER_COMMAND_STATUS)
if not response:
return None
# parse response
res = []
p = 8
max_ = len(response)
while p < max_:
length = unpack('>L', response[p:p + 4])[0]
k = response[p + 4:p + length + 4]
p += 4 + length
length = unpack('>L', response[p:p + 4])[0]
v = response[p + 4:p + length + 4]
p += 4 + length
res += [[k, v]]
return res
### persistent connections
def Open(self):
if self._socket:
self._error = 'already connected'
return None
server = self._Connect()
if not server:
return None
# command, command version = 0, body length = 4, body = 1
request = pack('>hhII', SEARCHD_COMMAND_PERSIST, 0, 4, 1)
self._Send(server, request)
self._socket = server
return True
def Close(self):
if not self._socket:
self._error = 'not connected'
return
self._socket.close()
self._socket = None
def EscapeString(self, string):
return re.sub(r'([()|!@~"&/\\^$=<-])', r"\\\1", string)
def FlushAttributes(self):
sock = self._Connect()
if not sock:
return -1
request = pack('>hhI', SEARCHD_COMMAND_FLUSHATTRS, VER_COMMAND_FLUSHATTRS, 0) # cmd, ver, bodylen
self._Send(sock, request)
response = self._GetResponse(sock, VER_COMMAND_FLUSHATTRS)
if not response or len(response) != 4:
self._error = 'unexpected response length'
return -1
tag = unpack('>L', response[0:4])[0]
return tag
def AssertInt32(value):
assert (isinstance(value, int))
assert (-2 ** 32 - 1 <= value <= 2 ** 32 - 1)
def AssertUInt32(value):
assert (isinstance(value, int))
assert (0 <= value <= 2 ** 32 - 1)
def SetBit(flag, bit, on):
if on:
flag += (1 << bit)
else:
reset = 255 ^ (1 << bit)
flag &= reset
return flag
#
# $Id: sphinxapi.py 4885 2015-01-20 07:02:07Z deogar $
#