Adding multiprocessing support to mongoengine by using the identity of the process to define the connection. Each 'thread' gets its own pymongo connection.

This commit is contained in:
Samuel Clay 2010-09-16 17:19:58 -04:00
parent 327452622e
commit 20dd7562e0

View File

@ -1,5 +1,5 @@
from pymongo import Connection from pymongo import Connection
import multiprocessing
__all__ = ['ConnectionError', 'connect'] __all__ = ['ConnectionError', 'connect']
@ -8,12 +8,12 @@ _connection_settings = {
'host': 'localhost', 'host': 'localhost',
'port': 27017, 'port': 27017,
} }
_connection = None _connection = {}
_db_name = None _db_name = None
_db_username = None _db_username = None
_db_password = None _db_password = None
_db = None _db = {}
class ConnectionError(Exception): class ConnectionError(Exception):
@ -22,32 +22,39 @@ class ConnectionError(Exception):
def _get_connection(): def _get_connection():
global _connection global _connection
identity = get_identity()
# Connect to the database if not already connected # Connect to the database if not already connected
if _connection is None: if _connection.get(identity) is None:
try: try:
_connection = Connection(**_connection_settings) _connection[identity] = Connection(**_connection_settings)
except: except:
raise ConnectionError('Cannot connect to the database') raise ConnectionError('Cannot connect to the database')
return _connection return _connection[identity]
def _get_db(): def _get_db():
global _db, _connection global _db, _connection
identity = get_identity()
# Connect if not already connected # Connect if not already connected
if _connection is None: if _connection.get(identity) is None:
_connection = _get_connection() _connection[identity] = _get_connection()
if _db is None: if _db.get(identity) is None:
# _db_name will be None if the user hasn't called connect() # _db_name will be None if the user hasn't called connect()
if _db_name is None: if _db_name is None:
raise ConnectionError('Not connected to the database') raise ConnectionError('Not connected to the database')
# Get DB from current connection and authenticate if necessary # Get DB from current connection and authenticate if necessary
_db = _connection[_db_name] _db[identity] = _connection[identity][_db_name]
if _db_username and _db_password: if _db_username and _db_password:
_db.authenticate(_db_username, _db_password) _db[identity].authenticate(_db_username, _db_password)
return _db return _db[identity]
def get_identity():
identity = multiprocessing.current_process()._identity
identity = 0 if not identity else identity[0]
return identity
def connect(db, username=None, password=None, **kwargs): def connect(db, username=None, password=None, **kwargs):
"""Connect to the database specified by the 'db' argument. Connection """Connect to the database specified by the 'db' argument. Connection
settings may be provided here as well if the database is not running on settings may be provided here as well if the database is not running on