Skip to content

Commit

Permalink
Format bot.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Dorukyum committed Oct 21, 2021
1 parent ee7c6ef commit 095d804
Showing 1 changed file with 122 additions and 47 deletions.
169 changes: 122 additions & 47 deletions discord/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ def add_application_command(self, command: ApplicationCommand) -> None:
command.guild_ids = self.debug_guilds
self._pending_application_commands.append(command)

def remove_application_command(self, command: ApplicationCommand) -> Optional[ApplicationCommand]:
def remove_application_command(
self, command: ApplicationCommand
) -> Optional[ApplicationCommand]:
"""Remove a :class:`.ApplicationCommand` from the internal list
of commands.
Expand Down Expand Up @@ -155,7 +157,9 @@ async def register_commands(self) -> None:
global_permissions: List = []

registered_commands = await self.http.get_global_commands(self.user.id)
for command in [cmd for cmd in self.pending_application_commands if cmd.guild_ids is None]:
for command in [
cmd for cmd in self.pending_application_commands if cmd.guild_ids is None
]:
as_dict = command.to_dict()
if len(registered_commands) > 0:
matches = [
Expand Down Expand Up @@ -185,16 +189,21 @@ async def register_commands(self) -> None:
update_guild_commands = {}
async for guild in self.fetch_guilds(limit=None):
update_guild_commands[guild.id] = []
for command in [cmd for cmd in self.pending_application_commands if cmd.guild_ids is not None]:
for command in [
cmd
for cmd in self.pending_application_commands
if cmd.guild_ids is not None
]:
as_dict = command.to_dict()
for guild_id in command.guild_ids:
to_update = update_guild_commands[guild_id]
update_guild_commands[guild_id] = to_update + [as_dict]

for guild_id in update_guild_commands:
try:
cmds = await self.http.bulk_upsert_guild_commands(self.user.id, guild_id,
update_guild_commands[guild_id])
cmds = await self.http.bulk_upsert_guild_commands(
self.user.id, guild_id, update_guild_commands[guild_id]
)

# Permissions for this Guild
guild_permissions: List = []
Expand All @@ -206,20 +215,39 @@ async def register_commands(self) -> None:
raise
else:
for i in cmds:
cmd = get(self.pending_application_commands, name=i["name"], description=i["description"],
type=i['type'])
cmd = get(
self.pending_application_commands,
name=i["name"],
description=i["description"],
type=i["type"],
)
self.application_commands[i["id"]] = cmd

# Permissions
permissions = [perm.to_dict() for perm in cmd.permissions if perm.guild_id is None or (
perm.guild_id == guild_id and perm.guild_id in cmd.guild_ids)]
guild_permissions.append({"id": i["id"], "permissions": permissions})
permissions = [
perm.to_dict()
for perm in cmd.permissions
if perm.guild_id is None
or (
perm.guild_id == guild_id and perm.guild_id in cmd.guild_ids
)
]
guild_permissions.append(
{"id": i["id"], "permissions": permissions}
)

for global_command in global_permissions:
permissions = [perm.to_dict() for perm in global_command['permissions'] if
perm.guild_id is None or (
perm.guild_id == guild_id and perm.guild_id in cmd.guild_ids)]
guild_permissions.append({"id": global_command["id"], "permissions": permissions})
permissions = [
perm.to_dict()
for perm in global_command["permissions"]
if perm.guild_id is None
or (
perm.guild_id == guild_id and perm.guild_id in cmd.guild_ids
)
]
guild_permissions.append(
{"id": global_command["id"], "permissions": permissions}
)

# Collect & Upsert Permissions for Each Guild
# Command Permissions for this Guild
Expand All @@ -231,47 +259,80 @@ async def register_commands(self) -> None:

# Replace Role / Owner Names with IDs
for permission in item["permissions"]:
if isinstance(permission['id'], str):
if isinstance(permission["id"], str):
# Replace Role Names
if permission['type'] == 1 and isinstance(permission['id'], str):
role = get(self.get_guild(guild_id).roles, name=permission['id'])
if permission["type"] == 1 and isinstance(
permission["id"], str
):
role = get(
self.get_guild(guild_id).roles,
name=permission["id"],
)

# If not missing
if not role is None:
new_cmd_perm["permissions"].append(
{"id": role.id, "type": 1, "permission": permission['permission']})
{
"id": role.id,
"type": 1,
"permission": permission["permission"],
}
)
else:
print("No Role ID found in Guild ({guild_id}) for Role ({role})".format(
guild_id=guild_id, role=permission['id']))
print(
"No Role ID found in Guild ({guild_id}) for Role ({role})".format(
guild_id=guild_id, role=permission["id"]
)
)
# Add Owner IDs
elif permission['type'] == 2 and permission['id'] == "owner":
elif (
permission["type"] == 2 and permission["id"] == "owner"
):
app = await self.application_info() # type: ignore
if app.team:
for m in app.team.members:
new_cmd_perm["permissions"].append(
{"id": m.id, "type": 2, "permission": permission['permission']})
{
"id": m.id,
"type": 2,
"permission": permission["permission"],
}
)
else:
new_cmd_perm["permissions"].append(
{"id": app.owner.id, "type": 2, "permission": permission['permission']})
{
"id": app.owner.id,
"type": 2,
"permission": permission["permission"],
}
)
# Add the Rest
else:
new_cmd_perm["permissions"].append(permission)

# Make sure we don't have over 10 overwrites
if len(new_cmd_perm['permissions']) > 10:
if len(new_cmd_perm["permissions"]) > 10:
print(
"Command '{name}' has more than 10 permission overrides in guild ({guild_id}).\nwill only use the first 10 permission overrides.".format(
name=self.application_commands[new_cmd_perm['id']].name, guild_id=guild_id))
new_cmd_perm['permissions'] = new_cmd_perm['permissions'][:10]
name=self.application_commands[new_cmd_perm["id"]].name,
guild_id=guild_id,
)
)
new_cmd_perm["permissions"] = new_cmd_perm["permissions"][:10]

# Append to guild_cmd_perms
guild_cmd_perms.append(new_cmd_perm)

# Upsert
try:
await self.http.bulk_upsert_command_permissions(self.user.id, guild_id, guild_cmd_perms)
await self.http.bulk_upsert_command_permissions(
self.user.id, guild_id, guild_cmd_perms
)
except Forbidden:
print(f"Failed to add command permissions to guild {guild_id}", file=sys.stderr)
print(
f"Failed to add command permissions to guild {guild_id}",
file=sys.stderr,
)
raise

async def process_application_commands(self, interaction: Interaction) -> None:
Expand Down Expand Up @@ -306,16 +367,16 @@ async def process_application_commands(self, interaction: Interaction) -> None:
else:
ctx = await self.get_application_context(interaction)
ctx.command = command
self.dispatch('application_command', ctx)
self.dispatch("application_command", ctx)
try:
if await self.can_run(ctx, call_once=True):
await ctx.command.invoke(ctx)
else:
raise CheckFailure('The global check once functions failed.')
raise CheckFailure("The global check once functions failed.")
except DiscordException as exc:
await ctx.command.dispatch_error(ctx, exc)
else:
self.dispatch('application_command_completion', ctx)
self.dispatch("application_command_completion", ctx)

def slash_command(self, **kwargs):
"""A shortcut decorator that invokes :func:`.ApplicationCommandMixin.command` and adds it to
Expand Down Expand Up @@ -400,7 +461,9 @@ def command(self, **kwargs):
"""
return self.application_command(**kwargs)

def command_group(self, name: str, description: str, guild_ids=None) -> SlashCommandGroup:
def command_group(
self, name: str, description: str, guild_ids=None
) -> SlashCommandGroup:
# TODO: Write documentation for this. I'm not familiar enough with what this function does to do it myself.
group = SlashCommandGroup(name, description, guild_ids)
self.add_application_command(group)
Expand Down Expand Up @@ -451,24 +514,30 @@ def __init__(self, description=None, *args, **options):
self._check_once = []
self._before_invoke = None
self._after_invoke = None
self.description = inspect.cleandoc(description) if description else ''
self.owner_id = options.get('owner_id')
self.owner_ids = options.get('owner_ids', set())
self.description = inspect.cleandoc(description) if description else ""
self.owner_id = options.get("owner_id")
self.owner_ids = options.get("owner_ids", set())

self.debug_guild = options.pop("debug_guild", None) # TODO: remove or reimplement
self.debug_guild = options.pop(
"debug_guild", None
) # TODO: remove or reimplement
self.debug_guilds = options.pop("debug_guilds", None)

if self.owner_id and self.owner_ids:
raise TypeError('Both owner_id and owner_ids are set.')
raise TypeError("Both owner_id and owner_ids are set.")

if self.owner_ids and not isinstance(self.owner_ids, collections.abc.Collection):
raise TypeError(f'owner_ids must be a collection not {self.owner_ids.__class__!r}')
if self.owner_ids and not isinstance(
self.owner_ids, collections.abc.Collection
):
raise TypeError(
f"owner_ids must be a collection not {self.owner_ids.__class__!r}"
)

if self.debug_guild:
if self.debug_guilds is None:
self.debug_guilds = [self.debug_guild]
else:
raise TypeError('Both debug_guild and debug_guilds are set.')
raise TypeError("Both debug_guild and debug_guilds are set.")

self._checks = []
self._check_once = []
Expand All @@ -481,7 +550,9 @@ async def on_connect(self):
async def on_interaction(self, interaction):
await self.process_application_commands(interaction)

async def on_application_command_error(self, context: ApplicationContext, exception: DiscordException) -> None:
async def on_application_command_error(
self, context: ApplicationContext, exception: DiscordException
) -> None:
"""|coro|
The default command error handler provided by the bot.
Expand All @@ -504,8 +575,10 @@ async def on_application_command_error(self, context: ApplicationContext, except
# if cog and cog.has_error_handler():
# return

print(f'Ignoring exception in command {context.command}:', file=sys.stderr)
traceback.print_exception(type(exception), exception, exception.__traceback__, file=sys.stderr)
print(f"Ignoring exception in command {context.command}:", file=sys.stderr)
traceback.print_exception(
type(exception), exception, exception.__traceback__, file=sys.stderr
)

# global check registration
# TODO: Remove these from commands.Bot
Expand Down Expand Up @@ -611,7 +684,9 @@ def whitelist(ctx):
self.add_check(func, call_once=True)
return func

async def can_run(self, ctx: ApplicationContext, *, call_once: bool = False) -> bool:
async def can_run(
self, ctx: ApplicationContext, *, call_once: bool = False
) -> bool:
data = self._check_once if call_once else self._checks

if len(data) == 0:
Expand All @@ -620,7 +695,6 @@ async def can_run(self, ctx: ApplicationContext, *, call_once: bool = False) ->
# type-checker doesn't distinguish between functions and methods
return await async_all(f(ctx) for f in data) # type: ignore


def before_invoke(self, coro):
"""A decorator that registers a coroutine as a pre-invoke hook.
A pre-invoke hook is called directly before the command is
Expand All @@ -646,7 +720,7 @@ def before_invoke(self, coro):
The coroutine passed is not actually a coroutine.
"""
if not asyncio.iscoroutinefunction(coro):
raise TypeError('The pre-invoke hook must be a coroutine.')
raise TypeError("The pre-invoke hook must be a coroutine.")

self._before_invoke = coro
return coro
Expand Down Expand Up @@ -678,11 +752,12 @@ def after_invoke(self, coro):
"""
if not asyncio.iscoroutinefunction(coro):
raise TypeError('The post-invoke hook must be a coroutine.')
raise TypeError("The post-invoke hook must be a coroutine.")

self._after_invoke = coro
return coro


class Bot(BotBase, Client):
"""Represents a discord bot.
Expand Down

0 comments on commit 095d804

Please sign in to comment.