1
0
Fork 0
mirror of https://github.com/myned/modufur.git synced 2024-12-24 14:27:27 +00:00

No change

This commit is contained in:
Myned 2017-12-23 21:35:47 -05:00
parent 5884fcfc0c
commit d95d9e721d
4 changed files with 2103 additions and 2103 deletions

File diff suppressed because it is too large Load diff

View file

@ -1,52 +1,52 @@
import asyncio import asyncio
import json import json
import traceback import traceback
from contextlib import suppress from contextlib import suppress
import discord as d import discord as d
from discord import errors as err from discord import errors as err
from discord.ext import commands from discord.ext import commands
from discord.ext.commands import errors as errext from discord.ext.commands import errors as errext
from utils import utils as u from utils import utils as u
owner_id = u.config['owner_id'] owner_id = u.config['owner_id']
ready = False ready = False
def is_owner(): def is_owner():
async def predicate(ctx): async def predicate(ctx):
return ctx.message.author.id == owner_id return ctx.message.author.id == owner_id
return commands.check(predicate) return commands.check(predicate)
def is_admin(): def is_admin():
def predicate(ctx): def predicate(ctx):
return ctx.message.author.guild_permissions.administrator return ctx.message.author.guild_permissions.administrator
return commands.check(predicate) return commands.check(predicate)
def is_mod(): def is_mod():
def predicate(ctx): def predicate(ctx):
return ctx.message.author.guild_permissions.ban_members return ctx.message.author.guild_permissions.ban_members
return commands.check(predicate) return commands.check(predicate)
def owner(ctx): def owner(ctx):
return ctx.message.author.id == owner_id return ctx.message.author.id == owner_id
def admin(ctx): def admin(ctx):
return ctx.message.author.guild_permissions.administrator return ctx.message.author.guild_permissions.administrator
def mod(ctx): def mod(ctx):
return ctx.message.author.guild_permissions.ban_members return ctx.message.author.guild_permissions.ban_members
def is_nsfw(): def is_nsfw():
def predicate(ctx): def predicate(ctx):
if isinstance(ctx.message.channel, d.TextChannel): if isinstance(ctx.message.channel, d.TextChannel):
return ctx.message.channel.is_nsfw() return ctx.message.channel.is_nsfw()
return True return True
return commands.check(predicate) return commands.check(predicate)

View file

@ -1,255 +1,255 @@
import asyncio import asyncio
from datetime import datetime as dt from datetime import datetime as dt
import json import json
import logging as log import logging as log
import subprocess import subprocess
import sys import sys
import traceback as tb import traceback as tb
from contextlib import suppress from contextlib import suppress
from pprint import pprint from pprint import pprint
from hurry.filesize import size, alternative from hurry.filesize import size, alternative
from urllib.parse import urlparse from urllib.parse import urlparse
import discord as d import discord as d
from discord import errors as err from discord import errors as err
from discord import utils from discord import utils
from discord.ext import commands as cmds from discord.ext import commands as cmds
from discord.ext.commands import errors as errext from discord.ext.commands import errors as errext
from misc import exceptions as exc from misc import exceptions as exc
from misc import checks from misc import checks
from utils import utils as u from utils import utils as u
log.basicConfig(level=log.WARNING) log.basicConfig(level=log.WARNING)
# class HelpFormatter(cmds.HelpFormatter): # class HelpFormatter(cmds.HelpFormatter):
# #
# async def format(self): # async def format(self):
# self._paginator = cmds.Paginator() # self._paginator = cmds.Paginator()
# #
# # we need a padding of ~80 or so # # we need a padding of ~80 or so
# #
# description = self.command.description if not self.is_cog() else inspect.getdoc(self.command) # description = self.command.description if not self.is_cog() else inspect.getdoc(self.command)
# #
# if description: # if description:
# # <description> portion # # <description> portion
# self._paginator.add_line(description, empty=True) # self._paginator.add_line(description, empty=True)
# #
# if isinstance(self.command, cmds.Command): # if isinstance(self.command, cmds.Command):
# # <signature portion> # # <signature portion>
# signature = self.get_command_signature() # signature = self.get_command_signature()
# self._paginator.add_line(signature, empty=True) # self._paginator.add_line(signature, empty=True)
# #
# # <long doc> section # # <long doc> section
# if self.command.help: # if self.command.help:
# self._paginator.add_line(self.command.help, empty=True) # self._paginator.add_line(self.command.help, empty=True)
# #
# # end it here if it's just a regular command # # end it here if it's just a regular command
# if not self.has_subcommands(): # if not self.has_subcommands():
# self._paginator.close_page() # self._paginator.close_page()
# return self._paginator.pages # return self._paginator.pages
# #
# max_width = self.max_name_size # max_width = self.max_name_size
def get_prefix(bot, message): def get_prefix(bot, message):
with suppress(AttributeError): with suppress(AttributeError):
return u.settings['prefixes'].get(message.guild.id, u.config['prefix']) return u.settings['prefixes'].get(message.guild.id, u.config['prefix'])
return u.config['prefix'] return u.config['prefix']
bot = cmds.Bot(command_prefix=get_prefix, self_bot=u.config['selfbot'], formatter=cmds.HelpFormatter(show_check_failure=True), description='Modufur - A booru bot with a side of management and automated tasking\nMade by @Myned#3985\n\nNSFW for Not Safe For Wumpus commands\n(G) for group commands\n@permission@ for required permissions\n!notice! for important information\np for prefix\n\n\{\} for mandatory argument\n[] for optional argument\n... for one or more arguments', help_attrs={'aliases': ['h']}, pm_help=None) bot = cmds.Bot(command_prefix=get_prefix, self_bot=u.config['selfbot'], formatter=cmds.HelpFormatter(show_check_failure=True), description='Modufur - A booru bot with a side of management and automated tasking\nMade by @Myned#3985\n\nNSFW for Not Safe For Wumpus commands\n(G) for group commands\n@permission@ for required permissions\n!notice! for important information\np for prefix\n\n\{\} for mandatory argument\n[] for optional argument\n... for one or more arguments', help_attrs={'aliases': ['h']}, pm_help=None)
@bot.command(help='help', brief='brief', description='description', usage='usage', hidden=True) @bot.command(help='help', brief='brief', description='description', usage='usage', hidden=True)
async def test(ctx): async def test(ctx):
pass pass
# Send and print ready message to #testing and console after logon # Send and print ready message to #testing and console after logon
@bot.event @bot.event
async def on_ready(): async def on_ready():
if not checks.ready: if not checks.ready:
from cogs import booru, info, management, owner, tools from cogs import booru, info, management, owner, tools
for cog in (tools.Utils(bot), owner.Bot(bot), owner.Tools(bot), management.Administration(bot), info.Info(bot), booru.MsG(bot)): for cog in (tools.Utils(bot), owner.Bot(bot), owner.Tools(bot), management.Administration(bot), info.Info(bot), booru.MsG(bot)):
bot.add_cog(cog) bot.add_cog(cog)
print(f'COG : {type(cog).__name__}') print(f'COG : {type(cog).__name__}')
# bot.loop.create_task(u.clear(booru.temp_urls, 30*60)) # bot.loop.create_task(u.clear(booru.temp_urls, 30*60))
if u.config['playing'] is not '': if u.config['playing'] is not '':
await bot.change_presence(game=d.Game(name=u.config['playing'])) await bot.change_presence(game=d.Game(name=u.config['playing']))
print('\n> > > > > > > > >\nC O N N E C T E D : {}\n> > > > > > > > >\n'.format(bot.user.name)) print('\n> > > > > > > > >\nC O N N E C T E D : {}\n> > > > > > > > >\n'.format(bot.user.name))
await bot.get_channel(u.config['info_channel']).send(f'**Started** \N{BLACK SUN WITH RAYS} `{"` or `".join(u.config["prefix"])}`') await bot.get_channel(u.config['info_channel']).send(f'**Started** \N{BLACK SUN WITH RAYS} `{"` or `".join(u.config["prefix"])}`')
# u.notify('C O N N E C T E D') # u.notify('C O N N E C T E D')
if u.temp['startup']: if u.temp['startup']:
with suppress(err.NotFound): with suppress(err.NotFound):
if u.temp['startup'][0] == 'guild': if u.temp['startup'][0] == 'guild':
dest = bot.get_channel(u.temp['startup'][1]) dest = bot.get_channel(u.temp['startup'][1])
else: else:
dest = bot.get_user(u.temp['startup'][1]) dest = bot.get_user(u.temp['startup'][1])
message = await dest.get_message(u.temp['startup'][2]) message = await dest.get_message(u.temp['startup'][2])
await message.add_reaction('\N{WHITE HEAVY CHECK MARK}') await message.add_reaction('\N{WHITE HEAVY CHECK MARK}')
u.temp['startup'] = () u.temp['startup'] = ()
u.dump(u.temp, 'temp/temp.pkl') u.dump(u.temp, 'temp/temp.pkl')
checks.ready = True checks.ready = True
else: else:
print('\n- - - -\nI N F O : reconnected, skipping initialization\n- - - -') print('\n- - - -\nI N F O : reconnected, skipping initialization\n- - - -')
@bot.event @bot.event
async def on_message(message): async def on_message(message):
if not u.config['selfbot']: if not u.config['selfbot']:
if message.author is not bot.user and not message.author.bot: if message.author is not bot.user and not message.author.bot:
await bot.process_commands(message) await bot.process_commands(message)
else: else:
if not message.author.bot: if not message.author.bot:
await bot.process_commands(message) await bot.process_commands(message)
@bot.event @bot.event
async def on_error(error, *args, **kwargs): async def on_error(error, *args, **kwargs):
print('\n! ! ! ! !\nE R R O R : {}\n! ! ! ! !\n'.format(error), file=sys.stderr) print('\n! ! ! ! !\nE R R O R : {}\n! ! ! ! !\n'.format(error), file=sys.stderr)
tb.print_exc() tb.print_exc()
await bot.get_user(u.config['owner_id']).send('**ERROR** \N{WARNING SIGN}\n```\n{}```'.format(error)) await bot.get_user(u.config['owner_id']).send('**ERROR** \N{WARNING SIGN}\n```\n{}```'.format(error))
await bot.get_channel(u.config['info_channel']).send('**ERROR** \N{WARNING SIGN}\n```\n{}```'.format(error)) await bot.get_channel(u.config['info_channel']).send('**ERROR** \N{WARNING SIGN}\n```\n{}```'.format(error))
if u.temp['startup']: if u.temp['startup']:
with suppress(err.NotFound): with suppress(err.NotFound):
if u.temp['startup'][0] == 'guild': if u.temp['startup'][0] == 'guild':
dest = bot.get_channel(u.temp['startup'][1]) dest = bot.get_channel(u.temp['startup'][1])
else: else:
dest = bot.get_user(u.temp['startup'][1]) dest = bot.get_user(u.temp['startup'][1])
message = await dest.get_message(u.temp['startup'][2]) message = await dest.get_message(u.temp['startup'][2])
await message.add_reaction('\N{WARNING SIGN}') await message.add_reaction('\N{WARNING SIGN}')
u.temp.clear() u.temp.clear()
u.dump(u.temp, 'temp/temp.pkl') u.dump(u.temp, 'temp/temp.pkl')
# u.notify('E R R O R') # u.notify('E R R O R')
await bot.logout() await bot.logout()
u.close(bot.loop) u.close(bot.loop)
@bot.event @bot.event
async def on_command_error(ctx, error): async def on_command_error(ctx, error):
if isinstance(error, err.NotFound): if isinstance(error, err.NotFound):
print('NOT FOUND') print('NOT FOUND')
elif isinstance(error, errext.CheckFailure): elif isinstance(error, errext.CheckFailure):
await ctx.send('**Insufficient permissions**', delete_after=10) await ctx.send('**Insufficient permissions**', delete_after=10)
await ctx.message.add_reaction('\N{NO ENTRY}') await ctx.message.add_reaction('\N{NO ENTRY}')
elif isinstance(error, errext.CommandNotFound): elif isinstance(error, errext.CommandNotFound):
print('INVALID COMMAND : {}'.format(error), file=sys.stderr) print('INVALID COMMAND : {}'.format(error), file=sys.stderr)
await ctx.message.add_reaction('\N{BLACK QUESTION MARK ORNAMENT}') await ctx.message.add_reaction('\N{BLACK QUESTION MARK ORNAMENT}')
else: else:
print('\n! ! ! ! ! ! ! ! ! ! ! !\nC O M M A N D E R R O R : {}\n! ! ! ! ! ! ! ! ! ! ! !\n'.format( print('\n! ! ! ! ! ! ! ! ! ! ! !\nC O M M A N D E R R O R : {}\n! ! ! ! ! ! ! ! ! ! ! !\n'.format(
error), file=sys.stderr) error), file=sys.stderr)
tb.print_exception(type(error), error, error.__traceback__, file=sys.stderr) tb.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
await bot.get_user(u.config['owner_id']).send('**COMMAND ERROR** \N{WARNING SIGN} `{}` from {} in {}\n```\n{}```'.format(ctx.message.content, ctx.author.mention, ctx.channel.mention, ''.join(tb.format_exception(type(error), error, error.__traceback__ if len(str(error.__traceback__)) < 1500 else str(error.__traceback__)[:1500])))) await bot.get_user(u.config['owner_id']).send('**COMMAND ERROR** \N{WARNING SIGN} `{}` from {} in {}\n```\n{}```'.format(ctx.message.content, ctx.author.mention, ctx.channel.mention, ''.join(tb.format_exception(type(error), error, error.__traceback__ if len(str(error.__traceback__)) < 1500 else str(error.__traceback__)[:1500]))))
await bot.get_channel(u.config['info_channel']).send('**COMMAND ERROR** \N{WARNING SIGN} `{}` from {} in {}\n```\n{}```'.format(ctx.message.content, ctx.author.mention, ctx.channel.mention, error)) await bot.get_channel(u.config['info_channel']).send('**COMMAND ERROR** \N{WARNING SIGN} `{}` from {} in {}\n```\n{}```'.format(ctx.message.content, ctx.author.mention, ctx.channel.mention, error))
await exc.send_error(ctx, error) await exc.send_error(ctx, error)
await ctx.message.add_reaction('\N{WARNING SIGN}') await ctx.message.add_reaction('\N{WARNING SIGN}')
# u.notify('C O M M A N D E R R O R') # u.notify('C O M M A N D E R R O R')
@bot.event @bot.event
async def on_command_completion(ctx): async def on_command_completion(ctx):
with suppress(err.NotFound): with suppress(err.NotFound):
with suppress(AttributeError): with suppress(AttributeError):
if ctx.guild.id in u.settings['del_ctx'] and ctx.me.permissions_in(ctx.channel).manage_messages and isinstance(ctx.message.channel, d.TextChannel): if ctx.guild.id in u.settings['del_ctx'] and ctx.me.permissions_in(ctx.channel).manage_messages and isinstance(ctx.message.channel, d.TextChannel):
await ctx.message.delete() await ctx.message.delete()
await ctx.message.add_reaction('\N{WHITE HEAVY CHECK MARK}') await ctx.message.add_reaction('\N{WHITE HEAVY CHECK MARK}')
for command in ('lastcommand', ',restart', ',die'): for command in ('lastcommand', ',restart', ',die'):
if ctx.command.name == command: if ctx.command.name == command:
return return
u.last_commands[ctx.author.id] = ctx u.last_commands[ctx.author.id] = ctx
@bot.event @bot.event
async def on_guild_remove(guild): async def on_guild_remove(guild):
print(f'LEFT : {guild.name}') print(f'LEFT : {guild.name}')
for task, idents in u.tasks.items(): for task, idents in u.tasks.items():
for channel in guild.channels: for channel in guild.channels:
if channel.id in idents: if channel.id in idents:
idents.remove(channel.id) idents.remove(channel.id)
print(f'STOPPED : {task} in #{channel.id}') print(f'STOPPED : {task} in #{channel.id}')
u.dump(u.tasks, 'cogs/tasks.pkl') u.dump(u.tasks, 'cogs/tasks.pkl')
# d.opus.load_opus('opus') # d.opus.load_opus('opus')
async def wait(voice): async def wait(voice):
asyncio.sleep(5) asyncio.sleep(5)
await voice.disconnect() await voice.disconnect()
def after(voice, error): def after(voice, error):
coro = voice.disconnect() coro = voice.disconnect()
future = asyncio.run_coroutine_threadsafe(coro, voice.loop) future = asyncio.run_coroutine_threadsafe(coro, voice.loop)
future.result() future.result()
# suggested = u.setdefault('cogs/suggested.pkl', {'last_update': 'None', 'tags': {}, 'total': 0}) # suggested = u.setdefault('cogs/suggested.pkl', {'last_update': 'None', 'tags': {}, 'total': 0})
@bot.command(name=',test', hidden=True) @bot.command(name=',test', hidden=True)
@cmds.is_owner() @cmds.is_owner()
async def test(ctx): async def test(ctx):
post = await u.fetch('https://e621.net/post/show.json?id=1145042', json=True) post = await u.fetch('https://e621.net/post/show.json?id=1145042', json=True)
tags = [] tags = []
if post['tags']: if post['tags']:
temptags = post['tags'].split(' ') temptags = post['tags'].split(' ')
cis = [] cis = []
for tag in suggested: for tag in suggested:
pass pass
for tag in temptags: for tag in temptags:
tags.append(f'[{tag}](https://e621.net/post?tags={tag})') tags.append(f'[{tag}](https://e621.net/post?tags={tag})')
# tags = ' '.join(tags) # tags = ' '.join(tags)
else: else:
tags = 'None' tags = 'None'
if post['description']: if post['description']:
post_description = post['description'] if len(post['description']) < 200 else f'{post["description"][:200]}...' post_description = post['description'] if len(post['description']) < 200 else f'{post["description"][:200]}...'
else: else:
post_description = 'None' post_description = 'None'
title = ', '.join(post['artist']) title = ', '.join(post['artist'])
description = f'posted by: *[{post["author"]}](https://e621.net/post?tags=user:{post["author"]})*' description = f'posted by: *[{post["author"]}](https://e621.net/post?tags=user:{post["author"]})*'
url = f'https://e621.net/post?tags={",".join(post["artist"])}' url = f'https://e621.net/post?tags={",".join(post["artist"])}'
# timestamp = dt.utcnow() # timestamp = dt.utcnow()
color = ctx.me.color color = ctx.me.color
footer = {'text': post['score'], 'icon_url': 'https://images-ext-1.discordapp.net/external/W2k0ZzhU7ngvN_-CdqAa3H3FmkfCNYQTxPG_DsvacB4/https/emojipedia-us.s3.amazonaws.com/thumbs/320/twitter/103/sparkles_2728.png'} footer = {'text': post['score'], 'icon_url': 'https://images-ext-1.discordapp.net/external/W2k0ZzhU7ngvN_-CdqAa3H3FmkfCNYQTxPG_DsvacB4/https/emojipedia-us.s3.amazonaws.com/thumbs/320/twitter/103/sparkles_2728.png'}
# image = 'https://e621.net/post/show/54360' # image = 'https://e621.net/post/show/54360'
thumbnail = post['file_url'] thumbnail = post['file_url']
author = {'name': post['id'], 'url': f'https://e621.net/post/show/{post["id"]}', 'icon_url': ctx.author.avatar_url} author = {'name': post['id'], 'url': f'https://e621.net/post/show/{post["id"]}', 'icon_url': ctx.author.avatar_url}
fields = [] fields = []
names = ('File', 'Sources', 'Description', 'tags', 'tags (ext.)') names = ('File', 'Sources', 'Description', 'tags', 'tags (ext.)')
values = (f'[{post["md5"]}]({post["file_url"]}) | [{post["file_ext"]}](https://e621.net/post?tags=type:{post["file_ext"]})\n\n**Size** [{size(post["file_size"], system=alternative)}](https://e621.net/post?tags=filesize:{post["file_size"]})\n**Resolution** [{post["width"]} x {post["height"]}](https://e621.net/post?tags=width:{post["width"]},height:{post["height"]}) | [{u.get_aspectratio(post["width"], post["height"])}](https://e621.net/post?tags=ratio:{post["width"]/post["height"]:.2f})', '\n'.join([f'[{urlparse(source).netloc}]({source})' for source in post['sources']]), post_description, ' '.join(tags[:20]), ' '.join(tags[20:])) values = (f'[{post["md5"]}]({post["file_url"]}) | [{post["file_ext"]}](https://e621.net/post?tags=type:{post["file_ext"]})\n\n**Size** [{size(post["file_size"], system=alternative)}](https://e621.net/post?tags=filesize:{post["file_size"]})\n**Resolution** [{post["width"]} x {post["height"]}](https://e621.net/post?tags=width:{post["width"]},height:{post["height"]}) | [{u.get_aspectratio(post["width"], post["height"])}](https://e621.net/post?tags=ratio:{post["width"]/post["height"]:.2f})', '\n'.join([f'[{urlparse(source).netloc}]({source})' for source in post['sources']]), post_description, ' '.join(tags[:20]), ' '.join(tags[20:]))
inlines = (False, False, False, True, True) inlines = (False, False, False, True, True)
for name, value, inline in zip(names, values, inlines): for name, value, inline in zip(names, values, inlines):
fields.append({'name': name, 'value': value, 'inline': inline}) fields.append({'name': name, 'value': value, 'inline': inline})
embed = u.generate_embed(ctx, title=title, description=description, url=url, colour=color, footer=footer, thumbnail=thumbnail, author=author, fields=fields) embed = u.generate_embed(ctx, title=title, description=description, url=url, colour=color, footer=footer, thumbnail=thumbnail, author=author, fields=fields)
await ctx.send(embed=embed) await ctx.send(embed=embed)
# print(ctx.args) # print(ctx.args)
# print(ctx.kwargs) # print(ctx.kwargs)
# if '<:N_:368917475531816962>' in message: # if '<:N_:368917475531816962>' in message:
# await ctx.send('<:N_:368917475531816962>') # await ctx.send('<:N_:368917475531816962>')
# logs = [] # logs = []
# async for entry in ctx.guild.audit_logs(limit=None, action=d.AuditLogAction.message_delete): # async for entry in ctx.guild.audit_logs(limit=None, action=d.AuditLogAction.message_delete):
# logs.append( # logs.append(
# f'@{entry.user.name} deleted {entry.extra.count} messages from @{entry.target.name} in #{entry.extra.channel.name}') # f'@{entry.user.name} deleted {entry.extra.count} messages from @{entry.target.name} in #{entry.extra.channel.name}')
# pprint(logs) # pprint(logs)
# channel = bot.get_channel(int(cid)) # channel = bot.get_channel(int(cid))
# voice = await channel.connect() # voice = await channel.connect()
# voice.play(d.AudioSource, after=lambda: after(voice)) # voice.play(d.AudioSource, after=lambda: after(voice))
bot.run(u.config['token'], bot=not u.config['selfbot']) bot.run(u.config['token'], bot=not u.config['selfbot'])

View file

@ -1,191 +1,191 @@
import asyncio import asyncio
import json as jsn import json as jsn
import os import os
import pickle as pkl import pickle as pkl
import subprocess import subprocess
from contextlib import suppress from contextlib import suppress
from fractions import gcd from fractions import gcd
import math import math
import gmusicapi as gpm import gmusicapi as gpm
import aiohttp import aiohttp
import discord as d import discord as d
from misc import exceptions as exc from misc import exceptions as exc
# from pync import Notifier # from pync import Notifier
print('\nPID : {}\n'.format(os.getpid())) print('\nPID : {}\n'.format(os.getpid()))
# def notify(message): # def notify(message):
# subprocess.run(['terminal-notifier', '-message', message, '-title', # subprocess.run(['terminal-notifier', '-message', message, '-title',
# 'Modumind', '-activate', 'com.apple.Terminal', '-appIcon', 'icon.png', '-sound', 'Ping'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 'Modumind', '-activate', 'com.apple.Terminal', '-appIcon', 'icon.png', '-sound', 'Ping'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try: try:
with open('config.json') as infile: with open('config.json') as infile:
config = jsn.load(infile) config = jsn.load(infile)
print('LOADED : config.json') print('LOADED : config.json')
except FileNotFoundError: except FileNotFoundError:
with open('config.json', 'w') as outfile: with open('config.json', 'w') as outfile:
jsn.dump({'client_id': 0, 'info_channel': 0, 'owner_id': 0, 'permissions': 126016, jsn.dump({'client_id': 0, 'info_channel': 0, 'owner_id': 0, 'permissions': 126016,
'playing': 'a game', 'prefix': [',', 'm,'], 'selfbot': False, 'token': 'str'}, outfile, indent=4, sort_keys=True) 'playing': 'a game', 'prefix': [',', 'm,'], 'selfbot': False, 'token': 'str'}, outfile, indent=4, sort_keys=True)
raise FileNotFoundError( raise FileNotFoundError(
'FILE NOT FOUND : config.json created with abstract values. Restart run.py with correct values') 'FILE NOT FOUND : config.json created with abstract values. Restart run.py with correct values')
def setdefault(filename, default=None, json=False): def setdefault(filename, default=None, json=False):
if json: if json:
try: try:
with open(filename, 'rb') as infile: with open(filename, 'rb') as infile:
print(f'LOADED : {filename}') print(f'LOADED : {filename}')
return jsn.load(infile) return jsn.load(infile)
except FileNotFoundError: except FileNotFoundError:
with open(filename, 'wb+') as iofile: with open(filename, 'wb+') as iofile:
print(f'FILE NOT FOUND : {filename} created and loaded with default values') print(f'FILE NOT FOUND : {filename} created and loaded with default values')
jsn.dump(default, iofile) jsn.dump(default, iofile)
iofile.seek(0) iofile.seek(0)
return jsn.load(iofile) return jsn.load(iofile)
else: else:
try: try:
with open(filename, 'rb') as infile: with open(filename, 'rb') as infile:
print(f'LOADED : {filename}') print(f'LOADED : {filename}')
return pkl.load(infile) return pkl.load(infile)
except FileNotFoundError: except FileNotFoundError:
with open(filename, 'wb+') as iofile: with open(filename, 'wb+') as iofile:
print(f'FILE NOT FOUND : {filename} created and loaded with default values') print(f'FILE NOT FOUND : {filename} created and loaded with default values')
pkl.dump(default, iofile) pkl.dump(default, iofile)
iofile.seek(0) iofile.seek(0)
return pkl.load(iofile) return pkl.load(iofile)
def load(filename, *, json=False): def load(filename, *, json=False):
if not json: if not json:
with open(filename, 'rb') as infile: with open(filename, 'rb') as infile:
return pkl.load(infile) return pkl.load(infile)
else: else:
with open(filename) as infile: with open(filename) as infile:
return jsn.load(infile) return jsn.load(infile)
def dump(obj, filename, *, json=False): def dump(obj, filename, *, json=False):
if not json: if not json:
with open(filename, 'wb') as outfile: with open(filename, 'wb') as outfile:
pkl.dump(obj, outfile) pkl.dump(obj, outfile)
else: else:
with open(filename, 'w') as outfile: with open(filename, 'w') as outfile:
jsn.dump(obj, outfile, indent=4, sort_keys=True) jsn.dump(obj, outfile, indent=4, sort_keys=True)
settings = setdefault('misc/settings.pkl', {'del_ctx': [], 'prefixes': {}}) settings = setdefault('misc/settings.pkl', {'del_ctx': [], 'prefixes': {}})
tasks = setdefault('cogs/tasks.pkl', {'auto_del': [], 'auto_rev': [], 'periodic_gpm': []}) tasks = setdefault('cogs/tasks.pkl', {'auto_del': [], 'auto_rev': [], 'periodic_gpm': []})
temp = setdefault('temp/temp.pkl', {'startup': ()}) temp = setdefault('temp/temp.pkl', {'startup': ()})
RATE_LIMIT = 2.2 RATE_LIMIT = 2.2
color = d.Color(0x1A1A1A) color = d.Color(0x1A1A1A)
session = aiohttp.ClientSession() session = aiohttp.ClientSession()
last_commands = {} last_commands = {}
async def fetch(url, *, params={}, json=False, response=False): async def fetch(url, *, params={}, json=False, response=False):
async with session.get(url, params=params, headers={'User-Agent': 'Myned/Modumind'}) as r: async with session.get(url, params=params, headers={'User-Agent': 'Myned/Modumind'}) as r:
if response: if response:
return r return r
elif json: elif json:
return await r.json() return await r.json()
return await r.read() return await r.read()
# async def clear(obj, interval=10 * 60, replace=None): # async def clear(obj, interval=10 * 60, replace=None):
# if replace is None: # if replace is None:
# if type(obj) is list: # if type(obj) is list:
# replace = [] # replace = []
# elif type(obj) is dict: # elif type(obj) is dict:
# replace = {} # replace = {}
# elif type(obj) is int: # elif type(obj) is int:
# replace = 0 # replace = 0
# elif type(obj) is str: # elif type(obj) is str:
# replace = '' # replace = ''
# #
# while True: # while True:
# obj = replace # obj = replace
# asyncio.sleep(interval) # asyncio.sleep(interval)
def close(loop): def close(loop):
if session: if session:
session.close() session.close()
loop.stop() loop.stop()
pending = asyncio.Task.all_tasks() pending = asyncio.Task.all_tasks()
for task in pending: for task in pending:
task.cancel() task.cancel()
# with suppress(asyncio.CancelledError): # with suppress(asyncio.CancelledError):
# loop.run_until_complete(task) # loop.run_until_complete(task)
# loop.close() # loop.close()
print('Finished cancelling tasks.') print('Finished cancelling tasks.')
def generate_embed(ctx, *, title=d.Embed.Empty, kind='rich', description=d.Embed.Empty, url=d.Embed.Empty, timestamp=d.Embed.Empty, colour=color, footer={}, image=d.Embed.Empty, thumbnail=d.Embed.Empty, author={}, fields=[]): def generate_embed(ctx, *, title=d.Embed.Empty, kind='rich', description=d.Embed.Empty, url=d.Embed.Empty, timestamp=d.Embed.Empty, colour=color, footer={}, image=d.Embed.Empty, thumbnail=d.Embed.Empty, author={}, fields=[]):
embed = d.Embed(title=title, type=kind, description=description, url=url, timestamp=timestamp, colour=colour if isinstance(ctx.channel, d.TextChannel) else color) embed = d.Embed(title=title, type=kind, description=description, url=url, timestamp=timestamp, colour=colour if isinstance(ctx.channel, d.TextChannel) else color)
if footer: if footer:
embed.set_footer(text=footer.get('text', d.Embed.Empty), icon_url=footer.get('icon_url', d.Embed.Empty)) embed.set_footer(text=footer.get('text', d.Embed.Empty), icon_url=footer.get('icon_url', d.Embed.Empty))
if image: if image:
embed.set_image(url=image) embed.set_image(url=image)
if thumbnail: if thumbnail:
embed.set_thumbnail(url=thumbnail) embed.set_thumbnail(url=thumbnail)
if author: if author:
embed.set_author(name=author.get('name', d.Embed.Empty), url=author.get('url', d.Embed.Empty), icon_url=author.get('icon_url', d.Embed.Empty)) embed.set_author(name=author.get('name', d.Embed.Empty), url=author.get('url', d.Embed.Empty), icon_url=author.get('icon_url', d.Embed.Empty))
for field in fields: for field in fields:
embed.add_field(name=field.get('name', d.Embed.Empty), value=field.get('value', d.Embed.Empty), inline=field.get('inline', True)) embed.add_field(name=field.get('name', d.Embed.Empty), value=field.get('value', d.Embed.Empty), inline=field.get('inline', True))
return embed return embed
def get_kwargs(ctx, args, *, limit=False): def get_kwargs(ctx, args, *, limit=False):
destination = ctx destination = ctx
remaining = list(args[:]) remaining = list(args[:])
rm = False rm = False
lim = 1 lim = 1
for flag in ('-d', '-dm'): for flag in ('-d', '-dm'):
if flag in remaining: if flag in remaining:
destination = ctx.author destination = ctx.author
remaining.remove(flag) remaining.remove(flag)
for flag in ('-r', '-rm', '-remove', '-re', '-repl', '-replace'): for flag in ('-r', '-rm', '-remove', '-re', '-repl', '-replace'):
if flag in remaining and ctx.author.permissions_in(ctx.channel).manage_messages: if flag in remaining and ctx.author.permissions_in(ctx.channel).manage_messages:
rm = True rm = True
remaining.remove(flag) remaining.remove(flag)
if limit: if limit:
for arg in remaining: for arg in remaining:
if arg.isdigit(): if arg.isdigit():
if 1 <= int(arg) <= limit: if 1 <= int(arg) <= limit:
lim = int(arg) lim = int(arg)
remaining.remove(arg) remaining.remove(arg)
break break
else: else:
raise exc.BoundsError(arg) raise exc.BoundsError(arg)
return {'destination': destination, 'remaining': remaining, 'remove': rm, 'limit': lim} return {'destination': destination, 'remaining': remaining, 'remove': rm, 'limit': lim}
def get_aspectratio(a, b): def get_aspectratio(a, b):
divisor = gcd(a, b) divisor = gcd(a, b)
return f'{int(a / divisor)}:{int(b / divisor)}' return f'{int(a / divisor)}:{int(b / divisor)}'
def ci(pos, n): def ci(pos, n):
z = 1.96 z = 1.96
phat = float(pos) / n phat = float(pos) / n
return (phat + z*z/(2*n) - z * math.sqrt((phat*(1-phat)+z*z/(4*n))/n))/(1+z*z/n) return (phat + z*z/(2*n) - z * math.sqrt((phat*(1-phat)+z*z/(4*n))/n))/(1+z*z/n)