- 
                Notifications
    You must be signed in to change notification settings 
- Fork 344
feat: Add function to verify an App Check token #642
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
25d3842
              cf60bb9
              3c4e191
              4e84ce3
              dc9cbfd
              eb1725d
              c5a25c2
              aa98697
              7e2259c
              0978778
              85145e1
              41f93ea
              5436d12
              6a4815a
              a592256
              5b94963
              89f29d3
              a5290b5
              c46b60b
              e9148b7
              b732aa6
              46f22f6
              2b6c7e7
              e08f355
              73edeb3
              33f93e5
              5321203
              77eb730
              fe30abb
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -12,3 +12,4 @@ apikey.txt | |
| htmlcov/ | ||
| .pytest_cache/ | ||
| .vscode/ | ||
| .venv/ | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| # Copyright 2022 Google Inc. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|  | ||
| # See as an example from firebase_admin/messaging.py | ||
| # def _get_messaging_service(app): | ||
| # return _utils.get_app_service(app, _MESSAGING_ATTRIBUTE, _MessagingService) | ||
|  | ||
| # Our goal in general is to take the design doc implentation and match it to the | ||
| # existing code in the SDK with tests | ||
| # Timeline is to be done by week of the 19th | ||
|  | ||
| """Firebase App Check module.""" | ||
|  | ||
| # ASK(lahiru) Do I need to add these imports to the requirements file? | ||
| import jwt | ||
| from jwt import PyJWKClient | ||
| from typing import Any, Dict, List | ||
| from firebase_admin import _utils | ||
|  | ||
| _APP_CHECK_ATTRIBUTE = '_app_check' | ||
|  | ||
| def _get_app_check_service(app) -> Any: | ||
| return _utils.get_app_service(app, _APP_CHECK_ATTRIBUTE, _AppCheckService) | ||
|  | ||
| # should i accept an app (design doc doesn't have one) or just always make it none | ||
| def verify_token(token: str, app=None) -> Dict[str, Any]: | ||
|         
                  dwyfrequency marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| return _get_app_check_service(app).verify_token(token) | ||
|  | ||
| class _AppCheckService: | ||
| """Service class that implements Firebase App Check functionality.""" | ||
|  | ||
| _APP_CHECK_GCP_API_URL = "https://firebaseappcheck.googleapis.com" | ||
| _APP_CHECK_BETA_JWKS_RESOURCE = "/v1beta/jwks" | ||
|  | ||
|  | ||
| def __init__(self, app): | ||
|         
                  dwyfrequency marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| # the verification method should go in the service | ||
| project_id = app.project_id | ||
| if not project_id: | ||
| raise ValueError( | ||
| 'Project ID is required to access App Check service. Either set the ' | ||
| 'projectId option, or use service account credentials. Alternatively, set the ' | ||
| 'GOOGLE_CLOUD_PROJECT environment variable.') | ||
|         
                  dwyfrequency marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| # Unsure what I should include in this constructor, or even if I should include one | ||
|  | ||
| @classmethod | ||
| def verify_token(self, token: str) -> Dict[str, Any]: | ||
| if token is None: | ||
|         
                  dwyfrequency marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| return None | ||
|  | ||
| # Obtain the Firebase App Check Public Keys | ||
| # Note: It is not recommended to hard code these keys as they rotate, | ||
| # but you should cache them for up to 6 hours. | ||
|         
                  dwyfrequency marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| url = f'{self._APP_CHECK_GCP_API_URL}{self._APP_CHECK_BETA_JWKS_RESOURCE}' | ||
|  | ||
| jwks_client = PyJWKClient(url) | ||
| signing_key = jwks_client.get_signing_key_from_jwt(token) | ||
|  | ||
| header = jwt.get_unverified_header(token) | ||
| self._has_valid_token_headers(header) | ||
|  | ||
|  | ||
| # I don't see any method or property to just get key from signing_key /*/lib/python3.10/site-packages/jwt/api_jwk.py | ||
| payload = self._decode_and_verify(token, signing_key.key, "project_number") | ||
|  | ||
| # The token's subject will be the app ID, you may optionally filter against | ||
| # an allow list | ||
|          | ||
| return payload.get('sub') | ||
|          | ||
|  | ||
| def _has_valid_token_headers(header: Any) -> None: | ||
| # Ensure the token's header has type JWT | ||
| if header.get('typ') != 'JWT': | ||
| raise ValueError("The token received is not a JWT") | ||
|         
                  dwyfrequency marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| # Ensure the token's header uses the algorithm RS256 | ||
| if header.get('alg') != 'RS256': | ||
| raise ValueError("JWT's algorithm does not have valid token headers") | ||
|         
                  dwyfrequency marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
|  | ||
| def _decode_token(token: str, signing_key: str, algorithms:List[str]=["RS256"]) -> Dict[str, Any]: | ||
| payload = {} | ||
| try: | ||
| # Verify the signature on the App Check token | ||
| # Ensure the token is not expired | ||
| payload = jwt.decode( | ||
| token, | ||
| signing_key, | ||
| algorithms | ||
| ) | ||
| except: | ||
| ValueError('Unable to decode the token') | ||
|         
                  dwyfrequency marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| return payload | ||
|  | ||
| # move inside service class | ||
| def _decode_and_verify(self, token: str, signing_key: str, project_number: str): | ||
| payload = {} | ||
| # Verify the signature on the App Check token | ||
| # Ensure the token is not expired | ||
| payload = self._decode_token( | ||
| token, | ||
| signing_key, | ||
| algorithms=["RS256"] | ||
| ) | ||
| if len(payload.aud) <= 1: | ||
| raise ValueError('Project ID and Project Number are required to access App Check.') | ||
| if self._APP_CHECK_GCP_API_URL not in payload.issuer: | ||
| raise ValueError('Token does not contain the correct Issuer.') | ||
|  | ||
| # within the aud of the payload, there will be an array of project id & number | ||
| return payload | ||
|  | ||
| # we need to make some code around fetching the project id | ||
|  | ||
| # Instead of returning none, raise value errors exceptions see messaging | ||
Uh oh!
There was an error while loading. Please reload this page.