1
0
Fork 0
mirror of https://github.com/myned/modufur.git synced 2025-01-13 12:23:19 +00:00
modufur/src/main/utils/utils.py

167 lines
5.1 KiB
Python
Raw Normal View History

2017-10-13 02:30:07 +00:00
import asyncio
import json as jsn
2017-10-14 03:59:14 +00:00
import os
2017-10-13 02:30:07 +00:00
import pickle as pkl
import subprocess
from contextlib import suppress
from fractions import gcd
import math
2017-10-13 02:30:07 +00:00
import aiohttp
import discord as d
2017-10-18 00:28:56 +00:00
from misc import exceptions as exc
# from pync import Notifier
print('\nPID : {}\n'.format(os.getpid()))
# def notify(message):
# subprocess.run(['terminal-notifier', '-message', message, '-title',
# 'Modumind', '-activate', 'com.apple.Terminal', '-appIcon', 'icon.png', '-sound', 'Ping'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2017-10-13 02:30:07 +00:00
2017-10-14 03:59:14 +00:00
try:
2017-10-20 20:23:01 +00:00
with open('config.json') as infile:
config = jsn.load(infile)
2017-10-20 20:20:33 +00:00
print('LOADED : config.json')
2017-10-14 03:59:14 +00:00
except FileNotFoundError:
2017-10-20 20:23:01 +00:00
with open('config.json', 'w') as outfile:
jsn.dump({'client_id': 0, 'info_channel': 0, 'owner_id': 0, 'permissions': 126016,
'playing': 'a game', 'prefix': [',', 'm,'], 'token': 'str'}, outfile, indent=4, sort_keys=True)
raise FileNotFoundError(
2017-10-20 20:20:33 +00:00
'FILE NOT FOUND : config.json created with abstract values. Restart run.py with correct values')
2017-10-14 03:59:14 +00:00
2017-10-13 02:30:07 +00:00
def setdefault(filename, default=None):
2017-10-20 20:23:01 +00:00
try:
with open(filename, 'rb') as infile:
2017-10-20 20:20:33 +00:00
print('LOADED : {}'.format(filename))
2017-10-20 20:23:01 +00:00
return pkl.load(infile)
except FileNotFoundError:
with open(filename, 'wb+') as iofile:
2017-10-20 20:20:33 +00:00
print('FILE NOT FOUND : {} created and loaded with default values'.format(filename))
2017-10-20 20:23:01 +00:00
pkl.dump(default, iofile)
iofile.seek(0)
return pkl.load(iofile)
2017-10-13 02:30:07 +00:00
def load(filename, *, json=False):
2017-10-20 20:23:01 +00:00
if not json:
with open(filename, 'rb') as infile:
return pkl.load(infile)
else:
with open(filename) as infile:
return jsn.load(infile)
def dump(obj, filename, *, json=False):
2017-10-20 20:23:01 +00:00
if not json:
with open(filename, 'wb') as outfile:
pkl.dump(obj, outfile)
else:
with open(filename, 'w') as outfile:
jsn.dump(obj, outfile, indent=4, sort_keys=True)
2017-10-13 02:30:07 +00:00
2017-10-20 20:23:01 +00:00
settings = setdefault('settings.pkl', {'del_ctx': [], 'prefixes': {}})
2017-10-16 22:49:32 +00:00
tasks = setdefault('cogs/tasks.pkl', {'auto_del': [], 'auto_qual': [], 'auto_rev': []})
temp = setdefault('temp.pkl', {})
RATE_LIMIT = 2.2
2017-11-06 06:56:52 +00:00
color = d.Color(0x1A1A1A)
session = aiohttp.ClientSession()
2017-11-19 16:40:08 +00:00
last_commands = {}
2017-10-13 02:30:07 +00:00
# async def clear(obj, interval=10 * 60, replace=None):
# if replace is None:
# if type(obj) is list:
# replace = []
# elif type(obj) is dict:
# replace = {}
# elif type(obj) is int:
# replace = 0
# elif type(obj) is str:
# replace = ''
#
# while True:
# obj = replace
# asyncio.sleep(interval)
def close(loop):
2017-10-20 20:23:01 +00:00
if session:
session.close()
2017-10-20 20:23:01 +00:00
loop.stop()
pending = asyncio.Task.all_tasks()
for task in pending:
task.cancel()
# with suppress(asyncio.CancelledError):
# loop.run_until_complete(task)
# loop.close()
2017-10-20 20:23:01 +00:00
print('Finished cancelling tasks.')
2017-10-13 02:30:07 +00:00
async def fetch(url, *, params={}, json=False):
2017-10-21 20:38:49 +00:00
async with session.get(url, params=params, headers={'User-Agent': 'Myned/Modumind/dev'}) as r:
2017-10-20 20:23:01 +00:00
if json:
return await r.json()
return await r.read()
2017-11-20 04:03:04 +00:00
def generate_embed(ctx, *, title=d.Embed.Empty, type='rich', description=d.Embed.Empty, url=d.Embed.Empty, timestamp=d.Embed.Empty, colour=color, footer={}, image=d.Embed.Empty, thumbnail=d.Embed.Empty, author={}, fields=[]):
embed = d.Embed(title=title, type=type, description=description, url=url, timestamp=timestamp, colour=colour if isinstance(ctx.channel, d.TextChannel) else color)
if footer:
embed.set_footer(text=footer.get('text', d.Embed.Empty), icon_url=footer.get('icon_url', d.Embed.Empty))
if image:
embed.set_image(url=image)
if thumbnail:
embed.set_thumbnail(url=thumbnail)
if author:
embed.set_author(name=author.get('name', d.Embed.Empty), url=author.get('url', d.Embed.Empty), icon_url=author.get('icon_url', d.Embed.Empty))
for field in fields:
embed.add_field(name=field.get('name', d.Embed.Empty), value=field.get('value', d.Embed.Empty), inline=field.get('inline', True))
return embed
def get_kwargs(ctx, args, *, limit=False):
2017-10-20 20:23:01 +00:00
destination = ctx
remaining = list(args[:])
rm = False
lim = 1
2017-10-17 23:04:45 +00:00
2017-10-28 19:40:14 +00:00
for flag in ('-d', '-dm'):
if flag in remaining:
destination = ctx.author
2017-10-28 19:40:14 +00:00
remaining.remove(flag)
2017-10-28 19:40:14 +00:00
for flag in ('-r', '-rm', '-remove', '-re', '-repl', '-replace'):
if flag in remaining and ctx.author.permissions_in(ctx.channel).manage_messages:
rm = True
2017-10-17 23:04:45 +00:00
2017-10-28 19:40:14 +00:00
remaining.remove(flag)
2017-10-20 20:23:01 +00:00
if limit:
for arg in remaining:
2017-10-20 20:20:47 +00:00
if arg.isdigit():
2017-10-20 20:23:01 +00:00
if 1 <= int(arg) <= limit:
lim = int(arg)
remaining.remove(arg)
break
else:
raise exc.BoundsError(arg)
return {'destination': destination, 'remaining': remaining, 'remove': rm, 'limit': lim}
def ci(pos, n):
z = 1.96
phat = float(pos) / n
return (phat + z*z/(2*n) - z * math.sqrt((phat*(1-phat)+z*z/(4*n))/n))/(1+z*z/n)