auto_chaos.base_system
🪐 Base system 🪐
1""" 2🪐 3Base system 4🪐 5""" 6import asyncio 7 8 9class BaseSystem: 10 """ 11 Defines the base system class, all systems must inherit from this 12 """ 13 14 def __init__(self) -> None: 15 self.results = [] 16 self.errors = [] 17 18 def do_action(self, data: str) -> tuple[list[str], list[str]]: 19 """ 20 Do the action defined in data. 21 22 Args: 23 data (str): ACTION_1 arg_1 arg_2, ACTION_2 arg_1 24 """ 25 self.results = [] 26 self.errors = [] 27 all_tasks = [] 28 29 # Remove quotes 30 if '"' in data: 31 data = data.replace('"', "") 32 if "'" in data: 33 data = data.replace("'", "") 34 # Parse the string and get the action 35 if "," in data: 36 datas = data.split(",") 37 elif "\n" in data: 38 datas = data.split("\n") 39 else: 40 datas = [data] 41 42 # Iterate over all actions and execute them in separate threads 43 for data in datas: 44 if not data: 45 continue 46 if data[0] == " ": 47 data = data[1:] 48 try: 49 all_tasks.append( 50 asyncio.to_thread( 51 # Parse the action string to get the action name (corresponding to a 52 # method name in lower case in the given system) and arguments 53 getattr(self, data.split(" ")[0].lower()), 54 data.split(" ")[1:], 55 ) 56 ) 57 except AttributeError: 58 pass 59 except Exception as error: 60 self.errors.append(error) 61 asyncio.get_event_loop().run_until_complete( 62 asyncio.gather(*all_tasks, return_exceptions=True) 63 ) 64 return self.results, self.errors 65 66 def default_action(self, args: list[str] = None): 67 """ 68 Actions must be defined like that default action 69 70 Args: 71 args (list[str], optional): List of arguments to use. Defaults to None. 72 """ 73 pass
class
BaseSystem:
10class BaseSystem: 11 """ 12 Defines the base system class, all systems must inherit from this 13 """ 14 15 def __init__(self) -> None: 16 self.results = [] 17 self.errors = [] 18 19 def do_action(self, data: str) -> tuple[list[str], list[str]]: 20 """ 21 Do the action defined in data. 22 23 Args: 24 data (str): ACTION_1 arg_1 arg_2, ACTION_2 arg_1 25 """ 26 self.results = [] 27 self.errors = [] 28 all_tasks = [] 29 30 # Remove quotes 31 if '"' in data: 32 data = data.replace('"', "") 33 if "'" in data: 34 data = data.replace("'", "") 35 # Parse the string and get the action 36 if "," in data: 37 datas = data.split(",") 38 elif "\n" in data: 39 datas = data.split("\n") 40 else: 41 datas = [data] 42 43 # Iterate over all actions and execute them in separate threads 44 for data in datas: 45 if not data: 46 continue 47 if data[0] == " ": 48 data = data[1:] 49 try: 50 all_tasks.append( 51 asyncio.to_thread( 52 # Parse the action string to get the action name (corresponding to a 53 # method name in lower case in the given system) and arguments 54 getattr(self, data.split(" ")[0].lower()), 55 data.split(" ")[1:], 56 ) 57 ) 58 except AttributeError: 59 pass 60 except Exception as error: 61 self.errors.append(error) 62 asyncio.get_event_loop().run_until_complete( 63 asyncio.gather(*all_tasks, return_exceptions=True) 64 ) 65 return self.results, self.errors 66 67 def default_action(self, args: list[str] = None): 68 """ 69 Actions must be defined like that default action 70 71 Args: 72 args (list[str], optional): List of arguments to use. Defaults to None. 73 """ 74 pass
Defines the base system class, all systems must inherit from this
def
do_action(self, data: str) -> tuple[list[str], list[str]]:
19 def do_action(self, data: str) -> tuple[list[str], list[str]]: 20 """ 21 Do the action defined in data. 22 23 Args: 24 data (str): ACTION_1 arg_1 arg_2, ACTION_2 arg_1 25 """ 26 self.results = [] 27 self.errors = [] 28 all_tasks = [] 29 30 # Remove quotes 31 if '"' in data: 32 data = data.replace('"', "") 33 if "'" in data: 34 data = data.replace("'", "") 35 # Parse the string and get the action 36 if "," in data: 37 datas = data.split(",") 38 elif "\n" in data: 39 datas = data.split("\n") 40 else: 41 datas = [data] 42 43 # Iterate over all actions and execute them in separate threads 44 for data in datas: 45 if not data: 46 continue 47 if data[0] == " ": 48 data = data[1:] 49 try: 50 all_tasks.append( 51 asyncio.to_thread( 52 # Parse the action string to get the action name (corresponding to a 53 # method name in lower case in the given system) and arguments 54 getattr(self, data.split(" ")[0].lower()), 55 data.split(" ")[1:], 56 ) 57 ) 58 except AttributeError: 59 pass 60 except Exception as error: 61 self.errors.append(error) 62 asyncio.get_event_loop().run_until_complete( 63 asyncio.gather(*all_tasks, return_exceptions=True) 64 ) 65 return self.results, self.errors
Do the action defined in data.
Arguments:
- data (str): ACTION_1 arg_1 arg_2, ACTION_2 arg_1
def
default_action(self, args: list[str] = None):
67 def default_action(self, args: list[str] = None): 68 """ 69 Actions must be defined like that default action 70 71 Args: 72 args (list[str], optional): List of arguments to use. Defaults to None. 73 """ 74 pass
Actions must be defined like that default action
Arguments:
- args (list[str], optional): List of arguments to use. Defaults to None.