gpt_enterprise.enterprise
🏢 Enterprise 🏢
1""" 2\U0001F3E2 3Enterprise 4\U0001F3E2 5""" 6 7import os 8 9from gpt_enterprise.team_leader import TeamLeader 10from gpt_enterprise.scrum_master import ScrumMaster 11 12 13class Enterprise: 14 """ 15 An enterprise is composed of several employees driven by managers and a CEO (you) 16 """ 17 18 def __init__( 19 self, 20 guidelines: str, 21 output_directory: str, 22 manager_retry: int, 23 company_name: str = "GPTenterprise", 24 interactive: bool = False, 25 asynchronous: bool = True, 26 ): 27 """ 28 Create an enterprise with CEO guidelines and hire: 29 - a team leader that wil be responsible of managing employees 30 - a scrum master that wil be responsible of managing tasks 31 32 Args: 33 guidelines (str): CEO guidelines 34 output_directory (str): Output directory 35 manager_retry (int): How many times manager will retry to do the plan 36 company_name (str, optional): Enterprise's name. Defaults to "GPTenterprise". 37 interactive (bool): Defaults to False 38 asynchronous (bool): Defaults to True 39 """ 40 self.company_name = company_name 41 self.employees = {} 42 self.tasks_board = [] 43 self.interactive = interactive 44 self.asynchronous = asynchronous 45 self.ceo_guidelines = guidelines 46 47 print(f"\U0001F468 {self.ceo_guidelines}") 48 49 # Create output directory if not exists 50 if not os.path.exists(output_directory): 51 os.makedirs(output_directory, exist_ok=True) 52 53 # Create the team leader 54 self.team_leader = TeamLeader( 55 ceo_guidelines=guidelines, 56 manager_retry=manager_retry, 57 output_directory=output_directory, 58 interactive=interactive, 59 ) 60 # Create the scrum master 61 self.scrum_master = ScrumMaster( 62 ceo_guidelines=guidelines, 63 manager_retry=manager_retry, 64 output_directory=output_directory, 65 interactive=interactive, 66 ) 67 68 async def run_enterprise(self) -> dict: 69 """ 70 Run the enterprise: 71 - Ask the team leader to hire employees 72 - Ask the scrum master to plan tasks 73 - Ask the scrum master to do tasks 74 - Return the employees production 75 76 Returns: 77 dict: Production wih employess and update tasks with result 78 """ 79 production = {} 80 # Hire needed employees 81 self.employees = self.team_leader.hire_employees() 82 # Extract tasks from manager plan 83 self.tasks_board = self.scrum_master.plan_tasks( 84 [employee for _, employee in self.employees.items()] 85 ) 86 # Do the plan asynchronously 87 if not self.asynchronous: 88 production_tasks_board = self.scrum_master.do_plan( 89 tasks=self.tasks_board, employees=self.employees 90 ) 91 else: 92 production_tasks_board = await self.scrum_master.do_plan_async( 93 tasks=self.tasks_board, employees=self.employees 94 ) 95 # Add CEO guidelines to the plan 96 production["ceo_guidelines"] = self.ceo_guidelines 97 # Return the result of the plan 98 production["tasks"] = production_tasks_board 99 # Add final result 100 production["final_result"] = production_tasks_board[-1]["result"] 101 if self.interactive: 102 if ( 103 "y" 104 in input( 105 f"{self.scrum_master.emoji} Here is the final result: \n {production['final_result']} \n Is this final result correct? (y/n) \n \U0001F468" 106 ).lower() 107 ): 108 print( 109 f"{self.scrum_master.emoji} Great ! Nice to have worked with you !" 110 ) 111 else: 112 if "y" in ( 113 input("Do you want me to retry ? (y/n)\n \U0001F468").lower() 114 or "no" 115 ): 116 return await self.run_enterprise() 117 118 return production
class
Enterprise:
14class Enterprise: 15 """ 16 An enterprise is composed of several employees driven by managers and a CEO (you) 17 """ 18 19 def __init__( 20 self, 21 guidelines: str, 22 output_directory: str, 23 manager_retry: int, 24 company_name: str = "GPTenterprise", 25 interactive: bool = False, 26 asynchronous: bool = True, 27 ): 28 """ 29 Create an enterprise with CEO guidelines and hire: 30 - a team leader that wil be responsible of managing employees 31 - a scrum master that wil be responsible of managing tasks 32 33 Args: 34 guidelines (str): CEO guidelines 35 output_directory (str): Output directory 36 manager_retry (int): How many times manager will retry to do the plan 37 company_name (str, optional): Enterprise's name. Defaults to "GPTenterprise". 38 interactive (bool): Defaults to False 39 asynchronous (bool): Defaults to True 40 """ 41 self.company_name = company_name 42 self.employees = {} 43 self.tasks_board = [] 44 self.interactive = interactive 45 self.asynchronous = asynchronous 46 self.ceo_guidelines = guidelines 47 48 print(f"\U0001F468 {self.ceo_guidelines}") 49 50 # Create output directory if not exists 51 if not os.path.exists(output_directory): 52 os.makedirs(output_directory, exist_ok=True) 53 54 # Create the team leader 55 self.team_leader = TeamLeader( 56 ceo_guidelines=guidelines, 57 manager_retry=manager_retry, 58 output_directory=output_directory, 59 interactive=interactive, 60 ) 61 # Create the scrum master 62 self.scrum_master = ScrumMaster( 63 ceo_guidelines=guidelines, 64 manager_retry=manager_retry, 65 output_directory=output_directory, 66 interactive=interactive, 67 ) 68 69 async def run_enterprise(self) -> dict: 70 """ 71 Run the enterprise: 72 - Ask the team leader to hire employees 73 - Ask the scrum master to plan tasks 74 - Ask the scrum master to do tasks 75 - Return the employees production 76 77 Returns: 78 dict: Production wih employess and update tasks with result 79 """ 80 production = {} 81 # Hire needed employees 82 self.employees = self.team_leader.hire_employees() 83 # Extract tasks from manager plan 84 self.tasks_board = self.scrum_master.plan_tasks( 85 [employee for _, employee in self.employees.items()] 86 ) 87 # Do the plan asynchronously 88 if not self.asynchronous: 89 production_tasks_board = self.scrum_master.do_plan( 90 tasks=self.tasks_board, employees=self.employees 91 ) 92 else: 93 production_tasks_board = await self.scrum_master.do_plan_async( 94 tasks=self.tasks_board, employees=self.employees 95 ) 96 # Add CEO guidelines to the plan 97 production["ceo_guidelines"] = self.ceo_guidelines 98 # Return the result of the plan 99 production["tasks"] = production_tasks_board 100 # Add final result 101 production["final_result"] = production_tasks_board[-1]["result"] 102 if self.interactive: 103 if ( 104 "y" 105 in input( 106 f"{self.scrum_master.emoji} Here is the final result: \n {production['final_result']} \n Is this final result correct? (y/n) \n \U0001F468" 107 ).lower() 108 ): 109 print( 110 f"{self.scrum_master.emoji} Great ! Nice to have worked with you !" 111 ) 112 else: 113 if "y" in ( 114 input("Do you want me to retry ? (y/n)\n \U0001F468").lower() 115 or "no" 116 ): 117 return await self.run_enterprise() 118 119 return production
An enterprise is composed of several employees driven by managers and a CEO (you)
Enterprise( guidelines: str, output_directory: str, manager_retry: int, company_name: str = 'GPTenterprise', interactive: bool = False, asynchronous: bool = True)
19 def __init__( 20 self, 21 guidelines: str, 22 output_directory: str, 23 manager_retry: int, 24 company_name: str = "GPTenterprise", 25 interactive: bool = False, 26 asynchronous: bool = True, 27 ): 28 """ 29 Create an enterprise with CEO guidelines and hire: 30 - a team leader that wil be responsible of managing employees 31 - a scrum master that wil be responsible of managing tasks 32 33 Args: 34 guidelines (str): CEO guidelines 35 output_directory (str): Output directory 36 manager_retry (int): How many times manager will retry to do the plan 37 company_name (str, optional): Enterprise's name. Defaults to "GPTenterprise". 38 interactive (bool): Defaults to False 39 asynchronous (bool): Defaults to True 40 """ 41 self.company_name = company_name 42 self.employees = {} 43 self.tasks_board = [] 44 self.interactive = interactive 45 self.asynchronous = asynchronous 46 self.ceo_guidelines = guidelines 47 48 print(f"\U0001F468 {self.ceo_guidelines}") 49 50 # Create output directory if not exists 51 if not os.path.exists(output_directory): 52 os.makedirs(output_directory, exist_ok=True) 53 54 # Create the team leader 55 self.team_leader = TeamLeader( 56 ceo_guidelines=guidelines, 57 manager_retry=manager_retry, 58 output_directory=output_directory, 59 interactive=interactive, 60 ) 61 # Create the scrum master 62 self.scrum_master = ScrumMaster( 63 ceo_guidelines=guidelines, 64 manager_retry=manager_retry, 65 output_directory=output_directory, 66 interactive=interactive, 67 )
Create an enterprise with CEO guidelines and hire:
- a team leader that wil be responsible of managing employees
- a scrum master that wil be responsible of managing tasks
Arguments:
- guidelines (str): CEO guidelines
- output_directory (str): Output directory
- manager_retry (int): How many times manager will retry to do the plan
- company_name (str, optional): Enterprise's name. Defaults to "GPTenterprise".
- interactive (bool): Defaults to False
- asynchronous (bool): Defaults to True
async def
run_enterprise(self) -> dict:
69 async def run_enterprise(self) -> dict: 70 """ 71 Run the enterprise: 72 - Ask the team leader to hire employees 73 - Ask the scrum master to plan tasks 74 - Ask the scrum master to do tasks 75 - Return the employees production 76 77 Returns: 78 dict: Production wih employess and update tasks with result 79 """ 80 production = {} 81 # Hire needed employees 82 self.employees = self.team_leader.hire_employees() 83 # Extract tasks from manager plan 84 self.tasks_board = self.scrum_master.plan_tasks( 85 [employee for _, employee in self.employees.items()] 86 ) 87 # Do the plan asynchronously 88 if not self.asynchronous: 89 production_tasks_board = self.scrum_master.do_plan( 90 tasks=self.tasks_board, employees=self.employees 91 ) 92 else: 93 production_tasks_board = await self.scrum_master.do_plan_async( 94 tasks=self.tasks_board, employees=self.employees 95 ) 96 # Add CEO guidelines to the plan 97 production["ceo_guidelines"] = self.ceo_guidelines 98 # Return the result of the plan 99 production["tasks"] = production_tasks_board 100 # Add final result 101 production["final_result"] = production_tasks_board[-1]["result"] 102 if self.interactive: 103 if ( 104 "y" 105 in input( 106 f"{self.scrum_master.emoji} Here is the final result: \n {production['final_result']} \n Is this final result correct? (y/n) \n \U0001F468" 107 ).lower() 108 ): 109 print( 110 f"{self.scrum_master.emoji} Great ! Nice to have worked with you !" 111 ) 112 else: 113 if "y" in ( 114 input("Do you want me to retry ? (y/n)\n \U0001F468").lower() 115 or "no" 116 ): 117 return await self.run_enterprise() 118 119 return production
Run the enterprise:
- Ask the team leader to hire employees
- Ask the scrum master to plan tasks
- Ask the scrum master to do tasks
- Return the employees production
Returns:
dict: Production wih employess and update tasks with result