Skip to content

Commit

Permalink
[refs #22] Per-user keyrings, session invalidation
Browse files Browse the repository at this point in the history
Generate a Keyring for each user id, this makes it possible to
invalidate the keyring on logout, meaning other user sessions using
the same authentication token will become invalid.

This protects against session stealing (copying the __ac cookie) and
creates a working server-side log-out.
  • Loading branch information
david-batranu committed Mar 30, 2022
1 parent e952d4a commit d6c7d5a
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions plone/session/plugins/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from App.config import getConfiguration
from email.utils import formatdate
from plone.keyring.interfaces import IKeyManager
from plone.keyring.keyring import Keyring
from plone.session import tktauth
from plone.session.interfaces import ISessionPlugin
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
Expand Down Expand Up @@ -73,6 +74,7 @@ class SessionPlugin(BasePlugin):
external_ticket_name = 'ticket'
secure = False
_shared_secret = None
secret_prefix = "_plone.session_"

# These mod_auth_tkt options are not yet implemented (by intent)
# ignoreIP = True # you always want this on the public internet
Expand Down Expand Up @@ -138,16 +140,23 @@ def __init__(self, id, title=None, path="/"):
self.title = title
self.path = path

def _getSigningSecret(self):
def _getSecretKey(self, userid):
return "{}{}".format(self.secret_prefix, userid)

def _getSigningSecret(self, userid):
if self._shared_secret is not None:
return self._shared_secret
manager = getUtility(IKeyManager)
return manager.secret()
secret_key = self._getSecretKey(userid)
if secret_key not in manager:
manager[secret_key] = Keyring(1)
manager[secret_key].fill()
return manager.secret(ring=secret_key)

# ISessionPlugin implementation
def _setupSession(self, userid, response, tokens=(), user_data=''):
cookie = tktauth.createTicket(
secret=self._getSigningSecret(),
secret=self._getSigningSecret(userid),
userid=userid,
tokens=tokens,
user_data=user_data,
Expand Down Expand Up @@ -186,7 +195,6 @@ def extractCredentials(self, request):
return creds

creds["source"] = "plone.session" # XXX should this be the id?

return creds

# IAuthenticationPlugin implementation
Expand All @@ -208,8 +216,11 @@ def authenticateCredentials(self, credentials):
return (info['id'], info['login'])

def _validateTicket(self, ticket, now=None):
_, userid, _, _, _ = tktauth.splitTicket(ticket)

if now is None:
now = time.time()

if self._shared_secret is not None:
ticket_data = tktauth.validateTicket(
self._shared_secret,
Expand All @@ -223,7 +234,14 @@ def _validateTicket(self, ticket, now=None):
manager = queryUtility(IKeyManager)
if manager is None:
return None
for secret in manager[u"_system"]:

secret_key = self._getSecretKey(userid)
if secret_key in manager:
secrets = manager[secret_key]
else:
secrets = manager[u"_system"]

for secret in secrets:
if secret is None:
continue
ticket_data = tktauth.validateTicket(
Expand Down Expand Up @@ -259,6 +277,13 @@ def updateCredentials(self, request, response, login, new_password):

# ICredentialsResetPlugin implementation
def resetCredentials(self, request, response):
ticket = binascii.a2b_base64(request.get(self.cookie_name))
_, userid, _, _, _ = tktauth.splitTicket(ticket)
secret_key = self._getSecretKey(userid)
manager = getUtility(IKeyManager)
if manager[secret_key]:
manager.clear(ring=secret_key)
manager.rotate(ring=secret_key)
response = self.REQUEST["RESPONSE"]
if self.cookie_domain:
response.expireCookie(
Expand All @@ -275,8 +300,10 @@ def manage_clearSecrets(self, REQUEST):
sessions and requires users to login again.
"""
manager = getUtility(IKeyManager)
manager.clear()
manager.rotate()
for ring in manager:
if ring.startswith(self.secret_prefix) or ring == "_system":
manager.clear(ring=ring)
manager.rotate(ring=ring)
response = REQUEST.response
response.redirect(
'%s/manage_secret?manage_tabs_message=%s' %
Expand All @@ -289,7 +316,9 @@ def manage_createNewSecret(self, REQUEST):
"""Create a new (signing) secret.
"""
manager = getUtility(IKeyManager)
manager.rotate()
for ring in manager:
if ring.startswith(self.secret_prefix) or ring == "_system":
manager.rotate(ring=ring)
response = REQUEST.response
response.redirect(
'%s/manage_secret?manage_tabs_message=%s' %
Expand Down

0 comments on commit d6c7d5a

Please sign in to comment.