Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions flask_multipass/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ class AuthProvider(metaclass=SupportsMeta):
#: form in your application, specify a :class:`~flask_wtf.Form`
#: here (usually containing a username/email and a password field).
login_form = None
#: The field name in the login form that contains the identifier.
#: Useful to reliably retrieve identifier data in applications that use
#: multiple auth providers.
identifier_field_name = None
#: The rate limiter used for login attempts in local auth providers.
#: This should be an instance of :class:`~flask_limiter.Limiter`.
rate_limiter = None
#: The rate limiter used for login attempts bound to a specific user
#: This should be an instance of :class:`~flask_limiter.Limiter`.
rate_limiter_user = None

def __init__(self, multipass, name, settings):
self.multipass = multipass
Expand All @@ -46,6 +56,12 @@ def is_external(self):
"""
return self.login_form is None

def is_rate_limited(self, form):
"""True if rate limiters are set for local auth provider."""
if self.is_external:
return False
return bool(self._get_exceeded_rate_limiter(form.data[self.identifier_field_name]))

def process_local_login(self, data): # pragma: no cover
"""Handles the login process based on form data.

Expand Down Expand Up @@ -113,5 +129,28 @@ def process_logout(self, return_url):
"""
return None

def notify_failed_login(self, identifier=None):
"""Notify the provider about a failed login attempt."""
if not self.is_rate_limited:
return
if identifier and self.rate_limiter_user:
self.rate_limiter_user.hit(identifier)
elif self.rate_limiter:
self.rate_limiter.hit()

def __repr__(self):
return f'<{type(self).__name__}({self.name})>'

def _get_exceeded_rate_limiter(self, identifier=None):
"""Return the rate limiter that has been exceeded."""
# TODO: Handle better
if not self.rate_limiter or not self.rate_limiter_user:
raise Exception
if self.rate_limiter.test():
return None
elif not self.rate_limiter_user.limits or not identifier:
return self.rate_limiter
elif self.rate_limiter_user.test(identifier):
return None
else:
return self.rate_limiter_user
2 changes: 2 additions & 0 deletions flask_multipass/providers/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class SQLAlchemyAuthProviderBase(AuthProvider):

#: The :class:`~flask_wtf.Form` that is used for the login dialog
login_form = LoginForm
#: The field name in the login form that contains the identifier
identifier_field_name = LoginForm.identifier.name
#: The Flask-SQLAlchemy model representing a user identity
identity_model = None
#: The column of the identity model that contains the provider
Expand Down
1 change: 1 addition & 0 deletions flask_multipass/providers/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class StaticAuthProvider(AuthProvider):
"""

login_form = StaticLoginForm
identifier_field_name = StaticLoginForm.username.name

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down