fix: fixed issues with tools not running

Former-commit-id: 58ed7826261ad0421f4066ed55f06949a56b6781
pull/160/head
Zack 1 year ago
parent 9c8ad849e8
commit cf32782a3d

@ -1,6 +1,6 @@
import gradio as gr
from bmtools.agent.tools_controller import MTQuestionAnswerer, load_valid_tools
from bmtools.agent.singletool import STQuestionAnswerer
from swarms.tools.tools_controller import MTQuestionAnswerer, load_valid_tools
from swarms.tools.singletool import STQuestionAnswerer
from langchain.schema import AgentFinish
import os
import requests

@ -30,6 +30,7 @@ from . import job_search
from . import gradio_tools
from . import travel
from . import walmart
from . import agent

@ -0,0 +1,317 @@
from collections import deque
from typing import Dict, List, Optional, Any
import re
from langchain import LLMChain, OpenAI, PromptTemplate, SerpAPIWrapper
from langchain.embeddings import OpenAIEmbeddings
from langchain.llms import BaseLLM
from langchain.vectorstores.base import VectorStore
from pydantic import BaseModel, Field
from langchain.chains.base import Chain
from langchain.vectorstores import FAISS
import faiss
from langchain.docstore import InMemoryDocstore
from langchain.agents import ZeroShotAgent, Tool, AgentExecutor
from bmtools.agent.executor import Executor, AgentExecutorWithTranslation
class ContextAwareAgent(ZeroShotAgent):
def get_full_inputs(
self, intermediate_steps, **kwargs: Any
) -> Dict[str, Any]:
"""Create the full inputs for the LLMChain from intermediate steps."""
thoughts = self._construct_scratchpad(intermediate_steps)
new_inputs = {"agent_scratchpad": thoughts, "stop": self._stop}
full_inputs = {**kwargs, **new_inputs}
return full_inputs
def _construct_scratchpad(self, intermediate_steps):
"""Construct the scratchpad that lets the agent continue its thought process."""
thoughts = ""
# only modify the following line, [-2: ]
for action, observation in intermediate_steps[-2: ]:
thoughts += action.log
thoughts += f"\n{self.observation_prefix}{observation}\n{self.llm_prefix}"
if "is not a valid tool, try another one" in observation:
thoughts += "You should select another tool rather than the invalid one.\n"
return thoughts
class TaskCreationChain(LLMChain):
"""Chain to generates tasks."""
@classmethod
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
"""Get the response parser."""
task_creation_template = (
"You are an task creation AI that uses the result of an execution agent"
" to create new tasks with the following objective: {objective},"
" The last completed task has the result: {result}."
" This result was based on this task description: {task_description}."
" These are incomplete tasks: {incomplete_tasks}."
" Based on the result, create new tasks to be completed"
" by the AI system that do not overlap with incomplete tasks."
" For a simple objective, do not generate complex todo lists."
" Do not generate repetitive tasks (e.g., tasks that have already been completed)."
" If there is not futher task needed to complete the objective, return NO TASK."
" Now return the tasks as an array."
)
prompt = PromptTemplate(
template=task_creation_template,
input_variables=["result", "task_description", "incomplete_tasks", "objective"],
)
return cls(prompt=prompt, llm=llm, verbose=verbose)
class InitialTaskCreationChain(LLMChain):
"""Chain to generates tasks."""
@classmethod
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
"""Get the response parser."""
task_creation_template = (
"You are a planner who is an expert at coming up with a todo list for a given objective. For a simple objective, do not generate a complex todo list. Generate the first (only one) task needed to do for this objective: {objective}"
)
prompt = PromptTemplate(
template=task_creation_template,
input_variables=["objective"],
)
return cls(prompt=prompt, llm=llm, verbose=verbose)
class TaskPrioritizationChain(LLMChain):
"""Chain to prioritize tasks."""
@classmethod
def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
"""Get the response parser."""
task_prioritization_template = (
"You are an task prioritization AI tasked with cleaning the formatting of and reprioritizing"
" the following tasks: {task_names}."
" Consider the ultimate objective of your team: {objective}."
" Do not make up any tasks, just reorganize the existing tasks."
" Do not remove any tasks. Return the result as a numbered list, like:"
" #. First task"
" #. Second task"
" Start the task list with number {next_task_id}. (e.g., 2. ***, 3. ***, etc.)"
)
prompt = PromptTemplate(
template=task_prioritization_template,
input_variables=["task_names", "next_task_id", "objective"],
)
return cls(prompt=prompt, llm=llm, verbose=verbose)
def get_next_task(task_creation_chain: LLMChain, result: Dict, task_description: str, task_list: List[str], objective: str) -> List[Dict]:
"""Get the next task."""
incomplete_tasks = ", ".join(task_list)
response = task_creation_chain.run(result=result, task_description=task_description, incomplete_tasks=incomplete_tasks, objective=objective)
# change the split method to re matching
# new_tasks = response.split('\n')
task_pattern = re.compile(r'\d+\. (.+?)\n')
new_tasks = task_pattern.findall(response)
return [{"task_name": task_name} for task_name in new_tasks if task_name.strip()]
def prioritize_tasks(task_prioritization_chain: LLMChain, this_task_id: int, task_list: List[Dict], objective: str) -> List[Dict]:
"""Prioritize tasks."""
task_names = [t["task_name"] for t in task_list]
next_task_id = int(this_task_id) + 1
response = task_prioritization_chain.run(task_names=task_names, next_task_id=next_task_id, objective=objective)
new_tasks = response.split('\n')
prioritized_task_list = []
for task_string in new_tasks:
if not task_string.strip():
continue
task_parts = task_string.strip().split(".", 1)
if len(task_parts) == 2:
task_id = task_parts[0].strip()
task_name = task_parts[1].strip()
prioritized_task_list.append({"task_id": task_id, "task_name": task_name})
return prioritized_task_list
def _get_top_tasks(vectorstore, query: str, k: int) -> List[str]:
"""Get the top k tasks based on the query."""
results = vectorstore.similarity_search_with_score(query, k=k)
if not results:
return []
sorted_results, _ = zip(*sorted(results, key=lambda x: x[1], reverse=True))
return [str(item.metadata['task']) for item in sorted_results]
def execute_task(vectorstore, execution_chain: LLMChain, objective: str, task: str, k: int = 5) -> str:
"""Execute a task."""
context = _get_top_tasks(vectorstore, query=objective, k=k)
return execution_chain.run(objective=objective, context=context, task=task)
class BabyAGI(Chain, BaseModel):
"""Controller model for the BabyAGI agent."""
task_list: deque = Field(default_factory=deque)
task_creation_chain: TaskCreationChain = Field(...)
task_prioritization_chain: TaskPrioritizationChain = Field(...)
initial_task_creation_chain: InitialTaskCreationChain = Field(...)
execution_chain: AgentExecutor = Field(...)
task_id_counter: int = Field(1)
vectorstore: VectorStore = Field(init=False)
max_iterations: Optional[int] = None
class Config:
"""Configuration for this pydantic object."""
arbitrary_types_allowed = True
def add_task(self, task: Dict):
self.task_list.append(task)
def print_task_list(self):
print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")
for t in self.task_list:
print(str(t["task_id"]) + ": " + t["task_name"])
def print_next_task(self, task: Dict):
print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
print(str(task["task_id"]) + ": " + task["task_name"])
def print_task_result(self, result: str):
print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
print(result)
@property
def input_keys(self) -> List[str]:
return ["objective"]
@property
def output_keys(self) -> List[str]:
return []
def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Run the agent."""
# not an elegant implementation, but it works for the first task
objective = inputs['objective']
first_task = inputs.get("first_task", self.initial_task_creation_chain.run(objective=objective))# self.task_creation_chain.llm(initial_task_prompt))
self.add_task({"task_id": 1, "task_name": first_task})
num_iters = 0
while True:
if self.task_list:
self.print_task_list()
# Step 1: Pull the first task
task = self.task_list.popleft()
self.print_next_task(task)
# Step 2: Execute the task
result = execute_task(
self.vectorstore, self.execution_chain, objective, task["task_name"]
)
this_task_id = int(task["task_id"])
self.print_task_result(result)
# Step 3: Store the result in Pinecone
result_id = f"result_{task['task_id']}"
self.vectorstore.add_texts(
texts=[result],
metadatas=[{"task": task["task_name"]}],
ids=[result_id],
)
# Step 4: Create new tasks and reprioritize task list
new_tasks = get_next_task(
self.task_creation_chain, result, task["task_name"], [t["task_name"] for t in self.task_list], objective
)
for new_task in new_tasks:
self.task_id_counter += 1
new_task.update({"task_id": self.task_id_counter})
self.add_task(new_task)
if len(self.task_list) == 0:
print("\033[91m\033[1m" + "\n*****NO TASK, ABORTING*****\n" + "\033[0m\033[0m")
break
self.task_list = deque(
prioritize_tasks(
self.task_prioritization_chain, this_task_id, list(self.task_list), objective
)
)
num_iters += 1
if self.max_iterations is not None and num_iters == self.max_iterations:
print("\033[91m\033[1m" + "\n*****TASK ENDING*****\n" + "\033[0m\033[0m")
break
return {}
@classmethod
def from_llm(
cls,
llm: BaseLLM,
prompt = None,
verbose: bool = False,
tools = None,
stream_output = None,
**kwargs
) -> "BabyAGI":
embeddings_model = OpenAIEmbeddings()
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
task_creation_chain = TaskCreationChain.from_llm(
llm, verbose=verbose
)
initial_task_creation_chain = InitialTaskCreationChain.from_llm(
llm, verbose=verbose
)
task_prioritization_chain = TaskPrioritizationChain.from_llm(
llm, verbose=verbose
)
llm_chain = LLMChain(llm=llm, prompt=prompt)
tool_names = [tool.name for tool in tools]
agent = ContextAwareAgent(llm_chain=llm_chain, allowed_tools=tool_names)
if stream_output:
agent_executor = Executor.from_agent_and_tools(agent=agent, tools=tools, verbose=True)
else:
agent_executor = AgentExecutorWithTranslation.from_agent_and_tools(agent=agent, tools=tools, verbose=True)
return cls(
task_creation_chain=task_creation_chain,
task_prioritization_chain=task_prioritization_chain,
initial_task_creation_chain=initial_task_creation_chain,
execution_chain=agent_executor,
vectorstore=vectorstore,
**kwargs
)
if __name__ == "__main__":
todo_prompt = PromptTemplate.from_template("You are a planner who is an expert at coming up with a todo list for a given objective. For a simple objective, do not generate a complex todo list. Come up with a todo list for this objective: {objective}")
todo_chain = LLMChain(llm=OpenAI(temperature=0), prompt=todo_prompt)
search = SerpAPIWrapper()
tools = [
Tool(
name = "Search",
func=search.run,
description="useful for when you need to answer questions about current events"
),
Tool(
name = "TODO",
func=todo_chain.run,
description="useful for when you need to come up with todo lists. Input: an objective to create a todo list for. Output: a todo list for that objective. Please be very clear what the objective is!"
)
]
prefix = """You are an AI who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}."""
suffix = """Question: {task}
{agent_scratchpad}"""
prompt = ZeroShotAgent.create_prompt(
tools,
prefix=prefix,
suffix=suffix,
input_variables=["objective", "task", "context","agent_scratchpad"]
)
OBJECTIVE = "Write a weather report for SF today"
llm = OpenAI(temperature=0)
# Logging of LLMChains
verbose=False
# If None, will keep on going forever
max_iterations: Optional[int] = 10
baby_agi = BabyAGI.from_llm(
llm=llm,
verbose=verbose,
max_iterations=max_iterations
)
baby_agi({"objective": OBJECTIVE})

@ -0,0 +1,152 @@
"""Interface for tools."""
from inspect import signature
from typing import Any, Awaitable, Callable, Optional, Union
from langchain.agents import Tool as LangChainTool
from langchain.tools.base import BaseTool
import requests
import json
import aiohttp
import http.client
http.client._MAXLINE = 655360
from bmtools import get_logger
logger = get_logger(__name__)
class Tool(LangChainTool):
tool_logo_md: str = ""
class RequestTool(BaseTool):
"""Tool that takes in function or coroutine directly."""
description: str = ""
func: Callable[[str], str]
afunc: Callable[[str], str]
coroutine: Optional[Callable[[str], Awaitable[str]]] = None
max_output_len = 4000
tool_logo_md: str = ""
def _run(self, tool_input: str) -> str:
"""Use the tool."""
return self.func(tool_input)
async def _arun(self, tool_input: str) -> str:
"""Use the tool asynchronously."""
ret = await self.afunc(tool_input)
return ret
def convert_prompt(self,params):
lines = "Your input should be a json (args json schema): {{"
for p in params:
logger.debug(p)
optional = not p['required']
description = p.get('description', '')
if len(description) > 0:
description = "("+description+")"
lines += '"{name}" : {type}{desc}, '.format(name=p['name'],
type= p['schema']['type'],
optional=optional,
desc=description)
lines += "}}"
return lines
def __init__(self, root_url, func_url, method, request_info, **kwargs):
""" Store the function, description, and tool_name in a class to store the information
"""
url = root_url + func_url
def func(json_args):
if isinstance(json_args, str):
try:
json_args = json.loads(json_args)
except:
return "Your input can not be parsed as json, please use thought."
if "tool_input" in json_args:
json_args = json_args["tool_input"]
# if it's post put patch, then we do json
if method.lower() in ['post', 'put', 'patch']:
response = getattr(requests, method.lower())(url, json=json_args)
else:
# for other methods, we use get, and use json_args as query params
response = requests.get(url, params=json_args)
if response.status_code == 200:
message = response.text
else:
message = f"Error code {response.status_code}. You can try (1) Change your input (2) Call another function. (If the same error code is produced more than 4 times, please use Thought: I can not use these APIs, so I will stop. Final Answer: No Answer, please check the APIs.)"
message = message[:self.max_output_len] # TODO: not rigorous, to improve
return message
def convert_openapi_to_params(request_body):
if not request_body:
return []
params = []
for content_type, content in request_body['content'].items():
schema = content['schema']
properties = schema.get('properties', {})
required = schema.get('required', [])
for key, value in properties.items():
param = {
'name': key,
'schema': value,
'required': key in required,
'description': value.get('description', '')
}
if content_type == 'multipart/form-data' and value.get('format') == 'binary':
param['type'] = 'file'
elif content_type in ['application/x-www-form-urlencoded', 'multipart/form-data']:
param['type'] = 'form'
else:
param['type'] = 'json'
params.append(param)
return params
async def afunc(json_args):
if isinstance(json_args, str):
try:
json_args = json.loads(json_args)
except:
return "Your input can not be parsed as json, please use thought."
if "tool_input" in json_args:
json_args = json_args["tool_input"]
async with aiohttp.ClientSession() as session:
async with session.get(url, params=json_args) as response:
if response.status == 200:
message = await response.text()
else:
message = f"Error code {response.status_code}. You can try (1) Change your input (2) Call another function. (If the same error code is produced more than 4 times, please use Thought: I can not use these APIs, so I will stop. Final Answer: No Answer, please check the APIs.)"
message = message[:self.max_output_len] # TODO: not rigorous, to improve
return message
tool_name = func_url.replace("/", ".").strip(".")
str_doc = ''
if 'parameters' in request_info[method]:
str_doc = self.convert_prompt(request_info[method]['parameters'])
if 'requestBody' in request_info[method]:
str_doc = str_doc + "\n" + self.convert_prompt(
convert_openapi_to_params(request_info[method]['requestBody']))
# description = f"- {tool_name}:\n" + \
# request_info[method].get('summary', '').replace("{", "{{").replace("}", "}}") \
description = request_info[method].get('description','').replace("{", "{{").replace("}", "}}") \
+ ". " \
+ str_doc \
+ f" The Action to trigger this API should be {tool_name} and the input parameters should be a json dict string. Pay attention to the type of parameters."
logger.info("API Name: {}".format(tool_name))
logger.info("API Description: {}".format(description))
super(RequestTool, self).__init__(
name=tool_name, func=func, afunc=afunc, description=description, **kwargs
)

@ -0,0 +1,137 @@
from __future__ import annotations
from typing import List, Optional
from pydantic import ValidationError
from langchain.chains.llm import LLMChain
from langchain.chat_models.base import BaseChatModel
from langchain.experimental.autonomous_agents.autogpt.output_parser import (
AutoGPTOutputParser,
BaseAutoGPTOutputParser,
)
from .prompt import AutoGPTPrompt
from langchain.experimental.autonomous_agents.autogpt.prompt_generator import (
FINISH_NAME,
)
from langchain.schema import (
AIMessage,
BaseMessage,
Document,
HumanMessage,
SystemMessage,
)
from langchain.tools.base import BaseTool
from langchain.tools.human.tool import HumanInputRun
from langchain.vectorstores.base import VectorStoreRetriever
import json
class AutoGPT:
"""Agent class for interacting with Auto-GPT."""
def __init__(
self,
ai_name: str,
memory: VectorStoreRetriever,
chain: LLMChain,
output_parser: BaseAutoGPTOutputParser,
tools: List[BaseTool],
feedback_tool: Optional[HumanInputRun] = None,
):
self.ai_name = ai_name
self.memory = memory
self.full_message_history: List[BaseMessage] = []
self.next_action_count = 0
self.chain = chain
self.output_parser = output_parser
self.tools = tools
self.feedback_tool = feedback_tool
@classmethod
def from_llm_and_tools(
cls,
ai_name: str,
ai_role: str,
memory: VectorStoreRetriever,
tools: List[BaseTool],
llm: BaseChatModel,
human_in_the_loop: bool = False,
output_parser: Optional[BaseAutoGPTOutputParser] = None,
) -> AutoGPT:
prompt = AutoGPTPrompt(
ai_name=ai_name,
ai_role=ai_role,
tools=tools,
input_variables=["memory", "messages", "goals", "user_input"],
token_counter=llm.get_num_tokens,
)
human_feedback_tool = HumanInputRun() if human_in_the_loop else None
chain = LLMChain(llm=llm, prompt=prompt)
return cls(
ai_name,
memory,
chain,
output_parser or AutoGPTOutputParser(),
tools,
feedback_tool=human_feedback_tool,
)
def run(self, goals: List[str]) -> str:
user_input = (
"Determine which next command to use, "
"and respond using the format specified above:"
)
# Interaction Loop
loop_count = 0
while True:
# Discontinue if continuous limit is reached
loop_count += 1
# Send message to AI, get response
assistant_reply = self.chain.run(
goals=goals,
messages=self.full_message_history,
memory=self.memory,
user_input=user_input,
)
# Print Assistant thoughts
print(assistant_reply)
self.full_message_history.append(HumanMessage(content=user_input))
self.full_message_history.append(AIMessage(content=assistant_reply))
# Get command name and arguments
action = self.output_parser.parse(assistant_reply)
tools = {t.name: t for t in self.tools}
if action.name == FINISH_NAME:
return action.args["response"]
if action.name in tools:
tool = tools[action.name]
try:
# for tools in BMTools, the input should be string, while for default langchain toosl, the input is in json format, here we modify the following code
json_args = json.dumps(action.args)
observation = tool.run(json_args)
except ValidationError as e:
observation = f"Error in args: {str(e)}"
result = f"Command {tool.name} returned: {observation}"
elif action.name == "ERROR":
result = f"Error: {action.args}. "
else:
result = (
f"Unknown command '{action.name}'. "
f"Please refer to the 'COMMANDS' list for available "
f"commands and only respond in the specified JSON format."
)
memory_to_add = (
f"Assistant Reply: {assistant_reply} " f"\nResult: {result} "
)
if self.feedback_tool is not None:
feedback = f"\n{self.feedback_tool.run('Input: ')}"
if feedback in {"q", "stop"}:
print("EXITING")
return "EXITING"
memory_to_add += feedback
self.memory.add_documents([Document(page_content=memory_to_add)])
self.full_message_history.append(SystemMessage(content=result))

@ -0,0 +1,66 @@
import json
import re
from abc import abstractmethod
from typing import Dict, NamedTuple
from langchain.schema import BaseOutputParser
class AutoGPTAction(NamedTuple):
"""Action returned by AutoGPTOutputParser."""
name: str
args: Dict
class BaseAutoGPTOutputParser(BaseOutputParser):
"""Base Output parser for AutoGPT."""
@abstractmethod
def parse(self, text: str) -> AutoGPTAction:
"""Return AutoGPTAction"""
def preprocess_json_input(input_str: str) -> str:
"""Preprocesses a string to be parsed as json.
Replace single backslashes with double backslashes,
while leaving already escaped ones intact.
Args:
input_str: String to be preprocessed
Returns:
Preprocessed string
"""
corrected_str = re.sub(
r'(?<!\\)\\(?!["\\/bfnrt]|u[0-9a-fA-F]{4})', r"\\\\", input_str
)
return corrected_str
class AutoGPTOutputParser(BaseAutoGPTOutputParser):
"""Output parser for AutoGPT."""
def parse(self, text: str) -> AutoGPTAction:
try:
parsed = json.loads(text, strict=False)
except json.JSONDecodeError:
preprocessed_text = preprocess_json_input(text)
try:
parsed = json.loads(preprocessed_text, strict=False)
except Exception:
return AutoGPTAction(
name="ERROR",
args={"error": f"Could not parse invalid json: {text}"},
)
try:
return AutoGPTAction(
name=parsed["command"]["name"],
args=parsed["command"]["args"],
)
except (KeyError, TypeError):
# If the command is null or incomplete, return an erroneous tool
return AutoGPTAction(
name="ERROR", args={"error": f"Incomplete command args: {parsed}"}
)

@ -0,0 +1,75 @@
import time
from typing import Any, Callable, List
from pydantic import BaseModel
from .prompt_generator import get_prompt
from langchain.prompts.chat import (
BaseChatPromptTemplate,
)
from langchain.schema import BaseMessage, HumanMessage, SystemMessage
from langchain.tools.base import BaseTool
from langchain.vectorstores.base import VectorStoreRetriever
class AutoGPTPrompt(BaseChatPromptTemplate, BaseModel):
ai_name: str
ai_role: str
tools: List[BaseTool]
token_counter: Callable[[str], int]
send_token_limit: int = 4196
def construct_full_prompt(self, goals: List[str]) -> str:
prompt_start = """Your decisions must always be made independently
without seeking user assistance. Play to your strengths
as an LLM and pursue simple strategies with no legal complications.
If you have completed all your tasks,
make sure to use the "finish" command."""
# Construct full prompt
full_prompt = (
f"You are {self.ai_name}, {self.ai_role}\n{prompt_start}\n\nGOALS:\n\n"
)
for i, goal in enumerate(goals):
full_prompt += f"{i+1}. {goal}\n"
full_prompt += f"\n\n{get_prompt(self.tools)}"
return full_prompt
def format_messages(self, **kwargs: Any) -> List[BaseMessage]:
base_prompt = SystemMessage(content=self.construct_full_prompt(kwargs["goals"]))
time_prompt = SystemMessage(
content=f"The current time and date is {time.strftime('%c')}"
)
used_tokens = self.token_counter(base_prompt.content) + self.token_counter(
time_prompt.content
)
memory: VectorStoreRetriever = kwargs["memory"]
previous_messages = kwargs["messages"]
relevant_docs = memory.get_relevant_documents(str(previous_messages[-10:]))
relevant_memory = [d.page_content for d in relevant_docs]
relevant_memory_tokens = sum(
[self.token_counter(doc) for doc in relevant_memory]
)
while used_tokens + relevant_memory_tokens > 2500:
relevant_memory = relevant_memory[:-1]
relevant_memory_tokens = sum(
[self.token_counter(doc) for doc in relevant_memory]
)
content_format = (
f"This reminds you of these events "
f"from your past:\n{relevant_memory}\n\n"
)
memory_message = SystemMessage(content=content_format)
used_tokens += len(memory_message.content)
historical_messages: List[BaseMessage] = []
for message in previous_messages[-10:][::-1]:
message_tokens = self.token_counter(message.content)
if used_tokens + message_tokens > self.send_token_limit - 1000:
break
historical_messages = [message] + historical_messages
input_message = HumanMessage(content=kwargs["user_input"])
messages: List[BaseMessage] = [base_prompt, time_prompt, memory_message]
messages += historical_messages
messages.append(input_message)
return messages

@ -0,0 +1,189 @@
import json
from typing import List
from langchain.tools.base import BaseTool
FINISH_NAME = "finish"
class PromptGenerator:
"""A class for generating custom prompt strings.
Does this based on constraints, commands, resources, and performance evaluations.
"""
def __init__(self) -> None:
"""Initialize the PromptGenerator object.
Starts with empty lists of constraints, commands, resources,
and performance evaluations.
"""
self.constraints: List[str] = []
self.commands: List[BaseTool] = []
self.resources: List[str] = []
self.performance_evaluation: List[str] = []
self.response_format = {
"thoughts": {
"text": "thought",
"reasoning": "reasoning",
"plan": "- short bulleted\n- list that conveys\n- long-term plan",
"criticism": "constructive self-criticism",
"speak": "thoughts summary to say to user",
},
"command": {"name": "command name", "args": {"arg name": "value"}},
}
def add_constraint(self, constraint: str) -> None:
"""
Add a constraint to the constraints list.
Args:
constraint (str): The constraint to be added.
"""
self.constraints.append(constraint)
def add_tool(self, tool: BaseTool) -> None:
self.commands.append(tool)
def _generate_command_string(self, tool: BaseTool) -> str:
output = f"{tool.name}: {tool.description}"
# json_args = json.dumps(tool.args) if "tool_input" not in tool.args else tool.args[
# "tool_input"
# ]
# output += f", args json schema: {json_args}"
return output
def add_resource(self, resource: str) -> None:
"""
Add a resource to the resources list.
Args:
resource (str): The resource to be added.
"""
self.resources.append(resource)
def add_performance_evaluation(self, evaluation: str) -> None:
"""
Add a performance evaluation item to the performance_evaluation list.
Args:
evaluation (str): The evaluation item to be added.
"""
self.performance_evaluation.append(evaluation)
def _generate_numbered_list(self, items: list, item_type: str = "list") -> str:
"""
Generate a numbered list from given items based on the item_type.
Args:
items (list): A list of items to be numbered.
item_type (str, optional): The type of items in the list.
Defaults to 'list'.
Returns:
str: The formatted numbered list.
"""
if item_type == "command":
command_strings = [
f"{i + 1}. {self._generate_command_string(item)}"
for i, item in enumerate(items)
]
finish_description = (
"use this to signal that you have finished all your objectives"
)
finish_args = (
'"response": "final response to let '
'people know you have finished your objectives"'
)
finish_string = (
f"{len(items) + 1}. {FINISH_NAME}: "
f"{finish_description}, args: {finish_args}"
)
return "\n".join(command_strings + [finish_string])
else:
return "\n".join(f"{i+1}. {item}" for i, item in enumerate(items))
def generate_prompt_string(self) -> str:
"""Generate a prompt string.
Returns:
str: The generated prompt string.
"""
formatted_response_format = json.dumps(self.response_format, indent=4)
prompt_string = (
f"Constraints:\n{self._generate_numbered_list(self.constraints)}\n\n"
f"Commands:\n"
f"{self._generate_numbered_list(self.commands, item_type='command')}\n\n"
f"Resources:\n{self._generate_numbered_list(self.resources)}\n\n"
f"Performance Evaluation:\n"
f"{self._generate_numbered_list(self.performance_evaluation)}\n\n"
f"You should only respond in JSON format as described below "
f"\nResponse Format: \n{formatted_response_format} "
f"\nEnsure the response can be parsed by Python json.loads"
)
return prompt_string
def get_prompt(tools: List[BaseTool]) -> str:
"""This function generates a prompt string.
It includes various constraints, commands, resources, and performance evaluations.
Returns:
str: The generated prompt string.
"""
# Initialize the PromptGenerator object
prompt_generator = PromptGenerator()
# Add constraints to the PromptGenerator object
prompt_generator.add_constraint(
"~4000 word limit for short term memory. "
"Your short term memory is short, "
"so immediately save important information to files."
)
prompt_generator.add_constraint(
"If you are unsure how you previously did something "
"or want to recall past events, "
"thinking about similar events will help you remember."
)
prompt_generator.add_constraint("No user assistance")
prompt_generator.add_constraint(
'Exclusively use the commands listed in double quotes e.g. "command name"'
)
# Add commands to the PromptGenerator object
for tool in tools:
prompt_generator.add_tool(tool)
# Add resources to the PromptGenerator object
prompt_generator.add_resource(
"Internet access for searches and information gathering."
)
prompt_generator.add_resource("Long Term memory management.")
prompt_generator.add_resource(
"GPT-3.5 powered Agents for delegation of simple tasks."
)
prompt_generator.add_resource("File output.")
# Add performance evaluations to the PromptGenerator object
prompt_generator.add_performance_evaluation(
"Continuously review and analyze your actions "
"to ensure you are performing to the best of your abilities."
)
prompt_generator.add_performance_evaluation(
"Constructively self-criticize your big-picture behavior constantly."
)
prompt_generator.add_performance_evaluation(
"Reflect on past decisions and strategies to refine your approach."
)
prompt_generator.add_performance_evaluation(
"Every command has a cost, so be smart and efficient. "
"Aim to complete tasks in the least number of steps."
)
# Generate the prompt string
prompt_string = prompt_generator.generate_prompt_string()
return prompt_string

@ -0,0 +1,135 @@
from __future__ import annotations
from typing import List, Optional
from pydantic import ValidationError
from langchain.chains.llm import LLMChain
from langchain.chat_models.base import BaseChatModel
from .output_parser import (
AutoGPTOutputParser,
BaseAutoGPTOutputParser,
)
from .prompt import AutoGPTPrompt
from .prompt_generator import (
FINISH_NAME,
)
from langchain.schema import (
AIMessage,
BaseMessage,
Document,
HumanMessage,
SystemMessage,
)
from langchain.tools.base import BaseTool
from langchain.tools.human.tool import HumanInputRun
from langchain.vectorstores.base import VectorStoreRetriever
import json
class AutoGPT:
"""Agent class for interacting with Auto-GPT."""
def __init__(
self,
ai_name: str,
memory: VectorStoreRetriever,
chain: LLMChain,
output_parser: BaseAutoGPTOutputParser,
tools: List[BaseTool],
feedback_tool: Optional[HumanInputRun] = None,
):
self.ai_name = ai_name
self.memory = memory
self.full_message_history: List[BaseMessage] = []
self.next_action_count = 0
self.chain = chain
self.output_parser = output_parser
self.tools = tools
self.feedback_tool = feedback_tool
@classmethod
def from_llm_and_tools(
cls,
ai_name: str,
ai_role: str,
memory: VectorStoreRetriever,
tools: List[BaseTool],
llm: BaseChatModel,
human_in_the_loop: bool = False,
output_parser: Optional[BaseAutoGPTOutputParser] = None,
) -> AutoGPT:
prompt = AutoGPTPrompt(
ai_name=ai_name,
ai_role=ai_role,
tools=tools,
input_variables=["memory", "messages", "goals", "user_input"],
token_counter=llm.get_num_tokens,
)
human_feedback_tool = HumanInputRun() if human_in_the_loop else None
chain = LLMChain(llm=llm, prompt=prompt)
return cls(
ai_name,
memory,
chain,
output_parser or AutoGPTOutputParser(),
tools,
feedback_tool=human_feedback_tool,
)
def __call__(self, goals: List[str]) -> str:
user_input = (
"Determine which next command to use, "
"and respond using the format specified above:"
)
# Interaction Loop
loop_count = 0
history_rec = []
while True:
# Discontinue if continuous limit is reached
loop_count += 1
# Send message to AI, get response
assistant_reply = self.chain.run(
goals=goals,
messages=self.full_message_history,
memory=self.memory,
user_input=user_input,
)
pos = assistant_reply.find('{')
if pos>0:
assistant_reply = assistant_reply[pos:]
# Print Assistant thoughts
print(assistant_reply)
self.full_message_history.append(HumanMessage(content=user_input))
self.full_message_history.append(AIMessage(content=assistant_reply))
# Get command name and arguments
action = self.output_parser.parse(assistant_reply)
tools = {t.name: t for t in self.tools}
if action.name == FINISH_NAME:
return action.args["response"]
if action.name in tools:
tool = tools[action.name]
try:
# for tools in BMTools, the input should be string, while for default langchain toosl, the input is in json format, here we modify the following code
tmp_json = action.args.copy()
tmp_json["history context"] = str(history_rec[-5:])[-500:]
tmp_json["user message"] = goals[0]
json_args = str(tmp_json).replace("\'", "\"")
observation = tool.run(json_args)
except ValidationError as e:
observation = f"Error in args: {str(e)}"
result = f"Command {tool.name} returned: {observation}"
if result.find('using the given APIs')==-1 and result.lower().find('no answer')==-1:
history_rec.append(f"Tool {action.name} returned: {observation}")
elif action.name == "ERROR":
result = f"Error: {action.args}. "
else:
result = (
f"Unknown command '{action.name}'. "
f"Please refer to the 'COMMANDS' list for available "
f"commands and only respond in the specified JSON format."
)
self.full_message_history.append(SystemMessage(content=result))

@ -0,0 +1,66 @@
import json
import re
from abc import abstractmethod
from typing import Dict, NamedTuple
from langchain.schema import BaseOutputParser
class AutoGPTAction(NamedTuple):
"""Action returned by AutoGPTOutputParser."""
name: str
args: Dict
class BaseAutoGPTOutputParser(BaseOutputParser):
"""Base Output parser for AutoGPT."""
@abstractmethod
def parse(self, text: str) -> AutoGPTAction:
"""Return AutoGPTAction"""
def preprocess_json_input(input_str: str) -> str:
"""Preprocesses a string to be parsed as json.
Replace single backslashes with double backslashes,
while leaving already escaped ones intact.
Args:
input_str: String to be preprocessed
Returns:
Preprocessed string
"""
corrected_str = re.sub(
r'(?<!\\)\\(?!["\\/bfnrt]|u[0-9a-fA-F]{4})', r"\\\\", input_str
)
return corrected_str
class AutoGPTOutputParser(BaseAutoGPTOutputParser):
"""Output parser for AutoGPT."""
def parse(self, text: str) -> AutoGPTAction:
try:
parsed = json.loads(text, strict=False)
except json.JSONDecodeError:
preprocessed_text = preprocess_json_input(text)
try:
parsed = json.loads(preprocessed_text, strict=False)
except Exception:
return AutoGPTAction(
name="ERROR",
args={"error": f"Could not parse invalid json: {text}"},
)
try:
return AutoGPTAction(
name=parsed["command"]["name"],
args=parsed["command"]["args"],
)
except (KeyError, TypeError):
# If the command is null or incomplete, return an erroneous tool
return AutoGPTAction(
name="ERROR", args={"error": f"Incomplete command args: {parsed}"}
)

@ -0,0 +1,68 @@
import time
from typing import Any, Callable, List
from pydantic import BaseModel
from .prompt_generator import get_prompt
from langchain.prompts.chat import (
BaseChatPromptTemplate,
)
from langchain.schema import BaseMessage, HumanMessage, SystemMessage
from langchain.tools.base import BaseTool
from langchain.vectorstores.base import VectorStoreRetriever
class AutoGPTPrompt(BaseChatPromptTemplate, BaseModel):
ai_name: str
ai_role: str
tools: List[BaseTool]
token_counter: Callable[[str], int]
send_token_limit: int = 4196
def construct_full_prompt(self, goals: List[str]) -> str:
prompt_start = """Your decisions must always be made independently
without seeking user assistance. Play to your strengths
as an LLM and pursue simple strategies with no legal complications.
Once you have completed your goal or have found that it can not be finished with current commands,
make sure to use the "finish" command immediately."""
# Construct full prompt
full_prompt = (
f"You are {self.ai_name}, {self.ai_role}\n{prompt_start}\n\nGOALS:\n\n"
)
if isinstance(goals, list):
for i, goal in enumerate(goals):
full_prompt += f"{i+1}. {goal}\n"
else:
full_prompt += f"{goals}\n"
full_prompt += f"\n\n{get_prompt(self.tools)}"
return full_prompt
def format_messages(self, **kwargs: Any) -> List[BaseMessage]:
base_prompt = SystemMessage(content=self.construct_full_prompt(kwargs["goals"]))
time_prompt = SystemMessage(
content=f"The current time and date is {time.strftime('%c')}"
)
used_tokens = self.token_counter(base_prompt.content) + self.token_counter(
time_prompt.content
)
memory: VectorStoreRetriever = kwargs["memory"]
previous_messages = kwargs["messages"]
content_format = (
f"This reminds you of these events "
f"you have already used, and NEVER conduct repeated or unrelated commands:\n"
)
memory_message = SystemMessage(content=content_format)
used_tokens += len(memory_message.content)
historical_messages: List[BaseMessage] = []
for message in previous_messages[-10:][::-1]:
message_tokens = self.token_counter(message.content)
if used_tokens + message_tokens > self.send_token_limit - 1000:
break
historical_messages = [message] + historical_messages
input_message = HumanMessage(content=kwargs["user_input"])
messages: List[BaseMessage] = [base_prompt, time_prompt, memory_message]
messages += historical_messages
messages.append(input_message)
return messages

@ -0,0 +1,186 @@
import json
from typing import List
from langchain.tools.base import BaseTool
FINISH_NAME = "finish"
class PromptGenerator:
"""A class for generating custom prompt strings.
Does this based on constraints, commands, resources, and performance evaluations.
"""
def __init__(self) -> None:
"""Initialize the PromptGenerator object.
Starts with empty lists of constraints, commands, resources,
and performance evaluations.
"""
self.constraints: List[str] = []
self.commands: List[BaseTool] = []
self.resources: List[str] = []
self.performance_evaluation: List[str] = []
self.response_format = {
"thoughts": {
"text": "thought",
"reasoning": "reasoning",
},
"command": {"name": "command name", "args": {"goal": "the detailed description and necessary information of the subtask that you hope current command can achieve"}},
}
def add_constraint(self, constraint: str) -> None:
"""
Add a constraint to the constraints list.
Args:
constraint (str): The constraint to be added.
"""
self.constraints.append(constraint)
def add_tool(self, tool: BaseTool) -> None:
self.commands.append(tool)
def _generate_command_string(self, tool: BaseTool) -> str:
output = f"{tool.name}: {tool.description}"
# json_args = json.dumps(tool.args) if "tool_input" not in tool.args else tool.args[
# "tool_input"
# ]
# output += f", args json schema: {json_args}"
return output
def add_resource(self, resource: str) -> None:
"""
Add a resource to the resources list.
Args:
resource (str): The resource to be added.
"""
self.resources.append(resource)
def add_performance_evaluation(self, evaluation: str) -> None:
"""
Add a performance evaluation item to the performance_evaluation list.
Args:
evaluation (str): The evaluation item to be added.
"""
self.performance_evaluation.append(evaluation)
def _generate_numbered_list(self, items: list, item_type: str = "list") -> str:
"""
Generate a numbered list from given items based on the item_type.
Args:
items (list): A list of items to be numbered.
item_type (str, optional): The type of items in the list.
Defaults to 'list'.
Returns:
str: The formatted numbered list.
"""
if item_type == "command":
command_strings = [
f"{i + 1}. {self._generate_command_string(item)}"
for i, item in enumerate(items)
]
finish_description = (
"use this to signal that you have finished all your objectives"
)
finish_args = (
'"response": "final response to let '
'people know you have finished your objectives"'
)
finish_string = (
f"{len(items) + 1}. {FINISH_NAME}: "
f"{finish_description}, args: {finish_args}"
)
return "\n".join(command_strings + [finish_string])
else:
return "\n".join(f"{i+1}. {item}" for i, item in enumerate(items))
def generate_prompt_string(self) -> str:
"""Generate a prompt string.
Returns:
str: The generated prompt string.
"""
formatted_response_format = json.dumps(self.response_format, indent=4)
prompt_string = (
#f"Constraints:\n{self._generate_numbered_list(self.constraints)}\n\n"
f"Commands:\n"
f"{self._generate_numbered_list(self.commands, item_type='command')}\n\n"
#f"Resources:\n{self._generate_numbered_list(self.resources)}\n\n"
f"Performance Evaluation:\n"
f"{self._generate_numbered_list(self.performance_evaluation)}\n\n"
f"You should only respond in JSON format as described below "
f"\nResponse Format: \n{formatted_response_format} "
f"\nEnsure the response can be parsed by Python json.loads"
)
return prompt_string
def get_prompt(tools: List[BaseTool]) -> str:
"""This function generates a prompt string.
It includes various constraints, commands, resources, and performance evaluations.
Returns:
str: The generated prompt string.
"""
# Initialize the PromptGenerator object
prompt_generator = PromptGenerator()
# Add constraints to the PromptGenerator object
prompt_generator.add_constraint(
"~4000 word limit for short term memory. "
"Your short term memory is short, "
"so immediately save important information to files."
)
prompt_generator.add_constraint(
"If you are unsure how you previously did something "
"or want to recall past events, "
"thinking about similar events will help you remember."
)
prompt_generator.add_constraint("No user assistance")
prompt_generator.add_constraint(
'Exclusively use the commands listed in double quotes e.g. "command name"'
)
# Add commands to the PromptGenerator object
for tool in tools:
prompt_generator.add_tool(tool)
# Add resources to the PromptGenerator object
prompt_generator.add_resource(
"Internet access for searches and information gathering."
)
prompt_generator.add_resource("Long Term memory management.")
prompt_generator.add_resource(
"GPT-3.5 powered Agents for delegation of simple tasks."
)
prompt_generator.add_resource("File output.")
# Add performance evaluations to the PromptGenerator object
prompt_generator.add_performance_evaluation(
"Continuously review and analyze your actions "
"to ensure you are performing to the best of your abilities."
)
prompt_generator.add_performance_evaluation(
"Constructively self-criticize your big-picture behavior constantly."
)
prompt_generator.add_performance_evaluation(
"Reflect on past decisions and strategies to refine your approach."
)
prompt_generator.add_performance_evaluation(
"Every command has a cost, so be smart and efficient. "
"Aim to complete tasks in the least number of steps."
)
# Generate the prompt string
prompt_string = prompt_generator.generate_prompt_string()
return prompt_string

@ -0,0 +1,114 @@
import time
import types
from typing import Any, Dict, List, Tuple, Union
from langchain.agents import AgentExecutor
from langchain.input import get_color_mapping
from langchain.schema import AgentAction, AgentFinish
from bmtools.agent.translator import Translator
class AgentExecutorWithTranslation(AgentExecutor):
translator: Translator = Translator()
def prep_outputs(
self,
inputs: Dict[str, str],
outputs: Dict[str, str],
return_only_outputs: bool = False,
) -> Dict[str, str]:
try:
outputs = super().prep_outputs(inputs, outputs, return_only_outputs)
except ValueError as e:
return outputs
else:
if "input" in outputs:
outputs = self.translator(outputs)
return outputs
class Executor(AgentExecutorWithTranslation):
def _call(self, inputs: Dict[str, str]) -> Dict[str, Any]:
"""Run text through and get agent response."""
# Construct a mapping of tool name to tool for easy lookup
name_to_tool_map = {tool.name: tool for tool in self.tools}
# We construct a mapping from each tool to a color, used for logging.
color_mapping = get_color_mapping(
[tool.name for tool in self.tools], excluded_colors=["green"]
)
intermediate_steps: List[Tuple[AgentAction, str]] = []
# Let's start tracking the iterations the agent has gone through
iterations = 0
time_elapsed = 0.0
start_time = time.time()
# We now enter the agent loop (until it returns something).
while self._should_continue(iterations, time_elapsed):
next_step_output = self._take_next_step(
name_to_tool_map, color_mapping, inputs, intermediate_steps
)
if isinstance(next_step_output, AgentFinish):
yield self._return(next_step_output, intermediate_steps)
return
for i, output in enumerate(next_step_output):
agent_action = output[0]
tool_logo = None
for tool in self.tools:
if tool.name == agent_action.tool:
tool_logo = tool.tool_logo_md
if isinstance(output[1], types.GeneratorType):
logo = f"{tool_logo}" if tool_logo is not None else ""
yield (AgentAction("", agent_action.tool_input, agent_action.log), f"Further use other tool {logo} to answer the question.")
for out in output[1]:
yield out
next_step_output[i] = (agent_action, out)
else:
for tool in self.tools:
if tool.name == agent_action.tool:
yield (AgentAction(tool_logo, agent_action.tool_input, agent_action.log), output[1])
intermediate_steps.extend(next_step_output)
if len(next_step_output) == 1:
next_step_action = next_step_output[0]
# See if tool should return directly
tool_return = self._get_tool_return(next_step_action)
if tool_return is not None:
yield self._return(tool_return, intermediate_steps)
return
iterations += 1
time_elapsed = time.time() - start_time
output = self.agent.return_stopped_response(
self.early_stopping_method, intermediate_steps, **inputs
)
yield self._return(output, intermediate_steps)
return
def __call__(
self, inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False
) -> Dict[str, Any]:
"""Run the logic of this chain and add to output if desired.
Args:
inputs: Dictionary of inputs, or single input if chain expects
only one param.
return_only_outputs: boolean for whether to return only outputs in the
response. If True, only new keys generated by this chain will be
returned. If False, both input keys and new keys generated by this
chain will be returned. Defaults to False.
"""
inputs = self.prep_inputs(inputs)
self.callback_manager.on_chain_start(
{"name": self.__class__.__name__},
inputs,
verbose=self.verbose,
)
try:
for output in self._call(inputs):
if type(output) is dict:
output = self.prep_outputs(inputs, output, return_only_outputs)
yield output
except (KeyboardInterrupt, Exception) as e:
self.callback_manager.on_chain_error(e, verbose=self.verbose)
raise e
self.callback_manager.on_chain_end(output, verbose=self.verbose)
# return self.prep_outputs(inputs, output, return_only_outputs)
return output

@ -0,0 +1,233 @@
from langchain.llms import OpenAI
from langchain import OpenAI, LLMChain, PromptTemplate, SerpAPIWrapper
from langchain.agents import ZeroShotAgent, AgentExecutor, initialize_agent, Tool
import importlib
import json
import os
import requests
import yaml
from bmtools.agent.apitool import RequestTool
from bmtools.agent.executor import Executor, AgentExecutorWithTranslation
from bmtools import get_logger
from bmtools.agent.BabyagiTools import BabyAGI
# from bmtools.models.customllm import CustomLLM
logger = get_logger(__name__)
def import_all_apis(tool_json):
'''import all apis that is a tool
'''
doc_url = tool_json['api']['url']
response = requests.get(doc_url)
logger.info("Doc string URL: {}".format(doc_url))
if doc_url.endswith('yaml') or doc_url.endswith('yml'):
plugin = yaml.safe_load(response.text)
else:
plugin = json.loads(response.text)
server_url = plugin['servers'][0]['url']
if server_url.startswith("/"):
server_url = "http://127.0.0.1:8079" + server_url
logger.info("server_url {}".format(server_url))
all_apis = []
for key in plugin['paths']:
value = plugin['paths'][key]
for method in value:
api = RequestTool(root_url=server_url, func_url=key, method=method, request_info=value)
all_apis.append(api)
return all_apis
def load_single_tools(tool_name, tool_url):
# tool_name, tool_url = "datasette", "https://datasette.io/"
# tool_name, tool_url = "klarna", "https://www.klarna.com/"
# tool_name, tool_url = 'chemical-prop', "http://127.0.0.1:8079/tools/chemical-prop/"
# tool_name, tool_url = 'douban-film', "http://127.0.0.1:8079/tools/douban-film/"
# tool_name, tool_url = 'weather', "http://127.0.0.1:8079/tools/weather/"
# tool_name, tool_url = 'wikipedia', "http://127.0.0.1:8079/tools/wikipedia/"
# tool_name, tool_url = 'wolframalpha', "http://127.0.0.1:8079/tools/wolframalpha/"
# tool_name, tool_url = 'klarna', "https://www.klarna.com/"
get_url = tool_url +".well-known/ai-plugin.json"
response = requests.get(get_url)
if response.status_code == 200:
tool_config_json = response.json()
else:
raise RuntimeError("Your URL of the tool is invalid.")
return tool_name, tool_config_json
class STQuestionAnswerer:
def __init__(self, openai_api_key = "", stream_output=False, llm='ChatGPT'):
if len(openai_api_key) < 3: # not valid key (TODO: more rigorous checking)
openai_api_key = os.environ.get('OPENAI_API_KEY')
self.openai_api_key = openai_api_key
self.llm_model = llm
self.set_openai_api_key(openai_api_key)
self.stream_output = stream_output
def set_openai_api_key(self, key):
logger.info("Using {}".format(self.llm_model))
if self.llm_model == "GPT-3.5":
self.llm = OpenAI(temperature=0.0, openai_api_key=key) # use text-darvinci
elif self.llm_model == "ChatGPT":
self.llm = OpenAI(model_name="gpt-3.5-turbo", temperature=0.0, openai_api_key=key) # use chatgpt
else:
raise RuntimeError("Your model is not available.")
def load_tools(self, name, meta_info, prompt_type="react-with-tool-description", return_intermediate_steps=True):
self.all_tools_map = {}
self.all_tools_map[name] = import_all_apis(meta_info)
logger.info("Tool [{}] has the following apis: {}".format(name, self.all_tools_map[name]))
if prompt_type == "zero-shot-react-description":
subagent = initialize_agent(self.all_tools_map[name], self.llm, agent="zero-shot-react-description", verbose=True, return_intermediate_steps=return_intermediate_steps)
elif prompt_type == "react-with-tool-description":
# customllm = CustomLLM()
description_for_model = meta_info['description_for_model'].replace("{", "{{").replace("}", "}}").strip()
prefix = f"""Answer the following questions as best you can. General instructions are: {description_for_model}. Specifically, you have access to the following APIs:"""
#suffix = """Begin! Remember: (1) Follow the format, i.e,\nThought:\nAction:\nAction Input:\nObservation:\nFinal Answer:\n (2) Provide as much as useful information in your Final Answer. (3) YOU MUST INCLUDE all relevant IMAGES in your Final Answer using format ![img](url), and include relevant links. (3) Do not make up anything, and if your Observation has no link, DO NOT hallucihate one. (4) If you have enough information, please use \nThought: I have got enough information\nFinal Answer: \n\nQuestion: {input}\n{agent_scratchpad}"""
suffix = """Begin! Remember: (1) Follow the format, i.e,\nThought:\nAction:\nAction Input:\nObservation:\nFinal Answer:\n. The action you generate must be exact one of the given API names instead of a sentence or any other redundant text. The action input is one json format dict without any redundant text or bracket descriptions . (2) Provide as much as useful information (such as useful values/file paths in your observation) in your Final Answer. Do not describe the process you achieve the goal, but only provide the detailed answer or response to the task goal. (3) Do not make up anything. DO NOT generate observation content by yourself. (4) Read the observation carefully, and pay attention to the messages even if an error occurs. (5) Once you have enough information, please immediately use \nThought: I have got enough information\nFinal Answer: \n\nTask: {input}\n{agent_scratchpad}"""
prompt = ZeroShotAgent.create_prompt(
self.all_tools_map[name],
prefix=prefix,
suffix=suffix,
input_variables=["input", "agent_scratchpad"]
)
llm_chain = LLMChain(llm=self.llm, prompt=prompt)
# llm_chain = LLMChain(llm=customllm, prompt=prompt)
logger.info("Full prompt template: {}".format(prompt.template))
tool_names = [tool.name for tool in self.all_tools_map[name] ]
agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names)
if self.stream_output:
agent_executor = Executor.from_agent_and_tools(agent=agent, tools=self.all_tools_map[name] , verbose=True, return_intermediate_steps=return_intermediate_steps)
else:
agent_executor = AgentExecutorWithTranslation.from_agent_and_tools(agent=agent, tools=self.all_tools_map[name], verbose=True, return_intermediate_steps=return_intermediate_steps)
return agent_executor
elif prompt_type == "babyagi":
# customllm = CustomLLM()
tool_str = "; ".join([t.name for t in self.all_tools_map[name]])
prefix = """You are an AI who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}.\n You have access to the following APIs:"""
suffix = """YOUR CONSTRAINTS: (1) YOU MUST follow this format:
\nThought:\nAction:\nAction Input: \n or \nThought:\nFinal Answer:\n (2) Do not make up anything, and if your Observation has no link, DO NOT hallucihate one. (3) The Action: MUST be one of the following: """ + tool_str + """\nQuestion: {task}\n Agent scratchpad (history actions): {agent_scratchpad}."""
prompt = ZeroShotAgent.create_prompt(
self.all_tools_map[name],
prefix=prefix,
suffix=suffix,
input_variables=["objective", "task", "context","agent_scratchpad"]
)
logger.info("Full prompt template: {}".format(prompt.template))
# specify the maximum number of iterations you want babyAGI to perform
max_iterations = 10
baby_agi = BabyAGI.from_llm(
llm=self.llm,
# llm=customllm,
prompt=prompt,
verbose=False,
tools=self.all_tools_map[name],
stream_output=self.stream_output,
return_intermediate_steps=return_intermediate_steps,
max_iterations=max_iterations,
)
return baby_agi
elif prompt_type == "autogpt":
from langchain.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
from langchain.tools.file_management.write import WriteFileTool
from langchain.tools.file_management.read import ReadFileTool
# Define your embedding model
embeddings_model = OpenAIEmbeddings()
# Initialize the vectorstore as empty
import faiss
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
from .autogpt.agent import AutoGPT
from langchain.chat_models import ChatOpenAI
from langchain.schema import (
AIMessage,
ChatGeneration,
ChatMessage,
ChatResult,
HumanMessage,
SystemMessage,
)
# customllm = CustomLLM()
# class MyChatOpenAI(ChatOpenAI):
# def _create_chat_result(self, response):
# generations = []
# for res in response["choices"]:
# message = self._convert_dict_to_message(res["message"])
# gen = ChatGeneration(message=message)
# generations.append(gen)
# llm_output = {"token_usage": response["usage"], "model_name": self.model_name}
# return ChatResult(generations=generations, llm_output=llm_output)
# def _generate(self, messages, stop):
# message_dicts, params = self._create_message_dicts(messages, stop)
# response = customllm(message_dicts)
# response = json.loads(response)
# # response = self.completion_with_retry(messages=message_dicts, **params)
# return self._create_chat_result(response)
# def _convert_dict_to_message(self, _dict: dict):
# role = _dict["role"]
# if role == "user":
# return HumanMessage(content=_dict["content"])
# elif role == "assistant":
# return AIMessage(content=_dict["content"])
# elif role == "system":
# return SystemMessage(content=_dict["content"])
# else:
# return ChatMessage(content=_dict["content"], role=role)
# should integrate WriteFile and ReadFile into tools, will fix later.
# for tool in [WriteFileTool(), ReadFileTool()]:
# self.all_tools_map[name].append(tool)
agent = AutoGPT.from_llm_and_tools(
ai_name="Tom",
ai_role="Assistant",
tools=self.all_tools_map[name],
llm=ChatOpenAI(temperature=0),
# llm=MyChatOpenAI(temperature=0),
memory=vectorstore.as_retriever()
)
# Set verbose to be true
agent.chain.verbose = True
return agent
if __name__ == "__main__":
tools_name, tools_config = load_single_tools()
print(tools_name, tools_config)
qa = STQuestionAnswerer()
agent = qa.load_tools(tools_name, tools_config)
agent("Calc integral of sin(x)+2x^2+3x+1 from 0 to 1")

@ -0,0 +1,133 @@
from langchain.llms import OpenAI
from langchain import OpenAI, LLMChain
from langchain.agents import ZeroShotAgent, AgentExecutor
import importlib
import json
import os
import requests
import yaml
from bmtools.agent.apitool import Tool
from bmtools.agent.singletool import STQuestionAnswerer
from bmtools.agent.executor import Executor, AgentExecutorWithTranslation
from bmtools import get_logger
from bmtools.models.customllm import CustomLLM
logger = get_logger(__name__)
def load_valid_tools(tools_mappings):
tools_to_config = {}
for key in tools_mappings:
get_url = tools_mappings[key]+".well-known/ai-plugin.json"
response = requests.get(get_url)
if response.status_code == 200:
tools_to_config[key] = response.json()
else:
logger.warning("Load tool {} error, status code {}".format(key, response.status_code))
return tools_to_config
class MTQuestionAnswerer:
"""Use multiple tools to answer a question. Basically pass a natural question to
"""
def __init__(self, openai_api_key, all_tools, stream_output=False, llm='ChatGPT'):
if len(openai_api_key) < 3: # not valid key (TODO: more rigorous checking)
openai_api_key = os.environ.get('OPENAI_API_KEY')
self.openai_api_key = openai_api_key
self.stream_output = stream_output
self.llm_model = llm
self.set_openai_api_key(openai_api_key)
self.load_tools(all_tools)
def set_openai_api_key(self, key):
logger.info("Using {}".format(self.llm_model))
if self.llm_model == "GPT-3.5":
self.llm = OpenAI(temperature=0.0, openai_api_key=key) # use text-darvinci
elif self.llm_model == "ChatGPT":
self.llm = OpenAI(model_name="gpt-3.5-turbo", temperature=0.0, openai_api_key=key) # use chatgpt
else:
raise RuntimeError("Your model is not available.")
def load_tools(self, all_tools):
logger.info("All tools: {}".format(all_tools))
self.all_tools_map = {}
self.tools_pool = []
for name in all_tools:
meta_info = all_tools[name]
question_answer = STQuestionAnswerer(self.openai_api_key, stream_output=self.stream_output, llm=self.llm_model)
subagent = question_answer.load_tools(name, meta_info, prompt_type="react-with-tool-description", return_intermediate_steps=False)
tool_logo_md = f'<img src="{meta_info["logo_url"]}" width="32" height="32" style="display:inline-block">'
for tool in subagent.tools:
tool.tool_logo_md = tool_logo_md
tool = Tool(
name=meta_info['name_for_model'],
description=meta_info['description_for_model'].replace("{", "{{").replace("}", "}}"),
func=subagent,
)
tool.tool_logo_md = tool_logo_md
self.tools_pool.append(tool)
def build_runner(self, ):
from langchain.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
embeddings_model = OpenAIEmbeddings()
import faiss
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
from .autogptmulti.agent import AutoGPT
from langchain.chat_models import ChatOpenAI
agent_executor = AutoGPT.from_llm_and_tools(
ai_name="Tom",
ai_role="Assistant",
tools=self.tools_pool,
llm=ChatOpenAI(temperature=0),
memory=vectorstore.as_retriever()
)
'''
# 可以修改prompt来让模型表现更好也可以修改tool的doc
prefix = """Answer the following questions as best you can. In this level, you are calling the tools in natural language format, since the tools are actually an intelligent agent like you, but they expert only in one area. Several things to remember. (1) Remember to follow the format of passing natural language as the Action Input. (2) DO NOT use your imagination, only use concrete information given by the tools. (3) If the observation contains images or urls which has useful information, YOU MUST INCLUDE ALL USEFUL IMAGES and links in your Answer and Final Answers using format ![img](url). BUT DO NOT provide any imaginary links. (4) The information in your Final Answer should include ALL the informations returned by the tools. (5) If a user's query is a language other than English, please translate it to English without tools, and translate it back to the source language in Final Answer. You have access to the following tools (Only use these tools we provide you):"""
suffix = """\nBegin! Remember to . \nQuestion: {input}\n{agent_scratchpad}"""
prompt = ZeroShotAgent.create_prompt(
self.tools_pool,
prefix=prefix,
suffix=suffix,
input_variables=["input", "agent_scratchpad"]
)
llm_chain = LLMChain(llm=self.llm, prompt=prompt)
logger.info("Full Prompt Template:\n {}".format(prompt.template))
tool_names = [tool.name for tool in self.tools_pool]
agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names)
if self.stream_output:
agent_executor = Executor.from_agent_and_tools(agent=agent, tools=self.tools_pool, verbose=True, return_intermediate_steps=True)
else:
agent_executor = AgentExecutorWithTranslation.from_agent_and_tools(agent=agent, tools=self.tools_pool, verbose=True, return_intermediate_steps=True)
'''
return agent_executor
if __name__ == "__main__":
tools_mappings = {
"klarna": "https://www.klarna.com/",
"chemical-prop": "http://127.0.0.1:8079/tools/chemical-prop/",
"wolframalpha": "http://127.0.0.1:8079/tools/wolframalpha/",
"weather": "http://127.0.0.1:8079/tools/weather/",
}
tools = load_valid_tools(tools_mappings)
qa = MTQuestionAnswerer(openai_api_key='', all_tools=tools)
agent = qa.build_runner()
agent("How many carbon elements are there in CH3COOH? How many people are there in China?")

@ -0,0 +1,122 @@
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
import py3langid as langid
from iso639 import languages
from typing import Dict
from copy import deepcopy
import os
def detect_lang(text: str):
lang_code = langid.classify(text)[0]
lang_name = languages.get(part1=lang_code[:2]).name
return lang_name
class Translator:
def __init__(self,
openai_api_key: str = None,
model_name: str = "gpt-3.5-turbo"):
self.openai_api_key = openai_api_key
self.model_name = model_name
self.init_flag = False
def init_model(self):
llm = self.create_openai_model(self.openai_api_key, self.model_name)
prompt = self.create_prompt()
self.chain = LLMChain(llm=llm, prompt=prompt)
self.init_flag = True
def __call__(self, inputs: Dict[str, str]) -> Dict[str, str]:
if not self.init_flag:
self.init_model()
question = inputs["input"]
answer = inputs["output"]
src_lang = detect_lang(answer)
tgt_lang = detect_lang(question)
if src_lang != tgt_lang:
translated_answer = self.chain.run(text=answer, language=tgt_lang)
outputs = deepcopy(inputs)
outputs["output"] = translated_answer
return outputs
else:
return inputs
def create_openai_model(self, openai_api_key: str, model_name: str) -> OpenAI:
if openai_api_key is None:
openai_api_key = os.environ.get('OPENAI_API_KEY')
llm = OpenAI(model_name=model_name,
temperature=0.0,
openai_api_key=openai_api_key)
return llm
def create_prompt(self) -> PromptTemplate:
template = """
Translate to {language}: {text} =>
"""
prompt = PromptTemplate(
input_variables=["text", "language"],
template=template
)
return prompt
if __name__ == "__main__":
lang = {
"zh": {
"question": "帮我介绍下《深海》这部电影",
"answer": "《深海》是一部中国大陆的动画、奇幻电影,由田晓鹏导演,苏鑫、王亭文、滕奎兴等人主演。剧情简介是在大海的最深处,藏着所有秘密。一位现代少女(参宿)误入梦幻的 深海世界,却因此邂逅了一段独特的生命旅程。![img](https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2635450820.webp)",
},
"ja": {
"question": "映画「深海」について教えてください",
"answer": "「深海」は、中国本土のアニメーションおよびファンタジー映画で、Tian Xiaopeng が監督し、Su Xin、Wang Tingwen、Teng Kuixing などが出演しています。 あらすじは、海の最深部にはすべての秘密が隠されているというもの。 夢のような深海の世界に迷い込んだ現代少女(さんすけ)は、それをきっかけに独特の人生の旅に出くわす。 ![img](https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2635450820.webp)",
},
"ko": {
"question": "영화 딥씨에 대해 알려주세요",
"answer": "\"Deep Sea\"는 Tian Xiaopeng 감독, Su Xin, Wang Tingwen, Teng Kuixing 등이 출연한 중국 본토의 애니메이션 및 판타지 영화입니다. 시놉시스는 바다 가장 깊은 곳에 모든 비밀이 숨겨져 있다는 것입니다. 현대 소녀(산스케)는 꿈 같은 심해 세계로 방황하지만 그것 때문에 독특한 삶의 여정을 만난다. ![img](https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2635450820.webp)",
},
"en": {
"question": "Tell me about the movie '深海'",
"answer": "\"Deep Sea\" is an animation and fantasy film in mainland China, directed by Tian Xiaopeng, starring Su Xin, Wang Tingwen, Teng Kuixing and others. The synopsis is that in the deepest part of the sea, all secrets are hidden. A modern girl (Sansuke) strays into the dreamy deep sea world, but encounters a unique journey of life because of it. ![img](https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2635450820.webp)",
},
"de": {
"question": "Erzähl mir von dem Film '深海'",
"answer": "\"Deep Sea\" ist ein Animations- und Fantasyfilm in Festlandchina unter der Regie von Tian Xiaopeng mit Su Xin, Wang Tingwen, Teng Kuixing und anderen in den Hauptrollen. Die Zusammenfassung ist, dass im tiefsten Teil des Meeres alle Geheimnisse verborgen sind. Ein modernes Mädchen (Sansuke) verirrt sich in die verträumte Tiefseewelt, trifft dabei aber auf eine einzigartige Lebensreise. ![img](https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2635450820.webp)",
},
"fr": {
"question": "Parlez-moi du film 'Deep Sea'",
"answer": "\"Deep Sea\" est un film d'animation et fantastique en Chine continentale, réalisé par Tian Xiaopeng, avec Su Xin, Wang Tingwen, Teng Kuixing et d'autres. Le synopsis est que dans la partie la plus profonde de la mer, tous les secrets sont cachés. Une fille moderne (Sansuke) s'égare dans le monde onirique des profondeurs marines, mais rencontre un voyage de vie unique à cause de cela. ![img](https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2635450820.webp)",
},
"ru": {
"question": "Расскажите о фильме 'Глубокое море'",
"answer": "«Глубокое море» — это анимационный и фэнтезийный фильм в материковом Китае, снятый Тянь Сяопином, в главных ролях Су Синь, Ван Тинвэнь, Тэн Куйсин и другие. Суть в том, что в самой глубокой части моря скрыты все секреты. Современная девушка (Сансукэ) заблудилась в мечтательном глубоководном мире, но из-за этого столкнулась с уникальным жизненным путешествием. ![img](https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2635450820.webp)",
},
}
translator = Translator()
for source in lang:
for target in lang:
print(source, "=>", target, end=":\t")
question = lang[target]["question"]
answer = lang[source]["answer"]
inputs = {
"input": question,
"output": answer
}
result = translator(inputs)
translated_answer = result["output"]
if detect_lang(question) == detect_lang(translated_answer) == languages.get(part1=target).name:
print("Y")
else:
print("N")
print("====================")
print("Question:\t", detect_lang(question), " - ", question)
print("Answer:\t", detect_lang(answer), " - ", answer)
print("Translated Anser:\t", detect_lang(translated_answer), " - ", translated_answer)
print("====================")

@ -26,6 +26,7 @@ class AgentExecutorWithTranslation(AgentExecutor):
return outputs
class Executor(AgentExecutorWithTranslation):
def _call(self, inputs: Dict[str, str]) -> Dict[str, Any]:
"""Run text through and get agent response."""
# Construct a mapping of tool name to tool for easy lookup
@ -84,6 +85,8 @@ class Executor(AgentExecutorWithTranslation):
def __call__(
self, inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False
) -> Dict[str, Any]:
"""Run the logic of this chain and add to output if desired.
Args:
@ -96,19 +99,13 @@ class Executor(AgentExecutorWithTranslation):
"""
inputs = self.prep_inputs(inputs)
self.callback_manager.on_chain_start(
{"name": self.__class__.__name__},
inputs,
verbose=self.verbose,
)
try:
for output in self._call(inputs):
if type(output) is dict:
output = self.prep_outputs(inputs, output, return_only_outputs)
yield output
except (KeyboardInterrupt, Exception) as e:
self.callback_manager.on_chain_error(e, verbose=self.verbose)
raise e
self.callback_manager.on_chain_end(output, verbose=self.verbose)
# return self.prep_outputs(inputs, output, return_only_outputs)
return self.prep_outputs(inputs, output, return_only_outputs)
return output

@ -166,7 +166,7 @@ class STQuestionAnswerer:
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
#TODO refactor to use the flow class
from .autogpt.agent import AutoGPT
from swarms.tools.agent.autogpt.agent import AutoGPT
from langchain.chat_models import ChatOpenAI
from langchain.schema import (
AIMessage,

@ -81,7 +81,7 @@ class MTQuestionAnswerer:
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})
# TODO refactor to use the flow
from swarms.tools.autogptmulti.agent import AutoGPT
from swarms.tools.agent.autogptmulti.agent import AutoGPT
from langchain.chat_models import ChatOpenAI
agent_executor = AutoGPT.from_llm_and_tools(
ai_name="Tom",
@ -91,7 +91,7 @@ class MTQuestionAnswerer:
memory=vectorstore.as_retriever()
)
'''
# 可以修改prompt来让模型表现更好也可以修改tool的doc
# You can modify the prompt to improve the model's performance, or modify the tool's doc
prefix = """Answer the following questions as best you can. In this level, you are calling the tools in natural language format, since the tools are actually an intelligent agent like you, but they expert only in one area. Several things to remember. (1) Remember to follow the format of passing natural language as the Action Input. (2) DO NOT use your imagination, only use concrete information given by the tools. (3) If the observation contains images or urls which has useful information, YOU MUST INCLUDE ALL USEFUL IMAGES and links in your Answer and Final Answers using format ![img](url). BUT DO NOT provide any imaginary links. (4) The information in your Final Answer should include ALL the informations returned by the tools. (5) If a user's query is a language other than English, please translate it to English without tools, and translate it back to the source language in Final Answer. You have access to the following tools (Only use these tools we provide you):"""
suffix = """\nBegin! Remember to . \nQuestion: {input}\n{agent_scratchpad}"""

@ -0,0 +1,185 @@
import gradio as gr
from swarms.tools.tools_controller import MTQuestionAnswerer, load_valid_tools
from swarms.tools.singletool import STQuestionAnswerer
from langchain.schema import AgentFinish
import os
import requests
available_models = ["ChatGPT", "GPT-3.5"]
DEFAULTMODEL = "ChatGPT" # "GPT-3.5"
tools_mappings = {
"klarna": "https://www.klarna.com/",
"weather": "http://127.0.0.1:8079/tools/weather/",
# "database": "http://127.0.0.1:8079/tools/database/",
# "db_diag": "http://127.0.0.1:8079/tools/db_diag/",
"chemical-prop": "http://127.0.0.1:8079/tools/chemical-prop/",
"douban-film": "http://127.0.0.1:8079/tools/douban-film/",
"wikipedia": "http://127.0.0.1:8079/tools/wikipedia/",
# "wikidata": "http://127.0.0.1:8079/tools/wikidata/",
"wolframalpha": "http://127.0.0.1:8079/tools/wolframalpha/",
"bing_search": "http://127.0.0.1:8079/tools/bing_search/",
"office-ppt": "http://127.0.0.1:8079/tools/office-ppt/",
"stock": "http://127.0.0.1:8079/tools/stock/",
"bing_map": "http://127.0.0.1:8079/tools/bing_map/",
# "baidu_map": "http://127.0.0.1:8079/tools/baidu_map/",
"zillow": "http://127.0.0.1:8079/tools/zillow/",
"airbnb": "http://127.0.0.1:8079/tools/airbnb/",
"job_search": "http://127.0.0.1:8079/tools/job_search/",
# "baidu-translation": "http://127.0.0.1:8079/tools/baidu-translation/",
# "nllb-translation": "http://127.0.0.1:8079/tools/nllb-translation/",
"tutorial": "http://127.0.0.1:8079/tools/tutorial/",
"file_operation": "http://127.0.0.1:8079/tools/file_operation/",
"meta_analysis": "http://127.0.0.1:8079/tools/meta_analysis/",
"code_interpreter": "http://127.0.0.1:8079/tools/code_interpreter/",
"arxiv": "http://127.0.0.1:8079/tools/arxiv/",
"google_places": "http://127.0.0.1:8079/tools/google_places/",
"google_serper": "http://127.0.0.1:8079/tools/google_serper/",
"google_scholar": "http://127.0.0.1:8079/tools/google_scholar/",
"python": "http://127.0.0.1:8079/tools/python/",
"sceneXplain": "http://127.0.0.1:8079/tools/sceneXplain/",
"shell": "http://127.0.0.1:8079/tools/shell/",
"image_generation": "http://127.0.0.1:8079/tools/image_generation/",
"hugging_tools": "http://127.0.0.1:8079/tools/hugging_tools/",
"gradio_tools": "http://127.0.0.1:8079/tools/gradio_tools/",
}
valid_tools_info = load_valid_tools(tools_mappings)
print(valid_tools_info)
all_tools_list = sorted(list(valid_tools_info.keys()))
gr.close_all()
MAX_TURNS = 30
MAX_BOXES = MAX_TURNS * 2
return_msg = []
chat_history = ""
def show_avatar_imgs(tools_chosen):
if len(tools_chosen) == 0:
tools_chosen = list(valid_tools_info.keys())
img_template = '<a href="{}" style="float: left"> <img style="margin:5px" src="{}.png" width="24" height="24" alt="avatar" /> {} </a>'
imgs = [valid_tools_info[tool]['avatar'] for tool in tools_chosen if valid_tools_info[tool]['avatar'] != None]
imgs = ' '.join([img_template.format(img, img, tool ) for img, tool in zip(imgs, tools_chosen) ])
return [gr.update(value='<span class="">'+imgs+'</span>', visible=True), gr.update(visible=True)]
def answer_by_tools(question, tools_chosen, model_chosen):
global return_msg
return_msg += [(question, None), (None, '...')]
yield [gr.update(visible=True, value=return_msg), gr.update(), gr.update()]
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY', '')
if len(tools_chosen) == 0: # if there is no tools chosen, we use all todo (TODO: What if the pool is too large.)
tools_chosen = list(valid_tools_info.keys())
if len(tools_chosen) == 1:
answerer = STQuestionAnswerer(OPENAI_API_KEY.strip(), stream_output=True, llm=model_chosen)
agent_executor = answerer.load_tools(tools_chosen[0], valid_tools_info[tools_chosen[0]], prompt_type="react-with-tool-description", return_intermediate_steps=True)
else:
answerer = MTQuestionAnswerer(OPENAI_API_KEY.strip(), load_valid_tools({k: tools_mappings[k] for k in tools_chosen}), stream_output=True, llm=model_chosen)
agent_executor = answerer.build_runner()
global chat_history
chat_history += "Question: " + question + "\n"
print(chat_history)
question = chat_history
for inter in agent_executor(question):
if isinstance(inter, AgentFinish): continue
result_str = []
return_msg.pop()
if isinstance(inter, dict):
result_str.append("<font color=red>Answer:</font> {}".format(inter['output']))
chat_history += "Answer:" + inter['output'] + "\n"
result_str.append("...")
else:
not_observation = inter[0].log
if not not_observation.startswith('Thought:'):
not_observation = "Thought: " + not_observation
chat_history += not_observation
not_observation = not_observation.replace('Thought:', '<font color=green>Thought: </font>')
not_observation = not_observation.replace('Action:', '<font color=purple>Action: </font>')
not_observation = not_observation.replace('Action Input:', '<font color=purple>Action Input: </font>')
result_str.append("{}".format(not_observation))
result_str.append("<font color=blue>Action output:</font>\n{}".format(inter[1]))
chat_history += "\nAction output:" + inter[1] + "\n"
result_str.append("...")
return_msg += [(None, result) for result in result_str]
yield [gr.update(visible=True, value=return_msg), gr.update(), gr.update()]
return_msg.pop()
if return_msg[-1][1].startswith("<font color=red>Answer:</font> "):
return_msg[-1] = (return_msg[-1][0], return_msg[-1][1].replace("<font color=red>Answer:</font> ", "<font color=green>Final Answer:</font> "))
yield [gr.update(visible=True, value=return_msg), gr.update(visible=True), gr.update(visible=False)]
def retrieve(tools_search):
if tools_search == "":
return gr.update(choices=all_tools_list)
else:
url = "http://127.0.0.1:8079/retrieve"
param = {
"query": tools_search
}
response = requests.post(url, json=param)
result = response.json()
retrieved_tools = result["tools"]
return gr.update(choices=retrieved_tools)
def clear_retrieve():
return [gr.update(value=""), gr.update(choices=all_tools_list)]
def clear_history():
global return_msg
global chat_history
return_msg = []
chat_history = ""
yield gr.update(visible=True, value=return_msg)
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=14):
gr.Markdown("<h1 align='left'> Swarm Tools </h1>")
with gr.Column(scale=1):
gr.Image('images/swarmslogobanner.png', show_download_button=False, show_label=False )
# gr.Markdown('<img src="../../images/swarmslogobanner.png" alt="swarms">')
with gr.Row():
with gr.Column(scale=4):
with gr.Row():
with gr.Column(scale=0.85):
txt = gr.Textbox(show_label=False, placeholder="Question here. Use Shift+Enter to add new line.", lines=1).style(container=False)
with gr.Column(scale=0.15, min_width=0):
buttonChat = gr.Button("Chat")
chatbot = gr.Chatbot(show_label=False, visible=True).style(height=600)
buttonClear = gr.Button("Clear History")
buttonStop = gr.Button("Stop", visible=False)
with gr.Column(scale=1):
model_chosen = gr.Dropdown(
list(available_models), value=DEFAULTMODEL, multiselect=False, label="Model provided",
info="Choose the model to solve your question, Default means ChatGPT."
)
with gr.Row():
tools_search = gr.Textbox(
lines=1,
label="Tools Search",
placeholder="Please input some text to search tools.",
)
buttonSearch = gr.Button("Reset search condition")
tools_chosen = gr.CheckboxGroup(
choices=all_tools_list,
value=["chemical-prop"],
label="Tools provided",
info="Choose the tools to solve your question.",
)
tools_search.change(retrieve, tools_search, tools_chosen)
buttonSearch.click(clear_retrieve, [], [tools_search, tools_chosen])
txt.submit(lambda : [gr.update(value=''), gr.update(visible=False), gr.update(visible=True)], [], [txt, buttonClear, buttonStop])
inference_event = txt.submit(answer_by_tools, [txt, tools_chosen, model_chosen], [chatbot, buttonClear, buttonStop])
buttonChat.click(answer_by_tools, [txt, tools_chosen, model_chosen], [chatbot, buttonClear, buttonStop])
buttonStop.click(lambda : [gr.update(visible=True), gr.update(visible=False)], [], [buttonClear, buttonStop], cancels=[inference_event])
buttonClear.click(clear_history, [], chatbot)
demo.queue().launch(share=False, inbrowser=True, server_name="127.0.0.1", server_port=7001)
Loading…
Cancel
Save