mirror of
https://github.com/myned/modufur.git
synced 2024-11-01 13:02:38 +00:00
567 lines
21 KiB
Python
567 lines
21 KiB
Python
import hikari
|
|
import lightbulb
|
|
import songbird
|
|
from songbird import hikari as songkari
|
|
from youtubesearchpython import __future__ as youtube
|
|
|
|
import config as c
|
|
from tools import components
|
|
|
|
|
|
plugin = lightbulb.Plugin("music", include_datastore=True)
|
|
plugin.add_checks(lightbulb.guild_only)
|
|
plugin.d.queue = {}
|
|
|
|
# Bot permissions required for functionality
|
|
PERMISSIONS = (
|
|
hikari.Permissions.VIEW_CHANNEL,
|
|
hikari.Permissions.CONNECT,
|
|
hikari.Permissions.SPEAK,
|
|
)
|
|
|
|
|
|
# Subclass string to store metadata in coroutine parameters
|
|
# Prevents needing to query for metadata again
|
|
class Metadata(str):
|
|
def __new__(cls, url, **_):
|
|
return super().__new__(cls, url)
|
|
|
|
def __init__(self, _, *, title, thumbnail, duration):
|
|
self.title = title
|
|
self.thumbnail = thumbnail
|
|
self.duration = duration
|
|
|
|
|
|
# Check if command is used in music channel
|
|
@lightbulb.Check
|
|
def music_channel(context):
|
|
if str(context.guild_id) not in c.config.get("music", {}):
|
|
return True
|
|
|
|
return context.channel_id == c.config.get("music", {}).get(str(context.guild_id), None)
|
|
|
|
|
|
# Check if user is in bot voice channel
|
|
@lightbulb.Check
|
|
def voice_only(context):
|
|
voice = context.bot.cache.get_voice_state(context.guild_id, context.user.id)
|
|
|
|
if not voice:
|
|
return False
|
|
if (
|
|
context.guild_id in context.bot.voice.connections
|
|
and voice.channel_id != context.bot.voice.connections[context.guild_id].channel_id
|
|
):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
# Listener for music exceptions
|
|
@plugin.set_error_handler()
|
|
async def on_error(event):
|
|
error = None
|
|
|
|
match event.exception.__cause__ or event.exception:
|
|
case AttributeError():
|
|
error = f"***Queue may still be initializing.** If this continues, please notify my master, {event.context.bot.application.owner.mention}*"
|
|
case lightbulb.CheckFailure():
|
|
if "voice_only" in str(event.exception):
|
|
error = "***Join the voice channel first***"
|
|
elif "music_channel" in str(event.exception):
|
|
error = f"***Command must be used in <#{c.config['music'][str(event.context.guild_id)]}>***"
|
|
|
|
if error:
|
|
await event.context.respond(error, flags=hikari.MessageFlag.EPHEMERAL)
|
|
|
|
return True
|
|
|
|
|
|
# Send notification to music channel on track fail
|
|
# async def on_fail(driver, source):
|
|
# for guild_id in c.config["music"]:
|
|
# if driver is plugin.d.queue[int(guild_id)].driver:
|
|
# await plugin.bot.rest.create_message(c.config["music"][guild_id], "***Track is unplayable, skipping***")
|
|
# break
|
|
|
|
|
|
# # Send notification to music channel on next track
|
|
# def on_next(driver, track_handle):
|
|
# for guild_id in c.config["music"]:
|
|
# if driver is plugin.d.queue[int(guild_id)].driver:
|
|
# plugin.bot.create_task(_on_next(guild_id, track_handle))
|
|
# break
|
|
|
|
|
|
# # Awaitable required
|
|
# async def _on_next(guild_id, track_handle):
|
|
# await plugin.bot.rest.create_message(
|
|
# c.config["music"][guild_id],
|
|
# (
|
|
# hikari.Embed(
|
|
# title=track_handle.metadata.title,
|
|
# url=track_handle.metadata.source_url,
|
|
# color=(await plugin.bot.rest.fetch_guild(int(guild_id))).get_my_member().get_top_role().color,
|
|
# )
|
|
# .set_thumbnail(track_handle.metadata.thumbnail)
|
|
# .set_footer(convert(round(track_handle.metadata.duration)))
|
|
# ),
|
|
# )
|
|
|
|
|
|
@plugin.command
|
|
@lightbulb.add_checks(
|
|
lightbulb.has_guild_permissions(hikari.Permissions.MANAGE_GUILD), lightbulb.bot_has_guild_permissions(*PERMISSIONS)
|
|
)
|
|
@lightbulb.option("channel", "Channel for music commands, empty to unset", hikari.GuildChannel, required=False)
|
|
@lightbulb.command("set", "Settings for the server, MANAGE_GUILD permission required", ephemeral=True)
|
|
@lightbulb.implements(lightbulb.SlashCommand)
|
|
async def set(context):
|
|
if context.options.channel:
|
|
c.config.setdefault("music", {})[str(context.guild_id)] = int(context.options.channel.id)
|
|
else:
|
|
if str(context.guild_id) in c.config.get("music", {}):
|
|
del c.config["music"][str(context.guild_id)]
|
|
if "music" in c.config and not c.config["music"]:
|
|
del c.config["music"]
|
|
|
|
c.dump()
|
|
|
|
if str(context.guild_id) in c.config.get("music", {}):
|
|
await context.respond(f"**Music channel set to <#{c.config['music'][str(context.guild_id)]}>**")
|
|
else:
|
|
await context.respond("**Music channel unset**")
|
|
|
|
|
|
@plugin.command
|
|
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), music_channel)
|
|
@lightbulb.command("move", "Move to a voice channel, queue intact")
|
|
@lightbulb.implements(lightbulb.SlashCommand)
|
|
async def move(context):
|
|
if not context.bot.cache.get_voice_state(context.guild_id, context.user.id):
|
|
await context.respond("***Join a voice channel first***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
|
|
driver = await connect(context)
|
|
|
|
await context.respond(f"**Connected to <#{driver.channel_id}>**")
|
|
|
|
|
|
@plugin.command
|
|
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), voice_only, music_channel)
|
|
@lightbulb.option("query", "Search for a track, playlist, or link to play", required=False)
|
|
@lightbulb.command("play", "Play or resume music from YouTube")
|
|
@lightbulb.implements(lightbulb.SlashCommand)
|
|
async def play(context):
|
|
if not context.options.query:
|
|
if not running(context.guild_id) or not await state(context.guild_id):
|
|
await context.respond("***Nothing to resume***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
if (await state(context.guild_id)).playing == songbird.PlayMode.Play:
|
|
await context.respond("***Already playing***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
|
|
plugin.d.queue[context.guild_id].track_handle.play()
|
|
|
|
await context.respond("**Resuming**")
|
|
return
|
|
|
|
await context.respond(hikari.ResponseType.DEFERRED_MESSAGE_CREATE)
|
|
|
|
# Search using youtube-search-python instead of songbird-py
|
|
search = await youtube.Search(context.options.query, limit=5).next()
|
|
embed = hikari.Embed(color=context.get_guild().get_my_member().get_top_role().color).set_author(name="Now playing")
|
|
sources = []
|
|
match = None
|
|
|
|
# Filter only videos or playlists
|
|
for result in search["result"]:
|
|
match result["type"]:
|
|
case "video":
|
|
match = result
|
|
embed.set_footer(result["duration"])
|
|
|
|
sources.append(
|
|
songbird.ytdl(
|
|
Metadata(
|
|
result["link"],
|
|
title=result["title"],
|
|
thumbnail=result["thumbnails"][0]["url"],
|
|
duration=result["duration"],
|
|
)
|
|
)
|
|
)
|
|
break
|
|
case "playlist":
|
|
match = result
|
|
embed.set_footer(f"{result['videoCount']} tracks")
|
|
playlist = youtube.Playlist(result["link"])
|
|
|
|
while playlist.hasMoreVideos:
|
|
await playlist.getNextVideos()
|
|
for video in playlist.videos:
|
|
sources.append(
|
|
songbird.ytdl(
|
|
Metadata(
|
|
video["link"],
|
|
title=video["title"],
|
|
thumbnail=video["thumbnails"][0]["url"],
|
|
duration=video["duration"],
|
|
)
|
|
)
|
|
)
|
|
break
|
|
|
|
if not match:
|
|
await context.respond(
|
|
"***Couldn't find many results.** Try a more specific query*", flags=hikari.MessageFlag.EPHEMERAL
|
|
)
|
|
return
|
|
|
|
if context.guild_id not in context.bot.voice.connections:
|
|
await connect(context)
|
|
|
|
embed.title = match["title"]
|
|
embed.url = match["link"]
|
|
if match["thumbnails"]:
|
|
embed.set_thumbnail(match["thumbnails"][0]["url"])
|
|
|
|
if running(context.guild_id) and await state(context.guild_id):
|
|
plugin.d.queue[context.guild_id].extend(sources)
|
|
|
|
if len(sources) > 1:
|
|
embed.set_author(
|
|
name=f"Positions {plugin.d.queue[context.guild_id].index(sources[0]) + 1}-{plugin.d.queue[context.guild_id].index(sources[-1]) + 1} in queue"
|
|
)
|
|
else:
|
|
match plugin.d.queue[context.guild_id].index(sources[0]):
|
|
case 0:
|
|
embed.set_author(name="Next in queue")
|
|
case _ as index:
|
|
embed.set_author(name=f"Position {index + 1} in queue")
|
|
else:
|
|
plugin.d.queue[context.guild_id].extend(sources)
|
|
|
|
await context.respond(embed)
|
|
|
|
|
|
@plugin.command
|
|
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), voice_only, music_channel)
|
|
@lightbulb.option("position", "Position of the track to skip to", required=False, autocomplete=True)
|
|
@lightbulb.command("skip", "Skip the current or to a specific track")
|
|
@lightbulb.implements(lightbulb.SlashCommand)
|
|
async def skip(context):
|
|
if not running(context.guild_id) or not await state(context.guild_id):
|
|
await context.respond("***Nothing to skip***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
if len(plugin.d.queue[context.guild_id]) == 0:
|
|
plugin.d.queue[context.guild_id].skip()
|
|
|
|
await context.respond("**Final track skipped**")
|
|
return
|
|
|
|
embed = (
|
|
hikari.Embed(
|
|
title=plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].title,
|
|
url=plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"],
|
|
color=context.get_guild().get_my_member().get_top_role().color,
|
|
)
|
|
.set_author(name="Skipped to")
|
|
.set_thumbnail(plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].thumbnail)
|
|
.set_footer(plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].duration)
|
|
)
|
|
|
|
# Reconstruct queue in lieu of an index skip
|
|
if context.options.position:
|
|
position = context.options.position.split()[0]
|
|
|
|
if not position.isdigit() or int(position) not in range(1, len(plugin.d.queue[context.guild_id]) + 1):
|
|
await context.respond("***Invalid position***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
|
|
sources = plugin.d.queue[context.guild_id][int(position) - 1 :]
|
|
|
|
plugin.d.queue[context.guild_id].track_handle.stop()
|
|
plugin.d.queue[context.guild_id].clear()
|
|
plugin.d.queue[context.guild_id] = songbird.Queue(plugin.d.queue[context.guild_id].driver)
|
|
plugin.d.queue[context.guild_id].extend(sources)
|
|
|
|
embed.title = plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].title
|
|
embed.url = plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"]
|
|
embed.set_thumbnail(plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].thumbnail)
|
|
embed.set_footer(plugin.d.queue[context.guild_id][0].cr_frame.f_locals["url"].duration)
|
|
else:
|
|
plugin.d.queue[context.guild_id].skip()
|
|
|
|
await context.respond(embed)
|
|
|
|
|
|
@plugin.command
|
|
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), voice_only, music_channel)
|
|
@lightbulb.option("position", "Position of the track to remove", autocomplete=True)
|
|
@lightbulb.command("remove", "Remove a track from the queue")
|
|
@lightbulb.implements(lightbulb.SlashCommand)
|
|
async def remove(context):
|
|
# Build embeds for paginator
|
|
def build(_, content):
|
|
return (
|
|
hikari.Embed(
|
|
description=content,
|
|
color=context.get_guild().get_my_member().get_top_role().color,
|
|
)
|
|
.set_author(name="Removed")
|
|
.set_footer(f"{len(sources)} track{'s' if len(sources) > 1 else ''}")
|
|
)
|
|
|
|
if not running(context.guild_id) or not await state(context.guild_id):
|
|
await context.respond("***Nothing to remove***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
|
|
split = context.options.position.split()
|
|
|
|
# Determine if range is used
|
|
if "-" in split[0]:
|
|
positions = "".join(split[0]).split("-")
|
|
else:
|
|
positions = [split[0] for _ in range(2)]
|
|
|
|
# Check if input is valid and within range
|
|
if not all(position.isdigit() for position in positions) or not all(
|
|
int(position) in range(1, len(plugin.d.queue[context.guild_id]) + 1) for position in positions
|
|
):
|
|
await context.respond("***Invalid position***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
|
|
# Remove sources from starting index multiple times
|
|
sources = [
|
|
plugin.d.queue[context.guild_id].pop(int(positions[0]) - 1)
|
|
for _ in range(int(positions[0]), int(positions[1]) + 1)
|
|
]
|
|
|
|
# Build paginator
|
|
paginator = lightbulb.utils.EmbedPaginator()
|
|
paginator.set_embed_factory(build)
|
|
for position, source in enumerate(sources, start=int(positions[0])):
|
|
paginator.add_line(
|
|
f"**{position}** · {source.cr_frame.f_locals['url'].duration} · [{truncate(source.cr_frame.f_locals['url'].title)}]({source.cr_frame.f_locals['url']})"
|
|
)
|
|
|
|
pages = [page for page in paginator.build_pages()]
|
|
|
|
if len(pages) > 1:
|
|
selector = components.Selector(
|
|
pages=pages,
|
|
buttons=[components.Back(), components.Forward()],
|
|
timeout=600,
|
|
)
|
|
|
|
await selector.send(context.interaction, ephemeral=True)
|
|
else:
|
|
await context.respond(pages[0])
|
|
|
|
|
|
# Autocomplete songs from queue
|
|
@skip.autocomplete("position")
|
|
@remove.autocomplete("position")
|
|
async def position_autocomplete(option, interaction):
|
|
if not running(interaction.guild_id) or not await state(interaction.guild_id):
|
|
return
|
|
|
|
suggestions = []
|
|
|
|
if "-" in option.value:
|
|
split = option.value.split("-")
|
|
|
|
if not all(value.isdigit() for value in split):
|
|
return
|
|
|
|
for index, source in enumerate(plugin.d.queue[interaction.guild_id], start=1):
|
|
if index in range(int(split[0]), int(split[1]) + 1):
|
|
suggestions.append(
|
|
f"{split[0]}-{split[1]} · {index} · {truncate(source.cr_frame.f_locals['url'].title)}"
|
|
)
|
|
else:
|
|
for index, source in enumerate(plugin.d.queue[interaction.guild_id], start=1):
|
|
if not option.value or option.value in str(index):
|
|
suggestions.append(f"{index} · {truncate(source.cr_frame.f_locals['url'].title)}")
|
|
|
|
return suggestions[:25]
|
|
|
|
|
|
@plugin.command
|
|
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), voice_only, music_channel)
|
|
@lightbulb.command("pause", "Pause the current track")
|
|
@lightbulb.implements(lightbulb.SlashCommand)
|
|
async def pause(context):
|
|
if not running(context.guild_id) or not await state(context.guild_id):
|
|
await context.respond("***Nothing to pause***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
if (await state(context.guild_id)).playing == songbird.PlayMode.Pause:
|
|
await context.respond("***Already paused***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
|
|
plugin.d.queue[context.guild_id].track_handle.pause()
|
|
|
|
await context.respond("**Paused**")
|
|
|
|
|
|
@plugin.command
|
|
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS), voice_only, music_channel)
|
|
@lightbulb.command("stop", "Stop the current track and clear the queue")
|
|
@lightbulb.implements(lightbulb.SlashCommand)
|
|
async def stop(context):
|
|
if not running(context.guild_id):
|
|
await context.respond("***Nothing to stop***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
|
|
try:
|
|
plugin.d.queue[context.guild_id].track_handle.stop()
|
|
except songbird.SongbirdError:
|
|
pass
|
|
|
|
plugin.d.queue.clear()
|
|
|
|
if context.guild_id in context.bot.voice.connections:
|
|
await context.bot.voice.disconnect(context.guild_id)
|
|
|
|
await context.respond("**Stopped**")
|
|
|
|
|
|
@plugin.command
|
|
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS))
|
|
@lightbulb.command("nowplaying", "Show the current track", ephemeral=True)
|
|
@lightbulb.implements(lightbulb.SlashCommand)
|
|
async def nowplaying(context):
|
|
if not running(context.guild_id) or not await state(context.guild_id):
|
|
await context.respond("***Nothing is playing***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
|
|
await context.respond(
|
|
hikari.Embed(
|
|
title=plugin.d.queue[context.guild_id].track_handle.metadata.title,
|
|
url=plugin.d.queue[context.guild_id].track_handle.metadata.source_url,
|
|
color=context.get_guild().get_my_member().get_top_role().color,
|
|
)
|
|
.set_author(name="Now playing")
|
|
.set_thumbnail(plugin.d.queue[context.guild_id].track_handle.metadata.thumbnail)
|
|
.set_footer(
|
|
f"{convert(round((await state(context.guild_id)).position))} / {convert(round(plugin.d.queue[context.guild_id].track_handle.metadata.duration))}"
|
|
)
|
|
)
|
|
|
|
|
|
@plugin.command
|
|
@lightbulb.add_checks(lightbulb.bot_has_guild_permissions(*PERMISSIONS))
|
|
@lightbulb.command("queue", "List songs in the queue", ephemeral=True)
|
|
@lightbulb.implements(lightbulb.SlashCommand)
|
|
async def queue(context):
|
|
# Build embeds for paginator
|
|
def build(_, content):
|
|
return (
|
|
hikari.Embed(
|
|
title=plugin.d.queue[context.guild_id].track_handle.metadata.title,
|
|
url=plugin.d.queue[context.guild_id].track_handle.metadata.source_url,
|
|
description=content,
|
|
color=context.get_guild().get_my_member().get_top_role().color,
|
|
)
|
|
.set_author(name="Queue")
|
|
.set_thumbnail(plugin.d.queue[context.guild_id].track_handle.metadata.thumbnail)
|
|
.set_footer(
|
|
f"{len(plugin.d.queue[context.guild_id])} track{'s' if len(plugin.d.queue[context.guild_id]) > 1 else ''}"
|
|
)
|
|
)
|
|
|
|
if not running(context.guild_id) or not await state(context.guild_id):
|
|
await context.respond("***Nothing in the queue***", flags=hikari.MessageFlag.EPHEMERAL)
|
|
return
|
|
|
|
# Build paginator
|
|
paginator = lightbulb.utils.EmbedPaginator()
|
|
paginator.set_embed_factory(build)
|
|
for index, source in enumerate(plugin.d.queue[context.guild_id], start=1):
|
|
paginator.add_line(
|
|
f"**{index}** · {source.cr_frame.f_locals['url'].duration} · [{truncate(source.cr_frame.f_locals['url'].title)}]({source.cr_frame.f_locals['url']})"
|
|
)
|
|
|
|
pages = [page for page in paginator.build_pages()]
|
|
|
|
if len(pages) > 1:
|
|
selector = components.Selector(
|
|
pages=pages,
|
|
buttons=[components.Back(), components.Forward()],
|
|
timeout=600,
|
|
)
|
|
|
|
await selector.send(context.interaction, ephemeral=True)
|
|
else:
|
|
await context.respond(pages[0])
|
|
|
|
|
|
# (Re)connect to voice channel and restart queue
|
|
async def connect(context):
|
|
voice = context.bot.cache.get_voice_state(context.guild_id, context.user.id)
|
|
|
|
if context.guild_id in context.bot.voice.connections:
|
|
await context.bot.voice.disconnect(context.guild_id)
|
|
|
|
driver = await context.bot.voice.connect_to(
|
|
context.guild_id,
|
|
voice.channel_id,
|
|
songkari.Voicebox,
|
|
deaf=True,
|
|
)
|
|
|
|
# Reconstruct queue
|
|
if running(context.guild_id):
|
|
sources = [songbird.ytdl(plugin.d.queue[context.guild_id].track_handle.metadata.source_url)] + plugin.d.queue[
|
|
context.guild_id
|
|
]
|
|
|
|
plugin.d.queue[context.guild_id].clear()
|
|
plugin.d.queue[context.guild_id] = songbird.Queue(driver)
|
|
plugin.d.queue[context.guild_id].extend(sources)
|
|
elif context.guild_id not in plugin.d.queue:
|
|
plugin.d.queue[context.guild_id] = songbird.Queue(driver)
|
|
|
|
return driver
|
|
|
|
|
|
# Return track info if queue is playing
|
|
async def state(guild_id):
|
|
try:
|
|
return await plugin.d.queue[guild_id].track_handle.get_info()
|
|
# TrackError is not exposed, so use base songbird error
|
|
# Return empty TrackState
|
|
except songbird.SongbirdError:
|
|
return None
|
|
|
|
|
|
# Return True if queue exists and is running
|
|
def running(guild_id):
|
|
return guild_id in plugin.d.queue and plugin.d.queue[guild_id].track_handle
|
|
|
|
|
|
# Convert seconds into (HH:)MM:SS
|
|
def convert(seconds):
|
|
h = seconds // 3600
|
|
m = seconds % 3600 // 60
|
|
s = seconds % 3600 % 60
|
|
|
|
return f"{f'{h}:' if h else ''}{m}:{s:02d}"
|
|
|
|
|
|
# Truncate if characters are longer than limit
|
|
# Discord does not handle escaped markdown syntax correctly, so remove []
|
|
def truncate(string, limit=80):
|
|
string = string.replace("[", "").replace("]", "")
|
|
|
|
return f"{string[:limit]}..." if len(string) > limit else string
|
|
|
|
|
|
def load(bot):
|
|
bot.add_plugin(plugin)
|
|
|
|
|
|
def unload(bot):
|
|
bot.create_task(bot.voice.disconnect_all())
|
|
bot.remove_plugin(plugin)
|