Compare commits

...

8 Commits

Author SHA1 Message Date
Stefan Wojcik
5e0b97e90c add a test with an invalid data type 2017-04-16 11:50:28 -04:00
Stefan Wojcik
a0a3805e2d Revert "switch from octal to hex for consistency" (because of pypy3)
This reverts commit 7d5caf836855cc9b843d50df8b17aa4ff897bd6d.
2017-04-15 23:14:19 -04:00
Stefan Wojcik
7d5caf8368 switch from octal to hex for consistency 2017-04-15 22:20:19 -04:00
Stefan Wojcik
dee5465440 dont run the unicode email test on pypy3 2017-04-15 18:07:32 -04:00
Stefan Wojcik
33e50e48c1 use six.u 2017-04-15 16:46:36 -04:00
Stefan Wojcik
41371e5fc5 empty whitelist by default + allow_ip_domain option 2017-04-10 10:00:49 -04:00
Stefan Wojcik
ce86ea4c9a flake8 fixes 2017-04-10 08:18:56 -04:00
Stefan Wojcik
601b79865d support unicode in EmailField 2017-04-09 22:33:11 -04:00
3 changed files with 183 additions and 25 deletions

View File

@ -2,6 +2,7 @@ import datetime
import decimal import decimal
import itertools import itertools
import re import re
import socket
import time import time
import uuid import uuid
import warnings import warnings
@ -154,21 +155,105 @@ class EmailField(StringField):
.. versionadded:: 0.4 .. versionadded:: 0.4
""" """
USER_REGEX = re.compile(
EMAIL_REGEX = re.compile( # `dot-atom` defined in RFC 5322 Section 3.2.3.
# dot-atom r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*\Z"
r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*" # `quoted-string` defined in RFC 5322 Section 3.2.4.
# quoted-string r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-\011\013\014\016-\177])*"\Z)',
r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-011\013\014\016-\177])*"' re.IGNORECASE
# domain (max length of an ICAAN TLD is 22 characters)
r')@(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}|[A-Z0-9-]{2,}(?<!-))$', re.IGNORECASE
) )
UTF8_USER_REGEX = re.compile(
six.u(
# RFC 6531 Section 3.3 extends `atext` (used by dot-atom) to
# include `UTF8-non-ascii`.
r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z\u0080-\U0010FFFF]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z\u0080-\U0010FFFF]+)*\Z"
# `quoted-string`
r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-\011\013\014\016-\177])*"\Z)'
), re.IGNORECASE | re.UNICODE
)
DOMAIN_REGEX = re.compile(
r'((?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+)(?:[A-Z0-9-]{2,63}(?<!-))\Z',
re.IGNORECASE
)
error_msg = u'Invalid email address: %s'
def __init__(self, domain_whitelist=None, allow_utf8_user=False,
allow_ip_domain=False, *args, **kwargs):
"""Initialize the EmailField.
Args:
domain_whitelist (list) - list of otherwise invalid domain
names which you'd like to support.
allow_utf8_user (bool) - if True, the user part of the email
address can contain UTF8 characters.
False by default.
allow_ip_domain (bool) - if True, the domain part of the email
can be a valid IPv4 or IPv6 address.
"""
self.domain_whitelist = domain_whitelist or []
self.allow_utf8_user = allow_utf8_user
self.allow_ip_domain = allow_ip_domain
super(EmailField, self).__init__(*args, **kwargs)
def validate_user_part(self, user_part):
"""Validate the user part of the email address. Return True if
valid and False otherwise.
"""
if self.allow_utf8_user:
return self.UTF8_USER_REGEX.match(user_part)
return self.USER_REGEX.match(user_part)
def validate_domain_part(self, domain_part):
"""Validate the domain part of the email address. Return True if
valid and False otherwise.
"""
# Skip domain validation if it's in the whitelist.
if domain_part in self.domain_whitelist:
return True
if self.DOMAIN_REGEX.match(domain_part):
return True
# Validate IPv4/IPv6, e.g. user@[192.168.0.1]
if (
self.allow_ip_domain and
domain_part[0] == '[' and
domain_part[-1] == ']'
):
for addr_family in (socket.AF_INET, socket.AF_INET6):
try:
socket.inet_pton(addr_family, domain_part[1:-1])
return True
except (socket.error, UnicodeEncodeError):
pass
return False
def validate(self, value): def validate(self, value):
if not EmailField.EMAIL_REGEX.match(value):
self.error('Invalid email address: %s' % value)
super(EmailField, self).validate(value) super(EmailField, self).validate(value)
if '@' not in value:
self.error(self.error_msg % value)
user_part, domain_part = value.rsplit('@', 1)
# Validate the user part.
if not self.validate_user_part(user_part):
self.error(self.error_msg % value)
# Validate the domain and, if invalid, see if it's IDN-encoded.
if not self.validate_domain_part(domain_part):
try:
domain_part = domain_part.encode('idna').decode('ascii')
except UnicodeError:
self.error(self.error_msg % value)
else:
if not self.validate_domain_part(domain_part):
self.error(self.error_msg % value)
class IntField(BaseField): class IntField(BaseField):
"""32-bit integer field.""" """32-bit integer field."""

View File

@ -844,7 +844,7 @@ class InstanceTest(unittest.TestCase):
class Recipient(Document): class Recipient(Document):
email = EmailField(required=True) email = EmailField(required=True)
recipient = Recipient(email='root@localhost') recipient = Recipient(email='not-an-email')
self.assertRaises(ValidationError, recipient.save) self.assertRaises(ValidationError, recipient.save)
recipient.save(validate=False) recipient.save(validate=False)

View File

@ -6,6 +6,7 @@ import math
import itertools import itertools
import re import re
import pymongo import pymongo
import sys
from nose.plugins.skip import SkipTest from nose.plugins.skip import SkipTest
from collections import OrderedDict from collections import OrderedDict
@ -342,8 +343,6 @@ class FieldTest(MongoDBTestCase):
class Link(Document): class Link(Document):
url = URLField() url = URLField()
Link.drop_collection()
link = Link() link = Link()
link.url = 'google' link.url = 'google'
self.assertRaises(ValidationError, link.validate) self.assertRaises(ValidationError, link.validate)
@ -356,8 +355,6 @@ class FieldTest(MongoDBTestCase):
class Link(Document): class Link(Document):
url = URLField() url = URLField()
Link.drop_collection()
link = Link() link = Link()
link.url = u'http://привет.com' link.url = u'http://привет.com'
@ -3456,23 +3453,99 @@ class FieldTest(MongoDBTestCase):
class User(Document): class User(Document):
email = EmailField() email = EmailField()
user = User(email="ross@example.com") user = User(email='ross@example.com')
self.assertTrue(user.validate() is None) user.validate()
user = User(email="ross@example.co.uk") user = User(email='ross@example.co.uk')
self.assertTrue(user.validate() is None) user.validate()
user = User(email=("Kofq@rhom0e4klgauOhpbpNdogawnyIKvQS0wk2mjqrgGQ5S" user = User(email=('Kofq@rhom0e4klgauOhpbpNdogawnyIKvQS0wk2mjqrgGQ5S'
"aJIazqqWkm7.net")) 'aJIazqqWkm7.net'))
self.assertTrue(user.validate() is None) user.validate()
user = User(email="new-tld@example.technology") user = User(email='new-tld@example.technology')
self.assertTrue(user.validate() is None) user.validate()
user = User(email='ross@example.com.')
self.assertRaises(ValidationError, user.validate)
# unicode domain
user = User(email=u'user@пример.рф')
user.validate()
# invalid unicode domain
user = User(email=u'user@пример')
self.assertRaises(ValidationError, user.validate)
# invalid data type
user = User(email=123)
self.assertRaises(ValidationError, user.validate)
def test_email_field_unicode_user(self):
# Don't run this test on pypy3, which doesn't support unicode regex:
# https://bitbucket.org/pypy/pypy/issues/1821/regular-expression-doesnt-find-unicode
if sys.version_info[:2] == (3, 2):
raise SkipTest('unicode email addresses are not supported on PyPy 3')
class User(Document):
email = EmailField()
# unicode user shouldn't validate by default...
user = User(email=u'Dörte@Sörensen.example.com')
self.assertRaises(ValidationError, user.validate)
# ...but it should be fine with allow_utf8_user set to True
class User(Document):
email = EmailField(allow_utf8_user=True)
user = User(email=u'Dörte@Sörensen.example.com')
user.validate()
def test_email_field_domain_whitelist(self):
class User(Document):
email = EmailField()
# localhost domain shouldn't validate by default...
user = User(email='me@localhost') user = User(email='me@localhost')
self.assertRaises(ValidationError, user.validate) self.assertRaises(ValidationError, user.validate)
user = User(email="ross@example.com.") # ...but it should be fine if it's whitelisted
class User(Document):
email = EmailField(domain_whitelist=['localhost'])
user = User(email='me@localhost')
user.validate()
def test_email_field_ip_domain(self):
class User(Document):
email = EmailField()
valid_ipv4 = 'email@[127.0.0.1]'
valid_ipv6 = 'email@[2001:dB8::1]'
invalid_ip = 'email@[324.0.0.1]'
# IP address as a domain shouldn't validate by default...
user = User(email=valid_ipv4)
self.assertRaises(ValidationError, user.validate)
user = User(email=valid_ipv6)
self.assertRaises(ValidationError, user.validate)
user = User(email=invalid_ip)
self.assertRaises(ValidationError, user.validate)
# ...but it should be fine with allow_ip_domain set to True
class User(Document):
email = EmailField(allow_ip_domain=True)
user = User(email=valid_ipv4)
user.validate()
user = User(email=valid_ipv6)
user.validate()
# invalid IP should still fail validation
user = User(email=invalid_ip)
self.assertRaises(ValidationError, user.validate) self.assertRaises(ValidationError, user.validate)
def test_email_field_honors_regex(self): def test_email_field_honors_regex(self):