1
0
Fork 0
mirror of https://github.com/myned/modufur.git synced 2025-01-18 22:15:20 +00:00

Initial hikari rewrite

This commit is contained in:
Myned 2022-02-21 01:10:57 -06:00
parent a376c28207
commit da8ff759e5
No known key found for this signature in database
GPG key ID: 33790F979F7A28B8
30 changed files with 486 additions and 3190 deletions

69
.gitignore vendored
View file

@ -1,12 +1,5 @@
# Custom
*.json
*.pyo
*.pyc
*.DS_Store
*.pkl
*.png
*.bat
*.lock
# CUSTOM
config.toml
# Byte-compiled / optimized / DLL files
__pycache__/
@ -30,6 +23,7 @@ parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
@ -48,13 +42,17 @@ pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
@ -62,9 +60,9 @@ coverage.xml
# Django stuff:
*.log
.static_storage/
.media/
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
@ -77,16 +75,41 @@ instance/
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# IPython
profile_default/
ipython_config.py
# celery beat schedule file
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
poetry.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
@ -112,3 +135,21 @@ venv.bak/
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

28
Pipfile
View file

@ -1,28 +0,0 @@
[[source]]
url = "https://pypi.python.org/simple"
verify_ssl = true
name = "pypi"
[[source]]
url = "https://pypi.org/simple"
name = "pypi"
verify_ssl = true
[[source]]
url = "https://www.piwheels.org/simple"
name = "piwheels"
verify_ssl = true
[requires]
[packages]
beautifulsoup4 = "*"
"discord.py" = {extras = ["voice"],git = "https://github.com/Rapptz/discord.py"}
"hurry.filesize" = "*"
requests = "*"
html5lib = "*"
tldextract = "*"
[dev-packages]
lxml = "*"
selenium = "*"

View file

@ -1,5 +1,34 @@
# Modufur
Discord booru bot with a side of management and tasking.
Credits:
Rapptz/discord.py
An experimental [Hikari](https://github.com/hikari-py/hikari) Discord bot for reverse image searching using [SauceNAO](https://saucenao.com) & [Kheina](https://kheina.com)
## Requirements
[Python](https://www.python.org) 3.10+\
[Poetry](https://python-poetry.org)
## Installation
```
git clone https://github.com/Myned/Modufur.git
```
```
cd Modufur
```
```
poetry install
```
## Usage
```
poetry run python run.py
```
## Configuration
`config.toml`
```
guilds = [] # guild IDs to register commands, empty for global
client = 0 # bot application ID
token = "" # bot token
activity = "" # bot status
saucenao = "" # saucenao token
e621 = "" # e621 token
```
## Credits
[hikari](https://github.com/hikari-py/hikari)\
[hikari-lightbulb](https://github.com/tandemdude/hikari-lightbulb)\
[hikari-miru](https://github.com/HyperGH/hikari-miru)\
[pysaucenao](https://github.com/FujiMakoto/pysaucenao)

View file

111
commands/booru.py Normal file
View file

@ -0,0 +1,111 @@
import urlextract
import hikari
import lightbulb
import pysaucenao
from tools import components, scraper
plugin = lightbulb.Plugin('booru')
extractor = urlextract.URLExtract()
@plugin.command
#@lightbulb.option('attachment', 'Attachment(s) to reverse', required=False)
@lightbulb.option('url', 'URL(s) to reverse, separated by space')
@lightbulb.command('reverse', 'Reverse image search using SauceNAO & Kheina', ephemeral=True)
@lightbulb.implements(lightbulb.SlashCommand, lightbulb.MessageCommand)
async def reverse(context):
match context:
case lightbulb.SlashContext():
urls = extractor.find_urls(context.options.url or '', only_unique=True, with_schema_only=True)
if not urls:
await context.respond('**Invalid URL(s).**')
return
await _reverse(context, urls)
case lightbulb.MessageContext():
urls = extractor.find_urls(context.options.target.content or '', only_unique=True, with_schema_only=True)
urls += [attachment.url for attachment in context.options.target.attachments if attachment.url not in urls]
if not urls:
await context.respond('**No images found.**')
return
selector = None
if len(urls) > 1:
selector = components.Selector(
pages=[f'**Select {urls.index(url) + 1} out of {len(urls)} potential images to search:**\n{url}' for url in urls],
buttons=[components.Select(), components.Back(), components.Forward(), components.Confirm()],
urls=urls)
await selector.send(context.interaction, ephemeral=True)
await selector.wait()
if selector.timed_out:
return
urls = selector.selected
await _reverse(context, urls, selector=selector)
@reverse.set_error_handler()
async def on_reverse_error(event):
error = None
match event.exception.__cause__:
case pysaucenao.ShortLimitReachedException():
error = '**API limit reached. Please try again in a minute.**'
case pysaucenao.DailyLimitReachedException():
error = '**Daily API limit reached. Please try again tomorrow.**'
case pysaucenao.FileSizeLimitException() as url:
error = f'**Image file size too large:**\n{url}'
case pysaucenao.ImageSizeException() as url:
error = f'**Image resolution too small:**\n{url}'
case pysaucenao.InvalidImageException() as url:
error = f'**Invalid image:**\n{url}'
case pysaucenao.UnknownStatusCodeException():
error = '**An unknown SauceNAO error has occurred. The service may be down.**'
if error:
await event.context.respond(error)
return True
async def _reverse(context, urls, *, selector=None):
if not selector:
await context.respond(hikari.ResponseType.DEFERRED_MESSAGE_CREATE)
matches = await scraper.reverse(urls)
if not matches:
if selector:
await context.interaction.edit_initial_response('**No matches found.**', components=None)
else:
await context.respond('**No matches found.**')
return
pages = [(hikari.Embed(
title=match['artist'], url=match['source'], color=context.get_guild().get_my_member().get_top_role().color)
.set_author(name=f'{match["similarity"]}% Match')
.set_image(match['thumbnail'])
.set_footer(match['index'])) if match else f'**No match found for:**\n{urls[index]}' for index, match in enumerate(matches)]
if len(pages) > 1:
selector = components.Selector(
pages=pages,
buttons=[components.Back(), components.Forward()],
timeout=900)
await selector.send_edit(context.interaction)
else:
if selector:
await context.interaction.edit_initial_response(content=None, embed=pages[0], components=None)
else:
await context.respond(pages[0])
def load(bot):
bot.add_plugin(plugin)
def unload(bot):
bot.remove_plugin(plugin)

32
commands/master.py Normal file
View file

@ -0,0 +1,32 @@
import os
import lightbulb
plugin = lightbulb.Plugin('master')
@plugin.command
@lightbulb.option('command', 'What is your command, master?', required=False, choices=('reload', 'sleep'))
@lightbulb.command('master', 'Commands my master can demand of me', ephemeral=True)
@lightbulb.implements(lightbulb.SlashCommand)
async def master(context):
if context.user.id == context.bot.application.owner.id:
match context.options.command:
case 'reload':
context.bot.reload_extensions(*context.bot.extensions)
extensions = [os.path.splitext(extension)[1][1:] for extension in context.bot.extensions]
await context.respond(f'**Reloaded `{"`, `".join(extensions[:-1])}`, and `{extensions[-1]}` for you, master.**')
case 'sleep':
await context.respond('**Goodnight, master.**')
await context.bot.close()
case _:
await context.respond(f'**Hello, master.**')
else:
await context.respond(f'**{context.bot.application.owner.mention} is my master. 🐺**')
def load(bot):
bot.add_plugin(plugin)
def unload(bot):
bot.remove_plugin(plugin)

27
config.py Normal file
View file

@ -0,0 +1,27 @@
import toml
ERROR = '```❗ An internal error has occurred. This has been reported to my master. 🐺```'
CONFIG = '''\
guilds = [] # guild IDs to register commands, empty for global
client = 0 # bot application ID
token = "" # bot token
activity = "" # bot status
saucenao = "" # saucenao token
e621 = "" # e621 token
'''
try:
config = toml.load('config.toml')
except FileNotFoundError:
with open('config.toml', 'w') as f:
f.write(CONFIG)
print('config.toml created with default values. Restart when modified.')
exit()
def error(event):
exception = event.exception.__cause__ or event.exception
return (f'**`{event.context.command.name}` in {event.context.get_channel().mention}'
f'```❗ {type(exception).__name__}: {exception}```**')

24
pyproject.toml Normal file
View file

@ -0,0 +1,24 @@
[tool.poetry]
name = "modufur"
version = "0.1.0"
description = "Modufur Discord Bot"
authors = ["Myned <devmyned@gmail.com>"]
license = "MIT"
[tool.poetry.dependencies]
python = "~3.10"
toml = "*"
uvloop = "*"
aiohttp = "*"
urlextract = "*"
tldextract = "*"
hikari = {extras = ["speedups"], version = "*"}
hikari-lightbulb = "*"
hikari-miru = "*"
pysaucenao = {git = "https://github.com/Myned/pysaucenao.git"}
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

34
run.py Normal file
View file

@ -0,0 +1,34 @@
import os
import hikari
import lightbulb
import miru
import config as c
# Unix optimizations
# https://github.com/hikari-py/hikari#uvloop
if os.name != 'nt':
import uvloop
uvloop.install()
bot = lightbulb.BotApp(
token=c.config['token'],
default_enabled_guilds=c.config['guilds'])
@bot.listen(lightbulb.CommandErrorEvent)
async def on_error(event):
await bot.application.owner.send(c.error(event))
try:
await event.context.respond(c.ERROR)
except:
pass
raise event.exception
miru.load(bot)
bot.load_extensions_from('tools', 'commands')
bot.run(activity=hikari.Activity(name=c.config['activity'], type=hikari.ActivityType.LISTENING))

View file

View file

File diff suppressed because it is too large Load diff

View file

@ -1,5 +0,0 @@
from discord.ext import commands as cmds
class Help(cmds.HelpCommand):
pass

View file

@ -1,41 +0,0 @@
import asyncio
import traceback as tb
import discord as d
from discord.ext import commands as cmds
from misc import exceptions as exc
from utils import utils as u
class Info(cmds.Cog):
def __init__(self, bot):
self.bot = bot
# @cmds.command(name='helptest', aliases=['h'], hidden=True)
# async def list_commands(self, ctx):
# embed = d.Embed(title='All possible commands:', color=ctx.me.color)
# embed.set_author(name=ctx.me.display_name, icon_url=ctx.me.avatar_url)
# embed.add_field(
# name='Booru', value='\n{}bl umbrella command for managing blacklists'.format(u.config['prefix']))
#
# await ctx.send(embed=embed)
@cmds.group(name='info', aliases=['i'], hidden=True)
async def info(self, ctx):
if ctx.invoked_subcommand is None:
await ctx.send('<embed>BOT INFO</embed>')
@info.command(aliases=['g'], brief='Provides info about a guild')
async def guild(self, ctx, guild_id: int):
guild = d.utils.get(self.bot.guilds, id=guild_id)
if guild:
await ctx.send(guild.name)
else:
await ctx.send(f'**Not in any guilds by the id of: ** `{guild_id}`')
@info.command(aliases=['u'], brief='Provides info about a user')
async def user(self, ctx, user: d.User):
pass

View file

@ -1,233 +0,0 @@
import asyncio
import traceback as tb
from contextlib import suppress
from datetime import datetime as dt
import discord as d
from discord import errors as err
from discord.ext import commands as cmds
from discord.ext.commands import errors as errext
from misc import exceptions as exc
from misc import checks
from utils import utils as u
class Admin(cmds.Cog):
def __init__(self, bot):
self.bot = bot
self.queue = asyncio.Queue()
self.deleting = False
if u.tasks['auto_del']:
for channel in u.tasks['auto_del']:
temp = self.bot.get_channel(channel)
self.bot.loop.create_task(self.queue_for_deletion(temp))
print('STARTED : auto-deleting in #{}'.format(temp.name))
self.deleting = True
self.bot.loop.create_task(self.delete())
@cmds.group(aliases=['pru', 'purge', 'pur', 'clear', 'cl'], hidden=True)
@cmds.is_owner()
async def prune(self, ctx):
pass
@prune.group(name='user', aliases=['u', 'member', 'm'])
async def _prune_user(self, ctx):
pass
@_prune_user.command(name='channel', aliases=['channels', 'chans', 'chan', 'ch', 'c'])
async def _prune_user_channel(self, ctx, user: d.User, *channels: d.TextChannel):
def confirm(r, u):
if u.id is ctx.author.id:
if r.emoji == '\N{OCTAGONAL SIGN}':
raise exc.Abort
if r.emoji == '\N{THUMBS UP SIGN}':
return True
return False
if not channels:
channels = [ctx.channel]
try:
pruning = await ctx.send(f'\N{HOURGLASS} **Pruning** {user.mention}**\'s messages from** {"**,** ".join([channel.mention for channel in channels])} **might take some time.** Proceed, {ctx.author.mention}?')
await pruning.add_reaction('\N{THUMBS UP SIGN}')
await pruning.add_reaction('\N{OCTAGONAL SIGN}')
await asyncio.sleep(1)
await self.bot.wait_for('reaction_add', check=confirm, timeout=10 * 60)
deleting = await ctx.send(f'\N{WASTEBASKET} **Deleting** {user.mention}**\'s messages...**')
await asyncio.sleep(1)
c = 0
for channel in channels:
await deleting.edit(content=f'\N{WASTEBASKET} **Deleting** {user.mention}**\'s messages from** {channel.mention}')
deleted = await channel.purge(check=lambda m: m.author.id == user.id, before=pruning, limit=None)
c += len(deleted)
await asyncio.sleep(1)
for channel in channels:
missed = 0
async for message in channel.history(before=pruning, limit=None):
if message.author.id == user.id:
missed += 1
if missed > 0:
await ctx.send(f'\N{DOUBLE EXCLAMATION MARK} `{missed}` **messages were not deleted in** {channel.mention}')
await ctx.send(f'\N{WHITE HEAVY CHECK MARK} **Finished deleting** `{c}` **of** {user.mention}**\'s messages**')
except exc.Abort:
await ctx.send('**Deletion aborted**')
await u.add_reaction(ctx.message, '\N{CROSS MARK}')
except TimeoutError:
await ctx.send('**Deletion timed out**')
await u.add_reaction(ctx.message, '\N{CROSS MARK}')
@_prune_user.command(name='all', aliases=['a'], brief='Prune a user\'s messages from the guild', description='about flag centers on message 50 of 101 messages\n\npfg \{user id\} [before|after|about] [\{message id\}]\n\nExample:\npfg \{user id\} before \{message id\}', hidden=True)
@cmds.is_owner()
async def _prune_user_all(self, ctx, user: d.User):
def confirm(r, u):
if u.id is ctx.author.id:
if r.emoji == '\N{OCTAGONAL SIGN}':
raise exc.Abort
if r.emoji == '\N{THUMBS UP SIGN}':
return True
return False
try:
pruning = await ctx.send(f'\N{HOURGLASS} **Pruning** {user.mention}**\'s messages might take some time.** Proceed, {ctx.author.mention}?')
await pruning.add_reaction('\N{THUMBS UP SIGN}')
await pruning.add_reaction('\N{OCTAGONAL SIGN}')
await asyncio.sleep(1)
await self.bot.wait_for('reaction_add', check=confirm, timeout=10 * 60)
deleting = await ctx.send(f'\N{WASTEBASKET} **Deleting** {user.mention}**\'s messages...**')
await asyncio.sleep(1)
c = 0
for channel in ctx.guild.text_channels:
await deleting.edit(content=f'\N{WASTEBASKET} **Deleting** {user.mention}**\'s messages from** {channel.mention}')
deleted = await channel.purge(check=lambda m: m.author.id == user.id, before=pruning, limit=None)
c += len(deleted)
await asyncio.sleep(1)
for channel in ctx.guild.text_channels:
missed = 0
async for message in channel.history(before=pruning, limit=None):
if message.author.id == user.id:
missed += 1
if missed > 0:
await ctx.send(f'\N{DOUBLE EXCLAMATION MARK} `{missed}` **messages were not deleted in** {channel.mention}')
await ctx.send(f'\N{WHITE HEAVY CHECK MARK} **Finished deleting** `{c}` **of** {user.mention}**\'s messages**')
except exc.Abort:
await ctx.send('**Deletion aborted**')
await u.add_reaction(ctx.message, '\N{CROSS MARK}')
except TimeoutError:
await ctx.send('**Deletion timed out**')
await u.add_reaction(ctx.message, '\N{CROSS MARK}')
@cmds.group(aliases=['task', 'tsk'])
async def tasks(self):
pass
async def delete(self):
while self.deleting:
message = await self.queue.get()
with suppress(err.NotFound):
if not message.pinned:
await message.delete()
print('STOPPED : deleting')
async def queue_for_deletion(self, channel):
def check(msg):
if 'stop d' in msg.content.lower() and msg.channel is channel and msg.author.guild_permissions.administrator:
raise exc.Abort
elif msg.channel is channel and not msg.pinned:
return True
return False
try:
async for message in channel.history(limit=None):
if 'stop d' in message.content.lower() and message.author.guild_permissions.administrator:
raise exc.Abort
if not message.pinned:
await self.queue.put(message)
while not self.bot.is_closed():
message = await self.bot.wait_for('message', check=check)
await self.queue.put(message)
except exc.Abort:
u.tasks['auto_del'].remove(channel.id)
u.dump(u.tasks, 'cogs/tasks.pkl')
if not u.tasks['auto_del']:
self.deleting = False
print('STOPPED : deleting #{}'.format(channel.name))
await channel.send('**Stopped queueing messages for deletion in** {}'.format(channel.mention))
@cmds.command(name='autodelete', aliases=['autodel'])
@cmds.has_permissions(administrator=True)
async def auto_delete(self, ctx):
try:
if ctx.channel.id not in u.tasks['auto_del']:
u.tasks['auto_del'].append(ctx.channel.id)
u.dump(u.tasks, 'cogs/tasks.pkl')
self.bot.loop.create_task(self.queue_for_deletion(ctx.channel))
if not self.deleting:
self.bot.loop.create_task(self.delete())
self.deleting = True
print('STARTED : auto-deleting in #{}'.format(ctx.channel.name))
await ctx.send('**Auto-deleting all messages in {}**'.format(ctx.channel.mention))
else:
raise exc.Exists
except exc.Exists:
await ctx.send('**Already auto-deleting in {}.** Type `stop d(eleting)` to stop.'.format(ctx.channel.mention))
await u.add_reaction(ctx.message, '\N{CROSS MARK}')
@cmds.group(aliases=['setting', 'set', 's'])
@cmds.has_permissions(administrator=True)
async def settings(self, ctx):
pass
@settings.command(name='deletecommands', aliases=['delcmds', 'delcmd'])
async def _settings_deletecommands(self, ctx):
if ctx.guild.id not in u.settings['del_ctx']:
u.settings['del_ctx'].append(ctx.guild.id)
else:
u.settings['del_ctx'].remove(ctx.guild.id)
u.dump(u.settings, 'settings.pkl')
await ctx.send('**Delete command invocations:** `{}`'.format(ctx.guild.id in u.settings['del_ctx']))
@settings.command(name='prefix', aliases=['pre', 'p'])
async def _settings_prefix(self, ctx, *prefixes):
if prefixes:
u.settings['prefixes'][ctx.guild.id] = prefixes
else:
with suppress(KeyError):
del u.settings['prefixes'][ctx.guild.id]
await ctx.send(f'**Prefix set to:** `{"` or `".join(prefixes if ctx.guild.id in u.settings["prefixes"] else u.config["prefix"])}`')
@settings.command(name='deleteresponses', aliases=['delresps', 'delresp'])
async def _settings_deleteresponses(self, ctx):
if ctx.guild.id not in u.settings['del_resp']:
u.settings['del_resp'].append(ctx.guild.id)
else:
u.settings['del_resp'].remove(ctx.guild.id)
u.dump(u.settings, 'settings.pkl')
await ctx.send(f'**Delete command responses:** `{ctx.guild.id in u.settings["del_resp"]}`')

View file

@ -1,322 +0,0 @@
import asyncio
import code
import io
import os
import re
import sys
import traceback as tb
from contextlib import redirect_stdout, suppress
import discord as d
from discord.ext import commands as cmds
from misc import exceptions as exc
from misc import checks
from utils import utils as u
from utils import formatter
class Bot(cmds.Cog):
def __init__(self, bot):
self.bot = bot
# Close connection to Discord - immediate offline
@cmds.command(name=',die', aliases=[',d'], brief='Kills the bot', description='BOT OWNER ONLY\nCloses the connection to Discord', hidden=True)
@cmds.is_owner()
async def die(self, ctx):
await u.add_reaction(ctx.message, '\N{CRESCENT MOON}')
chantype = 'guild' if isinstance(ctx.channel, d.TextChannel) else 'private'
u.temp['startup'] = (chantype, ctx.channel.id if chantype == 'guild' else ctx.author.id, ctx.message.id)
u.dump(u.temp, 'temp/temp.pkl')
# loop = self.bot.loop.all_tasks()
# for task in loop:
# task.cancel()
print('\n< < < < < < < < < < < <\nD I S C O N N E C T E D\n< < < < < < < < < < < <\n')
await self.bot.logout()
@cmds.command(name=',restart', aliases=[',res', ',r'], hidden=True)
@cmds.is_owner()
async def restart(self, ctx):
await u.add_reaction(ctx.message, '\N{SLEEPING SYMBOL}')
print('\n^ ^ ^ ^ ^ ^ ^ ^ ^ ^\nR E S T A R T I N G\n^ ^ ^ ^ ^ ^ ^ ^ ^ ^\n')
chantype = 'guild' if isinstance(ctx.channel, d.TextChannel) else 'private'
u.temp['startup'] = (chantype, ctx.channel.id if chantype == 'guild' else ctx.author.id, ctx.message.id)
u.dump(u.temp, 'temp/temp.pkl')
# loop = self.bot.loop.all_tasks()
# for task in loop:
# task.cancel()
await self.bot.logout()
os.execl(sys.executable, 'python3', 'run.py')
# Invite bot to bot owner's server
@cmds.command(name=',invite', aliases=[',inv', ',link'], brief='Invite the bot', description='BOT OWNER ONLY\nInvite the bot to a server (Requires admin)', hidden=True)
@cmds.is_owner()
async def invite(self, ctx):
await u.add_reaction(ctx.message, '\N{ENVELOPE}')
await ctx.send('https://discordapp.com/oauth2/authorize?&client_id={}&scope=bot&permissions={}'.format(u.config['client_id'], u.config['permissions']))
@cmds.command(name=',guilds', aliases=[',gs'])
@cmds.is_owner()
async def guilds(self, ctx):
paginator = cmds.Paginator()
for guild in self.bot.guilds:
paginator.add_line(f'{guild.name}: {guild.id}\n'
f' @{guild.owner}: {guild.owner.id}')
for page in paginator.pages:
await ctx.send(f'**Guilds:**\n{page}')
@cmds.group(name=',block', aliases=[',bl', ',b'])
@cmds.is_owner()
async def block(self, ctx):
pass
@block.group(name='list', aliases=['l'])
async def block_list(self, ctx):
pass
@block_list.command(name='guilds', aliases=['g'])
async def block_list_guilds(self, ctx):
await formatter.paginate(ctx, u.block['guild_ids'])
@block.command(name='user', aliases=['u'])
async def block_user(self, ctx, *users: d.User):
for user in users:
u.block['user_ids'].append(user.id)
u.dump(u.block, 'cogs/block.json', json=True)
@block.command(name='guild', aliases=['g'])
async def block_guild(self, ctx, *guilds):
for guild in guilds:
u.block['guild_ids'].append(guild)
u.dump(u.block, 'cogs/block.json', json=True)
@cmds.group(name=',unblock', aliases=[',unbl', ',unb'])
@cmds.is_owner()
async def unblock(self, ctx):
pass
@unblock.command(name='user', aliases=['u'])
async def unblock_user(self, ctx, *users: d.User):
for user in users:
u.block['user_ids'].remove(user.id)
u.dump(u.block, 'cogs/block.json', json=True)
await ctx.send('\N{WHITE HEAVY CHECK MARK} **Unblocked users**')
@unblock.command(name='guild', aliases=['g'])
async def unblock_guild(self, ctx, *guilds):
for guild in guilds:
u.block['guild_ids'].remove(guild)
u.dump(u.block, 'cogs/block.json', json=True)
await ctx.send('\N{WHITE HEAVY CHECK MARK} **Unblocked guilds**')
@cmds.command(name=',leave', aliases=[',l'])
@cmds.is_owner()
async def leave(self, ctx, *guilds):
for guild in guilds:
temp = d.utils.get(self.bot.guilds, id=int(guild))
await temp.leave()
@cmds.command(name=',permissions', aliases=[',permission', ',perms', ',perm'])
@cmds.is_owner()
async def permissions(self, ctx, *args: d.Member):
members = list(args)
permissions = {}
if not members:
members.append(ctx.guild.me)
for member in members:
permissions[member.mention] = []
for k, v in dict(ctx.channel.permissions_for(member)).items():
if v:
permissions[member.mention].append(k)
await formatter.paginate(ctx, permissions)
@cmds.command(name=',tasks', aliases=[',task'])
@cmds.is_owner()
async def tasks(self, ctx):
tasks = [task for task in asyncio.Task.all_tasks() if not task.done()]
await ctx.send(f'**Tasks active:** `{int((len(tasks) - 6) / 3)}`')
@cmds.command(name=',status', aliases=[',presence', ',game'], hidden=True)
@cmds.is_owner()
async def change_status(self, ctx, *, game=None):
if game:
await self.bot.change_presence(game=d.Game(name=game))
u.config['playing'] = game
u.dump(u.config, 'config.json', json=True)
await ctx.send(f'**Game changed to** `{game}`')
else:
await self.bot.change_presence(game=None)
u.config['playing'] = ''
u.dump(u.config, 'config.json', json=True)
await ctx.send('**Game changed to** ` `')
@cmds.command(name=',username', aliases=[',user'], hidden=True)
@cmds.is_owner()
async def change_username(self, ctx, *, username=None):
if username:
await self.bot.user.edit(username=username)
await ctx.send(f'**Username changed to** `{username}`')
else:
await ctx.send('**Invalid string**')
await u.add_reaction(ctx.message, '\N{HEAVY EXCLAMATION MARK SYMBOL}')
class Tools(cmds.Cog):
def __init__(self, bot):
self.bot = bot
def format(self, i='', o=''):
if len(o) > 1:
return '>>> {}\n{}'.format(i, o)
else:
return '>>> {}'.format(i)
async def generate(self, d, i='', o=''):
return await d.send('```python\n{}```'.format(self.format(i, o)))
async def refresh(self, m, i='', o=''):
output = m.content[9:-3]
if len(re.findall('\n', output)) <= 20:
await m.edit(content='```python\n{}\n{}\n>>>```'.format(output, self.format(i, o)))
else:
await m.edit(content='```python\n{}```'.format(self.format(i, o)))
async def generate_err(self, d, o=''):
return await d.send('```\n{}```'.format(o))
async def refresh_err(self, m, o=''):
await m.edit(content='```\n{}```'.format(o))
@cmds.command(name=',console', aliases=[',con', ',c'], hidden=True)
@cmds.is_owner()
async def console(self, ctx):
def execute(msg):
if msg.content.lower().startswith('exec ') and msg.author.id is ctx.author.id and msg.channel.id is ctx.channel.id:
msg.content = msg.content[5:]
return True
return False
def evaluate(msg):
if msg.content.lower().startswith('eval ') and msg.author.id is ctx.author.id and msg.channel.id is ctx.channel.id:
msg.content = msg.content[5:]
return True
return False
def exit(reaction, user):
if reaction.emoji == '\N{OCTAGONAL SIGN}' and user.id is ctx.author.id and reaction.message.id == ctx.message.id:
raise exc.Abort
return False
try:
console = await self.generate(ctx)
exception = await self.generate_err(ctx)
await u.add_reaction(ctx.message, '\N{OCTAGONAL SIGN}')
while not self.bot.is_closed():
try:
done, pending = await asyncio.wait([self.bot.wait_for('message', check=execute), self.bot.wait_for('message', check=evaluate), self.bot.wait_for('reaction_add', check=exit)], return_when=asyncio.FIRST_COMPLETED)
message = done.pop().result()
print(message.content)
except exc.Execute:
try:
sys.stdout = io.StringIO()
sys.stderr = io.StringIO()
exec(message.content)
except Exception:
await self.refresh_err(exception, tb.format_exc(limit=1))
finally:
await self.refresh(console, message.content, sys.stdout.getvalue() if sys.stdout.getvalue() != console.content else None)
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
with suppress(d.NotFound):
await message.delete()
except exc.Evaluate:
try:
sys.stdout = io.StringIO()
sys.stderr = io.StringIO()
eval(message.content)
except Exception:
await self.refresh_err(exception, tb.format_exc(limit=1))
finally:
await self.refresh(console, message.content, sys.stdout.getvalue() if sys.stdout.getvalue() != console.content else None)
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
with suppress(d.NotFound):
await message.delete()
except exc.Abort:
pass
finally:
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
print('RESET : sys.std output/error')
@cmds.command(name=',execute', aliases=[',exec'], hidden=True)
@cmds.is_owner()
async def execute(self, ctx, *, exe):
try:
with io.StringIO() as buff, redirect_stdout(buff):
exec(exe)
await self.generate(ctx, exe, f'\n{buff.getvalue()}')
except Exception:
await self.generate(ctx, exe, f'\n{tb.format_exc()}')
@cmds.command(name=',evaluate', aliases=[',eval'], hidden=True)
@cmds.is_owner()
async def evaluate(self, ctx, *, evl):
try:
with io.StringIO() as buff, redirect_stdout(buff):
eval(evl)
await self.generate(ctx, evl, f'\n{buff.getvalue()}')
except Exception:
await self.generate(ctx, evl, f'\n{tb.format_exc()}')
@cmds.group(aliases=[',db'], hidden=True)
@cmds.is_owner()
async def debug(self, ctx):
console = await self.generate(ctx)
@debug.command(name='inject', aliases=['inj'])
async def _inject(self, ctx, *, input_):
pass
@debug.command(name='inspect', aliases=['ins'])
async def _inspect(self, ctx, *, input_):
pass
# @cmds.command(name='endpoint', aliases=['end'])
# async def get_endpoint(self, ctx, *args):
# await ctx.send(f'```\n{await u.fetch(f"https://{args[0]}/{args[1]}/{args[2]}", params={args[3]: args[4], "limit": 1}, json=True)}```')

View file

@ -1,25 +0,0 @@
import asyncio
import json
from datetime import datetime as dt
import discord as d
from discord import errors as err
from discord.ext import commands as cmds
from discord.ext.commands import errors as errext
from misc import exceptions as exc
from misc import checks
from utils import utils as u
class Post(cmds.Cog):
def __init__(self, bot):
self.bot = bot
async def _check_posts(self, user, channel):
pass
@cmds.group(aliases=['update', 'up', 'u'])
async def updates(self, ctx):
pass

View file

@ -1,79 +0,0 @@
import asyncio
from datetime import datetime as dt
import mimetypes
import os
import tempfile
import traceback as tb
import webbrowser
import discord as d
from discord.ext import commands as cmds
#from run import config
from cogs import booru
from misc import exceptions as exc
from misc import checks
from utils import utils as u
from utils import formatter
youtube = None
tempfile.tempdir = os.getcwd()
class Utils(cmds.Cog):
def __init__(self, bot):
self.bot = bot
@cmds.command(name='lastcommand', aliases=['last', 'l', ','], brief='Reinvokes last successful command', description='Executes last successfully executed command')
async def last_command(self, ctx, arg='None'):
try:
context = u.last_commands[ctx.author.id]
if arg == 'show' or arg == 'sh' or arg == 's':
await ctx.send(f'`{context.prefix}{context.invoked_with} {" ".join(context.args[2:])}`')
else:
await ctx.invoke(context.command, *context.args[2:], **context.kwargs)
except KeyError:
await ctx.send('**No last command**')
await u.add_reaction(ctx.message, '\N{CROSS MARK}')
# Displays latency
@cmds.command(aliases=['p'], brief='Pong!', description='Returns latency from bot to Discord servers, not to user')
async def ping(self, ctx):
await u.add_reaction(ctx.message, '\N{TABLE TENNIS PADDLE AND BALL}')
await ctx.send(ctx.author.mention + ' \N{TABLE TENNIS PADDLE AND BALL} `' + str(round(self.bot.latency * 1000)) + 'ms`')
@cmds.command(aliases=['pre', 'prefixes'], brief='List bot prefixes', description='Shows all used prefixes')
async def prefix(self, ctx):
await ctx.send('**Prefix:** `{}`'.format('` or `'.join(u.settings['prefixes'][ctx.guild.id] if ctx.guild.id in u.settings['prefixes'] else u.config['prefix'])))
@cmds.group(name=',send', aliases=[',s'], hidden=True)
@cmds.is_owner()
async def send(self, ctx):
pass
@send.command(name='guild', aliases=['g', 'server', 's'])
async def send_guild(self, ctx, guild, channel, *, message):
try:
tempchannel = d.utils.find(lambda m: m.name == channel, d.utils.find(
lambda m: m.name == guild, self.bot.guilds).channels)
try:
await tempchannel.send(message)
await ctx.message.add_reaction('\N{WHITE HEAVY CHECK MARK}')
except AttributeError:
await ctx.send('**Invalid channel**')
await u.add_reaction(ctx.message, '\N{HEAVY EXCLAMATION MARK SYMBOL}')
except AttributeError:
await ctx.send('**Invalid guild**')
await u.add_reaction(ctx.message, '\N{HEAVY EXCLAMATION MARK SYMBOL}')
@send.command(name='user', aliases=['u', 'member', 'm'])
async def send_user(self, ctx, user, *, message):
await d.utils.get(self.bot.get_all_members(), id=int(user)).send(message)
await ctx.message.add_reaction('\N{WHITE HEAVY CHECK MARK}')

View file

@ -1,90 +0,0 @@
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
import asyncio
import traceback as tb
from discord.ext import commands as cmds
from utils import utils as u
class Weeb(cmds.Cog):
def __init__(self, bot):
self.bot = bot
self.weebing = False
with open('id.json') as f:
self.id = int(f.readline())
print('LOADED : id.json')
if not self.weebing:
self.weebing = True
self.bot.loop.create_task(self.start())
print('STARTED : weebing')
async def refresh_switchmod(self, browser):
message = ''
urls = {
'Novelties': 'https://switchmod.net/collections/ended-gbs/products/gmk-metaverse-2?variant=31671816880208',
'Royal': 'https://switchmod.net/collections/ended-gbs/products/gmk-metaverse-2?variant=31671816945744'
}
for item, url in urls.items():
browser.get(url)
try:
status = browser.find_elements_by_css_selector('#addToCartText-product-template')[0].text
except IndexError:
status = 'SOLD OUT'
if status != 'SOLD OUT':
message += f'{item} is in stock at Switchmod!\n<{url}>\n'
await asyncio.sleep(5)
return message
async def refresh_deskhero(self, browser):
message = ''
url = 'https://www.deskhero.ca/products/gmk-metaverse-2'
browser.get(url)
try:
royal_soldout = browser.find_elements_by_css_selector('#data-product-option-1-1')[0].get_attribute('data-soldout')
except IndexError:
royal_soldout = 'true'
try:
novelties_soldout = browser.find_elements_by_css_selector('#data-product-option-1-3')[0].get_attribute('data-soldout')
except IndexError:
novelties_soldout = 'true'
if royal_soldout != 'true':
message += f'Royal is in stock at Deskhero!\n<{url}>\n'
if novelties_soldout != 'true':
message += f'Novelties is in stock at Deskhero!\n<{url}>\n'
return message
async def start(self):
try:
opts = Options()
opts.headless = True
browser = Chrome(executable_path='/usr/bin/chromedriver', options=opts)
while self.weebing:
message = await self.refresh_switchmod(browser)
await asyncio.sleep(5)
message += await self.refresh_deskhero(browser)
if message:
await self.bot.get_user(self.id).send(message)
await self.bot.get_user(u.config['owner_id']).send('Something is in stock. Restart to keep checking')
browser.quit()
self.weebing = False
print('STOPPED : weebing')
await asyncio.sleep(120)
except Exception as e:
tb.print_exc()
await self.bot.get_user(u.config['owner_id']).send(f'! ERROR !\n\n{repr(e)}')

View file

View file

@ -1,52 +0,0 @@
import asyncio
import json
import traceback
from contextlib import suppress
import discord as d
from discord import errors as err
from discord.ext import commands
from discord.ext.commands import errors as errext
from utils import utils as u
owner_id = u.config['owner_id']
ready = False
def is_owner():
async def predicate(ctx):
return ctx.message.author.id == owner_id
return commands.check(predicate)
def is_admin():
def predicate(ctx):
return ctx.message.author.guild_permissions.administrator
return commands.check(predicate)
def is_mod():
def predicate(ctx):
return ctx.message.author.guild_permissions.ban_members
return commands.check(predicate)
def owner(ctx):
return ctx.message.author.id == owner_id
def admin(ctx):
return ctx.message.author.guild_permissions.administrator
def mod(ctx):
return ctx.message.author.guild_permissions.ban_members
def is_nsfw():
def predicate(ctx):
if isinstance(ctx.message.channel, d.TextChannel):
return ctx.message.channel.is_nsfw()
return True
return commands.check(predicate)

View file

@ -1,134 +0,0 @@
from discord.ext.commands import errors as errext
base = '\N{WARNING SIGN} **An internal error has occurred.** This has been reported to my master. \N{WOLF FACE}'
async def send_error(ctx, error):
await ctx.send('{}\n```\n{}```'.format(base, error))
class Remove(Exception):
pass
class SizeError(Exception):
pass
class Wrong(Exception):
pass
class Add(Exception):
pass
class Execute(Exception):
pass
class Evaluate(Exception):
pass
class Left(Exception):
pass
class Right(Exception):
pass
class Save(Exception):
def __init__(self, user=None, message=None):
self.user = user
self.message = message
class GoTo(Exception):
pass
class Exists(errext.CommandError):
pass
class MissingArgument(errext.CommandError):
pass
class FavoritesNotFound(errext.CommandError):
pass
class PostError(errext.CommandError):
pass
class ImageError(errext.CommandError):
pass
class MatchError(errext.CommandError):
pass
class TagBlacklisted(errext.CommandError):
pass
class BoundsError(errext.CommandError):
pass
class TagBoundsError(errext.CommandError):
pass
class TagExists(errext.CommandError):
pass
class TagError(errext.CommandError):
pass
class FlagError(errext.CommandError):
pass
class BlacklistError(errext.CommandError):
pass
class NotFound(errext.CommandError):
pass
class Timeout(errext.CommandError):
pass
class InvalidVideoFile(errext.CommandError):
pass
class MissingAttachment(errext.CommandError):
pass
class TooManyAttachments(errext.CommandError):
pass
class CheckFail(errext.CommandError):
pass
class Abort(Exception):
def __init__(self, message=None):
self.message = message
class Continue(Exception):
pass

View file

@ -1,199 +0,0 @@
import asyncio
import logging as log
import sys
import traceback as tb
from contextlib import suppress
import discord as d
from discord import errors as err
from discord.ext import commands as cmds
from discord.ext.commands import errors as errext
from misc import exceptions as exc
from misc import checks
from utils import utils as u
log.basicConfig(level=log.WARNING)
def get_prefix(bot, message):
with suppress(AttributeError):
return u.settings['prefixes'].get(message.guild.id, u.config['prefix'])
return u.config['prefix']
intents = d.Intents.default()
intents.members = True
bot = cmds.Bot(
intents=intents,
command_prefix=get_prefix,
self_bot=u.config['selfbot'],
description='Modufur - A booru bot with a side of management and automated tasking'
'\nMade by @Myned#3985'
)
@bot.event
async def on_ready():
if not checks.ready:
from cogs import weeb, booru, info, management, owner, tools
for cog in (
tools.Utils(bot),
owner.Bot(bot),
management.Admin(bot),
info.Info(bot),
booru.MsG(bot),
weeb.Weeb(bot)):
bot.add_cog(cog)
u.cogs[type(cog).__name__] = cog
print(f'COG : {type(cog).__name__}')
if u.config['playing'] != '':
await bot.change_presence(activity=d.Game(u.config['playing']))
print('\n> > > > > > > > >'
f'\nC O N N E C T E D : {bot.user.name}'
'\n> > > > > > > > >\n')
try:
if u.temp['startup']:
with suppress(err.NotFound):
if u.temp['startup'][0] == 'guild':
ctx = bot.get_channel(u.temp['startup'][1])
else:
ctx = bot.get_user(u.temp['startup'][1])
message = await ctx.fetch_message(u.temp['startup'][2])
await message.add_reaction('\N{WHITE HEAVY CHECK MARK}')
u.temp['startup'] = ()
u.dump(u.temp, 'temp/temp.pkl')
checks.ready = True
except KeyError:
u.dump({'startup': ()}, 'temp/temp.pkl')
except AttributeError:
pass
else:
print('\n- - - -\nI N F O : reconnected, reinitializing tasks\n- - - -\n')
reconnect = await bot.get_user(u.config['owner_id']).send('**RECONNECTING**')
await reconnect.add_reaction('\N{SLEEPING SYMBOL}')
if u.tasks['auto_del']:
for channel in u.tasks['auto_del']:
temp = bot.get_channel(channel)
bot.loop.create_task(u.cogs['Admin'].queue_for_deletion(temp))
print(f'RESTARTED : auto-deleting in #{temp.name}')
u.cogs['Admin'].deleting = True
bot.loop.create_task(u.cogs['Admin'].delete())
if u.config['playing'] != '':
await bot.change_presence(activity=d.Game(u.config['playing']))
await reconnect.add_reaction('\N{WHITE HEAVY CHECK MARK}')
print('\nS U C C E S S\n')
@bot.event
async def on_message(message):
if not u.config['selfbot']:
if message.author is not bot.user and not message.author.bot and message.author.id not in u.block['user_ids']:
await bot.process_commands(message)
else:
if not message.author.bot:
await bot.process_commands(message)
@bot.event
async def on_error(error, *args, **kwargs):
print(f'\n! ! ! ! !\nE R R O R : {sys.exc_info()[1].text}\n! ! ! ! !\n', file=sys.stderr)
tb.print_exc()
await bot.get_user(u.config['owner_id']).send(f'**ERROR** \N{WARNING SIGN}\n```\n{error}```')
if u.temp['startup']:
u.temp.clear()
u.dump(u.temp, 'temp/temp.pkl')
await bot.logout()
@bot.event
async def on_command_error(ctx, error):
with suppress(err.NotFound):
if isinstance(error, err.NotFound):
print('NOT FOUND')
# elif isinstance(error, errext.CommandInvokeError):
# print(f'ERROR : {error}')
elif isinstance(error, err.Forbidden):
pass
elif isinstance(error, errext.CommandOnCooldown):
await u.add_reaction(ctx.message, '\N{HOURGLASS}')
await asyncio.sleep(error.retry_after)
await u.add_reaction(ctx.message, '\N{WHITE HEAVY CHECK MARK}')
elif isinstance(error, errext.MissingRequiredArgument):
await ctx.send('**Missing required argument**')
await u.add_reaction(ctx.message, '\N{HEAVY EXCLAMATION MARK SYMBOL}')
elif isinstance(error, errext.BadArgument):
await ctx.send(f'**Invalid argument.** {error}')
await u.add_reaction(ctx.message, '\N{HEAVY EXCLAMATION MARK SYMBOL}')
elif isinstance(error, errext.CheckFailure):
await ctx.send('**Insufficient permissions**')
await u.add_reaction(ctx.message, '\N{NO ENTRY}')
elif isinstance(error, errext.CommandNotFound):
print(f'INVALID COMMAND : {error}', file=sys.stderr)
await u.add_reaction(ctx.message, '\N{BLACK QUESTION MARK ORNAMENT}')
else:
print('\n! ! ! ! ! ! ! ! ! ! ! !'
f'\nC O M M A N D E R R O R : {error}'
'\n! ! ! ! ! ! ! ! ! ! ! !\n', file=sys.stderr)
tb.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
await bot.get_user(u.config['owner_id']).send(
'**COMMAND ERROR** \N{WARNING SIGN} '
f'`{ctx.message.content}` '
f'from {ctx.author.id} '
f'in {ctx.channel.mention if isinstance(ctx.channel, d.channel.TextChannel) else "DMs"}'
'\n```\n'
f'{error}```')
await exc.send_error(ctx, error)
await u.add_reaction(ctx.message, '\N{WARNING SIGN}')
@bot.event
async def on_command_completion(ctx):
with suppress(err.NotFound):
with suppress(AttributeError):
if ctx.guild.id in u.settings['del_ctx'] and ctx.me.permissions_in(ctx.channel).manage_messages:
await ctx.message.delete()
u.last_commands[ctx.author.id] = ctx
@bot.event
async def on_guild_join(guild):
if str(guild.id) in u.block['guild_ids']:
print(f'LEAVING : {guild.name}')
await guild.leave()
else:
print(f'JOINING : {guild.name}')
@bot.event
async def on_guild_remove(guild):
print(f'LEFT : {guild.name}')
for task, idents in u.tasks.items():
for channel in guild.channels:
if channel.id in idents:
idents.remove(channel.id)
print(f'STOPPED : {task} in #{channel.id}')
u.dump(u.tasks, 'cogs/tasks.pkl')
@bot.command(name=',test', hidden=True)
@cmds.is_owner()
async def test(ctx):
pass
bot.run(u.config['token'], bot=not u.config['selfbot'])

View file

View file

View file

@ -1,73 +0,0 @@
import copy
from discord.ext.commands import Paginator
def tostring(i, *, order=None, newline=False):
o = ''
if i:
for v in i:
o += v + (' ' if newline is False else ' \n')
o = o[:-1]
elif order:
o += order
else:
o = ' '
return o
def tostring_commas(i):
if i:
o = ','
for v in i:
o += v + ','
return o[:-1]
return ''
async def paginate(
ctx,
i,
start='',
prefix='',
kprefix='`',
ksuffix='`\n',
eprefix='```\n',
ejoin=' ',
esuffix='\n```',
suffix='',
end=''):
paginator = Paginator(prefix=prefix, suffix=suffix)
messages = []
i = copy.deepcopy(i)
if start:
paginator.add_line(start + ('' if type(i) is not dict else '\n'))
if type(i) in (tuple, list, set):
if not i:
i = (' ')
paginator.add_line(eprefix + f'{ejoin}'.join(sorted(i)) + esuffix)
elif type(i) is dict:
if not i:
i = {' ': ' '}
for k, e in sorted(i.items()):
paginator.add_line(kprefix + k + ksuffix + eprefix + f'{ejoin}'.join(e) + esuffix)
if end:
paginator.add_line(end)
for page in paginator.pages:
messages.append(await ctx.send(page))
return messages
def dictelem_tostring(i):
o = ''
if i:
for dic, elem in i.items():
o += '**__' + dic + '__**\n'
for k, v in elem.items():
o += '***' + k + ':*** `' + tostring(v) + '`\n'
return o

View file

@ -1,157 +0,0 @@
import aiohttp
import ast
import re
from bs4 import BeautifulSoup
import lxml
from hurry.filesize import size, alternative
import tldextract as tld
from misc import exceptions as exc
from utils import utils as u
# async def get_harry(url):
# content = await u.fetch(f'https://iqdb.harry.lu?url={url}')
# soup = BeautifulSoup(content, 'html5lib')
#
# if soup.find('div', id='show1').string is 'Not the right one? ':
# parent = soup.find('th', string='Probable match:').parent.parent
#
# post = await u.fetch(
# f'https://e621.net/posts.json?id={re.search("show/([0-9]+)", parent.tr.td.a.get('href')).group(1)}',
# json=True)
# if (post['status'] == 'deleted'):
# post = await u.fetch(
# f'https://e621.net/posts.json?id={re.search("#(\\d+)", post["delreason"]).group(1)}',
# json=True)
#
# result = {
# 'source': f'https://e621.net/posts/{post["id"]}',
# 'artist': ', '.join(post['tags']['artist']),
# 'thumbnail': parent.td.a.img.get('src'),
# 'similarity': re.search('\\d+', parent.tr[4].td.string).group(0),
# 'database': 'Harry.lu'
# }
#
# return result
# else:
# return False
async def query_kheina(url):
try:
content = await u.fetch(f'https://api.kheina.com/v1/search', post={'url': url}, json=True)
similarity = int(content['results'][0]['similarity'])
if similarity < 55:
return None
if tld.extract(content['results'][0]['sources'][0]['source']).domain == 'furaffinity':
submission = re.search('\\d+$', content['results'][0]['sources'][0]['source']).group(0)
try:
export = await u.fetch(f'https://faexport.spangle.org.uk/submission/{submission}.json', json=True)
thumbnail = export['full']
except AssertionError:
thumbnail = ''
else:
thumbnail = ''
result = {
'source': content['results'][0]['sources'][0]['source'],
'artist': content['results'][0]['sources'][0]['artist'] if content['results'][0]['sources'][0]['artist'] else 'unknown',
'thumbnail': thumbnail,
'similarity': str(similarity),
'database': tld.extract(content['results'][0]['sources'][0]['source']).domain
}
return result
except Exception:
return None
async def query_saucenao(url):
try:
content = await u.fetch(
f'https://saucenao.com/search.php?url={url}&api_key={u.config["saucenao_api"]}&output_type={2}',
json=True)
if content['header'].get('message', '') in (
'Access to specified file was denied... ;_;',
'Problem with remote server...',
'image dimensions too small...'):
raise exc.ImageError
match = content['results'][0]
similarity = int(float(match['header']['similarity']))
if similarity < 55:
return None
source = match['data']['ext_urls'][0]
for e in match['data']['ext_urls']:
if 'furaffinity' in e:
source = e
break
for e in match['data']['ext_urls']:
if 'e621' in e:
source = e
break
artist = 'unknown'
for e in (
'author_name',
'member_name',
'creator'):
if e in match['data'] and match['data'][e]:
artist = match['data'][e]
break
result = {
'source': source,
'artist': artist,
'thumbnail': match['header']['thumbnail'],
'similarity': str(similarity),
'database': tld.extract(source).domain
}
return result
except Exception:
return None
async def get_post(url):
try:
content = await u.fetch(url, response=True)
filesize = int(content.headers['Content-Length'])
if filesize > 8192 * 1024:
raise exc.SizeError(size(filesize, system=alternative))
# Prioritize SauceNAO if e621/furaffinity, Kheina>SauceNAO if not
result = await query_saucenao(url)
if result:
if not any(s in result['source'] for s in ('e621', 'furaffinity')):
kheina = await query_kheina(url)
if kheina:
result = kheina
else:
result = await query_kheina(url)
if not result:
raise exc.MatchError(re.search('\\/([^\\/]+)$', url).group(1))
return result
except aiohttp.InvalidURL:
raise exc.MissingArgument
async def get_image(url):
content = await u.fetch(url)
value = lxml.html.fromstring(content).xpath(
'string(/html/body/div[@id="content"]/div[@id="post-view"]/div[@class="content"]/div[2]/img/@src)')
return value

View file

@ -1,190 +0,0 @@
import json as jsn
import os
import pickle as pkl
from contextlib import suppress
from fractions import gcd
import math
import aiohttp
import discord as d
from discord import errors as err
from misc import exceptions as exc
print('\nPID : {}\n'.format(os.getpid()))
try:
with open('config.json') as infile:
config = jsn.load(infile)
print('LOADED : config.json')
except FileNotFoundError:
with open('config.json', 'w') as outfile:
jsn.dump({'client_id': 0, 'owner_id': 0, 'permissions': 126016,
'playing': 'a game', 'prefix': [',', 'm,'], 'selfbot': False, 'token': 'str', 'saucenao_api': 'str', 'e621_api': 'str'}, outfile, indent=4, sort_keys=True)
print('FILE NOT FOUND : config.json created with default values. Restart run.py with correct values')
def setdefault(filename, default=None, json=False):
if json:
try:
with open(filename, 'r') as infile:
print(f'LOADED : {filename}')
return jsn.load(infile)
except FileNotFoundError:
with open(filename, 'w+') as iofile:
print(f'FILE NOT FOUND : {filename} created and loaded with default values')
jsn.dump(default, iofile)
iofile.seek(0)
return jsn.load(iofile)
else:
try:
with open(filename, 'rb') as infile:
print(f'LOADED : {filename}')
return pkl.load(infile)
except FileNotFoundError:
with open(filename, 'wb+') as iofile:
print(f'FILE NOT FOUND : {filename} created and loaded with default values')
pkl.dump(default, iofile)
iofile.seek(0)
return pkl.load(iofile)
def load(filename, *, json=False):
if not json:
with open(filename, 'rb') as infile:
return pkl.load(infile)
else:
with open(filename) as infile:
return jsn.load(infile)
def dump(obj, filename, *, json=False):
if not json:
with open(filename, 'wb') as outfile:
pkl.dump(obj, outfile)
else:
with open(filename, 'w') as outfile:
jsn.dump(obj, outfile, indent=4, sort_keys=True)
settings = setdefault('misc/settings.pkl', default={'del_ctx': [], 'del_resp': [], 'prefixes': {}})
tasks = setdefault('cogs/tasks.pkl', default={'auto_del': [], 'auto_hrt': [], 'auto_rev': []})
temp = setdefault('temp/temp.pkl', default={'startup': ()})
block = setdefault('cogs/block.json', default={'guild_ids': [], 'user_ids': []}, json=True)
cogs = {}
color = d.Color(0x1A1A1A)
last_commands = {}
asession = aiohttp.ClientSession()
async def fetch(url, *, post={}, response=False, text=False, json=False):
if '.json' in url and ('e621' in url or 'e926' in url):
url += f'&login=BotMyned&api_key={config["e621_api"]}'
if post:
async with asession.post(url, data=post, headers={
'User-Agent': 'Myned/Modufur (https://github.com/Myned/Modufur)'}, ssl=False) as r:
assert r.status == 200
if response:
return r
elif text:
return await r.text()
elif json:
return await r.json()
else:
return await r.read()
else:
async with asession.get(url, headers={
'User-Agent': 'Myned/Modufur (https://github.com/Myned/Modufur)'}, ssl=False) as r:
if r.status != 200:
return r.status
elif response:
return r
elif text:
return await r.text()
elif json:
return await r.json()
else:
return await r.read()
def generate_embed(ctx, *, title=d.Embed.Empty, kind='rich', description=d.Embed.Empty, url=d.Embed.Empty, timestamp=d.Embed.Empty, colour=color, footer={}, image=d.Embed.Empty, thumbnail=d.Embed.Empty, author={}, fields=[]):
embed = d.Embed(title=title, type=kind, description=description, url=url, timestamp=timestamp, colour=colour if isinstance(ctx.channel, d.TextChannel) else color)
if footer:
embed.set_footer(text=footer.get('text', d.Embed.Empty), icon_url=footer.get('icon_url', d.Embed.Empty))
if image:
embed.set_image(url=image)
if thumbnail:
embed.set_thumbnail(url=thumbnail)
if author:
embed.set_author(name=author.get('name', d.Embed.Empty), url=author.get('url', d.Embed.Empty), icon_url=author.get('icon_url', d.Embed.Empty))
for field in fields:
embed.add_field(name=field.get('name', d.Embed.Empty), value=field.get('value', d.Embed.Empty), inline=field.get('inline', True))
return embed
def kwargs(args):
params = list(args)
lst = 'blacklist'
for switch in ('-a', '--aliases'):
if switch in params:
lst = 'aliases'
params.remove(switch)
return params, lst
def get_kwargs(ctx, args, *, limit=False):
remaining = list(args[:])
rm = False
lim = 1
for flag in ('-r', '-rm', '--remove'):
if flag in remaining:
rm = True
remaining.remove(flag)
if limit:
for arg in remaining:
if arg.isdigit():
if 1 <= int(arg) <= limit:
lim = int(arg)
remaining.remove(arg)
break
else:
raise exc.BoundsError(arg)
return {'remaining': remaining, 'remove': rm, 'limit': lim}
def get_aspectratio(a, b):
divisor = gcd(a, b)
return f'{int(a / divisor)}:{int(b / divisor)}'
def ci(pos, n):
z = 1.96
phat = float(pos) / n
return (phat + z*z/(2*n) - z * math.sqrt((phat*(1-phat)+z*z/(4*n))/n))/(1+z*z/n)
async def add_reaction(message, reaction, errors=(err.NotFound, err.Forbidden)):
sent = False
with suppress(errors):
await message.add_reaction(reaction)
sent = True
return sent

109
tools/components.py Normal file
View file

@ -0,0 +1,109 @@
import hikari
import lightbulb
from miru.ext import nav
plugin = lightbulb.Plugin('components')
class Back(nav.PrevButton):
def __init__(self):
super().__init__(
style=hikari.ButtonStyle.SECONDARY,
label='',
emoji=None)
class Forward(nav.NextButton):
def __init__(self):
super().__init__(
style=hikari.ButtonStyle.SECONDARY,
label='',
emoji=None)
class Confirm(nav.StopButton):
def __init__(self):
super().__init__(
style=hikari.ButtonStyle.PRIMARY,
label='',
emoji=None)
async def callback(self, context):
await context.edit_response(content='**Searching...**', components=None)
self.view.stop()
async def before_page_change(self):
self.disabled = False if self.view.selected else True
class Select(nav.NavButton):
def __init__(self):
super().__init__(
style=hikari.ButtonStyle.DANGER,
label='',
emoji=None)
async def callback(self, context):
if self.view.urls[self.view.current_page] not in self.view.selected:
self.view.selected.append(self.view.urls[self.view.current_page])
self._button(selected=True)
else:
self.view.selected.remove(self.view.urls[self.view.current_page])
self._button()
await context.edit_response(components=self.view.build())
async def before_page_change(self):
if self.view.urls[self.view.current_page] not in self.view.selected:
self._button()
else:
self._button(selected=True)
def _button(self, *, selected=False):
self.style = hikari.ButtonStyle.SUCCESS if selected else hikari.ButtonStyle.DANGER
self.label = '' if selected else ''
try:
confirm = next((child for child in self.view.children if isinstance(child, Confirm)))
confirm.disabled = False if self.view.selected else True
except StopIteration:
pass
class Selector(nav.NavigatorView):
def __init__(self, *, pages=[], buttons=[], timeout=120, urls=[]):
super().__init__(
pages=pages,
buttons=buttons,
timeout=timeout)
self.urls = urls
self.selected = []
self.saved = set()
self.timed_out = False
async def on_timeout(self):
if self._inter:
for button in self.children:
button.disabled = True
await self._inter.edit_initial_response(components=self.build())
self.timed_out = True
async def send_edit(self, interaction):
self._inter = interaction
for button in self.children:
if isinstance(button, nav.NavButton):
await button.before_page_change()
payload = self._get_page_payload(self.pages[0])
await interaction.edit_initial_response(**payload)
self.start(await interaction.fetch_initial_response())
def load(bot):
bot.add_plugin(plugin)
def unload(bot):
bot.remove_plugin(plugin)

61
tools/scraper.py Normal file
View file

@ -0,0 +1,61 @@
import aiohttp
import tldextract
import lightbulb
import pysaucenao
import config as c
plugin = lightbulb.Plugin('scraper')
sauce = pysaucenao.SauceNao(api_key=c.config['saucenao'], priority=(29, 40, 41)) # e621 > Fur Affinity > Twitter
async def reverse(urls):
matches = []
for url in urls:
saucenao = await _saucenao(url)
kheina = None
if saucenao:
matches.append(saucenao)
else:
pass
if not saucenao and not kheina:
matches.append(None)
return matches
async def _saucenao(url):
try:
results = await sauce.from_url(url)
except pysaucenao.FileSizeLimitException:
raise pysaucenao.FileSizeLimitException(url)
except pysaucenao.ImageSizeException:
raise pysaucenao.ImageSizeException(url)
except pysaucenao.InvalidImageException:
raise pysaucenao.InvalidImageException(url)
if results:
return {
'source': results[0].url,
'artist': results[0].author_name or 'unknown',
'thumbnail': results[0].thumbnail,
'similarity': int(results[0].similarity),
'index': tldextract.extract(results[0].index).domain}
return
async def _kheina(url):
pass
async def _fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={'User-Agent': 'Myned/Modufur (https://github.com/Myned/Modufur)'}) as response:
return await response.json() if response.status == 200 else None
def load(bot):
bot.add_plugin(plugin)
def unload(bot):
bot.remove_plugin(plugin)