1
0
Fork 0
mirror of https://github.com/myned/modufur.git synced 2024-11-01 13:02:38 +00:00

Change to black formatting

This commit is contained in:
Myned 2022-02-21 14:53:53 -06:00
parent 5956d9301e
commit 2c171f204c
No known key found for this signature in database
GPG key ID: 33790F979F7A28B8
7 changed files with 117 additions and 98 deletions

View file

@ -6,70 +6,74 @@ import pysaucenao
from tools import components, scraper from tools import components, scraper
plugin = lightbulb.Plugin('booru') plugin = lightbulb.Plugin("booru")
extractor = urlextract.URLExtract() extractor = urlextract.URLExtract()
@plugin.command @plugin.command
#@lightbulb.option('attachment', 'Attachment(s) to reverse', required=False) # @lightbulb.option('attachment', 'Attachment(s) to reverse')
@lightbulb.option('url', 'URL(s) to reverse, separated by space') @lightbulb.option("url", "URL(s) to reverse, separated by space")
@lightbulb.command('reverse', 'Reverse image search using SauceNAO & Kheina', ephemeral=True) @lightbulb.command("reverse", "Reverse image search using SauceNAO & Kheina", ephemeral=True)
@lightbulb.implements(lightbulb.SlashCommand, lightbulb.MessageCommand) @lightbulb.implements(lightbulb.SlashCommand, lightbulb.MessageCommand)
async def reverse(context): async def reverse(context):
match context: match context:
case lightbulb.SlashContext(): case lightbulb.SlashContext():
urls = extractor.find_urls(context.options.url or '', only_unique=True, with_schema_only=True) urls = extractor.find_urls(context.options.url or "", only_unique=True, with_schema_only=True)
if not urls: if not urls:
await context.respond('**Invalid URL(s).**') await context.respond("**Invalid URL(s).**")
return return
await _reverse(context, urls) await _reverse(context, urls)
case lightbulb.MessageContext(): case lightbulb.MessageContext():
urls = extractor.find_urls(context.options.target.content or '', only_unique=True, with_schema_only=True) urls = extractor.find_urls(context.options.target.content or "", only_unique=True, with_schema_only=True)
urls += [attachment.url for attachment in context.options.target.attachments if attachment.url not in urls] urls += [attachment.url for attachment in context.options.target.attachments if attachment.url not in urls]
if not urls: if not urls:
await context.respond('**No images found.**') await context.respond("**No images found.**")
return return
selector = None selector = None
if len(urls) > 1: if len(urls) > 1:
selector = components.Selector( selector = components.Selector(
pages=[f'**Select potential images to search: `{urls.index(url) + 1}/{len(urls)}`**\n{url}' for url in urls], pages=[
f"**Select potential images to search: `{urls.index(url) + 1}/{len(urls)}`**\n{url}"
for url in urls
],
buttons=[components.Back(), components.Forward(), components.Select(), components.Confirm()], buttons=[components.Back(), components.Forward(), components.Select(), components.Confirm()],
urls=urls urls=urls,
) )
await selector.send(context.interaction, ephemeral=True) await selector.send(context.interaction, ephemeral=True)
await selector.wait() await selector.wait()
if selector.timed_out: if selector.timed_out:
await context.interaction.edit_initial_response('**Timed out.**', components=None) await context.interaction.edit_initial_response("**Timed out.**", components=None)
return return
urls = selector.selected urls = selector.selected
await _reverse(context, urls, selector=selector) await _reverse(context, urls, selector=selector)
@reverse.set_error_handler() @reverse.set_error_handler()
async def on_reverse_error(event): async def on_reverse_error(event):
error = None error = None
match event.exception.__cause__: match event.exception.__cause__:
case pysaucenao.ShortLimitReachedException(): case pysaucenao.ShortLimitReachedException():
error = '**API limit reached. Please try again in a minute.**' error = "**API limit reached. Please try again in a minute.**"
case pysaucenao.DailyLimitReachedException(): case pysaucenao.DailyLimitReachedException():
error = '**Daily API limit reached. Please try again tomorrow.**' error = "**Daily API limit reached. Please try again tomorrow.**"
case pysaucenao.FileSizeLimitException() as url: case pysaucenao.FileSizeLimitException() as url:
error = f'**Image file size too large:**\n{url}' error = f"**Image file size too large:**\n{url}"
case pysaucenao.ImageSizeException() as url: case pysaucenao.ImageSizeException() as url:
error = f'**Image resolution too small:**\n{url}' error = f"**Image resolution too small:**\n{url}"
case pysaucenao.InvalidImageException() as url: case pysaucenao.InvalidImageException() as url:
error = f'**Invalid image:**\n{url}' error = f"**Invalid image:**\n{url}"
case pysaucenao.UnknownStatusCodeException(): case pysaucenao.UnknownStatusCodeException():
error = '**An unknown SauceNAO error has occurred. The service may be down.**' error = "**An unknown SauceNAO error has occurred. The service may be down.**"
if error: if error:
try: try:
@ -79,6 +83,7 @@ async def on_reverse_error(event):
return True return True
async def _reverse(context, urls, *, selector=None): async def _reverse(context, urls, *, selector=None):
if not selector: if not selector:
await context.respond(hikari.ResponseType.DEFERRED_MESSAGE_CREATE) await context.respond(hikari.ResponseType.DEFERRED_MESSAGE_CREATE)
@ -87,24 +92,27 @@ async def _reverse(context, urls, *, selector=None):
if not matches: if not matches:
if selector: if selector:
await context.interaction.edit_initial_response('**No matches found.**', components=None) await context.interaction.edit_initial_response("**No matches found.**", components=None)
else: else:
await context.respond('**No matches found.**') await context.respond("**No matches found.**")
return return
pages = [(hikari.Embed( pages = [
title=match['artist'], url=match['url'], color=context.get_guild().get_my_member().get_top_role().color) (
.set_author(name=f'{match["similarity"]}% Match') hikari.Embed(
.set_image(match['thumbnail']) title=match["artist"], url=match["url"], color=context.get_guild().get_my_member().get_top_role().color
.set_footer(match['source'])) )
if match else f'**No match found.**\n{urls[index]}' for index, match in enumerate(matches)] .set_author(name=f'{match["similarity"]}% Match')
.set_image(match["thumbnail"])
.set_footer(match["source"])
)
if match
else f"**No match found.**\n{urls[index]}"
for index, match in enumerate(matches)
]
if len(pages) > 1: if len(pages) > 1:
selector = components.Selector( selector = components.Selector(pages=pages, buttons=[components.Back(), components.Forward()], timeout=900)
pages=pages,
buttons=[components.Back(), components.Forward()],
timeout=900
)
await selector.send_edit(context.interaction) await selector.send_edit(context.interaction)
else: else:
@ -113,7 +121,10 @@ async def _reverse(context, urls, *, selector=None):
else: else:
await context.respond(pages[0]) await context.respond(pages[0])
def load(bot): def load(bot):
bot.add_plugin(plugin) bot.add_plugin(plugin)
def unload(bot): def unload(bot):
bot.remove_plugin(plugin) bot.remove_plugin(plugin)

View file

@ -1,32 +1,37 @@
import os import os
import lightbulb import lightbulb
plugin = lightbulb.Plugin('master') plugin = lightbulb.Plugin("master")
@plugin.command @plugin.command
@lightbulb.option('command', 'What is your command, master?', required=False, choices=('reload', 'sleep')) @lightbulb.option("command", "What is your command, master?", required=False, choices=("reload", "sleep"))
@lightbulb.command('master', 'Commands my master can demand of me', ephemeral=True) @lightbulb.command("master", "Commands my master can demand of me", ephemeral=True)
@lightbulb.implements(lightbulb.SlashCommand) @lightbulb.implements(lightbulb.SlashCommand)
async def master(context): async def master(context):
if context.user.id == context.bot.application.owner.id: if context.user.id == context.bot.application.owner.id:
match context.options.command: match context.options.command:
case 'reload': case "reload":
context.bot.reload_extensions(*context.bot.extensions) context.bot.reload_extensions(*context.bot.extensions)
extensions = [os.path.splitext(extension)[1][1:] for extension in context.bot.extensions] extensions = [os.path.splitext(extension)[1][1:] for extension in context.bot.extensions]
await context.respond(f'**Reloaded `{"`, `".join(extensions[:-1])}`, and `{extensions[-1]}` for you, master.**') await context.respond(
case 'sleep': f'**Reloaded `{"`, `".join(extensions[:-1])}`, and `{extensions[-1]}` for you, master.**'
await context.respond('**Goodnight, master.**') )
case "sleep":
await context.respond("**Goodnight, master.**")
await context.bot.close() await context.bot.close()
case _: case _:
await context.respond(f'**Hello, master.**') await context.respond(f"**Hello, master.**")
else: else:
await context.respond(f'**{context.bot.application.owner.mention} is my master. 🐺**') await context.respond(f"**{context.bot.application.owner.mention} is my master. 🐺**")
def load(bot): def load(bot):
bot.add_plugin(plugin) bot.add_plugin(plugin)
def unload(bot): def unload(bot):
bot.remove_plugin(plugin) bot.remove_plugin(plugin)

View file

@ -1,29 +1,32 @@
import toml import toml
import hikari import hikari
ACTIVITY = hikari.ActivityType.LISTENING ACTIVITY = hikari.ActivityType.LISTENING
ERROR = '```❗ An internal error has occurred. This has been reported to my master. 🐺```' ERROR = "```❗ An internal error has occurred. This has been reported to my master. 🐺```"
CONFIG = '''\ CONFIG = """\
guilds = [] # guild IDs to register commands, empty for global guilds = [] # guild IDs to register commands, empty for global
client = 0 # bot application ID client = 0 # bot application ID
token = "" # bot token token = "" # bot token
activity = "" # bot status activity = "" # bot status
saucenao = "" # saucenao token saucenao = "" # saucenao token
e621 = "" # e621 token e621 = "" # e621 token
''' """
try: try:
config = toml.load('config.toml') config = toml.load("config.toml")
except FileNotFoundError: except FileNotFoundError:
with open('config.toml', 'w') as f: with open("config.toml", "w") as f:
f.write(CONFIG) f.write(CONFIG)
print('config.toml created with default values. Restart when modified.') print("config.toml created with default values. Restart when modified.")
exit() exit()
def error(event): def error(event):
exception = event.exception.__cause__ or event.exception exception = event.exception.__cause__ or event.exception
return (f'**`{event.context.command.name}` in {event.context.get_channel().mention}' return (
f'```❗ {type(exception).__name__}: {exception}```**') f"**`{event.context.command.name}` in {event.context.get_channel().mention}"
f"```❗ {type(exception).__name__}: {exception}```**"
)

View file

@ -18,6 +18,10 @@ hikari-miru = "*"
pysaucenao = {git = "https://github.com/Myned/pysaucenao.git"} pysaucenao = {git = "https://github.com/Myned/pysaucenao.git"}
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
black = "*"
[tool.black]
line-length = 120
[build-system] [build-system]
requires = ["poetry-core>=1.0.0"] requires = ["poetry-core>=1.0.0"]

12
run.py
View file

@ -1,4 +1,5 @@
import os import os
import hikari import hikari
import lightbulb import lightbulb
import miru import miru
@ -8,13 +9,12 @@ import config as c
# Unix optimizations # Unix optimizations
# https://github.com/hikari-py/hikari#uvloop # https://github.com/hikari-py/hikari#uvloop
if os.name != 'nt': if os.name != "nt":
import uvloop import uvloop
uvloop.install() uvloop.install()
bot = lightbulb.BotApp( bot = lightbulb.BotApp(token=c.config["token"], default_enabled_guilds=c.config["guilds"])
token=c.config['token'],
default_enabled_guilds=c.config['guilds'])
@bot.listen(lightbulb.CommandErrorEvent) @bot.listen(lightbulb.CommandErrorEvent)
@ -30,5 +30,5 @@ async def on_error(event):
miru.load(bot) miru.load(bot)
bot.load_extensions_from('tools', 'commands') bot.load_extensions_from("tools", "commands")
bot.run(activity=hikari.Activity(name=c.config['activity'], type=c.ACTIVITY)) bot.run(activity=hikari.Activity(name=c.config["activity"], type=c.ACTIVITY))

View file

@ -3,48 +3,35 @@ import lightbulb
from miru.ext import nav from miru.ext import nav
plugin = lightbulb.Plugin('components') plugin = lightbulb.Plugin("components")
class Back(nav.PrevButton): class Back(nav.PrevButton):
def __init__(self): def __init__(self):
super().__init__( super().__init__(style=hikari.ButtonStyle.SECONDARY, label="", emoji=None)
style=hikari.ButtonStyle.SECONDARY,
label='',
emoji=None
)
class Forward(nav.NextButton): class Forward(nav.NextButton):
def __init__(self): def __init__(self):
super().__init__( super().__init__(style=hikari.ButtonStyle.SECONDARY, label="", emoji=None)
style=hikari.ButtonStyle.SECONDARY,
label='',
emoji=None
)
class Confirm(nav.StopButton): class Confirm(nav.StopButton):
def __init__(self): def __init__(self):
super().__init__( super().__init__(style=hikari.ButtonStyle.PRIMARY, label="", emoji=None)
style=hikari.ButtonStyle.PRIMARY,
label='',
emoji=None
)
async def callback(self, context): async def callback(self, context):
await context.edit_response(content='**Searching...**', components=None) await context.edit_response(content="**Searching...**", components=None)
self.view.stop() self.view.stop()
async def before_page_change(self): async def before_page_change(self):
self.disabled = False if self.view.selected else True self.disabled = False if self.view.selected else True
class Select(nav.NavButton): class Select(nav.NavButton):
def __init__(self): def __init__(self):
super().__init__( super().__init__(style=hikari.ButtonStyle.DANGER, label="", emoji=None)
style=hikari.ButtonStyle.DANGER,
label='',
emoji=None
)
async def callback(self, context): async def callback(self, context):
if self.view.urls[self.view.current_page] not in self.view.selected: if self.view.urls[self.view.current_page] not in self.view.selected:
@ -64,7 +51,7 @@ class Select(nav.NavButton):
def _button(self, *, selected=False): def _button(self, *, selected=False):
self.style = hikari.ButtonStyle.SUCCESS if selected else hikari.ButtonStyle.DANGER self.style = hikari.ButtonStyle.SUCCESS if selected else hikari.ButtonStyle.DANGER
self.label = '' if selected else '' self.label = "" if selected else ""
try: try:
confirm = next((child for child in self.view.children if isinstance(child, Confirm))) confirm = next((child for child in self.view.children if isinstance(child, Confirm)))
@ -75,11 +62,7 @@ class Select(nav.NavButton):
class Selector(nav.NavigatorView): class Selector(nav.NavigatorView):
def __init__(self, *, pages=[], buttons=[], timeout=120, urls=[]): def __init__(self, *, pages=[], buttons=[], timeout=120, urls=[]):
super().__init__( super().__init__(pages=pages, buttons=buttons, timeout=timeout)
pages=pages,
buttons=buttons,
timeout=timeout
)
self.urls = urls self.urls = urls
self.selected = [] self.selected = []
self.saved = set() self.saved = set()
@ -110,5 +93,7 @@ class Selector(nav.NavigatorView):
def load(bot): def load(bot):
bot.add_plugin(plugin) bot.add_plugin(plugin)
def unload(bot): def unload(bot):
bot.remove_plugin(plugin) bot.remove_plugin(plugin)

View file

@ -6,13 +6,14 @@ import pysaucenao
import config as c import config as c
plugin = lightbulb.Plugin('scraper') plugin = lightbulb.Plugin("scraper")
sauce = pysaucenao.SauceNao(api_key=c.config['saucenao'], priority=(29, 40, 41)) # e621 > Fur Affinity > Twitter sauce = pysaucenao.SauceNao(api_key=c.config["saucenao"], priority=(29, 40, 41)) # e621 > Fur Affinity > Twitter
async def reverse(urls): async def reverse(urls):
return [await _saucenao(url) or await _kheina(url) for url in urls] return [await _saucenao(url) or await _kheina(url) for url in urls]
async def _saucenao(url): async def _saucenao(url):
try: try:
results = await sauce.from_url(url) results = await sauce.from_url(url)
@ -23,35 +24,45 @@ async def _saucenao(url):
except pysaucenao.InvalidImageException: except pysaucenao.InvalidImageException:
raise pysaucenao.InvalidImageException(url) raise pysaucenao.InvalidImageException(url)
return { return (
'url': results[0].url, {
'artist': ', '.join(results[0].authors) or 'Unknown', "url": results[0].url,
'thumbnail': results[0].thumbnail, "artist": ", ".join(results[0].authors) or "Unknown",
'similarity': round(results[0].similarity), "thumbnail": results[0].thumbnail,
'source': tldextract.extract(results[0].index).domain "similarity": round(results[0].similarity),
} if results else None "source": tldextract.extract(results[0].index).domain,
}
if results
else None
)
async def _kheina(url): async def _kheina(url):
content = await _post('https://api.kheina.com/v1/search', {'url': url}) content = await _post("https://api.kheina.com/v1/search", {"url": url})
if content['results'][0]['similarity'] < 50: if content["results"][0]["similarity"] < 50:
return None return None
return { return {
'url': content['results'][0]['sources'][0]['source'], "url": content["results"][0]["sources"][0]["source"],
'artist': content['results'][0]['sources'][0]['artist'] or 'Unknown', "artist": content["results"][0]["sources"][0]["artist"] or "Unknown",
'thumbnail': f'https://cdn.kheina.com/file/kheinacom/{content["results"][0]["sources"][0]["sha1"]}.jpg', "thumbnail": f'https://cdn.kheina.com/file/kheinacom/{content["results"][0]["sources"][0]["sha1"]}.jpg',
'similarity': round(content['results'][0]['similarity']), "similarity": round(content["results"][0]["similarity"]),
'source': tldextract.extract(content['results'][0]['sources'][0]['source']).domain "source": tldextract.extract(content["results"][0]["sources"][0]["source"]).domain,
} }
async def _post(url, data): async def _post(url, data):
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.post(url, data=data, headers={'User-Agent': 'Myned/Modufur (https://github.com/Myned/Modufur)'}) as response: async with session.post(
url, data=data, headers={"User-Agent": "Myned/Modufur (https://github.com/Myned/Modufur)"}
) as response:
return await response.json() if response.status == 200 else None return await response.json() if response.status == 200 else None
def load(bot): def load(bot):
bot.add_plugin(plugin) bot.add_plugin(plugin)
def unload(bot): def unload(bot):
bot.remove_plugin(plugin) bot.remove_plugin(plugin)