Skip to content
This repository was archived by the owner on Oct 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion mapadroid/mad_apk/apk_storage_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def decorated(self, *args, **kwargs):
return func(self, *args, **kwargs)
except FileNotFoundError:
msg = 'Attempted to access a non-existent file for {} [{}]'.format(args[0].name, args[1].name)
logger.warning(msg)
return Response(status=404, response=json.dumps(msg))
return decorated

Expand Down
44 changes: 38 additions & 6 deletions mapadroid/mad_apk/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import apkutils
from apkutils.apkfile import BadZipFile, LargeZipFile
import zipfile
from distutils.version import LooseVersion
from flask import Response, stream_with_context
import io
import json
import requests
from typing import Tuple, Union, Generator
Expand Down Expand Up @@ -140,6 +144,35 @@ def generate_filename(package: APKType, architecture: APKArch, version: str, mim
return '{}__{}__{}.{}'.format(friendlyname, version, architecture.name, ext)


def get_apk_info(downloaded_file: io.BytesIO) -> Tuple[str, str]:
package_version: str = None
package_name: str = None
try:
apk = apkutils.APK(downloaded_file)
except: # noqa: E722
logger.warning('Unable to parse APK file')
else:
manifest = apk.get_manifest()
try:
package_version, package_name = (manifest['@android:versionName'], manifest['@package'])
except (TypeError, KeyError):
logger.debug("Invalid manifest file. Potentially a split package")
with zipfile.ZipFile(downloaded_file) as zip_data:
for item in zip_data.infolist():
try:
with zip_data.open(item, 'r') as fh:
apk = apkutils.APK(io.BytesIO(fh.read()))
manifest = apk.get_manifest()
try:
package_version = manifest['@android:versionName']
package_name = manifest['@package']
except KeyError:
pass
except (BadZipFile, LargeZipFile):
continue
return package_version, package_name


def is_newer_version(first_ver: str, second_ver: str) -> bool:
""" Determines if the first version is newer than the second """
try:
Expand Down Expand Up @@ -279,18 +312,17 @@ def supported_pogo_version(architecture: APKArch, version: str) -> bool:
bits = '32'
else:
bits = '64'
composite_key = '%s_%s' % (version, bits,)
try:
with open('configs/version_codes.json') as fh:
address_object = json.load(fh)
composite_key = '%s_%s' % (version, bits,)
address_object[composite_key]
valid = True
json.load(fh)[composite_key]
return True
except KeyError:
try:
requests.get(VERSIONCODES_URL).json()[composite_key]
valid = True
return True
except KeyError:
pass
if not valid:
logger.info('Current version of POGO [{}] is not supported', composite_key)
logger.info('Current version of PoGo [{}] is not supported', composite_key)
return valid
47 changes: 27 additions & 20 deletions mapadroid/mad_apk/wizard.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import urllib3
from .abstract_apk_storage import AbstractAPKStorage
from .apk_enums import APKArch, APKType, APKPackage
from .utils import lookup_package_info, is_newer_version, supported_pogo_version, lookup_arch_enum
from .utils import lookup_package_info, is_newer_version, supported_pogo_version, lookup_arch_enum, get_apk_info
from mapadroid.utils import global_variables
from mapadroid.utils.gplay_connector import GPlayConnector
from mapadroid.utils.logging import get_logger, LoggerEnums
Expand All @@ -27,6 +27,10 @@ class WizardError(Exception):
pass


class InvalidDownload(WizardError):
pass


class InvalidFile(WizardError):
pass

Expand Down Expand Up @@ -82,8 +86,13 @@ def apk_download(self, package: APKType, architecture: APKArch) -> NoReturn:

def apk_nonblocking_download(self) -> NoReturn:
"Download all packages"
self.download_pogo(APKArch.armeabi_v7a)
self.download_pogo(APKArch.arm64_v8a)
for arch in APKArch:
if arch == APKArch.noarch:
continue
try:
self.download_pogo(arch)
except InvalidDownload:
pass
self.download_rgc(APKArch.noarch)
self.download_pd(APKArch.noarch)

Expand Down Expand Up @@ -135,10 +144,16 @@ def download_pogo(self, architecture: APKArch) -> NoReturn:
latest_data = self.get_latest(APKType.pogo, architecture)
downloaded_file = self.gpconn.download(APKPackage.pogo.value, version_code=latest_data['url'])
if downloaded_file and downloaded_file.getbuffer().nbytes > 0:
PackageImporter(APKType.pogo, architecture, self.storage, downloaded_file,
'application/zip', version=latest_version)
successful = True
else:
version, _ = get_apk_info(downloaded_file)
if version != latest_version:
msg = f"Playstore returned {version} when requesting {latest_version}"
logger.warning(msg)
raise InvalidDownload(msg)
else:
PackageImporter(APKType.pogo, architecture, self.storage, downloaded_file,
'application/zip', version=latest_version)
successful = True
if not successful:
logger.info("Issue downloading apk")
retries += 1
if retries < MAX_RETRIES:
Expand Down Expand Up @@ -280,6 +295,10 @@ def find_latest_pogo(self, architecture: APKArch) -> Optional[str]:
logger.info("Version in store is newer than supported version. Using an older version")
version_code = latest_supported["versionCode"]
version_str = latest_supported["version"]
elif current_version and store_vs == current_version:
logger.info("Latest version [{}] is already installed", store_vc)
version_code = store_vc
version_str = store_vs
else:
logger.info('Newer version found: {}', store_vs)
version_code = store_vc
Expand Down Expand Up @@ -413,7 +432,7 @@ def __init__(self, package: APKType, architecture: APKArch, storage_obj: Abstrac
self.package_arch: APKArch = None
self._data: io.BytesIO = downloaded_file
if mimetype == 'application/vnd.android.package-archive':
self.get_apk_info(downloaded_file)
self.package_version, self.package_name = get_apk_info(downloaded_file)
else:
self.normalize_package()
mimetype = 'application/zip'
Expand All @@ -435,18 +454,6 @@ def __init__(self, package: APKType, architecture: APKArch, storage_obj: Abstrac
else:
logger.warning('Unable to determine apk information')

def get_apk_info(self, downloaded_file: io.BytesIO) -> NoReturn:
try:
apk = apkutils.APK(downloaded_file)
except: # noqa: E722
logger.warning('Unable to parse APK file')
else:
manifest = apk.get_manifest()
try:
self.package_version, self.package_name = (manifest['@android:versionName'], manifest['@package'])
except KeyError:
raise InvalidFile('Unable to parse the APK file')

def normalize_package(self) -> NoReturn:
""" Normalize the package

Expand Down
76 changes: 62 additions & 14 deletions mapadroid/tests/mad_apk/test_wizard.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,66 @@
from mapadroid.mad_apk import APKType, WizardError, APKWizard, APKArch
from mapadroid.mad_apk import APKType, APKWizard, WizardError, APKArch
from mapadroid.mad_apk.wizard import InvalidDownload
from mapadroid.tests.mad_apk.base_storage import StorageBase, upload_package
from mapadroid.tests.test_utils import GetStorage, get_connection_api
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch
from mapadroid.utils.gplay_connector import GPlayConnector
import io


TEST_GPLAY_RESPONSE = io.BytesIO(b"Dummy File")


class WizardTests(StorageBase):

@patch('mapadroid.mad_apk.wizard.supported_pogo_version')
@patch('mapadroid.mad_apk.wizard.get_apk_info')
def test_invalid_version_from_gplay(self, get_apk_info, supported_pogo_version):
supported_pogo_version.return_value = True
get_apk_info.return_value = ("0.123.3", "com.ignored")
latest_gplay = {
"version_code": 20200901,
"version": "0.123.4"
}
autosearch_latest = {
"version": "0.123.4",
"url": 20201001
}
with GetStorage(get_connection_api()) as storage:
package_downloader = APKWizard(storage.db_wrapper, storage.storage_manager)
package_downloader.find_latest_pogo = MagicMock(return_value=latest_gplay)
storage.storage_manager.get_current_version = MagicMock(return_value=None)
package_downloader.get_latest = MagicMock(return_value=autosearch_latest)
GPlayConnector.download = MagicMock(return_value=TEST_GPLAY_RESPONSE)
with self.assertRaises(InvalidDownload):
package_downloader.download_pogo(APKArch.arm64_v8a)

@patch('mapadroid.mad_apk.wizard.get_apk_info')
def test_valid_version_from_gplay(self, get_apk_info):
latest_gplay = {
"version_code": 20200901,
"version": "0.123.4"
}
autosearch_latest = {
"version": "0.123.4",
"url": 20201001
}
get_apk_info_resp = ("0.123.4", "com.ignored")
get_apk_info.return_value = get_apk_info_resp
with GetStorage(get_connection_api()) as storage:
package_downloader = APKWizard(storage.db_wrapper, storage.storage_manager)
package_downloader.find_latest_pogo = MagicMock(return_value=latest_gplay)
storage.storage_manager.get_current_version = MagicMock(return_value="0.123.3")
package_downloader.get_latest = MagicMock(return_value=autosearch_latest)
GPlayConnector.download = MagicMock(return_value=TEST_GPLAY_RESPONSE)
package_downloader.download_pogo(APKArch.arm64_v8a)

def test_mistmatched_type(self):
with self.assertRaises(WizardError):
upload_package(self.storage_elem, apk_type=APKType.pd)

def test_version_newer_avail(self):
with GetStorage(get_connection_api()) as storage:
wizard = APKWizard(storage.db_wrapper, storage.storage_manager)
package_downloader = APKWizard(storage.db_wrapper, storage.storage_manager)
gplay_latest = (20201001, "0.123.4")
latest_supported = {
APKArch.arm64_v8a: {
Expand All @@ -24,18 +72,18 @@ def test_version_newer_avail(self):
"version": "0.123.4",
"url": 20201001
}
wizard.get_latest_version = MagicMock(return_value=latest_supported)
package_downloader.get_latest_version = MagicMock(return_value=latest_supported)
storage.storage_manager.get_current_version = MagicMock(return_value="0.123.3")
wizard.get_latest = MagicMock(return_value=autosearch_latest)
package_downloader.get_latest = MagicMock(return_value=autosearch_latest)
GPlayConnector.get_latest_version = MagicMock(return_value=gplay_latest)
wizard_latest = wizard.find_latest_pogo(APKArch.arm64_v8a)
wizard_latest = package_downloader.find_latest_pogo(APKArch.arm64_v8a)
self.assertTrue(wizard_latest is not None)
self.assertTrue(latest_supported[APKArch.arm64_v8a]["versionCode"] == wizard_latest["version_code"])
self.assertTrue(latest_supported[APKArch.arm64_v8a]["version"] == wizard_latest["version"])

def test_version_supported_but_not_gplay(self):
with GetStorage(get_connection_api()) as storage:
wizard = APKWizard(storage.db_wrapper, storage.storage_manager)
package_downloader = APKWizard(storage.db_wrapper, storage.storage_manager)
gplay_latest = (20200901, "0.123.3")
latest_supported = {
APKArch.arm64_v8a: {
Expand All @@ -47,16 +95,16 @@ def test_version_supported_but_not_gplay(self):
"version": "0.123.4",
"url": 20201001
}
wizard.get_latest_version = MagicMock(return_value=latest_supported)
package_downloader.get_latest_version = MagicMock(return_value=latest_supported)
storage.storage_manager.get_current_version = MagicMock(return_value="0.123.3")
wizard.get_latest = MagicMock(return_value=autosearch_latest)
package_downloader.get_latest = MagicMock(return_value=autosearch_latest)
GPlayConnector.get_latest_version = MagicMock(return_value=gplay_latest)
wizard_latest = wizard.find_latest_pogo(APKArch.arm64_v8a)
wizard_latest = package_downloader.find_latest_pogo(APKArch.arm64_v8a)
self.assertTrue(wizard_latest is None)

def test_version_newest_not_supported_but_older_supported(self):
with GetStorage(get_connection_api()) as storage:
wizard = APKWizard(storage.db_wrapper, storage.storage_manager)
package_downloader = APKWizard(storage.db_wrapper, storage.storage_manager)
gplay_latest = (20201001, "0.123.4")
latest_supported = {
APKArch.arm64_v8a: {
Expand All @@ -68,11 +116,11 @@ def test_version_newest_not_supported_but_older_supported(self):
"version": "0.123.4",
"url": 20201001
}
wizard.get_latest_version = MagicMock(return_value=latest_supported)
package_downloader.get_latest_version = MagicMock(return_value=latest_supported)
storage.storage_manager.get_current_version = MagicMock(return_value="0.123.3")
wizard.get_latest = MagicMock(return_value=autosearch_latest)
package_downloader.get_latest = MagicMock(return_value=autosearch_latest)
GPlayConnector.get_latest_version = MagicMock(return_value=gplay_latest)
wizard_latest = wizard.find_latest_pogo(APKArch.arm64_v8a)
wizard_latest = package_downloader.find_latest_pogo(APKArch.arm64_v8a)
self.assertTrue(wizard_latest is not None)
self.assertTrue(latest_supported[APKArch.arm64_v8a]["versionCode"] == wizard_latest["version_code"])
self.assertTrue(latest_supported[APKArch.arm64_v8a]["version"] == wizard_latest["version"])