-
Notifications
You must be signed in to change notification settings - Fork 94
feat(auth): Implement secure httpOnly cookie authentication for Okta #1920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
aa8c01d
5a04776
25982d2
b02e1d7
8f4308f
a1c9d4a
e70e6a6
9856fd8
1f297fd
c80b545
8bb9811
43d0772
60a65bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,244 @@ | ||
| import json | ||
| import logging | ||
| import os | ||
| import urllib.request | ||
| import urllib.parse | ||
| import base64 | ||
| import binascii | ||
| from http.cookies import SimpleCookie | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
| logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO')) | ||
|
|
||
|
|
||
| def handler(event, context): | ||
| """Main Lambda handler - routes requests to appropriate function""" | ||
| path = event.get('path', '') | ||
| method = event.get('httpMethod', '') | ||
|
|
||
| if path == '/auth/token-exchange' and method == 'POST': | ||
| return token_exchange_handler(event) | ||
| elif path == '/auth/logout' and method == 'POST': | ||
| return logout_handler(event) | ||
| elif path == '/auth/userinfo' and method == 'GET': | ||
| return userinfo_handler(event) | ||
| else: | ||
| return error_response( | ||
| 404, 'Auth endpoint not found. Valid routes: /auth/token-exchange, /auth/logout, /auth/userinfo', event | ||
| ) | ||
|
|
||
|
|
||
| def error_response(status_code, message, event=None): | ||
| """Return error response with CORS headers""" | ||
| response = { | ||
| 'statusCode': status_code, | ||
| 'headers': get_cors_headers(event) if event else {'Content-Type': 'application/json'}, | ||
| 'body': json.dumps({'error': message}), | ||
| } | ||
| return response | ||
|
|
||
|
|
||
| def get_cors_headers(event): | ||
| """Get CORS headers for response""" | ||
| cloudfront_url = os.environ.get('CLOUDFRONT_URL', '') | ||
| if not cloudfront_url: | ||
| logger.debug('CLOUDFRONT_URL not set - authentication endpoints will reject cross-origin requests') | ||
|
|
||
| return { | ||
| 'Content-Type': 'application/json', | ||
| 'Access-Control-Allow-Origin': cloudfront_url, | ||
| 'Access-Control-Allow-Credentials': 'true', | ||
| 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS', | ||
| 'Access-Control-Allow-Headers': 'Content-Type', | ||
| } | ||
|
|
||
|
|
||
| def token_exchange_handler(event): | ||
| """Exchange authorization code for tokens and set httpOnly cookies""" | ||
| try: | ||
| body = json.loads(event.get('body', '{}')) | ||
| code = body.get('code') | ||
| code_verifier = body.get('code_verifier') | ||
|
|
||
| if not code or not code_verifier: | ||
| return error_response(400, 'Missing code or code_verifier', event) | ||
|
|
||
| okta_url = os.environ.get('CUSTOM_AUTH_URL', '') | ||
| client_id = os.environ.get('CUSTOM_AUTH_CLIENT_ID', '') | ||
| redirect_uri = os.environ.get('CUSTOM_AUTH_REDIRECT_URL', '') | ||
|
|
||
| if not okta_url or not client_id: | ||
| return error_response(500, 'Missing Okta configuration', event) | ||
|
|
||
| # Call Okta token endpoint | ||
| token_url = f'{okta_url}/v1/token' | ||
| token_data = { | ||
| 'grant_type': 'authorization_code', | ||
| 'code': code, | ||
| 'code_verifier': code_verifier, | ||
| 'client_id': client_id, | ||
| 'redirect_uri': redirect_uri, | ||
| } | ||
|
|
||
| data = urllib.parse.urlencode(token_data).encode('utf-8') | ||
| req = urllib.request.Request( | ||
| token_url, | ||
| data=data, | ||
| headers={'Content-Type': 'application/x-www-form-urlencoded'}, | ||
| ) | ||
|
|
||
| try: | ||
| # nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected | ||
| with urllib.request.urlopen(req, timeout=10) as response: | ||
| tokens = json.loads(response.read().decode('utf-8')) | ||
| except urllib.error.HTTPError as e: | ||
| error_body = e.read().decode('utf-8') | ||
| logger.error(f'Token exchange failed: {error_body}') | ||
| return error_response(401, 'Authentication failed. Please try again.', event) | ||
|
|
||
| cookies = build_cookies(tokens) | ||
|
|
||
| return { | ||
| 'statusCode': 200, | ||
| 'headers': get_cors_headers(event), | ||
| 'multiValueHeaders': {'Set-Cookie': cookies}, | ||
| 'body': json.dumps({'success': True}), | ||
| } | ||
|
|
||
| except Exception as e: | ||
| logger.error(f'Token exchange error: {str(e)}') | ||
| return error_response(500, 'Internal server error', event) | ||
|
|
||
|
|
||
| def build_cookies(tokens): | ||
| """Build httpOnly cookies for tokens""" | ||
| cookies = [] | ||
| secure = True | ||
| httponly = True | ||
| samesite = 'Lax' | ||
| max_age = 3600 # 1 hour | ||
|
|
||
| for token_name in ['access_token', 'id_token']: | ||
| if tokens.get(token_name): | ||
| cookie = SimpleCookie() | ||
| cookie[token_name] = tokens[token_name] | ||
| cookie[token_name]['path'] = '/' | ||
| cookie[token_name]['secure'] = secure | ||
| cookie[token_name]['httponly'] = httponly | ||
| cookie[token_name]['samesite'] = samesite | ||
| cookie[token_name]['max-age'] = max_age | ||
| cookies.append(cookie[token_name].OutputString()) | ||
|
|
||
| return cookies | ||
|
|
||
|
|
||
| def logout_handler(event): | ||
| """Clear all auth cookies and return Okta logout URL""" | ||
| # Get id_token from cookie for Okta logout | ||
| cookie_header = event.get('headers', {}).get('Cookie') or event.get('headers', {}).get('cookie', '') | ||
| cookies_in = SimpleCookie() | ||
| cookies_in.load(cookie_header) | ||
|
|
||
| id_token = None | ||
| id_token_cookie = cookies_in.get('id_token') | ||
| if id_token_cookie: | ||
| id_token = id_token_cookie.value | ||
|
|
||
| # Clear all auth cookies | ||
| cookies = [] | ||
| for cookie_name in ['access_token', 'id_token', 'refresh_token']: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also logout from okta. Here we are deleting those cookies but that doesn't mean we will be logging out of Okta. Should we make a call to the okta endpoint to let Okta know that we want to logout ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, good point! I'll implement Okta logout by redirecting to Okta's /v1/logout endpoint from the frontend after clearing cookies. This fully ends the Okta session so the user must re-authenticate on next login. The flow will be: Frontend calls /auth/logout to clear cookies
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Super nit: can you comment this flow as a comment here so that anyone looking at this will have clear reference |
||
| cookie = SimpleCookie() | ||
| cookie[cookie_name] = '' | ||
| cookie[cookie_name]['path'] = '/' | ||
| cookie[cookie_name]['max-age'] = 0 | ||
| cookies.append(cookie[cookie_name].OutputString()) | ||
|
|
||
| # Build Okta logout URL if we have the id_token | ||
| logout_url = None | ||
| okta_url = os.environ.get('CUSTOM_AUTH_URL', '') | ||
| post_logout_uri = os.environ.get('CLOUDFRONT_URL', '') | ||
|
|
||
| if id_token and okta_url and post_logout_uri: | ||
| logout_params = urllib.parse.urlencode( | ||
| { | ||
| 'id_token_hint': id_token, | ||
| 'post_logout_redirect_uri': post_logout_uri, | ||
| } | ||
| ) | ||
| logout_url = f'{okta_url}/v1/logout?{logout_params}' | ||
|
|
||
| return { | ||
| 'statusCode': 200, | ||
| 'headers': get_cors_headers(event), | ||
| 'multiValueHeaders': {'Set-Cookie': cookies}, | ||
| 'body': json.dumps({'success': True, 'logout_url': logout_url}), | ||
| } | ||
|
|
||
|
|
||
| def userinfo_handler(event): | ||
| """Return user info from id_token cookie""" | ||
| try: | ||
| # Check both 'Cookie' and 'cookie' - API Gateway may normalize header casing | ||
| cookie_header = event.get('headers', {}).get('Cookie') or event.get('headers', {}).get('cookie', '') | ||
TejasRGitHub marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| cookies = SimpleCookie() | ||
| cookies.load(cookie_header) | ||
|
|
||
| id_token_cookie = cookies.get('id_token') | ||
| if not id_token_cookie: | ||
| return error_response(401, 'Not authenticated', event) | ||
|
|
||
| id_token = id_token_cookie.value | ||
|
|
||
| # Decode JWT payload (middle part of token) | ||
| # JWT format: header.payload.signature (base64url encoded) | ||
| parts = id_token.split('.') | ||
| if len(parts) != 3: | ||
| return error_response(401, 'Invalid token format', event) | ||
|
|
||
| payload = parts[1] | ||
|
|
||
| # Base64 requires padding to be multiple of 4 characters | ||
| # URL-safe base64 in JWTs often omits padding, so we add it back | ||
| padding = 4 - len(payload) % 4 | ||
TejasRGitHub marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if padding != 4: | ||
| payload += '=' * padding | ||
|
|
||
| decoded = base64.urlsafe_b64decode(payload) | ||
| claims = json.loads(decoded) | ||
|
|
||
| # Check if token is expired | ||
| import time | ||
|
|
||
| exp = claims.get('exp') | ||
| if exp and int(exp) < int(time.time()): | ||
| return error_response(401, 'Token expired', event) | ||
|
|
||
| email_claim = os.environ.get('CLAIMS_MAPPING_EMAIL', 'email') | ||
| user_id_claim = os.environ.get('CLAIMS_MAPPING_USER_ID', 'sub') | ||
|
|
||
| email = claims.get(email_claim, claims.get('email', claims.get('sub', ''))) | ||
| user_id = claims.get(user_id_claim, claims.get('sub', '')) | ||
|
|
||
| return { | ||
| 'statusCode': 200, | ||
| 'headers': get_cors_headers(event), | ||
| 'body': json.dumps( | ||
| { | ||
| 'email': email, | ||
| 'name': claims.get('name', email), | ||
| 'sub': user_id, | ||
| 'exp': exp, # Include expiration time for frontend to set up timer | ||
| } | ||
| ), | ||
| } | ||
|
|
||
| except (binascii.Error, ValueError) as e: | ||
| logger.error(f'Failed to decode JWT payload: {str(e)}') | ||
| return error_response(401, 'Invalid token', event) | ||
| except json.JSONDecodeError as e: | ||
| logger.error(f'Failed to parse JWT claims: {str(e)}') | ||
| return error_response(401, 'Invalid token', event) | ||
| except Exception as e: | ||
TejasRGitHub marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logger.error(f'Userinfo error: {str(e)}') | ||
| return error_response(500, 'Internal server error', event) | ||
Uh oh!
There was an error while loading. Please reload this page.