Source code for evergreen.oidc
# -*- encoding: utf-8 -*-
"""OIDC token management for Evergreen API authentication."""
from __future__ import absolute_import
import json
import os
import time
from typing import TYPE_CHECKING, Optional
import jwt
import requests
import structlog
from evergreen.config import OidcConfig
if TYPE_CHECKING:
from evergreen.api import EvergreenApi
LOGGER = structlog.getLogger(__name__)
[docs]
class OidcToken:
"""Represents an OIDC access token."""
def __init__(self, access_token: str, refresh_token: str):
"""
Initialize an OIDC token.
:param access_token: The access token JWT string.
:param refresh_token: Refresh token for obtaining new access tokens.
"""
self.access_token = access_token
self.refresh_token = refresh_token
[docs]
def is_expired(self, buffer_seconds: int = 60) -> bool:
"""
Check if token is expired by decoding the JWT and checking the exp claim.
:param buffer_seconds: Number of seconds to consider as buffer before actual expiration.
:return: True if token is expired or about to expire, False otherwise.
"""
claims = jwt.decode(self.access_token, options={"verify_signature": False})
exp_timestamp = claims["exp"]
current_time = time.time()
return current_time >= (exp_timestamp - buffer_seconds)
[docs]
@classmethod
def from_dict(cls, data: dict) -> "OidcToken":
"""
Create OidcToken from dictionary (e.g., from JSON file).
:param data: Dictionary containing 'access_token' and 'refresh_token'.
:return: OidcToken instance.
"""
return cls(
access_token=data["access_token"],
refresh_token=data["refresh_token"],
)
[docs]
def to_dict(self) -> dict:
"""
Convert token to dictionary (for JSON serialization).
:return: Dictionary representation of the token.
"""
return {
"access_token": self.access_token,
"refresh_token": self.refresh_token,
}
[docs]
def get_username_from_claims(self) -> str:
"""
Extract username from JWT token claims.
Decodes the JWT (without verification) and extracts the username
from common claim names (preferred_username, email, sub, name).
:return: Username extracted from token claims.
:raises KeyError: If no recognized username claim is found in token.
"""
# Decode JWT without verification (we're just reading claims)
claims = jwt.decode(self.access_token, options={"verify_signature": False})
# Try common username claim names
for claim_name in ["preferred_username", "email", "sub", "name"]:
if claim_name in claims:
username = claims[claim_name]
# If it's an email, extract the part before @
if "@" in username and claim_name == "email":
username = username.split("@")[0]
return username
raise KeyError("No recognized username claim found in token")
[docs]
class OidcTokenManager:
"""Manages OIDC token acquisition and caching."""
def __init__(self, oidc_config: OidcConfig, timeout: Optional[int] = None):
"""
Initialize the OIDC token manager.
:param oidc_config: OIDC configuration.
:param timeout: Optional timeout for HTTP requests in seconds.
"""
self.oidc_config = oidc_config
self.timeout = timeout
self._token: Optional[OidcToken] = None
[docs]
def get_token(self) -> str:
"""
Get a valid access token, acquiring or refreshing if necessary.
:return: Access token string.
"""
# Try to load from cache file first
if self._token is None and self.oidc_config.token_file_path:
try:
self._token = self._load_token_from_file()
except (FileNotFoundError, json.JSONDecodeError, KeyError) as e:
raise RuntimeError(
"Evergreen token file is invalid or missing. Do you need to run `evergreen-login`?"
) from e
# If we have a valid token, return it
if self._token and not self._token.is_expired():
return self._token.access_token
# Token is expired or missing. Try to refresh if we have a refresh token
if self._token and self._token.refresh_token:
LOGGER.debug("Attempting to refresh expired token using refresh token")
try:
self._token = self._refresh_token(self._token.refresh_token)
except requests.HTTPError as e:
if e.response.status_code >= 400 and e.response.status_code < 500:
raise RuntimeError(
f"Refresh token is invalid or expired. Do you need to re-run `evergreen login` or remove {self.oidc_config.token_file_path}?"
) from e
raise
# Cache the refreshed token if configured
if self.oidc_config.token_file_path:
self._save_token_to_file(self._token)
return self._token.access_token
raise RuntimeError(
"No valid OIDC token available and no refresh token present. Do you need to run `evergreen login`?"
)
def _load_token_from_file(self) -> OidcToken:
"""
Load token from the configured token file.
:return: OidcToken loaded from file.
:raises FileNotFoundError: If token file doesn't exist.
:raises json.JSONDecodeError: If token file is corrupted.
:raises KeyError: If token file is missing required fields.
"""
with open(self.oidc_config.token_file_path, "rb") as f:
data = json.load(f)
return OidcToken.from_dict(data)
def _save_token_to_file(self, token: OidcToken) -> None:
"""
Save token to the configured token file.
:param token: Token to save.
"""
if not self.oidc_config.token_file_path:
return
# Ensure directory exists
directory = os.path.dirname(self.oidc_config.token_file_path)
if directory and not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
with open(self.oidc_config.token_file_path, "w") as f:
json.dump(token.to_dict(), f)
def _refresh_token(self, refresh_token: str) -> OidcToken:
"""
Refresh an access token using a refresh token.
:param refresh_token: The refresh token to use.
:return: OidcToken instance with new access token.
:raises RuntimeError: If refresh fails.
"""
metadata = self._get_provider_metadata()
token_endpoint = metadata.get("token_endpoint")
if not token_endpoint:
raise RuntimeError("OIDC provider metadata missing token_endpoint")
payload = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": self.oidc_config.client_id,
}
response = requests.post(token_endpoint, data=payload, timeout=self.timeout)
response.raise_for_status()
data = response.json()
return OidcToken(
access_token=data["access_token"],
refresh_token=data.get(
"refresh_token", refresh_token
), # Use new refresh token if provided, else keep old one
)
def _get_provider_metadata(self) -> dict:
"""
Fetch OIDC provider metadata from the well-known configuration endpoint.
:return: Provider metadata dictionary.
:raises RuntimeError: If metadata fetch fails.
"""
metadata_url = f"{self.oidc_config.issuer}/.well-known/openid-configuration"
response = requests.get(metadata_url, timeout=self.timeout)
response.raise_for_status()
return response.json()
[docs]
def get_username_from_api(api: "EvergreenApi") -> str:
"""
Extract username from an EvergreenApi instance.
Works with both API key auth and OIDC auth by extracting the username
from the appropriate source.
:param api: The EvergreenApi instance.
:return: Username extracted from either API key auth or OIDC token.
:raises ValueError: If no username can be determined from auth configuration.
"""
# Try API key auth first
if api._auth:
return api._auth.username
# Try OIDC auth
if api._oidc_token_manager:
# Get the token (this will load from file if needed)
api._oidc_token_manager.get_token()
# Now access the token object that was stored
token = api._oidc_token_manager._token
if token and isinstance(token, OidcToken):
return token.get_username_from_claims()
raise ValueError("Could not determine username from API authentication configuration")