From 6ed18328bbdfb2f4202a0ba6ef89f1b27611c65e Mon Sep 17 00:00:00 2001 From: Richard Anthony Hein Date: Wed, 21 Aug 2024 20:06:00 +0000 Subject: [PATCH] removed most langchain dependencies for tighter swarms integration and rebuilt chatbot based on Agents --- playground/agents/use_cases/weather/main.py | 38 +++ .../weather/weather_agent_example.py | 29 ++ playground/demos/vLLM/vLLM_example.py | 2 +- swarms/models/popular_llms.py | 2 +- swarms/server/responses.py | 15 +- swarms/server/server.py | 254 ++++++++++-------- swarms/server/server_models.py | 12 +- swarms/structs/agent.py | 241 ++++++++++++++++- 8 files changed, 456 insertions(+), 137 deletions(-) create mode 100644 playground/agents/use_cases/weather/main.py create mode 100644 playground/agents/use_cases/weather/weather_agent_example.py diff --git a/playground/agents/use_cases/weather/main.py b/playground/agents/use_cases/weather/main.py new file mode 100644 index 00000000..4ccdccce --- /dev/null +++ b/playground/agents/use_cases/weather/main.py @@ -0,0 +1,38 @@ +import os +import asyncio +from swarms.models.popular_llms import OpenAIChatLLM +from weather_agent_example import WeatherAgent + +# Set the OpenAI environment to use vLLM +api_key = os.getenv("OPENAI_API_KEY") or "EMPTY" # for vllm +api_base = os.getenv("OPENAI_API_BASE") or "http://localhost:8000/v1" # for vllm +weather_api_key= "af6ef989b5c50a91ca068cc00df125b7", # Replace with your weather API key + +# Create an instance of the OpenAIChat class +llm = OpenAIChatLLM( + base_url=api_base, + api_key=api_key, + model="NousResearch/Meta-Llama-3-8B-Instruct", + temperature=0, + streaming=False +) + +agent = WeatherAgent( + weather_api_key=weather_api_key, + city_name="Nepean, Ontario", + agent_name="Weather Reporting Agent", + system_prompt="You are an exciting and professional weather reporting agent. Given a city you will summarize the weather every hour.", + llm=llm, + max_loops=3, + autosave=True, + dynamic_temperature_enabled=True, + dashboard=False, + verbose=True, + streaming_on=False, + saved_state_path="weather_agent_state.json", + user_name="RAH@EntangleIT.com", + retry_attempts=3, + context_length=200000, + ) + +agent.run("Summarize the following weather_data JSON for the average human. Translate the UNIX system time ('dt' in the JSON) to UTC. Note the temperature is listed in Kelvin in the JSON, so please translate Kelvin to Farheinheit and Celcius. Base your report only on the JSON and output the details in Markdown. \n") \ No newline at end of file diff --git a/playground/agents/use_cases/weather/weather_agent_example.py b/playground/agents/use_cases/weather/weather_agent_example.py new file mode 100644 index 00000000..347dc02d --- /dev/null +++ b/playground/agents/use_cases/weather/weather_agent_example.py @@ -0,0 +1,29 @@ +import time +import json + +from swarms import Agent +import requests + +class WeatherAgent(Agent): + + def __init__(self, weather_api_key, city_name, **kwargs): + super().__init__(city_name, **kwargs) + self.weather_api_key = weather_api_key + self.city_name = city_name + + def check_weather(self): + response = requests.get('https://api.openweathermap.org/data/2.5/weather', params={'q': self.city_name, 'appid': self.weather_api_key}) + if response.status_code == 200: + weather_data = response.json() + return weather_data + else: + print('Failed to retrieve weather data') + raise ValueError("Failed to retrieve weather data or got invalid weather JSON data.") + + def run(self, prompt): + while True: + weather_data = self.check_weather() + weather_data_json = json.dumps(weather_data) # Convert dict to JSON string + print("raw weather data: " + weather_data_json) + super().run(prompt + weather_data_json) + time.sleep(60 * 60) # Check weather every hour \ No newline at end of file diff --git a/playground/demos/vLLM/vLLM_example.py b/playground/demos/vLLM/vLLM_example.py index c36372e6..e4805838 100644 --- a/playground/demos/vLLM/vLLM_example.py +++ b/playground/demos/vLLM/vLLM_example.py @@ -48,7 +48,7 @@ agent = Agent( ) async def startup_event(): - agent.streaming( + agent.stream_reponse( "What are the components of a startups stock incentive equity plan" ) diff --git a/swarms/models/popular_llms.py b/swarms/models/popular_llms.py index 114d5cba..07a53a7b 100644 --- a/swarms/models/popular_llms.py +++ b/swarms/models/popular_llms.py @@ -1,7 +1,7 @@ from langchain_openai.chat_models.azure import ( AzureChatOpenAI, ) -from langchain_openai.chat_models import ( +from langchain_openai import ( ChatOpenAI as OpenAIChat, ) from langchain_community.llms.octoai_endpoint import OctoAIEndpoint diff --git a/swarms/server/responses.py b/swarms/server/responses.py index 89a03659..d04cd21d 100644 --- a/swarms/server/responses.py +++ b/swarms/server/responses.py @@ -1,7 +1,7 @@ """ Customized Langchain StreamingResponse for Server-Side Events (SSE) """ import asyncio from functools import partial -from typing import Any +from typing import Any, AsyncIterator from fastapi import status from langchain.chains.base import Chain @@ -21,17 +21,16 @@ class StreamingResponse(EventSourceResponse): def __init__( self, - *args: Any, - content: Any = iter(()), - **kwargs: dict[str, Any], + content: AsyncIterator[Any], ) -> None: """Constructor method. Args: content: The content to stream. """ - super().__init__(content=content, *args, **kwargs) - + super().__init__(content=content) + self.content = content + async def stream_response(self, send: Send) -> None: """Streams data chunks to client by iterating over `content`. @@ -50,7 +49,7 @@ class StreamingResponse(EventSourceResponse): ) try: - async for data in self.body_iterator: + async for data in self.content: chunk = ensure_bytes(data, self.sep) print(f"chunk: {chunk.decode()}") await send( @@ -78,7 +77,6 @@ class StreamingResponse(EventSourceResponse): def enable_compression(self, force: bool=False): raise NotImplementedError - class LangchainStreamingResponse(StreamingResponse): """StreamingResponse class for LangChain resources.""" @@ -178,3 +176,4 @@ class LangchainStreamingResponse(StreamingResponse): def enable_compression(self, force: bool=False): raise NotImplementedError + diff --git a/swarms/server/server.py b/swarms/server/server.py index d6eb2c83..dcf7f9c3 100644 --- a/swarms/server/server.py +++ b/swarms/server/server.py @@ -6,7 +6,8 @@ import os # import torch from contextlib import asynccontextmanager -import langchain +from typing import AsyncIterator +from swarms.structs.agent import Agent import tiktoken from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request @@ -15,22 +16,8 @@ from fastapi.responses import FileResponse, JSONResponse from fastapi.routing import APIRouter from fastapi.staticfiles import StaticFiles from huggingface_hub import login -from langchain.chains.combine_documents.stuff import StuffDocumentsChain -from langchain.chains.conversational_retrieval.base import ( - ConversationalRetrievalChain, -) -from langchain.chains.llm import LLMChain -from langchain.memory import ConversationBufferMemory -from langchain.memory.chat_message_histories.in_memory import ( - ChatMessageHistory, -) -from langchain.prompts.prompt import PromptTemplate -from langchain_community.chat_models import ChatOpenAI -# from langchain_core.messages import AIMessage, HumanMessage, SystemMessage -# from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder - -from swarms.prompts.chat_prompt import Message +from swarms.prompts.chat_prompt import Message, Role from swarms.prompts.conversational_RAG import ( B_INST, B_SYS, @@ -38,11 +25,12 @@ from swarms.prompts.conversational_RAG import ( DOCUMENT_PROMPT_TEMPLATE, E_INST, E_SYS, - QA_PROMPT_TEMPLATE, + QA_PROMPT_TEMPLATE_STR, ) -from swarms.server.responses import LangchainStreamingResponse -from swarms.server.server_models import ChatRequest, Role +from swarms.server.responses import StreamingResponse +from swarms.server.server_models import ChatRequest from swarms.server.vector_store import VectorStorage +from swarms.models.popular_llms import OpenAIChatLLM # Explicitly specify the path to the .env file # Two folders above the current file's directory @@ -106,10 +94,6 @@ tiktoken.model.MODEL_TO_ENCODING.update( print("Logging in to huggingface.co...") login(token=hf_token) # login to huggingface.co -langchain.debug = True -langchain.verbose = True - - @asynccontextmanager async def lifespan(app: FastAPI): """Initializes the vector store in a background task.""" @@ -144,14 +128,14 @@ if not os.path.exists(uploads): vector_store = VectorStorage(directory=uploads, use_gpu=use_gpu) -async def create_chain( +async def create_chat( messages: list[Message], - prompt: PromptTemplate = QA_PROMPT_TEMPLATE, + prompt: str = QA_PROMPT_TEMPLATE_STR, ): - """Creates the RAG Langchain conversational retrieval chain.""" - print("Creating chain ...") + """Creates the RAG conversational retrieval chain.""" + print("Creating chat from history and relevant docs if any ...") - llm = ChatOpenAI( + llm = OpenAIChatLLM( api_key=openai_api_key, base_url=openai_api_base, model=model_name, @@ -159,68 +143,114 @@ async def create_chain( streaming=True, ) - # if llm is ALlamaCpp: - # llm.max_tokens = max_tokens_to_gen - # elif llm is AGPT4All: - # llm.n_predict = max_tokens_to_gen - # el - # if llm is AChatOllama: - # llm.max_tokens = max_tokens_to_gen - # if llm is VLLMAsync: - # llm.max_tokens = max_tokens_to_gen - retriever = await vector_store.get_retriever("swarms") - - chat_memory = ChatMessageHistory() + doc_retrieval_string = "" for message in messages: - if message.role == Role.USER: - chat_memory.add_user_message(message.content) - elif message.role == Role.ASSISTANT: - chat_memory.add_ai_message(message.content) - - memory = ConversationBufferMemory( - chat_memory=chat_memory, - memory_key="chat_history", - input_key="question", - output_key="answer", - return_messages=True, - ) + if message.role == Role.HUMAN: + doc_retrieval_string += f"{Role.HUMAN}: {message.content}\r\n" + elif message.role == Role.AI: + doc_retrieval_string += f"{Role.AI}: {message.content}\r\n" - question_generator = LLMChain( - llm=llm, - prompt=CONDENSE_PROMPT_TEMPLATE, - memory=memory, - verbose=True, - output_key="answer", - ) + docs = retriever.invoke(doc_retrieval_string) - stuff_chain = LLMChain( - llm=llm, - prompt=prompt, - verbose=True, - output_key="answer", - ) + # find {context} in prompt and replace it with the docs page_content. + # Concatenate the content of all documents + context = "\n".join(doc.page_content for doc in docs) - doc_chain = StuffDocumentsChain( - llm_chain=stuff_chain, - document_variable_name="context", - document_prompt=DOCUMENT_PROMPT_TEMPLATE, - verbose=True, - output_key="answer", - memory=memory, - ) + # Replace {context} in the prompt with the concatenated document content + prompt = prompt.replace("{context}", context) - return ConversationalRetrievalChain( - combine_docs_chain=doc_chain, - memory=memory, - retriever=retriever, - question_generator=question_generator, - return_generated_question=False, - return_source_documents=True, - output_key="answer", + # Replace {chat_history} in the prompt with doc_retrieval_string + prompt = prompt.replace("{chat_history}", doc_retrieval_string) + + # Replace {question} in the prompt with the last message. + prompt = prompt.replace("{question}", messages[-1].content) + + # Initialize the agent + agent = Agent( + agent_name="Swarms QA ChatBot", + system_prompt=prompt, + llm=llm, + max_loops=1, + autosave=True, + # dynamic_temperature_enabled=True, + dashboard=False, verbose=True, + streaming_on=True, + # interactive=True, # Set to False to disable interactive mode + dynamic_temperature_enabled=False, + saved_state_path="chatbot.json", + # tools=[#Add your functions here# ], + # stopping_token="Stop!", + # interactive=True, + # docs_folder="docs", # Enter your folder name + # pdf_path="docs/finance_agent.pdf", + # sop="Calculate the profit for a company.", + # sop_list=["Calculate the profit for a company."], + user_name="RAH@EntangleIT.com", + docs=[doc.page_content for doc in docs], + # # docs_folder="docs", + retry_attempts=3, + # context_length=1000, + # tool_schema = dict + context_length=200000, + # tool_schema= + # tools + # agent_ops_on=True, ) + for message in messages[:-1]: + if message.role == Role.HUMAN: + agent.add_message_to_memory(message.content) + elif message.role == Role.AI: + agent.add_message_to_memory(message.content) + + async for response in agent.run_async(messages[-1].content): + yield response + + # memory = ConversationBufferMemory( + # chat_memory=chat_memory, + # memory_key="chat_history", + # input_key="question", + # output_key="answer", + # return_messages=True, + # ) + + # question_generator = LLMChain( + # llm=llm, + # prompt=CONDENSE_PROMPT_TEMPLATE, + # memory=memory, + # verbose=True, + # output_key="answer", + # ) + + # stuff_chain = LLMChain( + # llm=llm, + # prompt=prompt, + # verbose=True, + # output_key="answer", + # ) + + # doc_chain = StuffDocumentsChain( + # llm_chain=stuff_chain, + # document_variable_name="context", + # document_prompt=DOCUMENT_PROMPT_TEMPLATE, + # verbose=True, + # output_key="answer", + # memory=memory, + # ) + + # return ConversationalRetrievalChain( + # combine_docs_chain=doc_chain, + # memory=memory, + # retriever=retriever, + # question_generator=question_generator, + # return_generated_question=False, + # return_source_documents=True, + # output_key="answer", + # verbose=True, + # ) + @app.post( "/chat", summary="Chatbot", @@ -228,29 +258,29 @@ async def create_chain( ) async def chat(request: ChatRequest): """ Handles chatbot chat POST requests """ - chain = await create_chain( - messages=request.messages[:-1], - prompt=PromptTemplate.from_template( - f"{B_INST}{B_SYS}{request.prompt.strip()}{E_SYS}{E_INST}" - ), - ) - - json_config = { - "question": request.messages[-1].content, - "chat_history": [ - message.content for message in request.messages[:-1] - ], - # "callbacks": [ - # StreamingStdOutCallbackHandler(), - # TokenStreamingCallbackHandler(output_key="answer"), - # SourceDocumentsStreamingCallbackHandler(), - # ], - } - return LangchainStreamingResponse( - chain=chain, - config=json_config, - run_mode="async" + response = create_chat( + messages=request.messages, + prompt=request.prompt.strip() ) + # return response + return StreamingResponse(content=response) + + # json_config = { + # "question": request.messages[-1].content, + # "chat_history": [ + # message.content for message in request.messages[:-1] + # ], + # # "callbacks": [ + # # StreamingStdOutCallbackHandler(), + # # TokenStreamingCallbackHandler(output_key="answer"), + # # SourceDocumentsStreamingCallbackHandler(), + # # ], + # } + # return LangchainStreamingResponse( + # chain=chain, + # config=json_config, + # run_mode="async" + # ) @app.get("/") def root(): @@ -274,12 +304,12 @@ def favicon(): logging.basicConfig(level=logging.ERROR) -# @app.exception_handler(HTTPException) -# async def http_exception_handler(r: Request, exc: HTTPException): -# """Log and return exception details in response.""" -# logging.error( -# "HTTPException: %s executing request: %s", exc.detail, r.base_url -# ) -# return JSONResponse( -# status_code=exc.status_code, content={"detail": exc.detail} -# ) +@app.exception_handler(HTTPException) +async def http_exception_handler(r: Request, exc: HTTPException): + """Log and return exception details in response.""" + logging.error( + "HTTPException: %s executing request: %s", exc.detail, r.base_url + ) + return JSONResponse( + status_code=exc.status_code, content={"detail": exc.detail} + ) diff --git a/swarms/server/server_models.py b/swarms/server/server_models.py index 4d307b59..8fe88206 100644 --- a/swarms/server/server_models.py +++ b/swarms/server/server_models.py @@ -1,4 +1,5 @@ """ Chatbot Server API Models """ +from swarms.prompts.chat_prompt import Role from strenum import StrEnum from pydantic import BaseModel @@ -32,13 +33,6 @@ class RAGFiles(BaseModel): files: list[RAGFile] -class Role(StrEnum): - """ The role of a message in a conversation. """ - SYSTEM = "system" - ASSISTANT = "assistant" - USER = "user" - - class Message(BaseModel): """ Defines the type of a Message with a role and content. """ role: Role @@ -55,8 +49,8 @@ class ChatRequest(BaseModel): tokenLimit=2048, ) messages: list[Message] = [ - Message(role=Role.SYSTEM, content="Hello, how may I help you?"), - Message(role=Role.USER, content=""), + Message(role=Role.AI, content="Hello, how may I help you?"), + Message(role=Role.HUMAN, content="What is Swarms?"), ] maxTokens: int = 2048 temperature: float = 0 diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index e33df302..91651da5 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -7,7 +7,7 @@ import random import sys import time import uuid -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, AsyncIterator, Callable, Dict, List, Optional, Tuple, Union import toml import yaml @@ -882,6 +882,237 @@ class Agent: f"Error running agent: {error} optimize your input parameters" ) raise error + + async def run_async( + self, + task: Optional[str] = None, + img: Optional[str] = None, + video: Optional[str] = None, + is_last: bool = False, + *args, + **kwargs, + ) -> Any: + """ + Run the autonomous agent loop + """ + try: + # self.agent_initialization() + + # Add task to memory + self.short_memory.add(role=self.user_name, content=task) + + # Set the loop count + loop_count = 0 + + # Clear the short memory + response = None + all_responses = [] + steps_pool = [] + + # if self.tokenizer is not None: + # self.check_available_tokens() + + while self.max_loops == "auto" or loop_count < self.max_loops: + loop_count += 1 + self.loop_count_print(loop_count, self.max_loops) + print("\n") + + # Dynamic temperature + if self.dynamic_temperature_enabled is True: + self.dynamic_temperature() + + # Task prompt + task_prompt = self.short_memory.return_history_as_string() + + # Parameters + attempt = 0 + success = False + while attempt < self.retry_attempts and not success: + try: + if self.long_term_memory is not None: + logger.info("Querying long term memory...") + self.memory_query(task_prompt) + + else: + response_args = ( + (task_prompt, *args) + if img is None + else (task_prompt, img, *args) + ) + response = self.llm(*response_args, **kwargs) + + # Conver to a str if the response is not a str + response = self.llm_output_parser(response) + + # Print + if self.streaming_on is True: + yield response + else: + print(response) + + # Add the response to the memory + self.short_memory.add( + role=self.agent_name, content=response + ) + + # Add to all responses + all_responses.append(response) + + # Log the step + out_step = self.log_step_metadata(response) + steps_pool.append(out_step) + + # TODO: Implement reliablity check + if self.tools is not None: + # self.parse_function_call_and_execute(response) + self.parse_and_execute_tools(response) + + if self.code_interpreter is True: + # Parse the code and execute + logger.info("Parsing code and executing...") + code = extract_code_from_markdown(response) + + output = self.code_executor.execute(code) + + # Add to memory + self.short_memory.add( + role=self.agent_name, content=output + ) + + # Run the llm on the output + response = self.llm( + self.short_memory.return_history_as_string() + ) + + # Add to all responses + all_responses.append(response) + self.short_memory.add( + role=self.agent_name, content=response + ) + + if self.evaluator: + logger.info("Evaluating response...") + evaluated_response = self.evaluator(response) + print( + "Evaluated Response:" + f" {evaluated_response}" + ) + self.short_memory.add( + role=self.agent_name, + content=evaluated_response, + ) + + # all_responses.append(evaluated_response) + + # Sentiment analysis + if self.sentiment_analyzer: + logger.info("Analyzing sentiment...") + self.sentiment_analysis_handler(response) + + # print(response) + + success = True # Mark as successful to exit the retry loop + + except Exception as e: + logger.error( + f"Attempt {attempt+1}: Error generating" + f" response: {e}" + ) + attempt += 1 + + if not success: + logger.error( + "Failed to generate a valid response after" + " retry attempts." + ) + break # Exit the loop if all retry attempts fail + + # # Check stopping conditions + # if self.stopping_token in response: + # break + if ( + self.stopping_condition is not None + and self._check_stopping_condition(response) + ): + logger.info("Stopping condition met.") + break + elif self.stopping_func is not None and self.stopping_func( + response + ): + logger.info("Stopping function met.") + break + + if self.interactive: + logger.info("Interactive mode enabled.") + user_input = colored(input("You: "), "red") + + # User-defined exit command + if ( + user_input.lower() + == self.custom_exit_command.lower() + ): + print("Exiting as per user request.") + break + + self.short_memory.add( + role=self.user_name, content=user_input + ) + + if self.loop_interval: + logger.info( + f"Sleeping for {self.loop_interval} seconds" + ) + time.sleep(self.loop_interval) + + if self.autosave is True: + logger.info("Autosaving agent state.") + self.save_state(self.saved_state_path) + + # Apply the cleaner function to the response + if self.output_cleaner is not None: + logger.info("Applying output cleaner to response.") + response = self.output_cleaner(response) + logger.info(f"Response after output cleaner: {response}") + + # print(response) + if self.agent_ops_on is True and is_last is True: + self.check_end_session_agentops() + + # final_response = " ".join(all_responses) + all_responses = [ + response + for response in all_responses + if response is not None + ] + final_response = " ".join(all_responses) + + # logger.info(f"Final Response: {final_response}") + if self.return_history: + yield self.short_memory.return_history_as_string() + + elif self.return_step_meta: + log = ManySteps( + agent_id=self.agent_id, + agent_name=self.agent_name, + task=task, + number_of_steps=self.max_loops, + steps=steps_pool, + full_history=self.short_memory.return_history_as_string(), + total_tokens=self.tokenizer.count_tokens( + self.short_memory.return_history_as_string() + ), + ) + + yield log.model_dump_json(indent=4) + + else: + yield final_response + + except Exception as error: + logger.info( + f"Error running agent: {error} optimize your input parameters" + ) + raise error def __call__(self, task: str = None, img: str = None, *args, **kwargs): """Call the agent @@ -1573,7 +1804,7 @@ class Agent: return response - def stream_response(self, response: str, delay: float = 0.001) -> None: + async def stream_response(self, response: str, delay: float = 0.001) -> AsyncIterator[str]: """ Streams the response token by token. @@ -1597,7 +1828,7 @@ class Agent: # Stream and print the response token by token for token in response.split(): print(token, end=" ", flush=True) - time.sleep(delay) + yield token print() # Ensure a newline after streaming except Exception as e: print(f"An error occurred during streaming: {e}") @@ -1884,9 +2115,7 @@ class Agent: full_memory = self.short_memory.return_history_as_string() prompt_tokens = self.tokenizer.count_tokens(full_memory) completion_tokens = self.tokenizer.count_tokens(response) - total_tokens = self.tokenizer.count_tokens( - prompt_tokens + completion_tokens - ) + total_tokens = prompt_tokens + completion_tokens logger.info("Logging step metadata...")