| 
1 | 1 | import uuid  | 
2 |  | -from os import path  | 
 | 2 | +from os import path, urandom  | 
 | 3 | +import hashlib  | 
 | 4 | +import base64  | 
 | 5 | +import secrets  | 
3 | 6 | 
 
  | 
4 | 7 | 
 
  | 
5 | 8 | import requests  | 
6 |  | -from flask import current_app as app, url_for, redirect, render_template, request, session  | 
 | 9 | +from flask import current_app as app, url_for, redirect, render_template, request, session, redirect  | 
7 | 10 | from flask_oauthlib.client import OAuth  | 
 | 11 | +from requests_oauthlib import OAuth2Session  | 
8 | 12 | from docusign_esign import ApiClient  | 
9 | 13 | from docusign_esign.client.api_exception import ApiException  | 
10 | 14 | 
 
  | 
@@ -48,7 +52,10 @@ class DSClient:  | 
48 | 52 |     @classmethod  | 
49 | 53 |     def _init(cls, auth_type, api):  | 
50 | 54 |         if auth_type == "code_grant":  | 
51 |  | -            cls._auth_code_grant(api)  | 
 | 55 | +            if session.get("pkce_failed", False):  | 
 | 56 | +                cls._auth_code_grant(api)  | 
 | 57 | +            else:  | 
 | 58 | +                cls._pkce_auth(api)  | 
52 | 59 |         elif auth_type == "jwt":  | 
53 | 60 |             cls._jwt_auth(api)  | 
54 | 61 | 
 
  | 
@@ -92,6 +99,29 @@ def _auth_code_grant(cls, api):  | 
92 | 99 |             access_token_method="POST"  | 
93 | 100 |         )  | 
94 | 101 | 
 
  | 
 | 102 | +    @classmethod  | 
 | 103 | +    def _pkce_auth(cls, api):  | 
 | 104 | +        """Authorize with the Authorization Code Grant - OAuth 2.0 flow"""  | 
 | 105 | +        use_scopes = []  | 
 | 106 | + | 
 | 107 | +        if api == "Rooms":  | 
 | 108 | +            use_scopes.extend(ROOMS_SCOPES)  | 
 | 109 | +        elif api == "Click":  | 
 | 110 | +            use_scopes.extend(CLICK_SCOPES)  | 
 | 111 | +        elif api == "Admin":  | 
 | 112 | +            use_scopes.extend(ADMIN_SCOPES)  | 
 | 113 | +        elif api == "Maestro":  | 
 | 114 | +            use_scopes.extend(MAESTRO_SCOPES)  | 
 | 115 | +        elif api == "WebForms":  | 
 | 116 | +            use_scopes.extend(WEBFORMS_SCOPES)  | 
 | 117 | +        else:  | 
 | 118 | +            use_scopes.extend(SCOPES)  | 
 | 119 | +        # remove duplicate scopes  | 
 | 120 | +        use_scopes = list(set(use_scopes))  | 
 | 121 | + | 
 | 122 | +        redirect_uri = DS_CONFIG["app_url"] + url_for("ds.ds_callback")  | 
 | 123 | +        cls.ds_app = OAuth2Session(DS_CONFIG["ds_client_id"], redirect_uri=redirect_uri, scope=use_scopes)  | 
 | 124 | + | 
95 | 125 |     @classmethod  | 
96 | 126 |     def _jwt_auth(cls, api):  | 
97 | 127 |         """JSON Web Token authorization"""  | 
@@ -152,15 +182,24 @@ def destroy(cls):  | 
152 | 182 |     def login(cls, auth_type, api):  | 
153 | 183 |         cls._init(auth_type, api)  | 
154 | 184 |         if auth_type == "code_grant":  | 
155 |  | -            return cls.get(auth_type, api).authorize(callback=url_for("ds.ds_callback", _external=True))  | 
 | 185 | +            if session.get("pkce_failed", False):  | 
 | 186 | +                return cls.get(auth_type, api).authorize(callback=url_for("ds.ds_callback", _external=True))  | 
 | 187 | +            else:  | 
 | 188 | +                code_verifier = cls.generate_code_verifier()  | 
 | 189 | +                code_challenge = cls.generate_code_challenge(code_verifier)  | 
 | 190 | +                session["code_verifier"] = code_verifier  | 
 | 191 | +                return redirect(cls.get_auth_url_with_pkce(code_challenge))  | 
156 | 192 |         elif auth_type == "jwt":  | 
157 | 193 |             return cls._jwt_auth(api)  | 
158 | 194 | 
 
  | 
159 | 195 |     @classmethod  | 
160 | 196 |     def get_token(cls, auth_type):  | 
161 | 197 |         resp = None  | 
162 | 198 |         if auth_type == "code_grant":  | 
163 |  | -            resp = cls.get(auth_type).authorized_response()  | 
 | 199 | +            if session.get("pkce_failed", False):  | 
 | 200 | +                resp = cls.get(auth_type).authorized_response()  | 
 | 201 | +            else:  | 
 | 202 | +                return cls.fetch_token_with_pkce(request.url)  | 
164 | 203 |         elif auth_type == "jwt":  | 
165 | 204 |             resp = cls.get(auth_type).to_dict()  | 
166 | 205 | 
 
  | 
@@ -189,3 +228,44 @@ def get(cls, auth_type, api=API_TYPE["ESIGNATURE"]):  | 
189 | 228 |         if not cls.ds_app:  | 
190 | 229 |             cls._init(auth_type, api)  | 
191 | 230 |         return cls.ds_app  | 
 | 231 | +      | 
 | 232 | +    @classmethod  | 
 | 233 | +    def generate_code_verifier(cls):  | 
 | 234 | +        # Generate a random 32-byte string and base64-url encode it  | 
 | 235 | +        return secrets.token_urlsafe(32)  | 
 | 236 | + | 
 | 237 | +    @classmethod  | 
 | 238 | +    def generate_code_challenge(cls, code_verifier):  | 
 | 239 | +        # Hash the code verifier using SHA-256  | 
 | 240 | +        sha256_hash = hashlib.sha256(code_verifier.encode()).digest()  | 
 | 241 | + | 
 | 242 | +        # Base64 encode the hash and make it URL safe  | 
 | 243 | +        base64_encoded = base64.urlsafe_b64encode(sha256_hash).decode().rstrip('=')  | 
 | 244 | + | 
 | 245 | +        return base64_encoded  | 
 | 246 | +      | 
 | 247 | +    @classmethod  | 
 | 248 | +    def get_auth_url_with_pkce(cls, code_challenge):  | 
 | 249 | +        authorize_url = DS_CONFIG["authorization_server"] + "/oauth/auth"  | 
 | 250 | +        auth_url, state = cls.ds_app.authorization_url(  | 
 | 251 | +            authorize_url,  | 
 | 252 | +            code_challenge=code_challenge,  | 
 | 253 | +            code_challenge_method='S256',  # PKCE uses SHA-256 hashing,  | 
 | 254 | +            approval_prompt="auto"  | 
 | 255 | +        )  | 
 | 256 | + | 
 | 257 | +        return auth_url  | 
 | 258 | +      | 
 | 259 | +    @classmethod  | 
 | 260 | +    def fetch_token_with_pkce(cls, authorization_response):  | 
 | 261 | +        access_token_url = DS_CONFIG["authorization_server"] + "/oauth/token"  | 
 | 262 | +        token = cls.get("code_grant", session.get("api")).fetch_token(  | 
 | 263 | +            access_token_url,  | 
 | 264 | +            authorization_response=authorization_response,  | 
 | 265 | +            client_id=DS_CONFIG["ds_client_id"],  | 
 | 266 | +            client_secret=DS_CONFIG["ds_client_secret"],  | 
 | 267 | +            code_verifier=session["code_verifier"],  | 
 | 268 | +            code_challenge_method="S256"  | 
 | 269 | +        )  | 
 | 270 | + | 
 | 271 | +        return token  | 
0 commit comments