Skip to content

Commit

Permalink
fixup! crunchroll: respond to crunchryoll site upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
TAAPArthur committed May 14, 2024
1 parent 3f4f60f commit ffff1a0
Showing 1 changed file with 53 additions and 92 deletions.
145 changes: 53 additions & 92 deletions amt/servers/crunchyroll.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,38 @@ class GenericCrunchyrollServer(Server):
domain = "crunchyroll.com"
base_url = f"https://www.{domain}"
api_base = f"https://api.{domain}"
login_url = f"{api_base}/login.1.json"
token_url = f"{base_url}/auth/v1/token"
login_url = token_url

crunchyroll_lock = RLock()
session_id_may_be_invalid = True

_client_id = ('cr_web', 'noaihdevm_6iyg0a8l0q')
_basic_auth = None
_auth_headers = None
_auth_refresh = 0

_BASIC_AUTH = 'Basic ' + base64.b64encode(':'.join((
't-kdgp2h8c3jub8fn0fq',
'yfLDfMfrYvKXh4JXS1LEI2cCqu1v5Wan',
)).encode()).decode()

@property
def is_logged_in(self):
return bool(self.session_get_cookie("etp_rt"))

def get_session_id(self, force=False):
query = {
'sess_id': 1,
'device_id': 'whatvalueshouldbeforweb',
'device_type': 'com.crunchyroll.static',
'access_token': 'giKq5eY27ny3cqz',
'referer': f'{self.base_url}/welcome/login'
}
with GenericCrunchyrollServer.crunchyroll_lock:
upsell_response = self.session_get(
f'{self.api_base}/get_upsell_data.0.json?{urlencode(query)}')
print(upsell_response.json())
return upsell_response.json()['data']['session_id']
return bool(self.refresh_token)

@property
def refresh_token(self):
return self.session_get_cookie("refresh_token")

@refresh_token.setter
def refresh_token(self, value):
return self.session_set_cookie("refresh_token", value)

@property
def is_premium(self):
return True
for premium_cookies_name in ["crplusctamembership", "premplusctav"]:
if self.session_get_cookie(premium_cookies_name):
return True
return False

def session_get_json(self, url, mem_cache=False, skip_cache=True, **kwargs):
self.update_auth()
Expand All @@ -58,39 +63,42 @@ def get_auth_headers(self):
self.update_auth()
return GenericCrunchyrollServer._auth_headers

def set_auth_info(self, data):
GenericCrunchyrollServer._auth_headers = {"Authorization": data["token_type"] + " " + data["access_token"], "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.20 Safari/537.36"}
GenericCrunchyrollServer._auth_refresh = time.time() + data.get("expires_in", 300) - 10

def update_auth(self):
with GenericCrunchyrollServer.crunchyroll_lock:
if GenericCrunchyrollServer._auth_headers and GenericCrunchyrollServer._auth_refresh > time.time():
return
if not GenericCrunchyrollServer._basic_auth:
cx_api_param = self._client_id[self.is_logged_in]
GenericCrunchyrollServer._basic_auth = 'Basic ' + base64.b64encode(f'{cx_api_param}:'.encode()).decode()

auth_headers = {'Authorization': GenericCrunchyrollServer._basic_auth}
if self.is_logged_in:
grant_type = 'etp_rt_cookie'
auth_headers = {"Authorization": GenericCrunchyrollServer._BASIC_AUTH}
if self.refresh_token:
data = {
"refresh_token": self.refresh_token,
"grant_type": "refresh_token",
"scope": "offline_access",
}
else:
grant_type = 'client_id'
auth_headers['ETP-Anonymous-ID'] = uuid.uuid4()

data = {
"grant_type": grant_type.encode(),
"device_type": "Firefox on Linux",
"device_id": "c6ea9e3d-bdce-41c5-95eb-cf951e9b5667"
}
auth_response = self.session_post(f'{self.base_url}/auth/v1/token', headers=auth_headers, data=data).json()
GenericCrunchyrollServer._auth_headers = {'Authorization': auth_response['token_type'] + ' ' + auth_response['access_token']}
GenericCrunchyrollServer._auth_refresh = time.time() + auth_response.get("expires_in", 300) - 10
data = {"grant_type": "client_id"}
auth_headers["ETP-Anonymous-ID"] = str(uuid.uuid4())

auth_response = self.session_post(self.token_url, headers=auth_headers, data=data).json()
self.set_auth_info(auth_response)

def login(self, username, password):
session_id = self.get_session_id(True)
self.session_post(self.login_url,
data={
"account": username,
"password": password,
"session_id": session_id,
})
self.update_auth()
# session_id = self.get_session_id(True)
r = self.session_post(self.login_url,
data={
"username": username,
"password": password,
"grant_type": "password",
"scope": "offline_access",
},
headers={'Authorization': self._BASIC_AUTH})
data = r.json()
self.refresh_token = data["refresh_token"]
self.set_auth_info(data)
return True


Expand Down Expand Up @@ -120,50 +128,6 @@ def get_config(self):
def get_api_domain(self):
return self.get_config()['cxApiParams']['apiDomain']

@property
def is_premium(self):
for premium_cookies_name in ["crplusctamembership", "premplusctav"]:
if self.session_get_cookie(premium_cookies_name):
return True
return False

def needs_authentication(self):
return not self.session_get_cookie("etp_rt")

def init_auth_headers(self):
if self.session_get_cookie("etp_rt"):
grant_type, key = 'etp_rt_cookie', 'accountAuthClientId'
else:
grant_type, key = 'client_id', 'anonClientId'

config = self.get_config()
auth_token = 'Basic ' + str(base64.b64encode(('%s:' % config['cxApiParams'][key]).encode('ascii')), 'ascii')
headers = {'Authorization': auth_token, "Content-Type": "application/x-www-form-urlencoded"}

auth_response = self.session_get_json(f'{self.get_api_domain()}/auth/v1/token', post=True, headers=headers, data=f'grant_type={grant_type}'.encode('ascii'))
return {'Authorization': auth_response['token_type'] + ' ' + auth_response['access_token']}

def get_auth_headers(self):
if not self.auth_header:
self.auth_header = self.init_auth_headers()
return self.auth_header

def _get_params(self):
policy_response = self.session_get_json(f'{self.get_api_domain()}/index/v2', headers=self.get_auth_headers())
cms = policy_response.get('cms_web')
bucket = cms['bucket']
params = {
'Policy': cms['policy'],
'Signature': cms['signature'],
'Key-Pair-Id': cms['key_pair_id']
}
return (bucket, params)

def get_params(self):
if not self.params:
self.params = self._get_params()
return self.params

def get_media_list(self, **kwargs):
return self.search_for_media(None, **kwargs)

Expand Down Expand Up @@ -216,8 +180,6 @@ def update_media_data(self, media_data, **kwargs):
self.update_chapter_data(media_data, id=chapter_id, number=chapter["episode_number"], title=chapter["title"], premium=chapter["is_premium_only"], special=chapter["is_clip"], alt_id=chapter["slug_title"])

def get_stream_urls(self, media_data=None, chapter_data=None):
bucket, params = self.get_params()

url = f"https://cr-play-service.prd.crunchyrollsvc.com/v1/{chapter_data['id']}/console/switch/play"
data = self.session_get_json(url, key=url, mem_cache=True, headers=self.get_auth_headers())

Expand All @@ -230,10 +192,9 @@ def get_stream_urls(self, media_data=None, chapter_data=None):
return map(lambda x: [x[1]], url_list)

def get_subtitle_info(self, media_data, chapter_data):
bucket, params = self.get_params()

url = f"https://cr-play-service.prd.crunchyrollsvc.com/v1/{chapter_data['id']}/console/switch/play"
data = self.session_get_json(url, key=url, mem_cache=True, headers=self.get_auth_headers())

for subInfo in data["subtitles"].values():
yield subInfo["language"], subInfo["url"], subInfo["format"], False

Expand Down

0 comments on commit ffff1a0

Please sign in to comment.