Skip to content

feat: sync improvements #1328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 175 additions & 97 deletions interactions/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
Type,
Union,
Awaitable,
Tuple,
)

import interactions.api.events as events
import interactions.client.const as constants
from interactions.models.internal.callback import CallbackObject
from interactions.api.events import BaseEvent, RawGatewayEvent, processors
from interactions.api.events.internal import CallbackAdded
from interactions.api.gateway.gateway import GatewayClient
Expand Down Expand Up @@ -94,6 +94,7 @@
from interactions.models.internal.active_voice_state import ActiveVoiceState
from interactions.models.internal.application_commands import ContextMenu, ModalCommand, GlobalAutoComplete
from interactions.models.internal.auto_defer import AutoDefer
from interactions.models.internal.callback import CallbackObject
from interactions.models.internal.command import BaseCommand
from interactions.models.internal.context import (
BaseContext,
Expand Down Expand Up @@ -419,7 +420,8 @@ def __init__(
async def __aenter__(self) -> "Client":
if not self.token:
raise ValueError(
"Token not found - to use the bot in a context manager, you must pass the token in the Client constructor."
"Token not found - to use the bot in a context manager, you must pass the token in the Client"
" constructor."
)
await self.login(self.token)
return self
Expand Down Expand Up @@ -533,7 +535,8 @@ def _sanity_check(self) -> None:

if self.del_unused_app_cmd:
self.logger.warning(
"As `delete_unused_application_cmds` is enabled, the client must cache all guilds app-commands, this could take a while."
"As `delete_unused_application_cmds` is enabled, the client must cache all guilds app-commands, this"
" could take a while."
)

if Intents.GUILDS not in self._connection_state.intents:
Expand Down Expand Up @@ -645,16 +648,17 @@ async def on_command_error(self, event: events.CommandError) -> None:
if isinstance(event.error, errors.CommandOnCooldown):
await event.ctx.send(
embeds=Embed(
description=f"This command is on cooldown!\n"
f"Please try again in {int(event.error.cooldown.get_cooldown_time())} seconds",
description=(
"This command is on cooldown!\n"
f"Please try again in {int(event.error.cooldown.get_cooldown_time())} seconds"
),
color=BrandColors.FUCHSIA,
)
)
elif isinstance(event.error, errors.MaxConcurrencyReached):
await event.ctx.send(
embeds=Embed(
description="This command has reached its maximum concurrent usage!\n"
"Please try again shortly.",
description="This command has reached its maximum concurrent usage!\nPlease try again shortly.",
color=BrandColors.FUCHSIA,
)
)
Expand Down Expand Up @@ -756,7 +760,8 @@ async def on_autocomplete_completion(self, event: events.AutocompleteCompletion)
"""
symbol = "$"
self.logger.info(
f"Autocomplete Called: {symbol}{event.ctx.invoke_target} with {event.ctx.focussed_option = } | {event.ctx.kwargs = }"
f"Autocomplete Called: {symbol}{event.ctx.invoke_target} with {event.ctx.focussed_option = } |"
f" {event.ctx.kwargs = }"
)

@Listener.create(is_default_listener=True)
Expand Down Expand Up @@ -1175,7 +1180,8 @@ def add_listener(self, listener: Listener) -> None:
"""
if listener.event == "event":
self.logger.critical(
f"Subscribing to `{listener.event}` - Meta Events are very expensive; remember to remove it before releasing your bot"
f"Subscribing to `{listener.event}` - Meta Events are very expensive; remember to remove it before"
" releasing your bot"
)

if not listener.is_default_listener:
Expand All @@ -1186,7 +1192,8 @@ def add_listener(self, listener: Listener) -> None:
if required_intents := _INTENT_EVENTS.get(event_class):
if all(required_intent not in self.intents for required_intent in required_intents):
self.logger.warning(
f"Event `{listener.event}` will not work since the required intent is not set -> Requires any of: `{required_intents}`"
f"Event `{listener.event}` will not work since the required intent is not set -> Requires"
f" any of: `{required_intents}`"
)

# prevent the same callback being added twice
Expand Down Expand Up @@ -1410,12 +1417,11 @@ async def wrap(*args, **kwargs) -> Absent[List[Dict]]:
if cmd_name not in found and warn_missing:
self.logger.error(
f'Detected yet to sync slash command "/{cmd_name}" for scope '
f"{'global' if scope == GLOBAL_SCOPE else scope}"
f'{"global" if scope == GLOBAL_SCOPE else scope}'
)
continue
found.add(cmd_name)
self._interaction_lookup[cmd.resolved_name] = cmd
cmd.cmd_id[scope] = int(cmd_data["id"])
self.update_command_cache(scope, cmd.resolved_name, cmd_data["id"])

if warn_missing:
for cmd_data in remote_cmds.values():
Expand All @@ -1434,80 +1440,143 @@ async def synchronise_interactions(
Synchronise registered interactions with discord.

Args:
scopes: Optionally specify which scopes are to be synced
delete_commands: Override the client setting and delete commands
scopes: Optionally specify which scopes are to be synced.
delete_commands: Override the client setting and delete commands.

Returns:
None

Raises:
InteractionMissingAccess: If bot is lacking the necessary access.
Exception: If there is an error during the synchronization process.
"""
s = time.perf_counter()
_delete_cmds = self.del_unused_app_cmd if delete_commands is MISSING else delete_commands
await self._cache_interactions()

cmd_scopes = self._get_sync_scopes(scopes)
local_cmds_json = application_commands_to_dict(self.interactions_by_scope, self)

await asyncio.gather(*[self.sync_scope(scope, _delete_cmds, local_cmds_json) for scope in cmd_scopes])

t = time.perf_counter() - s
self.logger.debug(f"Sync of {len(cmd_scopes)} scopes took {t} seconds")

def _get_sync_scopes(self, scopes: Sequence["Snowflake_Type"]) -> List["Snowflake_Type"]:
"""
Determine which scopes to sync.

Args:
scopes: The scopes to sync.

Returns:
The scopes to sync.
"""
if scopes is not MISSING:
cmd_scopes = scopes
elif self.del_unused_app_cmd:
# if we're deleting unused commands, we check all scopes
cmd_scopes = [to_snowflake(g_id) for g_id in self._user._guild_ids] + [GLOBAL_SCOPE]
else:
# if we're not deleting, just check the scopes we have cmds registered in
cmd_scopes = list(set(self.interactions_by_scope) | {GLOBAL_SCOPE})
return scopes
if self.del_unused_app_cmd:
return [to_snowflake(g_id) for g_id in self._user._guild_ids] + [GLOBAL_SCOPE]
return list(set(self.interactions_by_scope) | {GLOBAL_SCOPE})

local_cmds_json = application_commands_to_dict(self.interactions_by_scope, self)
async def sync_scope(
self,
cmd_scope: "Snowflake_Type",
delete_cmds: bool,
local_cmds_json: Dict["Snowflake_Type", List[Dict[str, Any]]],
) -> None:
"""
Sync a single scope.

async def sync_scope(cmd_scope) -> None:
sync_needed_flag = False # a flag to force this scope to synchronise
sync_payload = [] # the payload to be pushed to discord
Args:
cmd_scope: The scope to sync.
delete_cmds: Whether to delete commands.
local_cmds_json: The local commands in json format.
"""
sync_needed_flag = False
sync_payload = []

try:
try:
remote_commands = await self.http.get_application_commands(self.app.id, cmd_scope)
except Forbidden:
self.logger.warning(f"Bot is lacking `application.commands` scope in {cmd_scope}!")
return
try:
remote_commands = await self.get_remote_commands(cmd_scope)
sync_payload, sync_needed_flag = self._build_sync_payload(
remote_commands, cmd_scope, local_cmds_json, delete_cmds
)

for local_cmd in self.interactions_by_scope.get(cmd_scope, {}).values():
# get remote equivalent of this command
remote_cmd_json = next(
(v for v in remote_commands if int(v["id"]) == local_cmd.cmd_id.get(cmd_scope)),
None,
)
# get json representation of this command
local_cmd_json = next((c for c in local_cmds_json[cmd_scope] if c["name"] == str(local_cmd.name)))

# this works by adding any command we *want* on Discord, to a payload, and synchronising that
# this allows us to delete unused commands, add new commands, or do nothing in 1 or less API calls

if sync_needed(local_cmd_json, remote_cmd_json):
# determine if the local and remote commands are out-of-sync
sync_needed_flag = True
sync_payload.append(local_cmd_json)
elif not _delete_cmds and remote_cmd_json:
_remote_payload = {
k: v for k, v in remote_cmd_json.items() if k not in ("id", "application_id", "version")
}
sync_payload.append(_remote_payload)
elif _delete_cmds:
sync_payload.append(local_cmd_json)

sync_payload = [FastJson.loads(_dump) for _dump in {FastJson.dumps(_cmd) for _cmd in sync_payload}]

if sync_needed_flag or (_delete_cmds and len(sync_payload) < len(remote_commands)):
# synchronise commands if flag is set, or commands are to be deleted
self.logger.info(f"Overwriting {cmd_scope} with {len(sync_payload)} application commands")
sync_response: list[dict] = await self.http.overwrite_application_commands(
self.app.id, sync_payload, cmd_scope
)
self._cache_sync_response(sync_response, cmd_scope)
else:
self.logger.debug(f"{cmd_scope} is already up-to-date with {len(remote_commands)} commands.")
if sync_needed_flag or (delete_cmds and len(sync_payload) < len(remote_commands)):
await self._sync_commands_with_discord(sync_payload, cmd_scope)
else:
self.logger.debug(f"{cmd_scope} is already up-to-date with {len(remote_commands)} commands.")

except Forbidden as e:
raise InteractionMissingAccess(cmd_scope) from e
except HTTPException as e:
self._raise_sync_exception(e, local_cmds_json, cmd_scope)
except Forbidden as e:
raise InteractionMissingAccess(cmd_scope) from e
except HTTPException as e:
self._raise_sync_exception(e, local_cmds_json, cmd_scope)

await asyncio.gather(*[sync_scope(scope) for scope in cmd_scopes])
async def get_remote_commands(self, cmd_scope: "Snowflake_Type") -> List[Dict[str, Any]]:
"""
Get the remote commands for a scope.

t = time.perf_counter() - s
self.logger.debug(f"Sync of {len(cmd_scopes)} scopes took {t} seconds")
Args:
cmd_scope: The scope to get the commands for.
"""
try:
return await self.http.get_application_commands(self.app.id, cmd_scope)
except Forbidden:
self.logger.warning(f"Bot is lacking `application.commands` scope in {cmd_scope}!")
return []

def _build_sync_payload(
self,
remote_commands: List[Dict[str, Any]],
cmd_scope: "Snowflake_Type",
local_cmds_json: Dict["Snowflake_Type", List[Dict[str, Any]]],
delete_cmds: bool,
) -> Tuple[List[Dict[str, Any]], bool]:
"""
Build the sync payload for a single scope.

Args:
remote_commands: The remote commands.
cmd_scope: The scope to sync.
local_cmds_json: The local commands in json format.
delete_cmds: Whether to delete commands.
"""
sync_payload = []
sync_needed_flag = False

for local_cmd in self.interactions_by_scope.get(cmd_scope, {}).values():
remote_cmd_json = next(
(v for v in remote_commands if int(v["id"]) == local_cmd.cmd_id.get(cmd_scope)),
None,
)
local_cmd_json = next((c for c in local_cmds_json[cmd_scope] if c["name"] == str(local_cmd.name)))

if sync_needed(local_cmd_json, remote_cmd_json):
sync_needed_flag = True
sync_payload.append(local_cmd_json)
elif not delete_cmds and remote_cmd_json:
_remote_payload = {
k: v for k, v in remote_cmd_json.items() if k not in ("id", "application_id", "version")
}
sync_payload.append(_remote_payload)
elif delete_cmds:
sync_payload.append(local_cmd_json)

sync_payload = [FastJson.loads(_dump) for _dump in {FastJson.dumps(_cmd) for _cmd in sync_payload}]
return sync_payload, sync_needed_flag

async def _sync_commands_with_discord(
self, sync_payload: List[Dict[str, Any]], cmd_scope: "Snowflake_Type"
) -> None:
"""
Sync the commands with discord.

Args:
sync_payload: The sync payload.
cmd_scope: The scope to sync.
"""
self.logger.info(f"Overwriting {cmd_scope} with {len(sync_payload)} application commands")
sync_response: list[dict] = await self.http.overwrite_application_commands(self.app.id, sync_payload, cmd_scope)
self._cache_sync_response(sync_response, cmd_scope)

def get_application_cmd_by_id(
self, cmd_id: "Snowflake_Type", *, scope: "Snowflake_Type" = None
Expand Down Expand Up @@ -1550,29 +1619,38 @@ def _raise_sync_exception(self, e: HTTPException, cmds_json: dict, cmd_scope: "S
def _cache_sync_response(self, sync_response: list[dict], scope: "Snowflake_Type") -> None:
for cmd_data in sync_response:
command_id = Snowflake(cmd_data["id"])
command_name = cmd_data["name"]

if any(
option["type"] in (OptionType.SUB_COMMAND, OptionType.SUB_COMMAND_GROUP)
for option in cmd_data.get("options", [])
):
for option in cmd_data.get("options", []):
if option["type"] in (OptionType.SUB_COMMAND, OptionType.SUB_COMMAND_GROUP):
command_name = f"{command_name} {option['name']}"
if option["type"] == OptionType.SUB_COMMAND_GROUP:
for _sc in option.get("options", []):
command_name = f"{command_name} {_sc['name']}"
if command := self.interactions_by_scope[scope].get(command_name):
command.cmd_id[scope] = command_id
self._interaction_lookup[command.resolved_name] = command
elif command := self.interactions_by_scope[scope].get(command_name):
command.cmd_id[scope] = command_id
self._interaction_lookup[command.resolved_name] = command
continue
elif command := self.interactions_by_scope[scope].get(command_name):
command.cmd_id[scope] = command_id
self._interaction_lookup[command.resolved_name] = command
continue
tier_0_name = cmd_data["name"]
options = cmd_data.get("options", [])

if any(option["type"] in (OptionType.SUB_COMMAND, OptionType.SUB_COMMAND_GROUP) for option in options):
for option in options:
option_type = option["type"]

if option_type in (OptionType.SUB_COMMAND, OptionType.SUB_COMMAND_GROUP):
tier_2_name = f"{tier_0_name} {option['name']}"

if option_type == OptionType.SUB_COMMAND_GROUP:
for sub_option in option.get("options", []):
tier_3_name = f"{tier_2_name} {sub_option['name']}"
self.update_command_cache(scope, tier_3_name, command_id)
else:
self.update_command_cache(scope, tier_2_name, command_id)

else:
self.update_command_cache(scope, tier_0_name, command_id)

def update_command_cache(self, scope: "Snowflake_Type", command_name: str, command_id: "Snowflake") -> None:
"""
Update the internal cache with a command ID.

Args:
scope: The scope of the command to update
command_name: The name of the command
command_id: The ID of the command
"""
if command := self.interactions_by_scope[scope].get(command_name):
command.cmd_id[scope] = command_id
self._interaction_lookup[command.resolved_name] = command

async def get_context(self, data: dict) -> InteractionContext:
match data["type"]:
Expand Down
Loading