1
0
Fork 0
mirror of https://github.com/myned/modufur.git synced 2025-01-12 20:13:18 +00:00
modufur/src/utils/utils.py
2018-01-14 01:24:17 -05:00

191 lines
6 KiB
Python

import asyncio
import json as jsn
import os
import pickle as pkl
import subprocess
from contextlib import suppress
from fractions import gcd
import math
import gmusicapi as gpm
import aiohttp
import discord as d
from misc import exceptions as exc
# from pync import Notifier
print('\nPID : {}\n'.format(os.getpid()))
# def notify(message):
# subprocess.run(['terminal-notifier', '-message', message, '-title',
# 'Modumind', '-activate', 'com.apple.Terminal', '-appIcon', 'icon.png', '-sound', 'Ping'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
with open('config.json') as infile:
config = jsn.load(infile)
print('LOADED : config.json')
except FileNotFoundError:
with open('config.json', 'w') as outfile:
jsn.dump({'client_id': 0, 'info_channel': 0, 'owner_id': 0, 'permissions': 126016,
'playing': 'a game', 'prefix': [',', 'm,'], 'selfbot': False, 'token': 'str'}, outfile, indent=4, sort_keys=True)
print('FILE NOT FOUND : config.json created with abstract values. Restart run.py with correct values')
def setdefault(filename, default=None, json=False):
if json:
try:
with open(filename, 'r') as infile:
print(f'LOADED : {filename}')
return jsn.load(infile)
except FileNotFoundError:
with open(filename, 'w+') as iofile:
print(f'FILE NOT FOUND : {filename} created and loaded with default values')
jsn.dump(default, iofile)
iofile.seek(0)
return jsn.load(iofile)
else:
try:
with open(filename, 'rb') as infile:
print(f'LOADED : {filename}')
return pkl.load(infile)
except FileNotFoundError:
with open(filename, 'wb+') as iofile:
print(f'FILE NOT FOUND : {filename} created and loaded with default values')
pkl.dump(default, iofile)
iofile.seek(0)
return pkl.load(iofile)
def load(filename, *, json=False):
if not json:
with open(filename, 'rb') as infile:
return pkl.load(infile)
else:
with open(filename) as infile:
return jsn.load(infile)
def dump(obj, filename, *, json=False):
if not json:
with open(filename, 'wb') as outfile:
pkl.dump(obj, outfile)
else:
with open(filename, 'w') as outfile:
jsn.dump(obj, outfile, indent=4, sort_keys=True)
settings = setdefault('misc/settings.pkl', default={'del_ctx': [], 'del_resp': [], 'prefixes': {}})
tasks = setdefault('cogs/tasks.pkl', default={'auto_del': [], 'auto_hrt': [], 'auto_rev': [], 'periodic_gpm': []})
temp = setdefault('temp/temp.pkl', default={'startup': ()})
secrets = setdefault('secrets.json', default={'client_secrets': {'client_id': '', 'client_secret': ''}}, json=True)
RATE_LIMIT = 2.2
color = d.Color(0x1A1A1A)
session = aiohttp.ClientSession()
last_commands = {}
async def fetch(url, *, params={}, json=False, response=False):
async with session.get(url, params=params, headers={'User-Agent': 'Myned/Modumind'}) as r:
if response:
return r
elif json:
return await r.json()
return await r.read()
# async def clear(obj, interval=10 * 60, replace=None):
# if replace is None:
# if type(obj) is list:
# replace = []
# elif type(obj) is dict:
# replace = {}
# elif type(obj) is int:
# replace = 0
# elif type(obj) is str:
# replace = ''
#
# while True:
# obj = replace
# asyncio.sleep(interval)
def close(loop):
if session:
session.close()
loop.stop()
pending = asyncio.Task.all_tasks()
for task in pending:
task.cancel()
# with suppress(asyncio.CancelledError):
# loop.run_until_complete(task)
# loop.close()
print('Finished cancelling tasks.')
def generate_embed(ctx, *, title=d.Embed.Empty, kind='rich', description=d.Embed.Empty, url=d.Embed.Empty, timestamp=d.Embed.Empty, colour=color, footer={}, image=d.Embed.Empty, thumbnail=d.Embed.Empty, author={}, fields=[]):
embed = d.Embed(title=title, type=kind, description=description, url=url, timestamp=timestamp, colour=colour if isinstance(ctx.channel, d.TextChannel) else color)
if footer:
embed.set_footer(text=footer.get('text', d.Embed.Empty), icon_url=footer.get('icon_url', d.Embed.Empty))
if image:
embed.set_image(url=image)
if thumbnail:
embed.set_thumbnail(url=thumbnail)
if author:
embed.set_author(name=author.get('name', d.Embed.Empty), url=author.get('url', d.Embed.Empty), icon_url=author.get('icon_url', d.Embed.Empty))
for field in fields:
embed.add_field(name=field.get('name', d.Embed.Empty), value=field.get('value', d.Embed.Empty), inline=field.get('inline', True))
return embed
def get_kwargs(ctx, args, *, limit=False):
destination = ctx
remaining = list(args[:])
rm = False
lim = 1
for flag in ('-d', '-dm'):
if flag in remaining:
destination = ctx.author
remaining.remove(flag)
for flag in ('-r', '-rm', '-remove', '-re', '-repl', '-replace'):
if flag in remaining and ctx.author.permissions_in(ctx.channel).manage_messages:
rm = True
remaining.remove(flag)
if limit:
for arg in remaining:
if arg.isdigit():
if 1 <= int(arg) <= limit:
lim = int(arg)
remaining.remove(arg)
break
else:
raise exc.BoundsError(arg)
return {'destination': destination, 'remaining': remaining, 'remove': rm, 'limit': lim}
def get_aspectratio(a, b):
divisor = gcd(a, b)
return f'{int(a / divisor)}:{int(b / divisor)}'
def ci(pos, n):
z = 1.96
phat = float(pos) / n
return (phat + z*z/(2*n) - z * math.sqrt((phat*(1-phat)+z*z/(4*n))/n))/(1+z*z/n)