diff --git a/discord/bot.py b/discord/bot.py index a5aef903dc..f8b967ecd6 100644 --- a/discord/bot.py +++ b/discord/bot.py @@ -100,7 +100,9 @@ def add_application_command(self, command: ApplicationCommand) -> None: command.guild_ids = self.debug_guilds self._pending_application_commands.append(command) - def remove_application_command(self, command: ApplicationCommand) -> Optional[ApplicationCommand]: + def remove_application_command( + self, command: ApplicationCommand + ) -> Optional[ApplicationCommand]: """Remove a :class:`.ApplicationCommand` from the internal list of commands. @@ -155,7 +157,9 @@ async def register_commands(self) -> None: global_permissions: List = [] registered_commands = await self.http.get_global_commands(self.user.id) - for command in [cmd for cmd in self.pending_application_commands if cmd.guild_ids is None]: + for command in [ + cmd for cmd in self.pending_application_commands if cmd.guild_ids is None + ]: as_dict = command.to_dict() if len(registered_commands) > 0: matches = [ @@ -185,7 +189,11 @@ async def register_commands(self) -> None: update_guild_commands = {} async for guild in self.fetch_guilds(limit=None): update_guild_commands[guild.id] = [] - for command in [cmd for cmd in self.pending_application_commands if cmd.guild_ids is not None]: + for command in [ + cmd + for cmd in self.pending_application_commands + if cmd.guild_ids is not None + ]: as_dict = command.to_dict() for guild_id in command.guild_ids: to_update = update_guild_commands[guild_id] @@ -193,8 +201,9 @@ async def register_commands(self) -> None: for guild_id in update_guild_commands: try: - cmds = await self.http.bulk_upsert_guild_commands(self.user.id, guild_id, - update_guild_commands[guild_id]) + cmds = await self.http.bulk_upsert_guild_commands( + self.user.id, guild_id, update_guild_commands[guild_id] + ) # Permissions for this Guild guild_permissions: List = [] @@ -206,20 +215,39 @@ async def register_commands(self) -> None: raise else: for i in cmds: - cmd = get(self.pending_application_commands, name=i["name"], description=i["description"], - type=i['type']) + cmd = get( + self.pending_application_commands, + name=i["name"], + description=i["description"], + type=i["type"], + ) self.application_commands[i["id"]] = cmd # Permissions - permissions = [perm.to_dict() for perm in cmd.permissions if perm.guild_id is None or ( - perm.guild_id == guild_id and perm.guild_id in cmd.guild_ids)] - guild_permissions.append({"id": i["id"], "permissions": permissions}) + permissions = [ + perm.to_dict() + for perm in cmd.permissions + if perm.guild_id is None + or ( + perm.guild_id == guild_id and perm.guild_id in cmd.guild_ids + ) + ] + guild_permissions.append( + {"id": i["id"], "permissions": permissions} + ) for global_command in global_permissions: - permissions = [perm.to_dict() for perm in global_command['permissions'] if - perm.guild_id is None or ( - perm.guild_id == guild_id and perm.guild_id in cmd.guild_ids)] - guild_permissions.append({"id": global_command["id"], "permissions": permissions}) + permissions = [ + perm.to_dict() + for perm in global_command["permissions"] + if perm.guild_id is None + or ( + perm.guild_id == guild_id and perm.guild_id in cmd.guild_ids + ) + ] + guild_permissions.append( + {"id": global_command["id"], "permissions": permissions} + ) # Collect & Upsert Permissions for Each Guild # Command Permissions for this Guild @@ -231,47 +259,80 @@ async def register_commands(self) -> None: # Replace Role / Owner Names with IDs for permission in item["permissions"]: - if isinstance(permission['id'], str): + if isinstance(permission["id"], str): # Replace Role Names - if permission['type'] == 1 and isinstance(permission['id'], str): - role = get(self.get_guild(guild_id).roles, name=permission['id']) + if permission["type"] == 1 and isinstance( + permission["id"], str + ): + role = get( + self.get_guild(guild_id).roles, + name=permission["id"], + ) # If not missing if not role is None: new_cmd_perm["permissions"].append( - {"id": role.id, "type": 1, "permission": permission['permission']}) + { + "id": role.id, + "type": 1, + "permission": permission["permission"], + } + ) else: - print("No Role ID found in Guild ({guild_id}) for Role ({role})".format( - guild_id=guild_id, role=permission['id'])) + print( + "No Role ID found in Guild ({guild_id}) for Role ({role})".format( + guild_id=guild_id, role=permission["id"] + ) + ) # Add Owner IDs - elif permission['type'] == 2 and permission['id'] == "owner": + elif ( + permission["type"] == 2 and permission["id"] == "owner" + ): app = await self.application_info() # type: ignore if app.team: for m in app.team.members: new_cmd_perm["permissions"].append( - {"id": m.id, "type": 2, "permission": permission['permission']}) + { + "id": m.id, + "type": 2, + "permission": permission["permission"], + } + ) else: new_cmd_perm["permissions"].append( - {"id": app.owner.id, "type": 2, "permission": permission['permission']}) + { + "id": app.owner.id, + "type": 2, + "permission": permission["permission"], + } + ) # Add the Rest else: new_cmd_perm["permissions"].append(permission) # Make sure we don't have over 10 overwrites - if len(new_cmd_perm['permissions']) > 10: + if len(new_cmd_perm["permissions"]) > 10: print( "Command '{name}' has more than 10 permission overrides in guild ({guild_id}).\nwill only use the first 10 permission overrides.".format( - name=self.application_commands[new_cmd_perm['id']].name, guild_id=guild_id)) - new_cmd_perm['permissions'] = new_cmd_perm['permissions'][:10] + name=self.application_commands[new_cmd_perm["id"]].name, + guild_id=guild_id, + ) + ) + new_cmd_perm["permissions"] = new_cmd_perm["permissions"][:10] # Append to guild_cmd_perms guild_cmd_perms.append(new_cmd_perm) # Upsert try: - await self.http.bulk_upsert_command_permissions(self.user.id, guild_id, guild_cmd_perms) + await self.http.bulk_upsert_command_permissions( + self.user.id, guild_id, guild_cmd_perms + ) except Forbidden: - print(f"Failed to add command permissions to guild {guild_id}", file=sys.stderr) + print( + f"Failed to add command permissions to guild {guild_id}", + file=sys.stderr, + ) raise async def process_application_commands(self, interaction: Interaction) -> None: @@ -306,16 +367,16 @@ async def process_application_commands(self, interaction: Interaction) -> None: else: ctx = await self.get_application_context(interaction) ctx.command = command - self.dispatch('application_command', ctx) + self.dispatch("application_command", ctx) try: if await self.can_run(ctx, call_once=True): await ctx.command.invoke(ctx) else: - raise CheckFailure('The global check once functions failed.') + raise CheckFailure("The global check once functions failed.") except DiscordException as exc: await ctx.command.dispatch_error(ctx, exc) else: - self.dispatch('application_command_completion', ctx) + self.dispatch("application_command_completion", ctx) def slash_command(self, **kwargs): """A shortcut decorator that invokes :func:`.ApplicationCommandMixin.command` and adds it to @@ -400,7 +461,9 @@ def command(self, **kwargs): """ return self.application_command(**kwargs) - def command_group(self, name: str, description: str, guild_ids=None) -> SlashCommandGroup: + def command_group( + self, name: str, description: str, guild_ids=None + ) -> SlashCommandGroup: # TODO: Write documentation for this. I'm not familiar enough with what this function does to do it myself. group = SlashCommandGroup(name, description, guild_ids) self.add_application_command(group) @@ -451,24 +514,30 @@ def __init__(self, description=None, *args, **options): self._check_once = [] self._before_invoke = None self._after_invoke = None - self.description = inspect.cleandoc(description) if description else '' - self.owner_id = options.get('owner_id') - self.owner_ids = options.get('owner_ids', set()) + self.description = inspect.cleandoc(description) if description else "" + self.owner_id = options.get("owner_id") + self.owner_ids = options.get("owner_ids", set()) - self.debug_guild = options.pop("debug_guild", None) # TODO: remove or reimplement + self.debug_guild = options.pop( + "debug_guild", None + ) # TODO: remove or reimplement self.debug_guilds = options.pop("debug_guilds", None) if self.owner_id and self.owner_ids: - raise TypeError('Both owner_id and owner_ids are set.') + raise TypeError("Both owner_id and owner_ids are set.") - if self.owner_ids and not isinstance(self.owner_ids, collections.abc.Collection): - raise TypeError(f'owner_ids must be a collection not {self.owner_ids.__class__!r}') + if self.owner_ids and not isinstance( + self.owner_ids, collections.abc.Collection + ): + raise TypeError( + f"owner_ids must be a collection not {self.owner_ids.__class__!r}" + ) if self.debug_guild: if self.debug_guilds is None: self.debug_guilds = [self.debug_guild] else: - raise TypeError('Both debug_guild and debug_guilds are set.') + raise TypeError("Both debug_guild and debug_guilds are set.") self._checks = [] self._check_once = [] @@ -481,7 +550,9 @@ async def on_connect(self): async def on_interaction(self, interaction): await self.process_application_commands(interaction) - async def on_application_command_error(self, context: ApplicationContext, exception: DiscordException) -> None: + async def on_application_command_error( + self, context: ApplicationContext, exception: DiscordException + ) -> None: """|coro| The default command error handler provided by the bot. @@ -504,8 +575,10 @@ async def on_application_command_error(self, context: ApplicationContext, except # if cog and cog.has_error_handler(): # return - print(f'Ignoring exception in command {context.command}:', file=sys.stderr) - traceback.print_exception(type(exception), exception, exception.__traceback__, file=sys.stderr) + print(f"Ignoring exception in command {context.command}:", file=sys.stderr) + traceback.print_exception( + type(exception), exception, exception.__traceback__, file=sys.stderr + ) # global check registration # TODO: Remove these from commands.Bot @@ -611,7 +684,9 @@ def whitelist(ctx): self.add_check(func, call_once=True) return func - async def can_run(self, ctx: ApplicationContext, *, call_once: bool = False) -> bool: + async def can_run( + self, ctx: ApplicationContext, *, call_once: bool = False + ) -> bool: data = self._check_once if call_once else self._checks if len(data) == 0: @@ -620,7 +695,6 @@ async def can_run(self, ctx: ApplicationContext, *, call_once: bool = False) -> # type-checker doesn't distinguish between functions and methods return await async_all(f(ctx) for f in data) # type: ignore - def before_invoke(self, coro): """A decorator that registers a coroutine as a pre-invoke hook. A pre-invoke hook is called directly before the command is @@ -646,7 +720,7 @@ def before_invoke(self, coro): The coroutine passed is not actually a coroutine. """ if not asyncio.iscoroutinefunction(coro): - raise TypeError('The pre-invoke hook must be a coroutine.') + raise TypeError("The pre-invoke hook must be a coroutine.") self._before_invoke = coro return coro @@ -678,11 +752,12 @@ def after_invoke(self, coro): """ if not asyncio.iscoroutinefunction(coro): - raise TypeError('The post-invoke hook must be a coroutine.') + raise TypeError("The post-invoke hook must be a coroutine.") self._after_invoke = coro return coro + class Bot(BotBase, Client): """Represents a discord bot.