auto_chaos.base_system

🪐 Base system 🪐

 1"""
 2🪐
 3Base system
 4🪐
 5"""
 6import asyncio
 7
 8
 9class BaseSystem:
10    """
11    Defines the base system class, all systems must inherit from this
12    """
13
14    def __init__(self) -> None:
15        self.results = []
16        self.errors = []
17
18    def do_action(self, data: str) -> tuple[list[str], list[str]]:
19        """
20        Do the action defined in data.
21
22        Args:
23            data (str): ACTION_1 arg_1 arg_2, ACTION_2 arg_1
24        """
25        self.results = []
26        self.errors = []
27        all_tasks = []
28
29        # Remove quotes
30        if '"' in data:
31            data = data.replace('"', "")
32        if "'" in data:
33            data = data.replace("'", "")
34        # Parse the string and get the action
35        if "," in data:
36            datas = data.split(",")
37        elif "\n" in data:
38            datas = data.split("\n")
39        else:
40            datas = [data]
41
42        # Iterate over all actions and execute them in separate threads
43        for data in datas:
44            if not data:
45                continue
46            if data[0] == " ":
47                data = data[1:]
48            try:
49                all_tasks.append(
50                    asyncio.to_thread(
51                        # Parse the action string to get the action name (corresponding to a
52                        # method name in lower case in the given system) and arguments
53                        getattr(self, data.split(" ")[0].lower()),
54                        data.split(" ")[1:],
55                    )
56                )
57            except AttributeError:
58                pass
59            except Exception as error:
60                self.errors.append(error)
61        asyncio.get_event_loop().run_until_complete(
62            asyncio.gather(*all_tasks, return_exceptions=True)
63        )
64        return self.results, self.errors
65
66    def default_action(self, args: list[str] = None):
67        """
68        Actions must be defined like that default action
69
70        Args:
71            args (list[str], optional): List of arguments to use. Defaults to None.
72        """
73        pass
class BaseSystem:
10class BaseSystem:
11    """
12    Defines the base system class, all systems must inherit from this
13    """
14
15    def __init__(self) -> None:
16        self.results = []
17        self.errors = []
18
19    def do_action(self, data: str) -> tuple[list[str], list[str]]:
20        """
21        Do the action defined in data.
22
23        Args:
24            data (str): ACTION_1 arg_1 arg_2, ACTION_2 arg_1
25        """
26        self.results = []
27        self.errors = []
28        all_tasks = []
29
30        # Remove quotes
31        if '"' in data:
32            data = data.replace('"', "")
33        if "'" in data:
34            data = data.replace("'", "")
35        # Parse the string and get the action
36        if "," in data:
37            datas = data.split(",")
38        elif "\n" in data:
39            datas = data.split("\n")
40        else:
41            datas = [data]
42
43        # Iterate over all actions and execute them in separate threads
44        for data in datas:
45            if not data:
46                continue
47            if data[0] == " ":
48                data = data[1:]
49            try:
50                all_tasks.append(
51                    asyncio.to_thread(
52                        # Parse the action string to get the action name (corresponding to a
53                        # method name in lower case in the given system) and arguments
54                        getattr(self, data.split(" ")[0].lower()),
55                        data.split(" ")[1:],
56                    )
57                )
58            except AttributeError:
59                pass
60            except Exception as error:
61                self.errors.append(error)
62        asyncio.get_event_loop().run_until_complete(
63            asyncio.gather(*all_tasks, return_exceptions=True)
64        )
65        return self.results, self.errors
66
67    def default_action(self, args: list[str] = None):
68        """
69        Actions must be defined like that default action
70
71        Args:
72            args (list[str], optional): List of arguments to use. Defaults to None.
73        """
74        pass

Defines the base system class, all systems must inherit from this

results
errors
def do_action(self, data: str) -> tuple[list[str], list[str]]:
19    def do_action(self, data: str) -> tuple[list[str], list[str]]:
20        """
21        Do the action defined in data.
22
23        Args:
24            data (str): ACTION_1 arg_1 arg_2, ACTION_2 arg_1
25        """
26        self.results = []
27        self.errors = []
28        all_tasks = []
29
30        # Remove quotes
31        if '"' in data:
32            data = data.replace('"', "")
33        if "'" in data:
34            data = data.replace("'", "")
35        # Parse the string and get the action
36        if "," in data:
37            datas = data.split(",")
38        elif "\n" in data:
39            datas = data.split("\n")
40        else:
41            datas = [data]
42
43        # Iterate over all actions and execute them in separate threads
44        for data in datas:
45            if not data:
46                continue
47            if data[0] == " ":
48                data = data[1:]
49            try:
50                all_tasks.append(
51                    asyncio.to_thread(
52                        # Parse the action string to get the action name (corresponding to a
53                        # method name in lower case in the given system) and arguments
54                        getattr(self, data.split(" ")[0].lower()),
55                        data.split(" ")[1:],
56                    )
57                )
58            except AttributeError:
59                pass
60            except Exception as error:
61                self.errors.append(error)
62        asyncio.get_event_loop().run_until_complete(
63            asyncio.gather(*all_tasks, return_exceptions=True)
64        )
65        return self.results, self.errors

Do the action defined in data.

Arguments:
  • data (str): ACTION_1 arg_1 arg_2, ACTION_2 arg_1
def default_action(self, args: list[str] = None):
67    def default_action(self, args: list[str] = None):
68        """
69        Actions must be defined like that default action
70
71        Args:
72            args (list[str], optional): List of arguments to use. Defaults to None.
73        """
74        pass

Actions must be defined like that default action

Arguments:
  • args (list[str], optional): List of arguments to use. Defaults to None.