Compare commits
	
		
			8 Commits
		
	
	
		
			v0.16.0
			...
			unicode-em
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 5e0b97e90c | ||
|  | a0a3805e2d | ||
|  | 7d5caf8368 | ||
|  | dee5465440 | ||
|  | 33e50e48c1 | ||
|  | 41371e5fc5 | ||
|  | ce86ea4c9a | ||
|  | 601b79865d | 
| @@ -2,6 +2,7 @@ import datetime | |||||||
| import decimal | import decimal | ||||||
| import itertools | import itertools | ||||||
| import re | import re | ||||||
|  | import socket | ||||||
| import time | import time | ||||||
| import uuid | import uuid | ||||||
| import warnings | import warnings | ||||||
| @@ -154,21 +155,105 @@ class EmailField(StringField): | |||||||
|  |  | ||||||
|     .. versionadded:: 0.4 |     .. versionadded:: 0.4 | ||||||
|     """ |     """ | ||||||
|  |     USER_REGEX = re.compile( | ||||||
|     EMAIL_REGEX = re.compile( |         # `dot-atom` defined in RFC 5322 Section 3.2.3. | ||||||
|         # dot-atom |         r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*\Z" | ||||||
|         r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*" |         # `quoted-string` defined in RFC 5322 Section 3.2.4. | ||||||
|         # quoted-string |         r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-\011\013\014\016-\177])*"\Z)', | ||||||
|         r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-011\013\014\016-\177])*"' |         re.IGNORECASE | ||||||
|         # domain (max length of an ICAAN TLD is 22 characters) |  | ||||||
|         r')@(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}|[A-Z0-9-]{2,}(?<!-))$', re.IGNORECASE |  | ||||||
|     ) |     ) | ||||||
|  |  | ||||||
|  |     UTF8_USER_REGEX = re.compile( | ||||||
|  |         six.u( | ||||||
|  |             # RFC 6531 Section 3.3 extends `atext` (used by dot-atom) to | ||||||
|  |             # include `UTF8-non-ascii`. | ||||||
|  |             r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z\u0080-\U0010FFFF]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z\u0080-\U0010FFFF]+)*\Z" | ||||||
|  |             # `quoted-string` | ||||||
|  |             r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-\011\013\014\016-\177])*"\Z)' | ||||||
|  |         ), re.IGNORECASE | re.UNICODE | ||||||
|  |     ) | ||||||
|  |  | ||||||
|  |     DOMAIN_REGEX = re.compile( | ||||||
|  |         r'((?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+)(?:[A-Z0-9-]{2,63}(?<!-))\Z', | ||||||
|  |         re.IGNORECASE | ||||||
|  |     ) | ||||||
|  |  | ||||||
|  |     error_msg = u'Invalid email address: %s' | ||||||
|  |  | ||||||
|  |     def __init__(self, domain_whitelist=None, allow_utf8_user=False, | ||||||
|  |                  allow_ip_domain=False, *args, **kwargs): | ||||||
|  |         """Initialize the EmailField. | ||||||
|  |  | ||||||
|  |         Args: | ||||||
|  |             domain_whitelist (list) - list of otherwise invalid domain | ||||||
|  |                                       names which you'd like to support. | ||||||
|  |             allow_utf8_user (bool) - if True, the user part of the email | ||||||
|  |                                      address can contain UTF8 characters. | ||||||
|  |                                      False by default. | ||||||
|  |             allow_ip_domain (bool) - if True, the domain part of the email | ||||||
|  |                                      can be a valid IPv4 or IPv6 address. | ||||||
|  |         """ | ||||||
|  |         self.domain_whitelist = domain_whitelist or [] | ||||||
|  |         self.allow_utf8_user = allow_utf8_user | ||||||
|  |         self.allow_ip_domain = allow_ip_domain | ||||||
|  |         super(EmailField, self).__init__(*args, **kwargs) | ||||||
|  |  | ||||||
|  |     def validate_user_part(self, user_part): | ||||||
|  |         """Validate the user part of the email address. Return True if | ||||||
|  |         valid and False otherwise. | ||||||
|  |         """ | ||||||
|  |         if self.allow_utf8_user: | ||||||
|  |             return self.UTF8_USER_REGEX.match(user_part) | ||||||
|  |         return self.USER_REGEX.match(user_part) | ||||||
|  |  | ||||||
|  |     def validate_domain_part(self, domain_part): | ||||||
|  |         """Validate the domain part of the email address. Return True if | ||||||
|  |         valid and False otherwise. | ||||||
|  |         """ | ||||||
|  |         # Skip domain validation if it's in the whitelist. | ||||||
|  |         if domain_part in self.domain_whitelist: | ||||||
|  |             return True | ||||||
|  |  | ||||||
|  |         if self.DOMAIN_REGEX.match(domain_part): | ||||||
|  |             return True | ||||||
|  |  | ||||||
|  |         # Validate IPv4/IPv6, e.g. user@[192.168.0.1] | ||||||
|  |         if ( | ||||||
|  |             self.allow_ip_domain and | ||||||
|  |             domain_part[0] == '[' and | ||||||
|  |             domain_part[-1] == ']' | ||||||
|  |         ): | ||||||
|  |             for addr_family in (socket.AF_INET, socket.AF_INET6): | ||||||
|  |                 try: | ||||||
|  |                     socket.inet_pton(addr_family, domain_part[1:-1]) | ||||||
|  |                     return True | ||||||
|  |                 except (socket.error, UnicodeEncodeError): | ||||||
|  |                     pass | ||||||
|  |  | ||||||
|  |         return False | ||||||
|  |  | ||||||
|     def validate(self, value): |     def validate(self, value): | ||||||
|         if not EmailField.EMAIL_REGEX.match(value): |  | ||||||
|             self.error('Invalid email address: %s' % value) |  | ||||||
|         super(EmailField, self).validate(value) |         super(EmailField, self).validate(value) | ||||||
|  |  | ||||||
|  |         if '@' not in value: | ||||||
|  |             self.error(self.error_msg % value) | ||||||
|  |  | ||||||
|  |         user_part, domain_part = value.rsplit('@', 1) | ||||||
|  |  | ||||||
|  |         # Validate the user part. | ||||||
|  |         if not self.validate_user_part(user_part): | ||||||
|  |             self.error(self.error_msg % value) | ||||||
|  |  | ||||||
|  |         # Validate the domain and, if invalid, see if it's IDN-encoded. | ||||||
|  |         if not self.validate_domain_part(domain_part): | ||||||
|  |             try: | ||||||
|  |                 domain_part = domain_part.encode('idna').decode('ascii') | ||||||
|  |             except UnicodeError: | ||||||
|  |                 self.error(self.error_msg % value) | ||||||
|  |             else: | ||||||
|  |                 if not self.validate_domain_part(domain_part): | ||||||
|  |                     self.error(self.error_msg % value) | ||||||
|  |  | ||||||
|  |  | ||||||
| class IntField(BaseField): | class IntField(BaseField): | ||||||
|     """32-bit integer field.""" |     """32-bit integer field.""" | ||||||
|   | |||||||
| @@ -844,7 +844,7 @@ class InstanceTest(unittest.TestCase): | |||||||
|         class Recipient(Document): |         class Recipient(Document): | ||||||
|             email = EmailField(required=True) |             email = EmailField(required=True) | ||||||
|  |  | ||||||
|         recipient = Recipient(email='root@localhost') |         recipient = Recipient(email='not-an-email') | ||||||
|         self.assertRaises(ValidationError, recipient.save) |         self.assertRaises(ValidationError, recipient.save) | ||||||
|         recipient.save(validate=False) |         recipient.save(validate=False) | ||||||
|  |  | ||||||
|   | |||||||
| @@ -6,6 +6,7 @@ import math | |||||||
| import itertools | import itertools | ||||||
| import re | import re | ||||||
| import pymongo | import pymongo | ||||||
|  | import sys | ||||||
|  |  | ||||||
| from nose.plugins.skip import SkipTest | from nose.plugins.skip import SkipTest | ||||||
| from collections import OrderedDict | from collections import OrderedDict | ||||||
| @@ -342,8 +343,6 @@ class FieldTest(MongoDBTestCase): | |||||||
|         class Link(Document): |         class Link(Document): | ||||||
|             url = URLField() |             url = URLField() | ||||||
|  |  | ||||||
|         Link.drop_collection() |  | ||||||
|  |  | ||||||
|         link = Link() |         link = Link() | ||||||
|         link.url = 'google' |         link.url = 'google' | ||||||
|         self.assertRaises(ValidationError, link.validate) |         self.assertRaises(ValidationError, link.validate) | ||||||
| @@ -356,8 +355,6 @@ class FieldTest(MongoDBTestCase): | |||||||
|         class Link(Document): |         class Link(Document): | ||||||
|             url = URLField() |             url = URLField() | ||||||
|  |  | ||||||
|         Link.drop_collection() |  | ||||||
|  |  | ||||||
|         link = Link() |         link = Link() | ||||||
|         link.url = u'http://привет.com' |         link.url = u'http://привет.com' | ||||||
|  |  | ||||||
| @@ -3456,23 +3453,99 @@ class FieldTest(MongoDBTestCase): | |||||||
|         class User(Document): |         class User(Document): | ||||||
|             email = EmailField() |             email = EmailField() | ||||||
|  |  | ||||||
|         user = User(email="ross@example.com") |         user = User(email='ross@example.com') | ||||||
|         self.assertTrue(user.validate() is None) |         user.validate() | ||||||
|  |  | ||||||
|         user = User(email="ross@example.co.uk") |         user = User(email='ross@example.co.uk') | ||||||
|         self.assertTrue(user.validate() is None) |         user.validate() | ||||||
|  |  | ||||||
|         user = User(email=("Kofq@rhom0e4klgauOhpbpNdogawnyIKvQS0wk2mjqrgGQ5S" |         user = User(email=('Kofq@rhom0e4klgauOhpbpNdogawnyIKvQS0wk2mjqrgGQ5S' | ||||||
|                            "aJIazqqWkm7.net")) |                            'aJIazqqWkm7.net')) | ||||||
|         self.assertTrue(user.validate() is None) |         user.validate() | ||||||
|  |  | ||||||
|         user = User(email="new-tld@example.technology") |         user = User(email='new-tld@example.technology') | ||||||
|         self.assertTrue(user.validate() is None) |         user.validate() | ||||||
|  |  | ||||||
|  |         user = User(email='ross@example.com.') | ||||||
|  |         self.assertRaises(ValidationError, user.validate) | ||||||
|  |  | ||||||
|  |         # unicode domain | ||||||
|  |         user = User(email=u'user@пример.рф') | ||||||
|  |         user.validate() | ||||||
|  |  | ||||||
|  |         # invalid unicode domain | ||||||
|  |         user = User(email=u'user@пример') | ||||||
|  |         self.assertRaises(ValidationError, user.validate) | ||||||
|  |  | ||||||
|  |         # invalid data type | ||||||
|  |         user = User(email=123) | ||||||
|  |         self.assertRaises(ValidationError, user.validate) | ||||||
|  |  | ||||||
|  |     def test_email_field_unicode_user(self): | ||||||
|  |         # Don't run this test on pypy3, which doesn't support unicode regex: | ||||||
|  |         # https://bitbucket.org/pypy/pypy/issues/1821/regular-expression-doesnt-find-unicode | ||||||
|  |         if sys.version_info[:2] == (3, 2): | ||||||
|  |             raise SkipTest('unicode email addresses are not supported on PyPy 3') | ||||||
|  |  | ||||||
|  |         class User(Document): | ||||||
|  |             email = EmailField() | ||||||
|  |  | ||||||
|  |         # unicode user shouldn't validate by default... | ||||||
|  |         user = User(email=u'Dörte@Sörensen.example.com') | ||||||
|  |         self.assertRaises(ValidationError, user.validate) | ||||||
|  |  | ||||||
|  |         # ...but it should be fine with allow_utf8_user set to True | ||||||
|  |         class User(Document): | ||||||
|  |             email = EmailField(allow_utf8_user=True) | ||||||
|  |  | ||||||
|  |         user = User(email=u'Dörte@Sörensen.example.com') | ||||||
|  |         user.validate() | ||||||
|  |  | ||||||
|  |     def test_email_field_domain_whitelist(self): | ||||||
|  |         class User(Document): | ||||||
|  |             email = EmailField() | ||||||
|  |  | ||||||
|  |         # localhost domain shouldn't validate by default... | ||||||
|         user = User(email='me@localhost') |         user = User(email='me@localhost') | ||||||
|         self.assertRaises(ValidationError, user.validate) |         self.assertRaises(ValidationError, user.validate) | ||||||
|  |  | ||||||
|         user = User(email="ross@example.com.") |         # ...but it should be fine if it's whitelisted | ||||||
|  |         class User(Document): | ||||||
|  |             email = EmailField(domain_whitelist=['localhost']) | ||||||
|  |  | ||||||
|  |         user = User(email='me@localhost') | ||||||
|  |         user.validate() | ||||||
|  |  | ||||||
|  |     def test_email_field_ip_domain(self): | ||||||
|  |         class User(Document): | ||||||
|  |             email = EmailField() | ||||||
|  |  | ||||||
|  |         valid_ipv4 = 'email@[127.0.0.1]' | ||||||
|  |         valid_ipv6 = 'email@[2001:dB8::1]' | ||||||
|  |         invalid_ip = 'email@[324.0.0.1]' | ||||||
|  |  | ||||||
|  |         # IP address as a domain shouldn't validate by default... | ||||||
|  |         user = User(email=valid_ipv4) | ||||||
|  |         self.assertRaises(ValidationError, user.validate) | ||||||
|  |  | ||||||
|  |         user = User(email=valid_ipv6) | ||||||
|  |         self.assertRaises(ValidationError, user.validate) | ||||||
|  |  | ||||||
|  |         user = User(email=invalid_ip) | ||||||
|  |         self.assertRaises(ValidationError, user.validate) | ||||||
|  |  | ||||||
|  |         # ...but it should be fine with allow_ip_domain set to True | ||||||
|  |         class User(Document): | ||||||
|  |             email = EmailField(allow_ip_domain=True) | ||||||
|  |  | ||||||
|  |         user = User(email=valid_ipv4) | ||||||
|  |         user.validate() | ||||||
|  |  | ||||||
|  |         user = User(email=valid_ipv6) | ||||||
|  |         user.validate() | ||||||
|  |  | ||||||
|  |         # invalid IP should still fail validation | ||||||
|  |         user = User(email=invalid_ip) | ||||||
|         self.assertRaises(ValidationError, user.validate) |         self.assertRaises(ValidationError, user.validate) | ||||||
|  |  | ||||||
|     def test_email_field_honors_regex(self): |     def test_email_field_honors_regex(self): | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user