import re import telethon import yaml import pgbotlib.api import pgbotlib.dbstuff import pgbotlib.misc def get_token(token_name: str, token_regex: list) -> tuple: regex = [] for i in token_regex: regex.append(re.compile(i)) return token_name, regex def get_tokens(path: str) -> list: with open(path, 'r', encoding='utf-8') as data: tokens = yaml.safe_load(data.read()) return [get_token(i, tokens[i]) for i in tokens] class Responder: def __init__(self, config: dict, client: telethon.TelegramClient, db_connection: pgbotlib.dbstuff.DBConn, namegen: pgbotlib.misc.NameGenerator) -> None: # apiregex matches "{apiname}optional data" # message itself is also passed to the api call method self.enabled = True self.apiregex = re.compile(r'^\{(\w+)\}(.+)?$') self.namegen = pgbotlib.misc.NameGenerator(config, db_connection) self.tokens = get_tokens( config.get('response_tokens', pgbotlib.misc.Paths.TOKENS) ) self.chats = config['chats'] self.api = pgbotlib.api.ApiWrapper(self.tokens, db_connection) self.db_connection = db_connection self.client = client def tokenize(self, message: str) -> frozenset: tokens = set() for token, regexi in self.tokens: for regex in regexi: if regex.search(message): tokens.add(token) break return frozenset(tokens) def __get_keys(self) -> dict: result = {} query = 'SELECT DISTINCT tokens FROM responses' for i in self.db_connection.query_raw(query, tuple()): result[frozenset(i[0].split(','))] = i[0] return result def __response_choice(self, key: str) -> str: return self.db_connection.query_random( "SELECT response FROM responses WHERE tokens = %s", (key,)) def enable(self) -> None: self.enabled = True def disable(self) -> None: self.enabled = False def is_enabled(self) -> bool: return self.enabled def get_response(self, tokens: frozenset) -> str: counter = 0 keys = self.__get_keys() for items, string in keys.items(): if items <= tokens: # check for priority tokens for token in items: if token.startswith('!'): return self.__response_choice(string) match_length = len(items & tokens) if match_length > counter: counter = match_length key = string if not counter: return None return self.__response_choice(key) def api_match(self, response: str, message: str) -> str: match = self.apiregex.search(response) if not match: return response api_spec = match.groups() return self.api.call(*api_spec, message) async def username(self, response: str, event: telethon.events.common.EventBuilder) -> str: if '' not in response: return response sender = await event.get_sender() username = self.namegen.get_name(sender) return response.replace('', username) async def respond(self, event: telethon.events.common.EventBuilder) -> None: if not self.enabled: return None chat_id = telethon.utils.get_peer_id(event.message.peer_id) if chat_id not in self.chats: return None message = event.message.text.lower() tokens = self.tokenize(message) response = self.get_response(tokens) if not response: return None response = self.api_match(response, message) response = await self.username(response, event) await self.client.send_message(event.message.peer_id, response)