Added Sharding support
Added shard_key meta, so save() and update() passes shard keys to the pymongo query. Also made shard key fields immutable. Closes #388 and #389
This commit is contained in:
		
							
								
								
									
										3
									
								
								AUTHORS
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								AUTHORS
									
									
									
									
									
								
							| @@ -89,3 +89,6 @@ that much better: | |||||||
|  * Ankhbayar |  * Ankhbayar | ||||||
|  * Jan Schrewe |  * Jan Schrewe | ||||||
|  * David Koblas |  * David Koblas | ||||||
|  |  * Crittercism | ||||||
|  |  * Alvin Liang | ||||||
|  |  * andrewmlevy | ||||||
|   | |||||||
| @@ -5,6 +5,7 @@ Changelog | |||||||
| Changes in dev | Changes in dev | ||||||
| ============== | ============== | ||||||
|  |  | ||||||
|  | - Added sharding support | ||||||
| - Added pymongo 2.1 support | - Added pymongo 2.1 support | ||||||
| - Fixed Abstract documents can now declare indexes | - Fixed Abstract documents can now declare indexes | ||||||
| - Added db_alias support to individual documents | - Added db_alias support to individual documents | ||||||
|   | |||||||
| @@ -476,8 +476,29 @@ subsequent calls to :meth:`~mongoengine.queryset.QuerySet.order_by`. :: | |||||||
|     first_post = BlogPost.objects.order_by("+published_date").first() |     first_post = BlogPost.objects.order_by("+published_date").first() | ||||||
|     assert first_post.title == "Blog Post #1" |     assert first_post.title == "Blog Post #1" | ||||||
|  |  | ||||||
|  | Shard keys | ||||||
|  | ========== | ||||||
|  |  | ||||||
|  | If your collection is sharded, then you need to specify the shard key as a tuple, | ||||||
|  | using the :attr:`shard_key` attribute of :attr:`-mongoengine.Document.meta`. | ||||||
|  | This ensures that the shard key is sent with the query when calling the  | ||||||
|  | :meth:`~mongoengine.document.Document.save` or  | ||||||
|  | :meth:`~mongoengine.document.Document.update` method on an existing  | ||||||
|  | :class:`-mongoengine.Document` instance:: | ||||||
|  |  | ||||||
|  |     class LogEntry(Document): | ||||||
|  |         machine = StringField() | ||||||
|  |         app = StringField() | ||||||
|  |         timestamp = DateTimeField() | ||||||
|  |         data = StringField() | ||||||
|  |  | ||||||
|  |         meta = { | ||||||
|  |             'shard_key': ('machine', 'timestamp',) | ||||||
|  |         } | ||||||
|  |  | ||||||
| Document inheritance | Document inheritance | ||||||
| ==================== | ==================== | ||||||
|  |  | ||||||
| To create a specialised type of a :class:`~mongoengine.Document` you have | To create a specialised type of a :class:`~mongoengine.Document` you have | ||||||
| defined, you may subclass it and add any extra fields or methods you may need. | defined, you may subclass it and add any extra fields or methods you may need. | ||||||
| As this is new class is not a direct subclass of | As this is new class is not a direct subclass of | ||||||
|   | |||||||
| @@ -724,6 +724,7 @@ class TopLevelDocumentMetaclass(DocumentMetaclass): | |||||||
| class BaseDocument(object): | class BaseDocument(object): | ||||||
|  |  | ||||||
|     _dynamic = False |     _dynamic = False | ||||||
|  |     _created = True | ||||||
|  |  | ||||||
|     def __init__(self, **values): |     def __init__(self, **values): | ||||||
|         signals.pre_init.send(self.__class__, document=self, values=values) |         signals.pre_init.send(self.__class__, document=self, values=values) | ||||||
| @@ -757,6 +758,7 @@ class BaseDocument(object): | |||||||
|         if self._dynamic: |         if self._dynamic: | ||||||
|             for key, value in dynamic_data.items(): |             for key, value in dynamic_data.items(): | ||||||
|                 setattr(self, key, value) |                 setattr(self, key, value) | ||||||
|  |  | ||||||
|         signals.post_init.send(self.__class__, document=self) |         signals.post_init.send(self.__class__, document=self) | ||||||
|  |  | ||||||
|     def __setattr__(self, name, value): |     def __setattr__(self, name, value): | ||||||
| @@ -784,6 +786,11 @@ class BaseDocument(object): | |||||||
|             if hasattr(self, '_changed_fields'): |             if hasattr(self, '_changed_fields'): | ||||||
|                 self._mark_as_changed(name) |                 self._mark_as_changed(name) | ||||||
|             return |             return | ||||||
|  |  | ||||||
|  |         if not self._created and name in self._meta.get('shard_key', tuple()): | ||||||
|  |             from queryset import OperationError | ||||||
|  |             raise OperationError("Shard Keys are immutable. Tried to update %s" % name) | ||||||
|  |  | ||||||
|         super(BaseDocument, self).__setattr__(name, value) |         super(BaseDocument, self).__setattr__(name, value) | ||||||
|  |  | ||||||
|     def __expand_dynamic_values(self, name, value): |     def __expand_dynamic_values(self, name, value): | ||||||
| @@ -912,6 +919,7 @@ class BaseDocument(object): | |||||||
|  |  | ||||||
|         obj = cls(**data) |         obj = cls(**data) | ||||||
|         obj._changed_fields = changed_fields |         obj._changed_fields = changed_fields | ||||||
|  |         obj._created = False | ||||||
|         return obj |         return obj | ||||||
|  |  | ||||||
|     def _mark_as_changed(self, key): |     def _mark_as_changed(self, key): | ||||||
|   | |||||||
| @@ -171,6 +171,7 @@ class Document(BaseDocument): | |||||||
|         doc = self.to_mongo() |         doc = self.to_mongo() | ||||||
|  |  | ||||||
|         created = force_insert or '_id' not in doc |         created = force_insert or '_id' not in doc | ||||||
|  |  | ||||||
|         try: |         try: | ||||||
|             collection = self.__class__.objects._collection |             collection = self.__class__.objects._collection | ||||||
|             if created: |             if created: | ||||||
| @@ -181,10 +182,18 @@ class Document(BaseDocument): | |||||||
|             else: |             else: | ||||||
|                 object_id = doc['_id'] |                 object_id = doc['_id'] | ||||||
|                 updates, removals = self._delta() |                 updates, removals = self._delta() | ||||||
|  |  | ||||||
|  |                 # Need to add shard key to query, or you get an error | ||||||
|  |                 select_dict = {'_id': object_id} | ||||||
|  |                 shard_key = self.__class__._meta.get('shard_key', tuple()) | ||||||
|  |                 for k in shard_key: | ||||||
|  |                     actual_key = self._db_field_map.get(k, k) | ||||||
|  |                     select_dict[actual_key] = doc[actual_key] | ||||||
|  |  | ||||||
|                 if updates: |                 if updates: | ||||||
|                     collection.update({'_id': object_id}, {"$set": updates}, upsert=True, safe=safe, **write_options) |                     collection.update(select_dict, {"$set": updates}, upsert=True, safe=safe, **write_options) | ||||||
|                 if removals: |                 if removals: | ||||||
|                     collection.update({'_id': object_id}, {"$unset": removals}, upsert=True, safe=safe, **write_options) |                     collection.update(select_dict, {"$unset": removals}, upsert=True, safe=safe, **write_options) | ||||||
|  |  | ||||||
|             cascade = self._meta.get('cascade', True) if cascade is None else cascade |             cascade = self._meta.get('cascade', True) if cascade is None else cascade | ||||||
|             if cascade: |             if cascade: | ||||||
| @@ -238,7 +247,12 @@ class Document(BaseDocument): | |||||||
|         if not self.pk: |         if not self.pk: | ||||||
|             raise OperationError('attempt to update a document not yet saved') |             raise OperationError('attempt to update a document not yet saved') | ||||||
|  |  | ||||||
|         return self.__class__.objects(pk=self.pk).update_one(**kwargs) |         # Need to add shard key to query, or you get an error | ||||||
|  |         select_dict = {'pk': self.pk} | ||||||
|  |         shard_key = self.__class__._meta.get('shard_key', tuple()) | ||||||
|  |         for k in shard_key: | ||||||
|  |             select_dict[k] = getattr(self, k) | ||||||
|  |         return self.__class__.objects(**select_dict).update_one(**kwargs) | ||||||
|  |  | ||||||
|     def delete(self, safe=False): |     def delete(self, safe=False): | ||||||
|         """Delete the :class:`~mongoengine.Document` from the database. This |         """Delete the :class:`~mongoengine.Document` from the database. This | ||||||
| @@ -248,10 +262,8 @@ class Document(BaseDocument): | |||||||
|         """ |         """ | ||||||
|         signals.pre_delete.send(self.__class__, document=self) |         signals.pre_delete.send(self.__class__, document=self) | ||||||
|  |  | ||||||
|         id_field = self._meta['id_field'] |  | ||||||
|         object_id = self._fields[id_field].to_mongo(self[id_field]) |  | ||||||
|         try: |         try: | ||||||
|             self.__class__.objects(**{id_field: object_id}).delete(safe=safe) |             self.__class__.objects(pk=self.pk).delete(safe=safe) | ||||||
|         except pymongo.errors.OperationFailure, err: |         except pymongo.errors.OperationFailure, err: | ||||||
|             message = u'Could not delete document (%s)' % err.message |             message = u'Could not delete document (%s)' % err.message | ||||||
|             raise OperationError(message) |             raise OperationError(message) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user