removed most langchain dependencies for tighter swarms integration and rebuilt chatbot based on Agents

pull/570/head
Richard Anthony Hein 8 months ago
parent 797a48291f
commit 6ed18328bb

@ -0,0 +1,38 @@
import os
import asyncio
from swarms.models.popular_llms import OpenAIChatLLM
from weather_agent_example import WeatherAgent
# Set the OpenAI environment to use vLLM
api_key = os.getenv("OPENAI_API_KEY") or "EMPTY" # for vllm
api_base = os.getenv("OPENAI_API_BASE") or "http://localhost:8000/v1" # for vllm
weather_api_key= "af6ef989b5c50a91ca068cc00df125b7", # Replace with your weather API key
# Create an instance of the OpenAIChat class
llm = OpenAIChatLLM(
base_url=api_base,
api_key=api_key,
model="NousResearch/Meta-Llama-3-8B-Instruct",
temperature=0,
streaming=False
)
agent = WeatherAgent(
weather_api_key=weather_api_key,
city_name="Nepean, Ontario",
agent_name="Weather Reporting Agent",
system_prompt="You are an exciting and professional weather reporting agent. Given a city you will summarize the weather every hour.",
llm=llm,
max_loops=3,
autosave=True,
dynamic_temperature_enabled=True,
dashboard=False,
verbose=True,
streaming_on=False,
saved_state_path="weather_agent_state.json",
user_name="RAH@EntangleIT.com",
retry_attempts=3,
context_length=200000,
)
agent.run("Summarize the following weather_data JSON for the average human. Translate the UNIX system time ('dt' in the JSON) to UTC. Note the temperature is listed in Kelvin in the JSON, so please translate Kelvin to Farheinheit and Celcius. Base your report only on the JSON and output the details in Markdown. \n")

@ -0,0 +1,29 @@
import time
import json
from swarms import Agent
import requests
class WeatherAgent(Agent):
def __init__(self, weather_api_key, city_name, **kwargs):
super().__init__(city_name, **kwargs)
self.weather_api_key = weather_api_key
self.city_name = city_name
def check_weather(self):
response = requests.get('https://api.openweathermap.org/data/2.5/weather', params={'q': self.city_name, 'appid': self.weather_api_key})
if response.status_code == 200:
weather_data = response.json()
return weather_data
else:
print('Failed to retrieve weather data')
raise ValueError("Failed to retrieve weather data or got invalid weather JSON data.")
def run(self, prompt):
while True:
weather_data = self.check_weather()
weather_data_json = json.dumps(weather_data) # Convert dict to JSON string
print("raw weather data: " + weather_data_json)
super().run(prompt + weather_data_json)
time.sleep(60 * 60) # Check weather every hour

@ -48,7 +48,7 @@ agent = Agent(
) )
async def startup_event(): async def startup_event():
agent.streaming( agent.stream_reponse(
"What are the components of a startups stock incentive equity plan" "What are the components of a startups stock incentive equity plan"
) )

@ -1,7 +1,7 @@
from langchain_openai.chat_models.azure import ( from langchain_openai.chat_models.azure import (
AzureChatOpenAI, AzureChatOpenAI,
) )
from langchain_openai.chat_models import ( from langchain_openai import (
ChatOpenAI as OpenAIChat, ChatOpenAI as OpenAIChat,
) )
from langchain_community.llms.octoai_endpoint import OctoAIEndpoint from langchain_community.llms.octoai_endpoint import OctoAIEndpoint

@ -1,7 +1,7 @@
""" Customized Langchain StreamingResponse for Server-Side Events (SSE) """ """ Customized Langchain StreamingResponse for Server-Side Events (SSE) """
import asyncio import asyncio
from functools import partial from functools import partial
from typing import Any from typing import Any, AsyncIterator
from fastapi import status from fastapi import status
from langchain.chains.base import Chain from langchain.chains.base import Chain
@ -21,17 +21,16 @@ class StreamingResponse(EventSourceResponse):
def __init__( def __init__(
self, self,
*args: Any, content: AsyncIterator[Any],
content: Any = iter(()),
**kwargs: dict[str, Any],
) -> None: ) -> None:
"""Constructor method. """Constructor method.
Args: Args:
content: The content to stream. content: The content to stream.
""" """
super().__init__(content=content, *args, **kwargs) super().__init__(content=content)
self.content = content
async def stream_response(self, send: Send) -> None: async def stream_response(self, send: Send) -> None:
"""Streams data chunks to client by iterating over `content`. """Streams data chunks to client by iterating over `content`.
@ -50,7 +49,7 @@ class StreamingResponse(EventSourceResponse):
) )
try: try:
async for data in self.body_iterator: async for data in self.content:
chunk = ensure_bytes(data, self.sep) chunk = ensure_bytes(data, self.sep)
print(f"chunk: {chunk.decode()}") print(f"chunk: {chunk.decode()}")
await send( await send(
@ -78,7 +77,6 @@ class StreamingResponse(EventSourceResponse):
def enable_compression(self, force: bool=False): def enable_compression(self, force: bool=False):
raise NotImplementedError raise NotImplementedError
class LangchainStreamingResponse(StreamingResponse): class LangchainStreamingResponse(StreamingResponse):
"""StreamingResponse class for LangChain resources.""" """StreamingResponse class for LangChain resources."""
@ -178,3 +176,4 @@ class LangchainStreamingResponse(StreamingResponse):
def enable_compression(self, force: bool=False): def enable_compression(self, force: bool=False):
raise NotImplementedError raise NotImplementedError

@ -6,7 +6,8 @@ import os
# import torch # import torch
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import langchain from typing import AsyncIterator
from swarms.structs.agent import Agent
import tiktoken import tiktoken
from dotenv import load_dotenv from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Request from fastapi import FastAPI, HTTPException, Request
@ -15,22 +16,8 @@ from fastapi.responses import FileResponse, JSONResponse
from fastapi.routing import APIRouter from fastapi.routing import APIRouter
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from huggingface_hub import login from huggingface_hub import login
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains.conversational_retrieval.base import (
ConversationalRetrievalChain,
)
from langchain.chains.llm import LLMChain
from langchain.memory import ConversationBufferMemory
from langchain.memory.chat_message_histories.in_memory import (
ChatMessageHistory,
)
from langchain.prompts.prompt import PromptTemplate
from langchain_community.chat_models import ChatOpenAI
# from langchain_core.messages import AIMessage, HumanMessage, SystemMessage from swarms.prompts.chat_prompt import Message, Role
# from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from swarms.prompts.chat_prompt import Message
from swarms.prompts.conversational_RAG import ( from swarms.prompts.conversational_RAG import (
B_INST, B_INST,
B_SYS, B_SYS,
@ -38,11 +25,12 @@ from swarms.prompts.conversational_RAG import (
DOCUMENT_PROMPT_TEMPLATE, DOCUMENT_PROMPT_TEMPLATE,
E_INST, E_INST,
E_SYS, E_SYS,
QA_PROMPT_TEMPLATE, QA_PROMPT_TEMPLATE_STR,
) )
from swarms.server.responses import LangchainStreamingResponse from swarms.server.responses import StreamingResponse
from swarms.server.server_models import ChatRequest, Role from swarms.server.server_models import ChatRequest
from swarms.server.vector_store import VectorStorage from swarms.server.vector_store import VectorStorage
from swarms.models.popular_llms import OpenAIChatLLM
# Explicitly specify the path to the .env file # Explicitly specify the path to the .env file
# Two folders above the current file's directory # Two folders above the current file's directory
@ -106,10 +94,6 @@ tiktoken.model.MODEL_TO_ENCODING.update(
print("Logging in to huggingface.co...") print("Logging in to huggingface.co...")
login(token=hf_token) # login to huggingface.co login(token=hf_token) # login to huggingface.co
langchain.debug = True
langchain.verbose = True
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""Initializes the vector store in a background task.""" """Initializes the vector store in a background task."""
@ -144,14 +128,14 @@ if not os.path.exists(uploads):
vector_store = VectorStorage(directory=uploads, use_gpu=use_gpu) vector_store = VectorStorage(directory=uploads, use_gpu=use_gpu)
async def create_chain( async def create_chat(
messages: list[Message], messages: list[Message],
prompt: PromptTemplate = QA_PROMPT_TEMPLATE, prompt: str = QA_PROMPT_TEMPLATE_STR,
): ):
"""Creates the RAG Langchain conversational retrieval chain.""" """Creates the RAG conversational retrieval chain."""
print("Creating chain ...") print("Creating chat from history and relevant docs if any ...")
llm = ChatOpenAI( llm = OpenAIChatLLM(
api_key=openai_api_key, api_key=openai_api_key,
base_url=openai_api_base, base_url=openai_api_base,
model=model_name, model=model_name,
@ -159,68 +143,114 @@ async def create_chain(
streaming=True, streaming=True,
) )
# if llm is ALlamaCpp:
# llm.max_tokens = max_tokens_to_gen
# elif llm is AGPT4All:
# llm.n_predict = max_tokens_to_gen
# el
# if llm is AChatOllama:
# llm.max_tokens = max_tokens_to_gen
# if llm is VLLMAsync:
# llm.max_tokens = max_tokens_to_gen
retriever = await vector_store.get_retriever("swarms") retriever = await vector_store.get_retriever("swarms")
doc_retrieval_string = ""
chat_memory = ChatMessageHistory()
for message in messages: for message in messages:
if message.role == Role.USER: if message.role == Role.HUMAN:
chat_memory.add_user_message(message.content) doc_retrieval_string += f"{Role.HUMAN}: {message.content}\r\n"
elif message.role == Role.ASSISTANT: elif message.role == Role.AI:
chat_memory.add_ai_message(message.content) doc_retrieval_string += f"{Role.AI}: {message.content}\r\n"
memory = ConversationBufferMemory(
chat_memory=chat_memory,
memory_key="chat_history",
input_key="question",
output_key="answer",
return_messages=True,
)
question_generator = LLMChain( docs = retriever.invoke(doc_retrieval_string)
llm=llm,
prompt=CONDENSE_PROMPT_TEMPLATE,
memory=memory,
verbose=True,
output_key="answer",
)
stuff_chain = LLMChain( # find {context} in prompt and replace it with the docs page_content.
llm=llm, # Concatenate the content of all documents
prompt=prompt, context = "\n".join(doc.page_content for doc in docs)
verbose=True,
output_key="answer",
)
doc_chain = StuffDocumentsChain( # Replace {context} in the prompt with the concatenated document content
llm_chain=stuff_chain, prompt = prompt.replace("{context}", context)
document_variable_name="context",
document_prompt=DOCUMENT_PROMPT_TEMPLATE,
verbose=True,
output_key="answer",
memory=memory,
)
return ConversationalRetrievalChain( # Replace {chat_history} in the prompt with doc_retrieval_string
combine_docs_chain=doc_chain, prompt = prompt.replace("{chat_history}", doc_retrieval_string)
memory=memory,
retriever=retriever, # Replace {question} in the prompt with the last message.
question_generator=question_generator, prompt = prompt.replace("{question}", messages[-1].content)
return_generated_question=False,
return_source_documents=True, # Initialize the agent
output_key="answer", agent = Agent(
agent_name="Swarms QA ChatBot",
system_prompt=prompt,
llm=llm,
max_loops=1,
autosave=True,
# dynamic_temperature_enabled=True,
dashboard=False,
verbose=True, verbose=True,
streaming_on=True,
# interactive=True, # Set to False to disable interactive mode
dynamic_temperature_enabled=False,
saved_state_path="chatbot.json",
# tools=[#Add your functions here# ],
# stopping_token="Stop!",
# interactive=True,
# docs_folder="docs", # Enter your folder name
# pdf_path="docs/finance_agent.pdf",
# sop="Calculate the profit for a company.",
# sop_list=["Calculate the profit for a company."],
user_name="RAH@EntangleIT.com",
docs=[doc.page_content for doc in docs],
# # docs_folder="docs",
retry_attempts=3,
# context_length=1000,
# tool_schema = dict
context_length=200000,
# tool_schema=
# tools
# agent_ops_on=True,
) )
for message in messages[:-1]:
if message.role == Role.HUMAN:
agent.add_message_to_memory(message.content)
elif message.role == Role.AI:
agent.add_message_to_memory(message.content)
async for response in agent.run_async(messages[-1].content):
yield response
# memory = ConversationBufferMemory(
# chat_memory=chat_memory,
# memory_key="chat_history",
# input_key="question",
# output_key="answer",
# return_messages=True,
# )
# question_generator = LLMChain(
# llm=llm,
# prompt=CONDENSE_PROMPT_TEMPLATE,
# memory=memory,
# verbose=True,
# output_key="answer",
# )
# stuff_chain = LLMChain(
# llm=llm,
# prompt=prompt,
# verbose=True,
# output_key="answer",
# )
# doc_chain = StuffDocumentsChain(
# llm_chain=stuff_chain,
# document_variable_name="context",
# document_prompt=DOCUMENT_PROMPT_TEMPLATE,
# verbose=True,
# output_key="answer",
# memory=memory,
# )
# return ConversationalRetrievalChain(
# combine_docs_chain=doc_chain,
# memory=memory,
# retriever=retriever,
# question_generator=question_generator,
# return_generated_question=False,
# return_source_documents=True,
# output_key="answer",
# verbose=True,
# )
@app.post( @app.post(
"/chat", "/chat",
summary="Chatbot", summary="Chatbot",
@ -228,29 +258,29 @@ async def create_chain(
) )
async def chat(request: ChatRequest): async def chat(request: ChatRequest):
""" Handles chatbot chat POST requests """ """ Handles chatbot chat POST requests """
chain = await create_chain( response = create_chat(
messages=request.messages[:-1], messages=request.messages,
prompt=PromptTemplate.from_template( prompt=request.prompt.strip()
f"{B_INST}{B_SYS}{request.prompt.strip()}{E_SYS}{E_INST}"
),
)
json_config = {
"question": request.messages[-1].content,
"chat_history": [
message.content for message in request.messages[:-1]
],
# "callbacks": [
# StreamingStdOutCallbackHandler(),
# TokenStreamingCallbackHandler(output_key="answer"),
# SourceDocumentsStreamingCallbackHandler(),
# ],
}
return LangchainStreamingResponse(
chain=chain,
config=json_config,
run_mode="async"
) )
# return response
return StreamingResponse(content=response)
# json_config = {
# "question": request.messages[-1].content,
# "chat_history": [
# message.content for message in request.messages[:-1]
# ],
# # "callbacks": [
# # StreamingStdOutCallbackHandler(),
# # TokenStreamingCallbackHandler(output_key="answer"),
# # SourceDocumentsStreamingCallbackHandler(),
# # ],
# }
# return LangchainStreamingResponse(
# chain=chain,
# config=json_config,
# run_mode="async"
# )
@app.get("/") @app.get("/")
def root(): def root():
@ -274,12 +304,12 @@ def favicon():
logging.basicConfig(level=logging.ERROR) logging.basicConfig(level=logging.ERROR)
# @app.exception_handler(HTTPException) @app.exception_handler(HTTPException)
# async def http_exception_handler(r: Request, exc: HTTPException): async def http_exception_handler(r: Request, exc: HTTPException):
# """Log and return exception details in response.""" """Log and return exception details in response."""
# logging.error( logging.error(
# "HTTPException: %s executing request: %s", exc.detail, r.base_url "HTTPException: %s executing request: %s", exc.detail, r.base_url
# ) )
# return JSONResponse( return JSONResponse(
# status_code=exc.status_code, content={"detail": exc.detail} status_code=exc.status_code, content={"detail": exc.detail}
# ) )

@ -1,4 +1,5 @@
""" Chatbot Server API Models """ """ Chatbot Server API Models """
from swarms.prompts.chat_prompt import Role
from strenum import StrEnum from strenum import StrEnum
from pydantic import BaseModel from pydantic import BaseModel
@ -32,13 +33,6 @@ class RAGFiles(BaseModel):
files: list[RAGFile] files: list[RAGFile]
class Role(StrEnum):
""" The role of a message in a conversation. """
SYSTEM = "system"
ASSISTANT = "assistant"
USER = "user"
class Message(BaseModel): class Message(BaseModel):
""" Defines the type of a Message with a role and content. """ """ Defines the type of a Message with a role and content. """
role: Role role: Role
@ -55,8 +49,8 @@ class ChatRequest(BaseModel):
tokenLimit=2048, tokenLimit=2048,
) )
messages: list[Message] = [ messages: list[Message] = [
Message(role=Role.SYSTEM, content="Hello, how may I help you?"), Message(role=Role.AI, content="Hello, how may I help you?"),
Message(role=Role.USER, content=""), Message(role=Role.HUMAN, content="What is Swarms?"),
] ]
maxTokens: int = 2048 maxTokens: int = 2048
temperature: float = 0 temperature: float = 0

@ -7,7 +7,7 @@ import random
import sys import sys
import time import time
import uuid import uuid
from typing import Any, Callable, Dict, List, Optional, Tuple, Union from typing import Any, AsyncIterator, Callable, Dict, List, Optional, Tuple, Union
import toml import toml
import yaml import yaml
@ -882,6 +882,237 @@ class Agent:
f"Error running agent: {error} optimize your input parameters" f"Error running agent: {error} optimize your input parameters"
) )
raise error raise error
async def run_async(
self,
task: Optional[str] = None,
img: Optional[str] = None,
video: Optional[str] = None,
is_last: bool = False,
*args,
**kwargs,
) -> Any:
"""
Run the autonomous agent loop
"""
try:
# self.agent_initialization()
# Add task to memory
self.short_memory.add(role=self.user_name, content=task)
# Set the loop count
loop_count = 0
# Clear the short memory
response = None
all_responses = []
steps_pool = []
# if self.tokenizer is not None:
# self.check_available_tokens()
while self.max_loops == "auto" or loop_count < self.max_loops:
loop_count += 1
self.loop_count_print(loop_count, self.max_loops)
print("\n")
# Dynamic temperature
if self.dynamic_temperature_enabled is True:
self.dynamic_temperature()
# Task prompt
task_prompt = self.short_memory.return_history_as_string()
# Parameters
attempt = 0
success = False
while attempt < self.retry_attempts and not success:
try:
if self.long_term_memory is not None:
logger.info("Querying long term memory...")
self.memory_query(task_prompt)
else:
response_args = (
(task_prompt, *args)
if img is None
else (task_prompt, img, *args)
)
response = self.llm(*response_args, **kwargs)
# Conver to a str if the response is not a str
response = self.llm_output_parser(response)
# Print
if self.streaming_on is True:
yield response
else:
print(response)
# Add the response to the memory
self.short_memory.add(
role=self.agent_name, content=response
)
# Add to all responses
all_responses.append(response)
# Log the step
out_step = self.log_step_metadata(response)
steps_pool.append(out_step)
# TODO: Implement reliablity check
if self.tools is not None:
# self.parse_function_call_and_execute(response)
self.parse_and_execute_tools(response)
if self.code_interpreter is True:
# Parse the code and execute
logger.info("Parsing code and executing...")
code = extract_code_from_markdown(response)
output = self.code_executor.execute(code)
# Add to memory
self.short_memory.add(
role=self.agent_name, content=output
)
# Run the llm on the output
response = self.llm(
self.short_memory.return_history_as_string()
)
# Add to all responses
all_responses.append(response)
self.short_memory.add(
role=self.agent_name, content=response
)
if self.evaluator:
logger.info("Evaluating response...")
evaluated_response = self.evaluator(response)
print(
"Evaluated Response:"
f" {evaluated_response}"
)
self.short_memory.add(
role=self.agent_name,
content=evaluated_response,
)
# all_responses.append(evaluated_response)
# Sentiment analysis
if self.sentiment_analyzer:
logger.info("Analyzing sentiment...")
self.sentiment_analysis_handler(response)
# print(response)
success = True # Mark as successful to exit the retry loop
except Exception as e:
logger.error(
f"Attempt {attempt+1}: Error generating"
f" response: {e}"
)
attempt += 1
if not success:
logger.error(
"Failed to generate a valid response after"
" retry attempts."
)
break # Exit the loop if all retry attempts fail
# # Check stopping conditions
# if self.stopping_token in response:
# break
if (
self.stopping_condition is not None
and self._check_stopping_condition(response)
):
logger.info("Stopping condition met.")
break
elif self.stopping_func is not None and self.stopping_func(
response
):
logger.info("Stopping function met.")
break
if self.interactive:
logger.info("Interactive mode enabled.")
user_input = colored(input("You: "), "red")
# User-defined exit command
if (
user_input.lower()
== self.custom_exit_command.lower()
):
print("Exiting as per user request.")
break
self.short_memory.add(
role=self.user_name, content=user_input
)
if self.loop_interval:
logger.info(
f"Sleeping for {self.loop_interval} seconds"
)
time.sleep(self.loop_interval)
if self.autosave is True:
logger.info("Autosaving agent state.")
self.save_state(self.saved_state_path)
# Apply the cleaner function to the response
if self.output_cleaner is not None:
logger.info("Applying output cleaner to response.")
response = self.output_cleaner(response)
logger.info(f"Response after output cleaner: {response}")
# print(response)
if self.agent_ops_on is True and is_last is True:
self.check_end_session_agentops()
# final_response = " ".join(all_responses)
all_responses = [
response
for response in all_responses
if response is not None
]
final_response = " ".join(all_responses)
# logger.info(f"Final Response: {final_response}")
if self.return_history:
yield self.short_memory.return_history_as_string()
elif self.return_step_meta:
log = ManySteps(
agent_id=self.agent_id,
agent_name=self.agent_name,
task=task,
number_of_steps=self.max_loops,
steps=steps_pool,
full_history=self.short_memory.return_history_as_string(),
total_tokens=self.tokenizer.count_tokens(
self.short_memory.return_history_as_string()
),
)
yield log.model_dump_json(indent=4)
else:
yield final_response
except Exception as error:
logger.info(
f"Error running agent: {error} optimize your input parameters"
)
raise error
def __call__(self, task: str = None, img: str = None, *args, **kwargs): def __call__(self, task: str = None, img: str = None, *args, **kwargs):
"""Call the agent """Call the agent
@ -1573,7 +1804,7 @@ class Agent:
return response return response
def stream_response(self, response: str, delay: float = 0.001) -> None: async def stream_response(self, response: str, delay: float = 0.001) -> AsyncIterator[str]:
""" """
Streams the response token by token. Streams the response token by token.
@ -1597,7 +1828,7 @@ class Agent:
# Stream and print the response token by token # Stream and print the response token by token
for token in response.split(): for token in response.split():
print(token, end=" ", flush=True) print(token, end=" ", flush=True)
time.sleep(delay) yield token
print() # Ensure a newline after streaming print() # Ensure a newline after streaming
except Exception as e: except Exception as e:
print(f"An error occurred during streaming: {e}") print(f"An error occurred during streaming: {e}")
@ -1884,9 +2115,7 @@ class Agent:
full_memory = self.short_memory.return_history_as_string() full_memory = self.short_memory.return_history_as_string()
prompt_tokens = self.tokenizer.count_tokens(full_memory) prompt_tokens = self.tokenizer.count_tokens(full_memory)
completion_tokens = self.tokenizer.count_tokens(response) completion_tokens = self.tokenizer.count_tokens(response)
total_tokens = self.tokenizer.count_tokens( total_tokens = prompt_tokens + completion_tokens
prompt_tokens + completion_tokens
)
logger.info("Logging step metadata...") logger.info("Logging step metadata...")

Loading…
Cancel
Save