1
0
Fork 0
mirror of https://github.com/myned/modufur.git synced 2024-11-01 13:02:38 +00:00
modufur/commands/music.py

567 lines
21 KiB
Python

import hikari
import lightbulb
import songbird
from songbird import hikari as songkari
from youtubesearchpython import __future__ as youtube
import config as c
from tools import components
plugin = lightbulb.Plugin("music", include_datastore=True)
plugin.add_checks(lightbulb.guild_only)
plugin.d.queue = {}
# Bot permissions required for functionality
PERMISSIONS = (
hikari.Permissions.VIEW_CHANNEL,
hikari.Permissions.CONNECT,
hikari.Permissions.SPEAK,
)
# Subclass string to store metadata in coroutine parameters
# Prevents needing to query for metadata again
class Metadata(str):
def __new__(cls, url, **_):
return super().__new__(cls, url)
def __init__(self, _, *, title, thumbnail, duration):
self.title = title
self.thumbnail = thumbnail
self.duration = duration
# Check if command is used in music channel
@lightbulb.Check
def music_channel(context):
if str(context.guild_id) not in c.config.get("music", {}):
return True
return context.channel_id == c.config.get("music", {}).get(str(context.guild_id), None)
# Check if user is in bot voice channel
@lightbulb.Check
def voice_only(context):
voice = context.bot.cache.get_voice_state(context.guild_id, context.user.id)
if not voice:
return False
if (
context.guild_id in context.bot.voice.connections
and voice.channel_id != context.bot.voice.connections[context.guild_id].channel_id
):
return False
return True
# Listener for music exceptions
@plugin.set_error_handler()
async def on_error(event):
error = None
match event.exception.__cause__ or event.exception:
case AttributeError():
error = f"***Queue may still be initializing.** If this continues, please notify my master, {event.context.bot.application.owner.mention}*"
case lightbulb.CheckFailure():
if "voice_only" in str(event.exception):
error = "***Join the voice channel first***"
elif "music_channel" in str(event.exception):
error = f"***Command must be used in <#{c.config['music'][str(event.context.guild_id)]}>***"
if error:
await event.context.respond(error, flags=hikari.MessageFlag.EPHEMERAL)
return True
# Send notification to music channel on track fail
# async def on_fail(driver, source):
# for guild_id in c.config["music"]:
# if driver is plugin.d.queue[int(guild_id)].driver:
# await plugin.bot.rest.create_message(c.config["music"][guild_id], "***Track is unplayable, skipping***")
# break
# # Send notification to music channel on next track
# def on_next(driver, track_handle):
# for guild_id in c.config["music"]:
# if driver is plugin.d.queue[int(guild_id)].driver:
# plugin.bot.create_task(_on_next(guild_id, track_handle))
# break
# # Awaitable required
# async def _on_next(guild_id, track_handle):
# await plugin.bot.rest.create_message(
# c.config["music"][guild_id],
# (
# hikari.Embed(
# title=track_handle.metadata.title,
# url=track_handle.metadata.source_url,
# color=(await plugin.bot.rest.fetch_guild(int(guild_id))).get_my_member().get_top_role().color,
# )
# .set_thumbnail(track_handle.metadata.thumbnail)
# .set_footer(convert(round(track_handle.metadata.duration)))
# ),
# )
@plugin.command
@lightbulb.add_checks(
lightbulb.has_guild_permissions(hikari.Permissions.MANAGE_GUILD), lightbulb.bot_has_guild_permissions(*PERMISSIONS)
)
@lightbulb.option("channel", "Channel for music commands, empty to unset", hikari.GuildChannel, required=False)
@lightbulb.command("set", "Settings for the server, MANAGE_GUILD permission required", ephemeral=True)
@lightbulb.implements(lightbulb.SlashCommand)
async def set(context):
if context.options.channel:
c.config.setdefault("music", {})[str(context.guild_id)] = int(context.options.channel.id)
else:
if str(context.guild_id) in c.config.get("music", {}):
del c.config["music"][str(context.guild_id)]
if "music" in c.config and not c.config["music"]:
del c.config["music"]
c.dump()
if str(context.guild_id) in c.config.get("music", {}):
await context.respond(f"**Music channel set to <#{c.config['music'][str(context.guild_id)]}>**")
else:
await context.respond("**Music channel unset**")
@plugin.command
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), music_channel)
@lightbulb.command("move", "Move to a voice channel, queue intact")
@lightbulb.implements(lightbulb.SlashCommand)
async def move(context):
if not context.bot.cache.get_voice_state(context.guild_id, context.user.id):
await context.respond("***Join a voice channel first***", flags=hikari.MessageFlag.EPHEMERAL)
return
driver = await connect(context)
await context.respond(f"**Connected to <#{driver.channel_id}>**")
@plugin.command
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), voice_only, music_channel)
@lightbulb.option("query", "Search for a track, playlist, or link to play", required=False)
@lightbulb.command("play", "Play or resume music from YouTube")
@lightbulb.implements(lightbulb.SlashCommand)
async def play(context):
if not context.options.query:
if not running(context.guild_id) or not await state(context.guild_id):
await context.respond("***Nothing to resume***", flags=hikari.MessageFlag.EPHEMERAL)
return
if (await state(context.guild_id)).playing == songbird.PlayMode.Play:
await context.respond("***Already playing***", flags=hikari.MessageFlag.EPHEMERAL)
return
plugin.d.queue[context.guild_id].track_handle.play()
await context.respond("**Resuming**")
return
await context.respond(hikari.ResponseType.DEFERRED_MESSAGE_CREATE)
# Search using youtube-search-python instead of songbird-py
search = await youtube.Search(context.options.query, limit=5).next()
embed = hikari.Embed(color=context.get_guild().get_my_member().get_top_role().color).set_author(name="Now playing")
sources = []
match = None
# Filter only videos or playlists
for result in search["result"]:
match result["type"]:
case "video":
match = result
embed.set_footer(result["duration"])
sources.append(
songbird.ytdl(
Metadata(
result["link"],
title=result["title"],
thumbnail=result["thumbnails"][0]["url"],
duration=result["duration"],
)
)
)
break
case "playlist":
match = result
embed.set_footer(f"{result['videoCount']} tracks")
playlist = youtube.Playlist(result["link"])
while playlist.hasMoreVideos:
await playlist.getNextVideos()
for video in playlist.videos:
sources.append(
songbird.ytdl(
Metadata(
video["link"],
title=video["title"],
thumbnail=video["thumbnails"][0]["url"],
duration=video["duration"],
)
)
)
break
if not match:
await context.respond(
"***Couldn't find many results.** Try a more specific query*", flags=hikari.MessageFlag.EPHEMERAL
)
return
if context.guild_id not in context.bot.voice.connections:
await connect(context)
embed.title = match["title"]
embed.url = match["link"]
if match["thumbnails"]:
embed.set_thumbnail(match["thumbnails"][0]["url"])
if running(context.guild_id) and await state(context.guild_id):
plugin.d.queue[context.guild_id].extend(sources)
if len(sources) > 1:
embed.set_author(
name=f"Positions {plugin.d.queue[context.guild_id].index(sources[0]) + 1}-{plugin.d.queue[context.guild_id].index(sources[-1]) + 1} in queue"
)
else:
match plugin.d.queue[context.guild_id].index(sources[0]):
case 0:
embed.set_author(name="Next in queue")
case _ as index:
embed.set_author(name=f"Position {index + 1} in queue")
else:
plugin.d.queue[context.guild_id].extend(sources)
await context.respond(embed)
@plugin.command
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), voice_only, music_channel)
@lightbulb.option("position", "Position of the track to skip to", required=False, autocomplete=True)
@lightbulb.command("skip", "Skip the current or to a specific track")
@lightbulb.implements(lightbulb.SlashCommand)
async def skip(context):
if not running(context.guild_id) or not await state(context.guild_id):
await context.respond("***Nothing to skip***", flags=hikari.MessageFlag.EPHEMERAL)
return
if len(plugin.d.queue[context.guild_id]) == 0:
plugin.d.queue[context.guild_id].skip()
await context.respond("**Final track skipped**")
return
embed = (
hikari.Embed(
title=plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].title,
url=plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"],
color=context.get_guild().get_my_member().get_top_role().color,
)
.set_author(name="Skipped to")
.set_thumbnail(plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].thumbnail)
.set_footer(plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].duration)
)
# Reconstruct queue in lieu of an index skip
if context.options.position:
position = context.options.position.split()[0]
if not position.isdigit() or int(position) not in range(1, len(plugin.d.queue[context.guild_id]) + 1):
await context.respond("***Invalid position***", flags=hikari.MessageFlag.EPHEMERAL)
return
sources = plugin.d.queue[context.guild_id][int(position) - 1 :]
plugin.d.queue[context.guild_id].track_handle.stop()
plugin.d.queue[context.guild_id].clear()
plugin.d.queue[context.guild_id] = songbird.Queue(plugin.d.queue[context.guild_id].driver)
plugin.d.queue[context.guild_id].extend(sources)
embed.title = plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].title
embed.url = plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"]
embed.set_thumbnail(plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].thumbnail)
embed.set_footer(plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].duration)
else:
plugin.d.queue[context.guild_id].skip()
await context.respond(embed)
@plugin.command
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), voice_only, music_channel)
@lightbulb.option("position", "Position of the track to remove", autocomplete=True)
@lightbulb.command("remove", "Remove a track from the queue")
@lightbulb.implements(lightbulb.SlashCommand)
async def remove(context):
# Build embeds for paginator
def build(_, content):
return (
hikari.Embed(
description=content,
color=context.get_guild().get_my_member().get_top_role().color,
)
.set_author(name="Removed")
.set_footer(f"{len(sources)} track{'s' if len(sources) > 1 else ''}")
)
if not running(context.guild_id) or not await state(context.guild_id):
await context.respond("***Nothing to remove***", flags=hikari.MessageFlag.EPHEMERAL)
return
split = context.options.position.split()
# Determine if range is used
if "-" in split[0]:
positions = "".join(split[0]).split("-")
else:
positions = [split[0] for _ in range(2)]
# Check if input is valid and within range
if not all(position.isdigit() for position in positions) or not all(
int(position) in range(1, len(plugin.d.queue[context.guild_id]) + 1) for position in positions
):
await context.respond("***Invalid position***", flags=hikari.MessageFlag.EPHEMERAL)
return
# Remove sources from starting index multiple times
sources = [
plugin.d.queue[context.guild_id].pop(int(positions[0]) - 1)
for _ in range(int(positions[0]), int(positions[1]) + 1)
]
# Build paginator
paginator = lightbulb.utils.EmbedPaginator()
paginator.set_embed_factory(build)
for position, source in enumerate(sources, start=int(positions[0])):
paginator.add_line(
f"**{position}** · {source.cr_frame.f_locals['url'].duration} · [{truncate(source.cr_frame.f_locals['url'].title)}]({source.cr_frame.f_locals['url']})"
)
pages = [page for page in paginator.build_pages()]
if len(pages) > 1:
selector = components.Selector(
pages=pages,
buttons=[components.Back(), components.Forward()],
timeout=600,
)
await selector.send(context.interaction, ephemeral=True)
else:
await context.respond(pages[0])
# Autocomplete songs from queue
@skip.autocomplete("position")
@remove.autocomplete("position")
async def position_autocomplete(option, interaction):
if not running(interaction.guild_id) or not await state(interaction.guild_id):
return
suggestions = []
if "-" in option.value:
split = option.value.split("-")
if not all(value.isdigit() for value in split):
return
for index, source in enumerate(plugin.d.queue[interaction.guild_id], start=1):
if index in range(int(split[0]), int(split[1]) + 1):
suggestions.append(
f"{split[0]}-{split[1]} · {index} · {truncate(source.cr_frame.f_locals['url'].title)}"
)
else:
for index, source in enumerate(plugin.d.queue[interaction.guild_id], start=1):
if not option.value or option.value in str(index):
suggestions.append(f"{index} · {truncate(source.cr_frame.f_locals['url'].title)}")
return suggestions[:25]
@plugin.command
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), voice_only, music_channel)
@lightbulb.command("pause", "Pause the current track")
@lightbulb.implements(lightbulb.SlashCommand)
async def pause(context):
if not running(context.guild_id) or not await state(context.guild_id):
await context.respond("***Nothing to pause***", flags=hikari.MessageFlag.EPHEMERAL)
return
if (await state(context.guild_id)).playing == songbird.PlayMode.Pause:
await context.respond("***Already paused***", flags=hikari.MessageFlag.EPHEMERAL)
return
plugin.d.queue[context.guild_id].track_handle.pause()
await context.respond("**Paused**")
@plugin.command
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), voice_only, music_channel)
@lightbulb.command("stop", "Stop the current track and clear the queue")
@lightbulb.implements(lightbulb.SlashCommand)
async def stop(context):
if not running(context.guild_id):
await context.respond("***Nothing to stop***", flags=hikari.MessageFlag.EPHEMERAL)
return
try:
plugin.d.queue[context.guild_id].track_handle.stop()
except songbird.SongbirdError:
pass
plugin.d.queue.clear()
if context.guild_id in context.bot.voice.connections:
await context.bot.voice.disconnect(context.guild_id)
await context.respond("**Stopped**")
@plugin.command
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS))
@lightbulb.command("nowplaying", "Show the current track", ephemeral=True)
@lightbulb.implements(lightbulb.SlashCommand)
async def nowplaying(context):
if not running(context.guild_id) or not await state(context.guild_id):
await context.respond("***Nothing is playing***", flags=hikari.MessageFlag.EPHEMERAL)
return
await context.respond(
hikari.Embed(
title=plugin.d.queue[context.guild_id].track_handle.metadata.title,
url=plugin.d.queue[context.guild_id].track_handle.metadata.source_url,
color=context.get_guild().get_my_member().get_top_role().color,
)
.set_author(name="Now playing")
.set_thumbnail(plugin.d.queue[context.guild_id].track_handle.metadata.thumbnail)
.set_footer(
f"{convert(round((await state(context.guild_id)).position))} / {convert(round(plugin.d.queue[context.guild_id].track_handle.metadata.duration))}"
)
)
@plugin.command
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS))
@lightbulb.command("queue", "List songs in the queue", ephemeral=True)
@lightbulb.implements(lightbulb.SlashCommand)
async def queue(context):
# Build embeds for paginator
def build(_, content):
return (
hikari.Embed(
title=plugin.d.queue[context.guild_id].track_handle.metadata.title,
url=plugin.d.queue[context.guild_id].track_handle.metadata.source_url,
description=content,
color=context.get_guild().get_my_member().get_top_role().color,
)
.set_author(name="Queue")
.set_thumbnail(plugin.d.queue[context.guild_id].track_handle.metadata.thumbnail)
.set_footer(
f"{len(plugin.d.queue[context.guild_id])} track{'s' if len(plugin.d.queue[context.guild_id]) > 1 else ''}"
)
)
if not running(context.guild_id) or not await state(context.guild_id):
await context.respond("***Nothing in the queue***", flags=hikari.MessageFlag.EPHEMERAL)
return
# Build paginator
paginator = lightbulb.utils.EmbedPaginator()
paginator.set_embed_factory(build)
for index, source in enumerate(plugin.d.queue[context.guild_id], start=1):
paginator.add_line(
f"**{index}** · {source.cr_frame.f_locals['url'].duration} · [{truncate(source.cr_frame.f_locals['url'].title)}]({source.cr_frame.f_locals['url']})"
)
pages = [page for page in paginator.build_pages()]
if len(pages) > 1:
selector = components.Selector(
pages=pages,
buttons=[components.Back(), components.Forward()],
timeout=600,
)
await selector.send(context.interaction, ephemeral=True)
else:
await context.respond(pages[0])
# (Re)connect to voice channel and restart queue
async def connect(context):
voice = context.bot.cache.get_voice_state(context.guild_id, context.user.id)
if context.guild_id in context.bot.voice.connections:
await context.bot.voice.disconnect(context.guild_id)
driver = await context.bot.voice.connect_to(
context.guild_id,
voice.channel_id,
songkari.Voicebox,
deaf=True,
)
# Reconstruct queue
if running(context.guild_id):
sources = [songbird.ytdl(plugin.d.queue[context.guild_id].track_handle.metadata.source_url)] + plugin.d.queue[
context.guild_id
]
plugin.d.queue[context.guild_id].clear()
plugin.d.queue[context.guild_id] = songbird.Queue(driver)
plugin.d.queue[context.guild_id].extend(sources)
elif context.guild_id not in plugin.d.queue:
plugin.d.queue[context.guild_id] = songbird.Queue(driver)
return driver
# Return track info if queue is playing
async def state(guild_id):
try:
return await plugin.d.queue[guild_id].track_handle.get_info()
# TrackError is not exposed, so use base songbird error
# Return empty TrackState
except songbird.SongbirdError:
return None
# Return True if queue exists and is running
def running(guild_id):
return guild_id in plugin.d.queue and plugin.d.queue[guild_id].track_handle
# Convert seconds into (HH:)MM:SS
def convert(seconds):
h = seconds // 3600
m = seconds % 3600 // 60
s = seconds % 3600 % 60
return f"{f'{h}:' if h else ''}{m}:{s:02d}"
# Truncate if characters are longer than limit
# Discord does not handle escaped markdown syntax correctly, so remove []
def truncate(string, limit=80):
string = string.replace("[", "").replace("]", "")
return f"{string[:limit]}..." if len(string) > limit else string
def load(bot):
bot.add_plugin(plugin)
def unload(bot):
bot.create_task(bot.voice.disconnect_all())
bot.remove_plugin(plugin)