Compare commits
2 commits
35da34bc8c
...
dc2c195e95
Author | SHA1 | Date | |
---|---|---|---|
dc2c195e95 | |||
c4f65650b9 |
4 changed files with 64 additions and 60 deletions
|
@ -1,5 +1,3 @@
|
|||
""" Some functions for api calls """
|
||||
|
||||
import json
|
||||
import random
|
||||
import re
|
||||
|
@ -31,14 +29,15 @@ class ApiWrapper:
|
|||
# this could have used match - case statement, but python 3.9
|
||||
def call(self, api: str, data: typing.Union[str, None],
|
||||
message: str) -> str:
|
||||
if api == 'img_url': return self.format_img(data)
|
||||
elif api == 'gif': return self.get_gif()
|
||||
elif api == 'kmp': return self.get_kmp()
|
||||
elif api == 'fga': return self.get_fga()
|
||||
elif api == 'fakenews': return self.get_fakenews()
|
||||
elif api == 'anek': return self.get_anek()
|
||||
elif api == 'y_search': return self.y_search(message)
|
||||
return self.FAILED
|
||||
match api:
|
||||
case 'img_url': return self.format_img(data)
|
||||
case 'gif': return self.get_gif()
|
||||
case 'kmp': return self.get_kmp()
|
||||
case 'fga': return self.get_fga()
|
||||
case 'fakenews': return self.get_fakenews()
|
||||
case 'anek': return self.get_anek()
|
||||
case 'y_search': return self.y_search(message)
|
||||
case _: return self.FAILED
|
||||
|
||||
def __sanitize_search(self, message: str) -> str:
|
||||
"""Removes one of each of the search tokens from the query
|
||||
|
|
|
@ -10,7 +10,8 @@ import pgbotlib.response
|
|||
|
||||
# TODO: quote via response?
|
||||
# chat = await event.get_chat()
|
||||
# result = await client.get_messages(chat.id, ids=[event.message.reply_to.reply_to_msg_id])
|
||||
# result = await client.get_messages(chat.id,
|
||||
# ids=[event.message.reply_to.reply_to_msg_id])
|
||||
# print(result)
|
||||
class Commander:
|
||||
T_START = frozenset(['cmd_start'])
|
||||
|
@ -59,15 +60,18 @@ class Commander:
|
|||
def __add_response(self, caller: int, command: str) -> bool:
|
||||
if caller not in self.admins:
|
||||
return self.NOPE
|
||||
input_tokens, phrase = command.strip().split(' ', 1)
|
||||
input_tokenset = frozenset(input_tokens.split(','))
|
||||
for token in input_tokenset:
|
||||
if token not in self.available_tokens:
|
||||
return False
|
||||
query = 'INSERT INTO responses (tokens, response) values (%s,%s)'
|
||||
values = (','.join(sorted(input_tokenset)), phrase.strip())
|
||||
self.db_conn.update(query, values)
|
||||
return self.YEP
|
||||
DB_QUERY = 'INSERT INTO responses (tokens, response) values (%s,%s)'
|
||||
try:
|
||||
input_tokens, phrase = command.strip().split(' ', 1)
|
||||
input_tokenset = frozenset(input_tokens.split(','))
|
||||
for token in input_tokenset:
|
||||
if token not in self.available_tokens:
|
||||
return "token not found!"
|
||||
values = (','.join(sorted(input_tokenset)), phrase.strip())
|
||||
self.db_conn.update(DB_QUERY, values)
|
||||
return self.YEP
|
||||
except Exception as e:
|
||||
return str(e)
|
||||
|
||||
def __add_user(self, caller: int, userspec: str) -> bool:
|
||||
if caller not in self.admins:
|
||||
|
@ -79,16 +83,15 @@ class Commander:
|
|||
self.db_conn.update(query, values)
|
||||
return self.YEP
|
||||
|
||||
|
||||
def __start_response(self) -> str:
|
||||
if self.responder.is_enabled():
|
||||
return self.responder.get_response(self.T_START_E)
|
||||
return self.responder.get_response(self.T_START)
|
||||
if self.responder.is_enabled():
|
||||
return self.responder.get_response(self.T_START_E)
|
||||
return self.responder.get_response(self.T_START)
|
||||
|
||||
def __stop_response(self) -> str:
|
||||
if self.responder.is_enabled():
|
||||
return self.responder.get_response(self.T_STOP)
|
||||
return self.responder.get_response(self.T_STOP_D)
|
||||
if self.responder.is_enabled():
|
||||
return self.responder.get_response(self.T_STOP)
|
||||
return self.responder.get_response(self.T_STOP_D)
|
||||
|
||||
def __list_users(self, users: list) -> str:
|
||||
userlist = [f'{user.id}: {self.namegen.get_tg_name(user)}'
|
||||
|
@ -109,35 +112,32 @@ class Commander:
|
|||
return None
|
||||
command = event.message.text
|
||||
sender = await event.get_sender()
|
||||
response = None
|
||||
if command.startswith('.add '):
|
||||
try:
|
||||
match command:
|
||||
case '.chat':
|
||||
response = str(chat_id)
|
||||
case '.list':
|
||||
response = ', '.join(self.available_tokens)
|
||||
case '.users':
|
||||
users = await self.client.get_participants(
|
||||
entity=event.message.peer_id)
|
||||
response = self.__list_users(users)
|
||||
case '.start':
|
||||
response = self.__start_response()
|
||||
self.responder.enable()
|
||||
case '.stop':
|
||||
response = self.__stop_response()
|
||||
self.responder.disable()
|
||||
case '.help':
|
||||
response = self.DOC
|
||||
case command if command.startswith('.add '):
|
||||
response = self.__add_response(sender.id, command[5:])
|
||||
except Exception as e:
|
||||
response = str(e)
|
||||
elif command.startswith('.adduser '):
|
||||
try:
|
||||
response = self.__add_user(sender.id, command[9:])
|
||||
except Exception as e:
|
||||
response = str(e)
|
||||
elif command == '.chat':
|
||||
response = str(chat_id)
|
||||
elif command == '.list':
|
||||
response = ', '.join(self.available_tokens)
|
||||
elif command.startswith('.regex '):
|
||||
response = self.__list_regex(command[7:].strip())
|
||||
elif command == '.users':
|
||||
users = await self.client.get_participants(
|
||||
entity=event.message.peer_id)
|
||||
response = self.__list_users(users)
|
||||
elif command == '.start':
|
||||
response = self.__start_response()
|
||||
self.responder.enable()
|
||||
elif command == '.stop':
|
||||
response = self.__stop_response()
|
||||
self.responder.disable()
|
||||
elif command == '.help':
|
||||
response = self.DOC
|
||||
if response:
|
||||
await self.client.send_message(event.message.peer_id, response)
|
||||
return None
|
||||
case command if command.startswith('.adduser '):
|
||||
try:
|
||||
response = self.__add_user(sender.id, command[9:])
|
||||
except Exception as e:
|
||||
response = str(e)
|
||||
case command if command.startswith('.regex '):
|
||||
response = self.__list_regex(command[7:].strip())
|
||||
case _:
|
||||
return None
|
||||
await self.client.send_message(event.message.peer_id, response)
|
||||
|
|
|
@ -30,6 +30,7 @@ class Cron:
|
|||
|
||||
def __mkjob(self, job: dict) -> callable:
|
||||
tokens = frozenset(job['tokens'].split(','))
|
||||
|
||||
async def send_message() -> None:
|
||||
if 'rand' in job:
|
||||
wait_seconds = random.randint(0, job['rand']) * 60
|
||||
|
|
|
@ -32,10 +32,14 @@ class MiscReactor:
|
|||
|
||||
def spawn_edited_handler(self,
|
||||
client: telethon.TelegramClient,
|
||||
trigger: telethon.events.common.EventBuilder) -> None:
|
||||
trigger: telethon.events.common.EventBuilder
|
||||
) -> None:
|
||||
@client.on(trigger)
|
||||
async def handle_edited_message(event):
|
||||
sender = await event.get_sender()
|
||||
sender_name = self.namegen.get_name(sender)
|
||||
chat_id = event.message.peer_id
|
||||
await client.send_message(chat_id, f'Я всё видел! {sender_name} совсем охуел, сообщения правит!')
|
||||
await client.send_message(
|
||||
chat_id,
|
||||
f'Я всё видел! {sender_name} совсем охуел, сообщения правит!'
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue