Merge branch 'master' of https://github.com/kyegomez/swarms into memory

memory
Sashin 1 year ago
commit 285d36ca6f

@ -65,7 +65,7 @@ jobs:
# https://github.com/docker/metadata-action # https://github.com/docker/metadata-action
- name: Extract Docker metadata - name: Extract Docker metadata
id: meta id: meta
uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0 uses: docker/metadata-action@31cebacef4805868f9ce9a0cb03ee36c32df2ac4 # v5.3.0
with: with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}

@ -26,7 +26,7 @@ jobs:
- name: Build package - name: Build package
run: python -m build run: python -m build
- name: Publish package - name: Publish package
uses: pypa/gh-action-pypi-publish@b7f401de30cb6434a1e19f805ff006643653240e uses: pypa/gh-action-pypi-publish@2f6f737ca5f74c637829c0f5c3acd0e29ea5e8bf
with: with:
user: __token__ user: __token__
password: ${{ secrets.PYPI_API_TOKEN }} password: ${{ secrets.PYPI_API_TOKEN }}

@ -12,7 +12,7 @@ jobs:
permissions: write-all permissions: write-all
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/first-interaction@v1.2.0 - uses: actions/first-interaction@v1.3.0
with: with:
repo-token: ${{ secrets.GITHUB_TOKEN }} repo-token: ${{ secrets.GITHUB_TOKEN }}
issue-message: "Hello there, thank you for opening an Issue ! 🙏🏻 The team was notified and they will get back to you asap." issue-message: "Hello there, thank you for opening an Issue ! 🙏🏻 The team was notified and they will get back to you asap."

1
.gitignore vendored

@ -9,6 +9,7 @@ video/
dataframe/ dataframe/
static/generated static/generated
runs
swarms/__pycache__ swarms/__pycache__
venv venv
.DS_Store .DS_Store

@ -33,7 +33,6 @@ Run example in Collab: <a target="_blank" href="https://colab.research.google.co
- Enterprise Grade + Production Grade: `Agent` is designed and optimized for automating real-world tasks at scale! - Enterprise Grade + Production Grade: `Agent` is designed and optimized for automating real-world tasks at scale!
```python ```python
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
@ -55,12 +54,12 @@ llm = OpenAIChat(
) )
## Initialize the Agent ## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, dashboard=True) agent = Agent(llm=llm, max_loops=1, autosave=True, dashboard=True)
# Run the Agent on a task # Run the workflow on a task
out = agent.run("Generate a 10,000 word blog on health and wellness.") out = agent.run("Generate a 10,000 word blog on health and wellness.")
print(out)
``` ```

@ -20,7 +20,12 @@ llm = OpenAIChat(
## Initialize the workflow ## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, dashboard=True) agent = Agent(
llm=llm,
max_loops=1,
autosave=True,
dashboard=True,
)
# Run the workflow on a task # Run the workflow on a task
out = agent.run("Generate a 10,000 word blog on health and wellness.") out = agent.run("Generate a 10,000 word blog on health and wellness.")

@ -10,43 +10,62 @@ openai_api_key = os.getenv("OPENAI_API_KEY")
stability_api_key = os.getenv("STABILITY_API_KEY") stability_api_key = os.getenv("STABILITY_API_KEY")
# Initialize the language model and image generation model # Initialize the language model and image generation model
llm = OpenAIChat(openai_api_key=openai_api_key, temperature=0.5, max_tokens=3000) llm = OpenAIChat(
openai_api_key=openai_api_key, temperature=0.5, max_tokens=3000
)
sd_api = StableDiffusion(api_key=stability_api_key) sd_api = StableDiffusion(api_key=stability_api_key)
# Creative Concept Generator for Product Ads # Creative Concept Generator for Product Ads
class ProductAdConceptGenerator: class ProductAdConceptGenerator:
def __init__(self, product_name): def __init__(self, product_name):
self.product_name = product_name self.product_name = product_name
self.themes = [ self.themes = [
"futuristic", "rustic", "luxurious", "minimalistic", "vibrant", "elegant", "futuristic",
"retro", "urban", "ethereal", "surreal", "artistic", "tech-savvy", "rustic",
"vintage", "natural", "sophisticated", "playful", "dynamic", "serene", "lasers," "lightning" "luxurious",
"minimalistic",
"vibrant",
"elegant",
"retro",
"urban",
"ethereal",
"surreal",
"artistic",
"tech-savvy",
"vintage",
"natural",
"sophisticated",
"playful",
"dynamic",
"serene",
"lasers,lightning",
] ]
self.contexts = [ self.contexts = [
"in an everyday setting", "in a rave setting", "in an abstract environment", "in an everyday setting",
"in an adventurous context", "surrounded by nature", "in a high-tech setting", "in a rave setting",
"in a historical context", "in a busy urban scene", "in a tranquil and peaceful setting", "in an abstract environment",
"against a backdrop of city lights", "in a surreal dreamscape", "in a festive atmosphere", "in an adventurous context",
"in a luxurious setting", "in a playful and colorful background", "in an ice cave setting", "surrounded by nature",
"in a serene and calm landscape" "in a high-tech setting",
"in a historical context",
"in a busy urban scene",
"in a tranquil and peaceful setting",
"against a backdrop of city lights",
"in a surreal dreamscape",
"in a festive atmosphere",
"in a luxurious setting",
"in a playful and colorful background",
"in an ice cave setting",
"in a serene and calm landscape",
] ]
<<<<<<< HEAD
=======
self.contexts = [ self.contexts = [
"high realism product ad (extremely creative)" "high realism product ad (extremely creative)"
] ]
>>>>>>> 831147e ([CODE QUALITY])
def generate_concept(self): def generate_concept(self):
theme = random.choice(self.themes) theme = random.choice(self.themes)
context = random.choice(self.contexts) context = random.choice(self.contexts)
<<<<<<< HEAD
return f"An ad for {self.product_name} that embodies a {theme} theme {context}"
# User input
product_name = input("Enter a product name for ad creation (e.g., 'PS5', 'AirPods', 'Kirkland Vodka'): ")
social_media_platform = input("Enter a social media platform (e.g., 'Facebook', 'Twitter', 'Instagram'): ")
=======
return ( return (
f"{theme} inside a {style} {self.product_name}, {context}" f"{theme} inside a {style} {self.product_name}, {context}"
) )
@ -57,7 +76,6 @@ product_name = input(
"Enter a product name for ad creation (e.g., 'PS5', 'AirPods'," "Enter a product name for ad creation (e.g., 'PS5', 'AirPods',"
" 'Kirkland Vodka'): " " 'Kirkland Vodka'): "
) )
>>>>>>> 831147e ([CODE QUALITY])
# Generate creative concept # Generate creative concept
concept_generator = ProductAdConceptGenerator(product_name) concept_generator = ProductAdConceptGenerator(product_name)
@ -68,15 +86,13 @@ image_paths = sd_api.run(creative_concept)
# Generate ad copy # Generate ad copy
ad_copy_agent = Agent(llm=llm, max_loops=1) ad_copy_agent = Agent(llm=llm, max_loops=1)
ad_copy_prompt = f"Write a compelling {social_media_platform} ad copy for a product photo showing {product_name} {creative_concept}." ad_copy_prompt = (
f"Write a compelling {social_media_platform} ad copy for a"
f" product photo showing {product_name} {creative_concept}."
)
ad_copy = ad_copy_agent.run(task=ad_copy_prompt) ad_copy = ad_copy_agent.run(task=ad_copy_prompt)
# Output the results # Output the results
<<<<<<< HEAD
print("Creative Concept:", creative_concept)
print("Image Path:", image_paths[0] if image_paths else "No image generated")
print("Ad Copy:", ad_copy)
=======
print("Creative Concept:", concept_result) print("Creative Concept:", concept_result)
print("Design Ideas:", design_result) print("Design Ideas:", design_result)
print("Ad Copy:", copywriting_result) print("Ad Copy:", copywriting_result)
@ -84,4 +100,3 @@ print(
"Image Path:", "Image Path:",
image_paths[0] if image_paths else "No image generated", image_paths[0] if image_paths else "No image generated",
) )
>>>>>>> 831147e ([CODE QUALITY])

@ -0,0 +1,76 @@
import os
from dotenv import load_dotenv
from swarms.models import OpenAIChat
from swarms.models.stable_diffusion import StableDiffusion
from swarms.structs import Agent, SequentialWorkflow
import swarms.prompts.education as edu_prompts
# Load environment variables
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
stability_api_key = os.getenv("STABILITY_API_KEY")
# Initialize language model
llm = OpenAIChat(
openai_api_key=api_key, temperature=0.5, max_tokens=3000
)
# Initialize Stable Diffusion
sd_api = StableDiffusion(api_key=stability_api_key)
# User preferences (can be dynamically set in a real application)
user_preferences = {
"subjects": "Cognitive Architectures",
"learning_style": "Visual",
"challenge_level": "Moderate",
}
# Formatted prompts from user preferences
curriculum_prompt = edu_prompts.CURRICULUM_DESIGN_PROMPT.format(
**user_preferences
)
interactive_prompt = edu_prompts.INTERACTIVE_LEARNING_PROMPT.format(
**user_preferences
)
sample_prompt = edu_prompts.SAMPLE_TEST_PROMPT.format(
**user_preferences
)
image_prompt = edu_prompts.IMAGE_GENERATION_PROMPT.format(
**user_preferences
)
# Initialize agents for different educational tasks
curriculum_agent = Agent(llm=llm, max_loops=1, sop=curriculum_prompt)
interactive_learning_agent = Agent(
llm=llm, max_loops=1, sop=interactive_prompt
)
sample_lesson_agent = Agent(llm=llm, max_loops=1, sop=sample_prompt)
# Create Sequential Workflow
workflow = SequentialWorkflow(max_loops=1)
# Add tasks to workflow with personalized prompts
workflow.add(curriculum_agent, "Generate a curriculum")
workflow.add(
interactive_learning_agent, "Generate an interactive lesson"
)
workflow.add(sample_lesson_agent, "Generate a practice test")
# Execute the workflow for text-based tasks
workflow.run()
# Generate an image using Stable Diffusion
image_result = sd_api.run(image_prompt)
# Output results for each task
for task in workflow.tasks:
print(
f"Task Description: {task.description}\nResult:"
f" {task.result}\n"
)
# Output image result
print(
"Image Generation Task: Generate an image for the interactive"
f" lesson\nResult: {image_result}"
)

@ -0,0 +1,149 @@
import os
from dotenv import load_dotenv
from swarms.models import OpenAIChat
from swarms.prompts.code_interpreter import CODE_INTERPRETER
from swarms.structs import Agent
from swarms.prompts.programming import TEST_SOP, DOCUMENTATION_SOP
from termcolor import colored
load_dotenv()
FEATURE = (
"Implement an all-new signup system in typescript using supabase"
)
CODEBASE = """
import React, { useState } from 'react';
import UpperPanel from './UpperPanel';
import LowerPanel from './LowerPanel';
const MainPanel = () => {
const [promptInstructionForLowerPanel, setPromptInstructionForLowerPanel] = useState('');
const [formData, setFormData] = useState('');
const [isLoading, setIsLoading] = useState(false);
return (
<div className="flex h-screen">
<UpperPanel setPromptInstructionForLowerPanel={setPromptInstructionForLowerPanel}
isLoading={isLoading}
setIsLoading={setIsLoading}
/>
<LowerPanel promptInstruction={promptInstructionForLowerPanel} isLoading={isLoading} />
</div>
);
};
export default MainPanel;
"""
# Load the environment variables
api_key = os.getenv("OPENAI_API_KEY")
# Initialize the language agent
llm = OpenAIChat(
model_name="gpt-4",
openai_api_key=api_key,
temperature=0.5,
max_tokens=4000,
)
# Product Manager Agent init
product_manager_agent = Agent(
llm=llm, max_loops=1, sop=CODE_INTERPRETER, autosave=True
)
# Initialize the agent with the language agent
feature_implementer_frontend = Agent(
llm=llm, max_loops=1, sop=CODE_INTERPRETER, autosave=True
)
# Create another agent for a different task
feature_implementer_backend = Agent(
llm=llm, max_loops=1, sop=CODE_INTERPRETER, autosave=True
)
# Create another agent for a different task
tester_agent = Agent(
llm=llm, max_loops=1, sop=TEST_SOP, autosave=True
)
# Create another agent for a different task
documenting_agent = Agent(
llm=llm, max_loops=1, sop=DOCUMENTATION_SOP, autosave=True
)
# Product Agent prompt
def feature_codebase_product_agentprompt(
feature: str, codebase: str
) -> str:
prompt = (
"Create an algorithmic pseudocode for an all-new feature:"
f" {feature} based on this codebase: {codebase}"
)
return prompt
# Product Manager Agent
product_manager_out = product_manager_agent.run(
feature_codebase_product_agentprompt(FEATURE, CODEBASE)
)
print(
colored(
(
"---------------------------- Product Manager Plan:"
f" {product_manager_out}"
),
"cyan",
)
)
# Feature Implementer Agent
agent1_out = feature_implementer_frontend.run(
f"Create the backend code for {FEATURE} in markdown based off of"
f" this algorithmic pseudocode: {product_manager_out} the logic"
f" based on the following codebase: {CODEBASE}"
)
print(
colored(
(
"--------------------- Feature Implementer Code logic:"
f" {agent1_out}"
),
"cyan",
)
)
# Tester agent
tester_agent_out = tester_agent.run(
f"Create tests for the following code: {agent1_out}"
)
print(
colored(
(
"---------------------------- Tests for the logic:"
f" {tester_agent_out}"
),
"green",
)
)
# Documentation Agent
documenter_agent_out = documenting_agent.run(
f"Document the following code: {agent1_out}"
)
print(
colored(
(
"---------------------------- Documentation for the"
f" logic: {documenter_agent_out}"
),
"yellow",
)
)

@ -0,0 +1,13 @@
#!/bin/bash
# Define the base URL
base_url="http://localhost:8000"
# Define the JSON payload
payload='{"feature": "login system", "codebase": "existing codebase here"}'
# Send POST request
echo "Sending request to /agent/ endpoint..."
response=$(curl -s -X POST "$base_url/agent/" -H "Content-Type: application/json" -d "$payload")
echo "Response: $response"

@ -0,0 +1,13 @@
#!/bin/bash
# Define the base URL
base_url="http://localhost:8000"
# Define the JSON payload
payload='{"feature": "login system", "codebase": "existing codebase here"}'
# Send POST request
echo "Sending request to /agent/ endpoint..."
response=$(curl -s -X POST "$base_url/agent/" -H "Content-Type: application/json" -d "$payload")
echo "Response: $response"

@ -1,96 +0,0 @@
import os
import datetime
from dotenv import load_dotenv
from swarms.models.stable_diffusion import StableDiffusion
from swarms.models.gpt4_vision_api import GPT4VisionAPI
from swarms.models import OpenAIChat
from swarms.structs import Agent
# Load environment variables
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
stability_api_key = os.getenv("STABILITY_API_KEY")
# Initialize the models
vision_api = GPT4VisionAPI(api_key=openai_api_key)
sd_api = StableDiffusion(api_key=stability_api_key)
gpt_api = OpenAIChat(openai_api_key=openai_api_key)
class Idea2Image(Agent):
def __init__(self, llm, vision_api):
super().__init__(llm=llm)
self.vision_api = vision_api
def run(self, initial_prompt, num_iterations, run_folder):
current_prompt = initial_prompt
for i in range(num_iterations):
print(f"Iteration {i}: Image generation and analysis")
if i == 0:
current_prompt = self.enrich_prompt(current_prompt)
print(f"Enriched Prompt: {current_prompt}")
img = sd_api.generate_and_move_image(
current_prompt, i, run_folder
)
if not img:
print("Failed to generate image")
break
print(f"Generated image at: {img}")
analysis = (
self.vision_api.run(img, current_prompt)
if img
else None
)
if analysis:
current_prompt += (
". " + analysis[:500]
) # Ensure the analysis is concise
print(f"Image Analysis: {analysis}")
else:
print(f"Failed to analyze image at: {img}")
def enrich_prompt(self, prompt):
enrichment_task = (
"Create a concise and effective image generation prompt"
" within 400 characters or less, based on Stable"
" Diffusion and Dalle best practices. Starting prompt:"
f" \n\n'{prompt}'\n\nImprove the prompt with any"
" applicable details or keywords by considering the"
" following aspects: \n1. Subject details (like actions,"
" emotions, environment) \n2. Artistic style (such as"
" surrealism, hyperrealism) \n3. Medium (digital"
" painting, oil on canvas) \n4. Color themes and"
" lighting (like warm colors, cinematic lighting) \n5."
" Composition and framing (close-up, wide-angle) \n6."
" Additional elements (like a specific type of"
" background, weather conditions) \n7. Any other"
" artistic or thematic details that can make the image"
" more vivid and compelling."
)
llm_result = self.llm.generate([enrichment_task])
return (
llm_result.generations[0][0].text[:500]
if llm_result.generations
else None
)
# User input and setup
user_prompt = input("Prompt for image generation: ")
num_iterations = int(
input("Enter the number of iterations for image improvement: ")
)
run_folder = os.path.join(
"runs", datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
)
os.makedirs(run_folder, exist_ok=True)
# Initialize and run the agent
idea2image_agent = Idea2Image(gpt_api, vision_api)
idea2image_agent.run(user_prompt, num_iterations, run_folder)
print("Image improvement process completed.")

@ -1,7 +0,0 @@
"""
Idea 2 img
task -> gpt4 text -> dalle3 img -> gpt4vision img + text analyze img -> dalle3 img -> loop
"""
from swarms.models.gpt4_vision_api import GPT4VisionAPI

@ -0,0 +1,185 @@
import datetime
import os
import streamlit as st
from dotenv import load_dotenv
from swarms.models import OpenAIChat
from swarms.models.gpt4_vision_api import GPT4VisionAPI
from swarms.models.stable_diffusion import StableDiffusion
from swarms.structs import Agent
# Load environment variables
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
stability_api_key = os.getenv("STABLE_API_KEY")
# Initialize the models
vision_api = GPT4VisionAPI(api_key=openai_api_key)
sd_api = StableDiffusion(api_key=stability_api_key)
gpt_api = OpenAIChat(openai_api_key=openai_api_key)
class Idea2Image(Agent):
def __init__(self, llm, vision_api):
super().__init__(llm=llm)
self.vision_api = vision_api
def run(self, initial_prompt, num_iterations, run_folder):
current_prompt = initial_prompt
for i in range(num_iterations):
print(f"Iteration {i}: Image generation and analysis")
if i == 0:
current_prompt = self.enrich_prompt(current_prompt)
print(f"Enriched Prompt: {current_prompt}")
img = sd_api.generate_and_move_image(
current_prompt, i, run_folder
)
if not img:
print("Failed to generate image")
break
print(f"Generated image at: {img}")
analysis = (
self.vision_api.run(img, current_prompt)
if img
else None
)
if analysis:
current_prompt += (
". " + analysis[:500]
) # Ensure the analysis is concise
print(f"Image Analysis: {analysis}")
else:
print(f"Failed to analyze image at: {img}")
def enrich_prompt(self, prompt):
enrichment_task = (
"Create a concise and effective image generation prompt"
" within 400 characters or less, based on Stable"
" Diffusion and Dalle best practices to help it create"
" much better images. Starting prompt:"
f" \n\n'{prompt}'\n\nImprove the prompt with any"
" applicable details or keywords by considering the"
" following aspects: \n1. Subject details (like actions,"
" emotions, environment) \n2. Artistic style (such as"
" surrealism, hyperrealism) \n3. Medium (digital"
" painting, oil on canvas) \n4. Color themes and"
" lighting (like warm colors, cinematic lighting) \n5."
" Composition and framing (close-up, wide-angle) \n6."
" Additional elements (like a specific type of"
" background, weather conditions) \n7. Any other"
" artistic or thematic details that can make the image"
" more vivid and compelling. Help the image generator"
" create better images by enriching the prompt."
)
llm_result = self.llm.generate([enrichment_task])
return (
llm_result.generations[0][0].text[:500]
if llm_result.generations
else None
)
def run_gradio(self, initial_prompt, num_iterations, run_folder):
results = []
current_prompt = initial_prompt
for i in range(num_iterations):
enriched_prompt = (
self.enrich_prompt(current_prompt)
if i == 0
else current_prompt
)
img_path = sd_api.generate_and_move_image(
enriched_prompt, i, run_folder
)
analysis = (
self.vision_api.run(img_path, enriched_prompt)
if img_path
else None
)
if analysis:
current_prompt += (
". " + analysis[:500]
) # Ensuring the analysis is concise
results.append((enriched_prompt, img_path, analysis))
return results
# print(
# colored("---------------------------------------- MultiModal Tree of Thought agents for Image Generation", "cyan", attrs=["bold"])
# )
# # User input and setup
# user_prompt = input("Prompt for image generation: ")
# num_iterations = int(
# input("Enter the number of iterations for image improvement: ")
# )
# run_folder = os.path.join(
# "runs", datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# )
# os.makedirs(run_folder, exist_ok=True)
# print(
# colored(
# f"---------------------------------- Running Multi-Modal Tree of thoughts agent with {num_iterations} iterations", "green"
# )
# )
# # Initialize and run the agent
# idea2image_agent = Idea2Image(gpt_api, vision_api)
# idea2image_agent.run(user_prompt, num_iterations, run_folder)
# print("Idea space has been traversed.")
# Load environment variables and initialize the models
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
stability_api_key = os.getenv("STABLE_API_KEY")
vision_api = GPT4VisionAPI(api_key=openai_api_key)
sd_api = StableDiffusion(api_key=stability_api_key)
gpt_api = OpenAIChat(openai_api_key=openai_api_key)
# Define the modified Idea2Image class here
# Streamlit UI layout
st.title(
"Explore the infinite Multi-Modal Idea Space with Idea2Image"
)
user_prompt = st.text_input("Prompt for image generation:")
num_iterations = st.number_input(
"Enter the number of iterations for image improvement:",
min_value=1,
step=1,
)
if st.button("Generate Image"):
run_folder = os.path.join(
"runs", datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
)
os.makedirs(run_folder, exist_ok=True)
idea2image_agent = Idea2Image(gpt_api, vision_api)
results = idea2image_agent.run_gradio(
user_prompt, num_iterations, run_folder
)
for i, (enriched_prompt, img_path, analysis) in enumerate(
results
):
st.write(f"Iteration {i+1}:")
st.write("Enriched Prompt:", enriched_prompt)
if img_path:
st.image(img_path, caption="Generated Image")
else:
st.error("Failed to generate image")
if analysis:
st.write("Image Analysis:", analysis)
st.success("Idea space has been traversed.")
# [Add any additional necessary code adjustments]

@ -0,0 +1,114 @@
"""
Multi Modal tree of thoughts that leverages the GPT-4 language model and the
Stable Diffusion model to generate a multimodal output and evaluate the
output based a metric from 0.0 to 1.0 and then run a search algorithm using DFS and BFS and return the best output.
task: Generate an image of a swarm of bees -> Image generator -> GPT4V evaluates the img from 0.0 to 1.0 -> DFS/BFS -> return the best output
- GPT4Vision will evaluate the image from 0.0 to 1.0 based on how likely it accomplishes the task
- DFS/BFS will search for the best output based on the evaluation from GPT4Vision
- The output will be a multimodal output that is a combination of the image and the text
- The output will be evaluated by GPT4Vision
- The prompt to the image generator will be optimized from the output of GPT4Vision and the search
"""
import os
from dotenv import load_dotenv
from swarms.models.gpt4_vision_api import GPT4VisionAPI
from swarms.models.stable_diffusion import StableDiffusion
from termcolor import colored
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
stable_api_key = os.environ.get("STABLE_API_KEY")
# Initialize the language model
llm = GPT4VisionAPI(
openai_api_key=api_key,
max_tokens=500,
)
# IMG Generator
img_generator = StableDiffusion(api_key=stable_api_key)
# Initialize the language model
task = "Garden of Eden futuristic city graphic art"
def evaluate_img(llm, task: str, img: str):
EVAL_IMG = f"""
Evaluate the image: {img} on a scale from 0.0 to 1.0 based on how likely it accomplishes the task: {task}. Output nothing than the float representing the evaluated img.
"""
out = llm.run(task=EVAL_IMG, img=img)
out = float(out)
return out
def enrichment_prompt(starting_prompt: str, evaluated_img: str):
enrichment_task = (
"Create a concise and effective image generation prompt"
" within 400 characters or less, based on Stable Diffusion"
" and Dalle best practices. Starting prompt:"
f" \n\n'{starting_prompt}'\n\nImprove the prompt with any"
" applicable details or keywords by considering the"
" following aspects: \n1. Subject details (like actions,"
" emotions, environment) \n2. Artistic style (such as"
" surrealism, hyperrealism) \n3. Medium (digital painting,"
" oil on canvas) \n4. Color themes and lighting (like warm"
" colors, cinematic lighting) \n5. Composition and framing"
" (close-up, wide-angle) \n6. Additional elements (like a"
" specific type of background, weather conditions) \n7. Any"
" other artistic or thematic details that can make the image"
" more vivid and compelling. 8. Based on the evaluation of"
" the first generated prompt used by the first prompt:"
f" {evaluated_img} Enrich the prompt to generate a more"
" compelling image. Output only a new prompt to create a"
" better image"
)
return enrichment_task
# Main loop
max_iterations = 10 # Define the maximum number of iterations
best_score = 0
best_image = None
for _ in range(max_iterations):
# Generate an image and get its path
print(colored(f"Generating img for Task: {task}", "purple"))
img_path = img_generator.run(
task=task
) # This should return the file path of the generated image
img_path = img_path[0]
print(colored(f"Generated Image Path: {img_path}", "green"))
# Evaluate the image by passing the file path
score = evaluate_img(llm, task, img_path)
print(
colored(
f"Evaluated Image Score: {score} for {img_path}", "cyan"
)
)
# Update the best score and image path if necessary
if score > best_score:
best_score = score
best_image_path = img_path
# Enrich the prompt based on the evaluation
prompt = enrichment_prompt(task, score)
print(colored(f"Enrichment Prompt: {prompt}", "yellow"))
# Output the best result
print("Best Image Path:", best_image_path)
print("Best Score:", best_score)

@ -13,16 +13,19 @@ Efficiency agent: Agent that monitors the efficiency of the factory: input image
Agent: Agent:
health security agent -> quality control agent -> productivity agent -> safety agent -> security agent -> sustainability agent -> efficiency agent health security agent -> quality control agent -> productivity agent -> safety agent -> security agent -> sustainability agent -> efficiency agent
""" """
from swarms.structs import Agent
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
from termcolor import colored
from swarms.models import GPT4VisionAPI from swarms.models import GPT4VisionAPI
from swarms.structs import Agent
load_dotenv() load_dotenv()
api_key = os.getenv("OPENAI_API_KEY") api_key = os.getenv("OPENAI_API_KEY")
# GPT4VisionAPI
llm = GPT4VisionAPI(openai_api_key=api_key) llm = GPT4VisionAPI(openai_api_key=api_key, max_tokens=2000)
assembly_line = ( assembly_line = (
"playground/demos/swarm_of_mma_manufacturing/assembly_line.jpg" "playground/demos/swarm_of_mma_manufacturing/assembly_line.jpg"
@ -81,41 +84,73 @@ efficiency_prompt = tasks["efficiency"]
health_security_agent = Agent( health_security_agent = Agent(
llm=llm, llm=llm,
sop_list=health_safety_prompt, sop_list=health_safety_prompt,
max_loops=2, max_loops=1,
multi_modal=True, multi_modal=True,
) )
# Quality control agent # Quality control agent
productivity_check_agent = Agent( productivity_check_agent = Agent(
llm=llm, sop=productivity_prompt, max_loops=2, multi_modal=True llm=llm,
sop=productivity_prompt,
max_loops=1,
multi_modal=True,
autosave=True,
) )
# Security agent # Security agent
security_check_agent = Agent( security_check_agent = Agent(
llm=llm, sop=security_prompt, max_loops=2, multi_modal=True llm=llm,
sop=security_prompt,
max_loops=1,
multi_modal=True,
autosave=True,
) )
# Efficiency agent # Efficiency agent
efficiency_check_agent = Agent( efficiency_check_agent = Agent(
llm=llm, sop=efficiency_prompt, max_loops=2, multi_modal=True llm=llm,
sop=efficiency_prompt,
max_loops=1,
multi_modal=True,
autosave=True,
) )
print(colored("Running the agents...", "green"))
print(colored("Running health check agent initializing...", "cyan"))
# Add the first task to the health_security_agent # Add the first task to the health_security_agent
health_check = health_security_agent.run( health_check = health_security_agent.run(
"Analyze the safety of this factory", robots "Analyze the safety of this factory", robots
) )
print(
colored(
"--------------- Productivity agents initializing...", "green"
)
)
# Add the third task to the productivity_check_agent # Add the third task to the productivity_check_agent
productivity_check = productivity_check_agent.run( productivity_check = productivity_check_agent.run(
health_check, assembly_line health_check, assembly_line
) )
print(
colored(
"--------------- Security agents initializing...", "green"
)
)
# Add the fourth task to the security_check_agent # Add the fourth task to the security_check_agent
security_check = security_check_agent.add( security_check = security_check_agent.run(
productivity_check, red_robots productivity_check, red_robots
) )
print(
colored(
"--------------- Efficiency agents initializing...", "cyan"
)
)
# Add the fifth task to the efficiency_check_agent # Add the fifth task to the efficiency_check_agent
efficiency_check = efficiency_check_agent.run( efficiency_check = efficiency_check_agent.run(
security_check, tesla_assembly_line security_check, tesla_assembly_line

Binary file not shown.

After

Width:  |  Height:  |  Size: 451 KiB

@ -0,0 +1,47 @@
import os
from dotenv import load_dotenv
from swarms.models import OpenAIChat, GPT4VisionAPI
from swarms.structs import Agent, SequentialWorkflow
import swarms.prompts.urban_planning as upp
# Load environment variables
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
stability_api_key = os.getenv("STABILITY_API_KEY")
# Initialize language model
llm = OpenAIChat(openai_api_key=api_key, temperature=0.5, max_tokens=3000)
# Initialize Vision model
vision_api = GPT4VisionAPI(api_key=api_key)
# Initialize agents for urban planning tasks
architecture_analysis_agent = Agent(llm=llm, max_loops=1, sop=upp.ARCHITECTURE_ANALYSIS_PROMPT)
infrastructure_evaluation_agent = Agent(llm=llm, max_loops=1, sop=upp.INFRASTRUCTURE_EVALUATION_PROMPT)
traffic_flow_analysis_agent = Agent(llm=llm, max_loops=1, sop=upp.TRAFFIC_FLOW_ANALYSIS_PROMPT)
environmental_impact_assessment_agent = Agent(llm=llm, max_loops=1, sop=upp.ENVIRONMENTAL_IMPACT_ASSESSMENT_PROMPT)
public_space_utilization_agent = Agent(llm=llm, max_loops=1, sop=upp.PUBLIC_SPACE_UTILIZATION_PROMPT)
socioeconomic_impact_analysis_agent = Agent(llm=llm, max_loops=1, sop=upp.SOCIOECONOMIC_IMPACT_ANALYSIS_PROMPT)
# Initialize the final planning agent
final_plan_agent = Agent(llm=llm, max_loops=1, sop=upp.FINAL_URBAN_IMPROVEMENT_PLAN_PROMPT)
# Create Sequential Workflow
workflow = SequentialWorkflow(max_loops=1)
# Add tasks to workflow with personalized prompts
workflow.add(architecture_analysis_agent, "Architecture Analysis")
workflow.add(infrastructure_evaluation_agent, "Infrastructure Evaluation")
workflow.add(traffic_flow_analysis_agent, "Traffic Flow Analysis")
workflow.add(environmental_impact_assessment_agent, "Environmental Impact Assessment")
workflow.add(public_space_utilization_agent, "Public Space Utilization")
workflow.add(socioeconomic_impact_analysis_agent, "Socioeconomic Impact Analysis")
workflow.add(final_plan_agent, "Generate the final urban improvement plan based on all previous agent's findings")
# Run the workflow for individual analysis tasks
# Execute the workflow for the final planning
workflow.run()
# Output results for each task and the final plan
for task in workflow.tasks:
print(f"Task Description: {task.description}\nResult: {task.result}\n")

@ -1,67 +0,0 @@
from swarms.models import Anthropic
from swarms.structs import Agent
from swarms.tools.tool import tool
import asyncio
llm = Anthropic(
anthropic_api_key="",
)
async def async_load_playwright(url: str) -> str:
"""Load the specified URLs using Playwright and parse using BeautifulSoup."""
from bs4 import BeautifulSoup
from playwright.async_api import async_playwright
results = ""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
try:
page = await browser.new_page()
await page.goto(url)
page_source = await page.content()
soup = BeautifulSoup(page_source, "html.parser")
for script in soup(["script", "style"]):
script.extract()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (
phrase.strip()
for line in lines
for phrase in line.split(" ")
)
results = "\n".join(chunk for chunk in chunks if chunk)
except Exception as e:
results = f"Error: {e}"
await browser.close()
return results
def run_async(coro):
event_loop = asyncio.get_event_loop()
return event_loop.run_until_complete(coro)
@tool
def browse_web_page(url: str) -> str:
"""Verbose way to scrape a whole webpage. Likely to cause issues parsing."""
return run_async(async_load_playwright(url))
## Initialize the workflow
agent = Agent(
llm=llm,
max_loops=5,
tools=[browse_web_page],
dashboard=True,
)
out = agent.run(
"Generate a 10,000 word blog on mental clarity and the benefits"
" of meditation."
)

@ -0,0 +1,19 @@
from swarms.tools.tool import tool
from swarms.tools.tool_func_doc_scraper import scrape_tool_func_docs
@tool
def search_api(query: str) -> str:
"""Search API
Args:
query (str): _description_
Returns:
str: _description_
"""
print(f"Searching API for {query}")
tool_docs = scrape_tool_func_docs(search_api)
print(tool_docs)

@ -0,0 +1,39 @@
import os
from swarms.models import OpenAIChat
from swarms.structs import Agent
from swarms.tools.tool import tool
from dotenv import load_dotenv
load_dotenv()
api_key = os.environ.get("OPENAI_API_KEY")
llm = OpenAIChat(api_key=api_key)
# @tool
# def search_api(query: str) -> str:
# """Search API
# Args:
# query (str): _description_
# Returns:
# str: _description_
# """
# print(f"Searching API for {query}")
## Initialize the workflow
agent = Agent(
llm=llm,
max_loops=5,
# tools=[search_api],
dashboard=True,
)
out = agent.run(
"Use the search api to find the best restaurants in New York"
" City."
)
print(out)

@ -0,0 +1,22 @@
from swarms.tools.tool import tool
from swarms.tools.tool_func_doc_scraper import scrape_tool_func_docs
# Define a tool by decorating a function with the tool decorator and providing a docstring
@tool(return_direct=True)
def search_api(query: str):
"""Search the web for the query
Args:
query (str): _description_
Returns:
_type_: _description_
"""
return f"Search results for {query}"
# Scrape the tool func docs to prepare for injection into the agent prompt
out = scrape_tool_func_docs(search_api)
print(out)

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "2.4.9" version = "2.5.7"
description = "Swarms - Pytorch" description = "Swarms - Pytorch"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]
@ -52,7 +52,7 @@ ratelimit = "*"
beautifulsoup4 = "*" beautifulsoup4 = "*"
cohere = "*" cohere = "*"
huggingface-hub = "*" huggingface-hub = "*"
pydantic = "*" pydantic = "1.10.12"
tenacity = "*" tenacity = "*"
Pillow = "*" Pillow = "*"
chromadb = "*" chromadb = "*"
@ -67,7 +67,7 @@ torchvision = "*"
rich = "*" rich = "*"
[tool.poetry.group.lint.dependencies] [tool.poetry.group.lint.dependencies]
ruff = "^0.0.249" ruff = ">=0.0.249,<0.1.7"
types-toml = "^0.10.8.1" types-toml = "^0.10.8.1"
types-redis = "^4.3.21.6" types-redis = "^4.3.21.6"
types-pytz = "^2023.3.0.0" types-pytz = "^2023.3.0.0"

@ -14,9 +14,10 @@ beautifulsoup4
google-search-results==2.4.2 google-search-results==2.4.2
Pillow Pillow
faiss-cpu faiss-cpu
openai openai==0.28.0
attrs attrs
datasets datasets
pydantic==1.10.12
soundfile soundfile
huggingface-hub huggingface-hub
google-generativeai google-generativeai
@ -64,4 +65,4 @@ rich
mkdocs mkdocs
mkdocs-material mkdocs-material
mkdocs-glightbox mkdocs-glightbox
pre-commit pre-commit

@ -14,12 +14,15 @@ api_key = os.getenv("OPENAI_API_KEY")
llm = OpenAIChat( llm = OpenAIChat(
openai_api_key=api_key, openai_api_key=api_key,
temperature=0.5, temperature=0.5,
max_tokens=3000, max_tokens=2000,
) )
# Initialize the agent with the language agent # Initialize the agent with the language agent
agent1 = Agent(llm=llm, max_loops=1) agent1 = Agent(
llm=llm,
max_loops=1,
)
# Create another agent for a different task # Create another agent for a different task
agent2 = Agent(llm=llm, max_loops=1) agent2 = Agent(llm=llm, max_loops=1)

@ -3,7 +3,7 @@ from concurrent import futures
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Any from typing import Optional, Any
from attr import define, field, Factory from attr import define, field, Factory
from swarms.utils.futures import execute_futures_dict from swarms.utils.execute_futures import execute_futures_dict
from griptape.artifacts import TextArtifact from griptape.artifacts import TextArtifact

@ -0,0 +1,60 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
class VectorDatabase(ABC):
@abstractmethod
def add(
self, vector: Dict[str, Any], metadata: Dict[str, Any]
) -> None:
"""
add a vector into the database.
Args:
vector (Dict[str, Any]): The vector to add.
metadata (Dict[str, Any]): Metadata associated with the vector.
"""
pass
@abstractmethod
def query(
self, vector: Dict[str, Any], num_results: int
) -> Dict[str, Any]:
"""
Query the database for vectors similar to the given vector.
Args:
vector (Dict[str, Any]): The vector to compare against.
num_results (int): The number of similar vectors to return.
Returns:
Dict[str, Any]: The most similar vectors and their associated metadata.
"""
pass
@abstractmethod
def delete(self, vector_id: str) -> None:
"""
Delete a vector from the database.
Args:
vector_id (str): The ID of the vector to delete.
"""
pass
@abstractmethod
def update(
self,
vector_id: str,
vector: Dict[str, Any],
metadata: Dict[str, Any],
) -> None:
"""
Update a vector in the database.
Args:
vector_id (str): The ID of the vector to update.
vector (Dict[str, Any]): The new vector.
metadata (Dict[str, Any]): The new metadata.
"""
pass

@ -0,0 +1,172 @@
import os
from termcolor import colored
import logging
from typing import Dict, List, Optional
import chromadb
import tiktoken as tiktoken
from chromadb.config import Settings
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
from dotenv import load_dotenv
from swarms.utils.token_count_tiktoken import limit_tokens_from_string
load_dotenv()
# ChromaDB settings
client = chromadb.Client(Settings(anonymized_telemetry=False))
# ChromaDB client
def get_chromadb_client():
return client
# OpenAI API key
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Results storage using local ChromaDB
class ChromaDB:
"""
ChromaDB database
Args:
metric (str): _description_
RESULTS_STORE_NAME (str): _description_
LLM_MODEL (str): _description_
openai_api_key (str): _description_
Methods:
add: _description_
query: _description_
Examples:
>>> chromadb = ChromaDB(
>>> metric="cosine",
>>> RESULTS_STORE_NAME="results",
>>> LLM_MODEL="gpt3",
>>> openai_api_key=OPENAI_API_KEY,
>>> )
>>> chromadb.add(task, result, result_id)
>>> chromadb.query(query, top_results_num)
"""
def __init__(
self,
metric: str,
RESULTS_STORE_NAME: str,
LLM_MODEL: str,
openai_api_key: str = OPENAI_API_KEY,
top_results_num: int = 3,
limit_tokens: Optional[int] = 1000,
):
self.metric = metric
self.RESULTS_STORE_NAME = RESULTS_STORE_NAME
self.LLM_MODEL = LLM_MODEL
self.openai_api_key = openai_api_key
self.top_results_num = top_results_num
self.limit_tokens = limit_tokens
# Disable ChromaDB logging
logging.getLogger("chromadb").setLevel(logging.ERROR)
# Create Chroma collection
chroma_persist_dir = "chroma"
chroma_client = chromadb.PersistentClient(
settings=chromadb.config.Settings(
persist_directory=chroma_persist_dir,
)
)
# Create embedding function
embedding_function = OpenAIEmbeddingFunction(
api_key=openai_api_key
)
# Create Chroma collection
self.collection = chroma_client.get_or_create_collection(
name=RESULTS_STORE_NAME,
metadata={"hnsw:space": metric},
embedding_function=embedding_function,
)
def add(self, task: Dict, result: str, result_id: str):
"""Adds a result to the ChromaDB collection
Args:
task (Dict): _description_
result (str): _description_
result_id (str): _description_
"""
try:
# Embed the result
embeddings = (
self.collection.embedding_function.embed([result])[0]
.tolist()
.copy()
)
# If the result is a list, flatten it
if (
len(
self.collection.get(ids=[result_id], include=[])[
"ids"
]
)
> 0
): # Check if the result already exists
self.collection.update(
ids=result_id,
embeddings=embeddings,
documents=result,
metadatas={
"task": task["task_name"],
"result": result,
},
)
# If the result is not a list, add it
else:
self.collection.add(
ids=result_id,
embeddings=embeddings,
documents=result,
metadatas={
"task": task["task_name"],
"result": result,
},
)
except Exception as error:
print(
colored(f"Error adding to ChromaDB: {error}", "red")
)
def query(
self,
query: str,
) -> List[dict]:
"""Queries the ChromaDB collection with a query for the top results
Args:
query (str): _description_
top_results_num (int): _description_
Returns:
List[dict]: _description_
"""
try:
count: int = self.collection.count()
if count == 0:
return []
results = self.collection.query(
query_texts=query,
n_results=min(self.top_results_num, count),
include=["metadatas"],
)
out = [item["task"] for item in results["metadatas"][0]]
out = limit_tokens_from_string(
out, "gpt-4", self.limit_tokens
)
return out
except Exception as error:
print(colored(f"Error querying ChromaDB: {error}", "red"))

@ -117,19 +117,13 @@ class GPT4VisionAPI:
pass pass
# Function to handle vision tasks # Function to handle vision tasks
def run( def run(self, img, task):
self,
task: Optional[str] = None,
img: Optional[str] = None,
*args,
**kwargs,
):
"""Run the model.""" """Run the model."""
try: try:
base64_image = self.encode_image(img) base64_image = self.encode_image(img)
headers = { headers = {
"Content-Type": "application/json", "Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}", "Authorization": f"Bearer {self.openai_api_key}",
} }
payload = { payload = {
"model": self.model_name, "model": self.model_name,
@ -154,28 +148,24 @@ class GPT4VisionAPI:
"max_tokens": self.max_tokens, "max_tokens": self.max_tokens,
} }
response = requests.post( response = requests.post(
self.openai_proxy, self.openai_proxy, headers=headers, json=payload
headers=headers,
json=payload,
) )
out = response.json() out = response.json()
content = out["choices"][0]["message"]["content"] if "choices" in out and out["choices"]:
content = (
if self.streaming_enabled: out["choices"][0]
content = self.stream_response(content) .get("message", {})
.get("content", None)
)
return content
else: else:
pass print("No valid response in 'choices'")
return None
if self.beautify:
content = colored(content, "cyan")
print(content)
else:
print(content)
except Exception as error: except Exception as error:
print(f"Error with the request: {error}") print(f"Error with the request: {error}")
raise error return None
def video_prompt(self, frames): def video_prompt(self, frames):
""" """

@ -8,6 +8,8 @@ from typing import List
load_dotenv() load_dotenv()
stable_api_key = os.environ.get("STABLE_API_KEY")
class StableDiffusion: class StableDiffusion:
""" """
@ -45,7 +47,7 @@ class StableDiffusion:
def __init__( def __init__(
self, self,
api_key: str, api_key: str = stable_api_key,
api_host: str = "https://api.stability.ai", api_host: str = "https://api.stability.ai",
cfg_scale: int = 7, cfg_scale: int = 7,
height: int = 1024, height: int = 1024,

@ -67,14 +67,10 @@ def agent_system_prompt_2(name: str):
return AGENT_SYSTEM_PROMPT_2 return AGENT_SYSTEM_PROMPT_2
def agent_system_prompt_3(agent_name: str = None, sop: str = None): AGENT_SYSTEM_PROMPT_3 = f"""
AGENT_SYSTEM_PROMPT_3 = f""" You are a fully autonomous agent serving the user in automating tasks, workflows, and activities.
You are {agent_name}, an fully autonomous agent LLM backed agent. Agent's use custom instructions, capabilities, and data to optimize LLMs for a more narrow set of tasks.
for a specific use case. Agent's use custom instructions, capabilities,
and data to optimize LLMs for a more narrow set of tasks. You yourself are an agent created by a user,
and your name is {agent_name}.
Here are instructions from the user outlining your goals and how you should respond: You will have internal dialogues with yourself and or interact with the user to aid in these tasks.
{sop} Your responses should be coherent, contextually relevant, and tailored to the task at hand.
""" """
return AGENT_SYSTEM_PROMPT_3

@ -0,0 +1,34 @@
user_preferences = {
"subjects": "AI Cognitive Architectures",
"learning_style": "Visual",
"challenge_level": "Moderate",
}
# Extracting individual preferences
subjects = user_preferences["subjects"]
learning_style = user_preferences["learning_style"]
challenge_level = user_preferences["challenge_level"]
# Curriculum Design Prompt
CURRICULUM_DESIGN_PROMPT = f"""
Develop a semester-long curriculum tailored to student interests in {subjects}. Focus on incorporating diverse teaching methods suitable for a {learning_style} learning style.
The curriculum should challenge students at a {challenge_level} level, integrating both theoretical knowledge and practical applications. Provide a detailed structure, including
weekly topics, key objectives, and essential resources needed.
"""
# Interactive Learning Session Prompt
INTERACTIVE_LEARNING_PROMPT = f"""
Basedon the curriculum, generate an interactive lesson plan for a student of {subjects} that caters to a {learning_style} learning style. Incorporate engaging elements and hands-on activities.
"""
# Sample Lesson Prompt
SAMPLE_TEST_PROMPT = f"""
Create a comprehensive sample test for the first week of the {subjects} curriculum, tailored for a {learning_style} learning style and at a {challenge_level} challenge level.
"""
# Image Generation for Education Prompt
IMAGE_GENERATION_PROMPT = f"""
Develop a stable diffusion prompt for an educational image/visual aid that align with the {subjects} curriculum, specifically designed to enhance understanding for students with a {learning_style}
learning style. This might include diagrams, infographics, and illustrative representations to simplify complex concepts. Ensure you output a 10/10 descriptive image generation prompt only.
"""

@ -46,7 +46,7 @@ commands: {
} }
"tool2: "tool_name", "tool2: "tool_name",
"params": { "params": {
"tool1": "inputs", "parameter": "inputs",
"tool1": "inputs" "tool1": "inputs"
} }
"tool3: "tool_name", "tool3: "tool_name",
@ -58,3 +58,73 @@ commands: {
} }
""" """
def tools_prompt_prep(
tool_docs: str = None, tool_few_shot_examples: str = None
):
"""
Tools prompt prep
Args:
docs (str, optional): _description_. Defaults to None.
scenarios (str, optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
PROMPT = f"""
# Task
You will be provided with a list of APIs. These APIs will have a
description and a list of parameters and return types for each tool. Your
task involves creating varied, complex, and detailed user scenarios
that require to call API calls. You must select what api to call based on
the context of the task and the scenario.
For instance, given the APIs: SearchHotels, BookHotel, CancelBooking,
GetNFLNews. Given that GetNFLNews is explicitly provided, your scenario
should articulate something akin to:
"The user wants to see if the Broncos won their last game (GetNFLNews).
They then want to see if that qualifies them for the playoffs and who
they will be playing against (GetNFLNews). The Broncos did make it into
the playoffs, so the user wants watch the game in person. They want to
look for hotels where the playoffs are occurring (GetNBANews +
SearchHotels). After looking at the options, the user chooses to book a
3-day stay at the cheapest 4-star option (BookHotel)."
13
This scenario exemplifies a scenario using 5 API calls. The scenario is
complex, detailed, and concise as desired. The scenario also includes two
APIs used in tandem, the required API, GetNBANews to search for the
playoffs location and SearchHotels to find hotels based on the returned
location. Usage of multiple APIs in tandem is highly desirable and will
receive a higher score. Ideally each scenario should contain one or more
instances of multiple APIs being used in tandem.
Note that this scenario does not use all the APIs given and re-uses the "
GetNBANews" API. Re-using APIs is allowed, but each scenario should
involve as many different APIs as the user demands. Note that API usage is also included
in the scenario, but exact parameters ar necessary. You must use a
different combination of APIs for each scenario. All APIs must be used in
at least one scenario. You can only use the APIs provided in the APIs
section.
Note that API calls are not explicitly mentioned and their uses are
included in parentheses. This behaviour should be mimicked in your
response.
Output the tool usage in a strict json format with the function name and input to
the function. For example, Deliver your response in this format:
{tool_few_shot_examples}
# APIs
{tool_docs}
# Response
"""
return PROMPT

@ -0,0 +1,40 @@
# urban_planning_prompts.py
# Architecture Analysis Prompt
ARCHITECTURE_ANALYSIS_PROMPT = """
Analyze the architectural styles, building designs, and construction materials visible in the urban area image provided. Provide insights on the historical influences, modern trends, and architectural diversity observed.
"""
# Infrastructure Evaluation Prompt
INFRASTRUCTURE_EVALUATION_PROMPT = """
Evaluate the infrastructure in the urban area image, focusing on roads, bridges, public transport, utilities, and communication networks. Assess their condition, capacity, and how they meet the needs of the urban population.
"""
# Traffic Flow Analysis Prompt
TRAFFIC_FLOW_ANALYSIS_PROMPT = """
Analyze the traffic flow and transportation systems visible in the urban area image. Identify key traffic routes, congestion points, and the effectiveness of public transportation in addressing urban mobility.
"""
# Environmental Impact Assessment Prompt
ENVIRONMENTAL_IMPACT_ASSESSMENT_PROMPT = """
Assess the environmental impact of the urban area shown in the image. Look for green spaces, pollution sources, and sustainability practices. Provide insights into the balance between urban development and environmental conservation.
"""
# Public Space Utilization Prompt
PUBLIC_SPACE_UTILIZATION_PROMPT = """
Evaluate the public spaces in the urban area, such as parks, squares, and recreational areas, as shown in the image. Assess their accessibility, condition, and how they contribute to the community's quality of life.
"""
# Socioeconomic Impact Analysis Prompt
SOCIOECONOMIC_IMPACT_ANALYSIS_PROMPT = """
Analyze the socioeconomic impact of the urban environment as depicted in the image. Consider factors such as housing, employment opportunities, commercial activities, and social disparities.
"""
# Final Urban Improvement Plan Prompt
FINAL_URBAN_IMPROVEMENT_PLAN_PROMPT = """
Based on the architecture analysis, infrastructure evaluation, traffic flow analysis, environmental impact assessment, public space utilization, and socioeconomic impact analysis provided by the previous agents, develop a comprehensive urban improvement plan. The plan should address key issues identified, propose practical solutions, and outline strategies to enhance the overall quality of life, sustainability, and efficiency of the urban area.
"""
# Additional or custom prompts can be added below as needed.

@ -10,8 +10,10 @@ from typing import Any, Callable, Dict, List, Optional, Tuple
from termcolor import colored from termcolor import colored
from swarms.memory.base_vector_db import VectorDatabase
from swarms.prompts.agent_system_prompts import ( from swarms.prompts.agent_system_prompts import (
FLOW_SYSTEM_PROMPT, FLOW_SYSTEM_PROMPT,
AGENT_SYSTEM_PROMPT_3,
agent_system_prompt_2, agent_system_prompt_2,
) )
from swarms.prompts.multi_modal_autonomous_instruction_prompt import ( from swarms.prompts.multi_modal_autonomous_instruction_prompt import (
@ -21,11 +23,13 @@ from swarms.prompts.tools import (
SCENARIOS, SCENARIOS,
) )
from swarms.tools.tool import BaseTool from swarms.tools.tool import BaseTool
from swarms.tools.tool_func_doc_scraper import scrape_tool_func_docs
from swarms.utils.code_interpreter import SubprocessCodeInterpreter from swarms.utils.code_interpreter import SubprocessCodeInterpreter
from swarms.utils.parse_code import ( from swarms.utils.parse_code import (
extract_code_in_backticks_in_string, extract_code_in_backticks_in_string,
) )
from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.token_count_tiktoken import limit_tokens_from_string
# Utils # Utils
@ -35,11 +39,13 @@ def stop_when_repeats(response: str) -> bool:
return "stop" in response.lower() return "stop" in response.lower()
# Parse done token
def parse_done_token(response: str) -> bool: def parse_done_token(response: str) -> bool:
"""Parse the response to see if the done token is present""" """Parse the response to see if the done token is present"""
return "<DONE>" in response return "<DONE>" in response
# Agent ID generator
def agent_id(): def agent_id():
"""Generate an agent id""" """Generate an agent id"""
return str(uuid.uuid4()) return str(uuid.uuid4())
@ -58,16 +64,40 @@ class Agent:
* Ability to provide a loop interval * Ability to provide a loop interval
Args: Args:
id (str): The id of the agent
llm (Any): The language model to use llm (Any): The language model to use
max_loops (int): The maximum number of loops to run template (Optional[str]): The template to use
stopping_condition (Optional[Callable[[str], bool]]): A stopping condition max_loops (int): The maximum number of loops
loop_interval (int): The interval between loops stopping_condition (Optional[Callable[[str], bool]]): The stopping condition
retry_attempts (int): The number of retry attempts loop_interval (int): The loop interval
retry_interval (int): The interval between retry attempts retry_attempts (int): The retry attempts
interactive (bool): Whether or not to run in interactive mode retry_interval (int): The retry interval
dashboard (bool): Whether or not to print the dashboard return_history (bool): Return the history
dynamic_temperature_enabled(bool): Dynamical temperature handling stopping_token (str): The stopping token
**kwargs (Any): Any additional keyword arguments dynamic_loops (Optional[bool]): Dynamic loops
interactive (bool): Interactive mode
dashboard (bool): Dashboard mode
agent_name (str): The name of the agent
agent_description (str): The description of the agent
system_prompt (str): The system prompt
tools (List[BaseTool]): The tools
dynamic_temperature_enabled (Optional[bool]): Dynamic temperature enabled
sop (Optional[str]): The standard operating procedure
sop_list (Optional[List[str]]): The standard operating procedure list
saved_state_path (Optional[str]): The saved state path
autosave (Optional[bool]): Autosave
context_length (Optional[int]): The context length
user_name (str): The user name
self_healing_enabled (Optional[bool]): Self healing enabled
code_interpreter (Optional[bool]): Code interpreter
multi_modal (Optional[bool]): Multi modal
pdf_path (Optional[str]): The pdf path
list_of_pdf (Optional[str]): The list of pdf
tokenizer (Optional[Any]): The tokenizer
memory (Optional[VectorDatabase]): The memory
preset_stopping_token (Optional[bool]): Preset stopping token
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Methods: Methods:
run(task: str, **kwargs: Any): Run the agent on a task run(task: str, **kwargs: Any): Run the agent on a task
@ -143,15 +173,14 @@ class Agent:
dynamic_loops: Optional[bool] = False, dynamic_loops: Optional[bool] = False,
interactive: bool = False, interactive: bool = False,
dashboard: bool = False, dashboard: bool = False,
agent_name: str = "Autonomous Agent XYZ1B", agent_name: str = "Autonomous-Agent-XYZ1B",
agent_description: str = None, agent_description: str = None,
system_prompt: str = FLOW_SYSTEM_PROMPT, system_prompt: str = AGENT_SYSTEM_PROMPT_3,
tools: List[BaseTool] = None, tools: List[BaseTool] = None,
dynamic_temperature_enabled: Optional[bool] = False, dynamic_temperature_enabled: Optional[bool] = False,
sop: Optional[str] = None, sop: Optional[str] = None,
sop_list: Optional[List[str]] = None, sop_list: Optional[List[str]] = None,
# memory: Optional[Vectorstore] = None, saved_state_path: Optional[str] = None,
saved_state_path: Optional[str] = "flow_state.json",
autosave: Optional[bool] = False, autosave: Optional[bool] = False,
context_length: Optional[int] = 8192, context_length: Optional[int] = 8192,
user_name: str = "Human:", user_name: str = "Human:",
@ -161,6 +190,8 @@ class Agent:
pdf_path: Optional[str] = None, pdf_path: Optional[str] = None,
list_of_pdf: Optional[str] = None, list_of_pdf: Optional[str] = None,
tokenizer: Optional[Any] = None, tokenizer: Optional[Any] = None,
memory: Optional[VectorDatabase] = None,
preset_stopping_token: Optional[bool] = False,
*args, *args,
**kwargs: Any, **kwargs: Any,
): ):
@ -183,11 +214,14 @@ class Agent:
self.context_length = context_length self.context_length = context_length
self.sop = sop self.sop = sop
self.sop_list = sop_list self.sop_list = sop_list
self.sop_list = []
self.tools = tools or [] self.tools = tools or []
self.tool_docs = []
self.system_prompt = system_prompt self.system_prompt = system_prompt
self.agent_name = agent_name self.agent_name = agent_name
self.agent_description = agent_description self.agent_description = agent_description
self.saved_state_path = saved_state_path self.saved_state_path = saved_state_path
self.saved_state_path = f"{self.agent_name}_state.json"
self.autosave = autosave self.autosave = autosave
self.response_filters = [] self.response_filters = []
self.self_healing_enabled = self_healing_enabled self.self_healing_enabled = self_healing_enabled
@ -196,6 +230,10 @@ class Agent:
self.pdf_path = pdf_path self.pdf_path = pdf_path
self.list_of_pdf = list_of_pdf self.list_of_pdf = list_of_pdf
self.tokenizer = tokenizer self.tokenizer = tokenizer
self.memory = memory
self.preset_stopping_token = preset_stopping_token
# self.system_prompt = AGENT_SYSTEM_PROMPT_3
# The max_loops will be set dynamically if the dynamic_loop # The max_loops will be set dynamically if the dynamic_loop
if self.dynamic_loops: if self.dynamic_loops:
@ -211,11 +249,33 @@ class Agent:
# Memory # Memory
self.feedback = [] self.feedback = []
self.memory = [] self.short_memory = []
# Initialize the code executor # Initialize the code executor
self.code_executor = SubprocessCodeInterpreter() self.code_executor = SubprocessCodeInterpreter()
# If the preset stopping token is enabled then set the stopping token to the preset stopping token
if preset_stopping_token:
self.stopping_token = "<DONE>"
# If memory then add the json to the memory vector database
if memory:
# Add all of the state to the memory
self.add_message_to_memory_db(
{"message": self.state_to_str()},
{"agent_id": self.id},
)
# If tools exist then add the tool docs usage to the sop
if self.tools:
self.sop_list.append(
self.tools_prompt_prep(self.tool_docs, SCENARIOS)
)
def set_system_prompt(self, system_prompt: str):
"""Set the system prompt"""
self.system_prompt = system_prompt
def provide_feedback(self, feedback: str) -> None: def provide_feedback(self, feedback: str) -> None:
"""Allow users to provide feedback on the responses.""" """Allow users to provide feedback on the responses."""
self.feedback.append(feedback) self.feedback.append(feedback)
@ -223,9 +283,17 @@ class Agent:
def _check_stopping_condition(self, response: str) -> bool: def _check_stopping_condition(self, response: str) -> bool:
"""Check if the stopping condition is met.""" """Check if the stopping condition is met."""
if self.stopping_condition: try:
return self.stopping_condition(response) if self.stopping_condition:
return False return self.stopping_condition(response)
return False
except Exception as error:
print(
colored(
f"Error checking stopping condition: {error}",
"red",
)
)
def dynamic_temperature(self): def dynamic_temperature(self):
""" """
@ -234,12 +302,19 @@ class Agent:
3. If the temperature is present, then dynamically change the temperature 3. If the temperature is present, then dynamically change the temperature
4. for every loop you can randomly change the temperature on a scale from 0.0 to 1.0 4. for every loop you can randomly change the temperature on a scale from 0.0 to 1.0
""" """
if hasattr(self.llm, "temperature"): try:
# Randomly change the temperature attribute of self.llm object if hasattr(self.llm, "temperature"):
self.llm.temperature = random.uniform(0.0, 1.0) # Randomly change the temperature attribute of self.llm object
else: self.llm.temperature = random.uniform(0.0, 1.0)
# Use a default temperature else:
self.llm.temperature = 0.7 # Use a default temperature
self.llm.temperature = 0.7
except Exception as error:
print(
colored(
f"Error dynamically changing temperature: {error}"
)
)
def format_prompt(self, template, **kwargs: Any) -> str: def format_prompt(self, template, **kwargs: Any) -> str:
"""Format the template with the provided kwargs using f-string interpolation.""" """Format the template with the provided kwargs using f-string interpolation."""
@ -288,19 +363,6 @@ class Agent:
for tool in self.tools: for tool in self.tools:
if tool.name == name: if tool.name == name:
return tool return tool
return None
def construct_dynamic_prompt(self):
"""Construct the dynamic prompt"""
tools_description = self.get_tool_description()
tool_prompt = self.tool_prompt_prep(
tools_description, SCENARIOS
)
return tool_prompt
# return DYNAMICAL_TOOL_USAGE.format(tools=tools_description)
def extract_tool_commands(self, text: str): def extract_tool_commands(self, text: str):
""" """
@ -329,14 +391,6 @@ class Agent:
except Exception as error: except Exception as error:
print(f"Error parsing JSON command: {error}") print(f"Error parsing JSON command: {error}")
def parse_and_execute_tools(self, response: str):
"""Parse and execute the tools"""
json_commands = self.extract_tool_commands(response)
for command in json_commands:
tool_name = command.get("tool")
params = command.get("parmas", {})
self.execute_tool(tool_name, params)
def execute_tools(self, tool_name, params): def execute_tools(self, tool_name, params):
"""Execute the tool with the provided params""" """Execute the tool with the provided params"""
tool = self.tool_find_by_name(tool_name) tool = self.tool_find_by_name(tool_name)
@ -345,26 +399,63 @@ class Agent:
tool_result = tool.run(**params) tool_result = tool.run(**params)
print(tool_result) print(tool_result)
def parse_and_execute_tools(self, response: str):
"""Parse and execute the tools"""
json_commands = self.extract_tool_commands(response)
for command in json_commands:
tool_name = command.get("tool")
params = command.get("parmas", {})
self.execute_tools(tool_name, params)
def truncate_history(self): def truncate_history(self):
""" """
Take the history and truncate it to fit into the model context length Take the history and truncate it to fit into the model context length
""" """
truncated_history = self.memory[-1][-self.context_length :] # truncated_history = self.short_memory[-1][-self.context_length :]
self.memory[-1] = truncated_history # self.short_memory[-1] = truncated_history
# out = limit_tokens_from_string(
# "\n".join(truncated_history), self.llm.model_name
# )
truncated_history = self.short_memory[-1][
-self.context_length :
]
text = "\n".join(truncated_history)
out = limit_tokens_from_string(text, "gpt-4")
return out
def add_task_to_memory(self, task: str): def add_task_to_memory(self, task: str):
"""Add the task to the memory""" """Add the task to the memory"""
self.memory.append([f"{self.user_name}: {task}"]) try:
self.short_memory.append([f"{self.user_name}: {task}"])
except Exception as error:
print(
colored(
f"Error adding task to memory: {error}", "red"
)
)
def add_message_to_memory(self, message: str): def add_message_to_memory(self, message: str):
"""Add the message to the memory""" """Add the message to the memory"""
self.memory[-1].append(message) try:
self.short_memory[-1].append(message)
except Exception as error:
print(
colored(
f"Error adding message to memory: {error}", "red"
)
)
def add_message_to_memory_and_truncate(self, message: str): def add_message_to_memory_and_truncate(self, message: str):
"""Add the message to the memory and truncate""" """Add the message to the memory and truncate"""
self.memory[-1].append(message) self.short_memory[-1].append(message)
self.truncate_history() self.truncate_history()
def parse_tool_docs(self):
"""Parse the tool docs"""
for tool in self.tools:
docs = self.tool_docs.append(scrape_tool_func_docs(tool))
return str(docs)
def print_dashboard(self, task: str): def print_dashboard(self, task: str):
"""Print dashboard""" """Print dashboard"""
model_config = self.get_llm_init_params() model_config = self.get_llm_init_params()
@ -405,16 +496,18 @@ class Agent:
) )
) )
# print(dashboard)
def activate_autonomous_agent(self): def activate_autonomous_agent(self):
"""Print the autonomous agent activation message""" """Print the autonomous agent activation message"""
try: try:
print( print(
colored("Initializing Autonomous Agent...", "yellow") colored(
(
"Initializing Autonomous Agent"
f" {self.agent_name}..."
),
"yellow",
)
) )
# print(colored("Loading modules...", "yellow"))
# print(colored("Modules loaded successfully.", "green"))
print( print(
colored( colored(
"Autonomous Agent Activated.", "Autonomous Agent Activated.",
@ -539,7 +632,7 @@ class Agent:
# Preparing the prompt # Preparing the prompt
task = self.agent_history_prompt( task = self.agent_history_prompt(
FLOW_SYSTEM_PROMPT, response AGENT_SYSTEM_PROMPT_3, response
) )
attempt = 0 attempt = 0
@ -589,18 +682,20 @@ class Agent:
time.sleep(self.loop_interval) time.sleep(self.loop_interval)
# Add the history to the memory # Add the history to the memory
self.memory.append(history) self.short_memory.append(history)
# If autosave is enabled then save the state # If autosave is enabled then save the state
if self.autosave: if self.autosave:
save_path = self.saved_state_path or "flow_state.json"
print( print(
colored( colored(
f"Autosaving agent state to {save_path}", (
"Autosaving agent state to"
f" {self.saved_state_path}"
),
"green", "green",
) )
) )
self.save_state(save_path) self.save_state(self.saved_state_path)
# If return history is enabled then return the response and history # If return history is enabled then return the response and history
if self.return_history: if self.return_history:
@ -611,104 +706,19 @@ class Agent:
print(f"Error running agent: {error}") print(f"Error running agent: {error}")
raise raise
async def arun(self, task: str, **kwargs):
"""
Run the autonomous agent loop aschnronously
Args:
task (str): The initial task to run
Agent:
1. Generate a response
2. Check stopping condition
3. If stopping condition is met, stop
4. If stopping condition is not met, generate a response
5. Repeat until stopping condition is met or max_loops is reached
"""
# Activate Autonomous agent message
self.activate_autonomous_agent()
response = task
history = [f"{self.user_name}: {task}"]
# If dashboard = True then print the dashboard
if self.dashboard:
self.print_dashboard(task)
loop_count = 0
# for i in range(self.max_loops):
while self.max_loops == "auto" or loop_count < self.max_loops:
loop_count += 1
print(
colored(
f"\nLoop {loop_count} of {self.max_loops}", "blue"
)
)
print("\n")
if self._check_stopping_condition(
response
) or parse_done_token(response):
break
# Adjust temperature, comment if no work
if self.dynamic_temperature_enabled:
self.dynamic_temperature()
# Preparing the prompt
task = self.agent_history_prompt(
FLOW_SYSTEM_PROMPT, response
)
attempt = 0
while attempt < self.retry_attempts:
try:
response = self.llm(
task**kwargs,
)
if self.interactive:
print(f"AI: {response}")
history.append(f"AI: {response}")
response = input("You: ")
history.append(f"Human: {response}")
else:
print(f"AI: {response}")
history.append(f"AI: {response}")
print(response)
break
except Exception as e:
logging.error(f"Error generating response: {e}")
attempt += 1
time.sleep(self.retry_interval)
history.append(response)
time.sleep(self.loop_interval)
self.memory.append(history)
if self.autosave:
save_path = self.saved_state_path or "flow_state.json"
print(
colored(
f"Autosaving agent state to {save_path}", "green"
)
)
self.save_state(save_path)
if self.return_history:
return response, history
return response
def _run(self, **kwargs: Any) -> str: def _run(self, **kwargs: Any) -> str:
"""Generate a result using the provided keyword args.""" """Generate a result using the provided keyword args."""
task = self.format_prompt(**kwargs) try:
response, history = self._generate(task, task) task = self.format_prompt(**kwargs)
logging.info(f"Message history: {history}") response, history = self._generate(task, task)
return response logging.info(f"Message history: {history}")
return response
except Exception as error:
print(colored(f"Error running agent: {error}", "red"))
def agent_history_prompt( def agent_history_prompt(
self, self,
system_prompt: str = FLOW_SYSTEM_PROMPT, system_prompt: str = AGENT_SYSTEM_PROMPT_3,
history=None, history=None,
): ):
""" """
@ -752,15 +762,29 @@ class Agent:
Args: Args:
tasks (List[str]): A list of tasks to run. tasks (List[str]): A list of tasks to run.
""" """
task_coroutines = [ try:
self.run_async(task, **kwargs) for task in tasks task_coroutines = [
] self.run_async(task, **kwargs) for task in tasks
completed_tasks = await asyncio.gather(*task_coroutines) ]
return completed_tasks completed_tasks = await asyncio.gather(*task_coroutines)
return completed_tasks
except Exception as error:
print(
colored(
(
f"Error running agent: {error} while running"
" concurrently"
),
"red",
)
)
def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]: def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]:
"""Generate responses for multiple input sets.""" try:
return [self.run(**input_data) for input_data in inputs] """Generate responses for multiple input sets."""
return [self.run(**input_data) for input_data in inputs]
except Exception as error:
print(colored(f"Error running bulk run: {error}", "red"))
@staticmethod @staticmethod
def from_llm_and_template(llm: Any, template: str) -> "Agent": def from_llm_and_template(llm: Any, template: str) -> "Agent":
@ -777,9 +801,19 @@ class Agent:
return Agent(llm=llm, template=template) return Agent(llm=llm, template=template)
def save(self, file_path) -> None: def save(self, file_path) -> None:
with open(file_path, "w") as f: """Save the agent history to a file.
json.dump(self.memory, f)
print(f"Saved agent history to {file_path}") Args:
file_path (_type_): _description_
"""
try:
with open(file_path, "w") as f:
json.dump(self.short_memory, f)
# print(f"Saved agent history to {file_path}")
except Exception as error:
print(
colored(f"Error saving agent history: {error}", "red")
)
def load(self, file_path: str): def load(self, file_path: str):
""" """
@ -789,7 +823,7 @@ class Agent:
file_path (str): The path to the file containing the saved agent history. file_path (str): The path to the file containing the saved agent history.
""" """
with open(file_path, "r") as f: with open(file_path, "r") as f:
self.memory = json.load(f) self.short_memory = json.load(f)
print(f"Loaded agent history from {file_path}") print(f"Loaded agent history from {file_path}")
def validate_response(self, response: str) -> bool: def validate_response(self, response: str) -> bool:
@ -814,7 +848,9 @@ class Agent:
"========================", "cyan", attrs=["bold"] "========================", "cyan", attrs=["bold"]
) )
) )
for loop_index, history in enumerate(self.memory, start=1): for loop_index, history in enumerate(
self.short_memory, start=1
):
print( print(
colored( colored(
f"\nLoop {loop_index}:", "yellow", attrs=["bold"] f"\nLoop {loop_index}:", "yellow", attrs=["bold"]
@ -857,10 +893,10 @@ class Agent:
# Update the agent's history with the new interaction # Update the agent's history with the new interaction
if self.interactive: if self.interactive:
self.memory.append(f"AI: {response}") self.short_memory.append(f"AI: {response}")
self.memory.append(f"Human: {task}") self.short_memory.append(f"Human: {task}")
else: else:
self.memory.append(f"AI: {response}") self.short_memory.append(f"AI: {response}")
return response return response
except Exception as error: except Exception as error:
@ -904,14 +940,14 @@ class Agent:
print(message) print(message)
""" """
if len(self.memory) < 2: if len(self.short_memory) < 2:
return None, None return None, None
# Remove the last response # Remove the last response
self.memory.pop() self.short_memory.pop()
# Get the previous state # Get the previous state
previous_state = self.memory[-1][-1] previous_state = self.short_memory[-1][-1]
return previous_state, f"Restored to {previous_state}" return previous_state, f"Restored to {previous_state}"
# Response Filtering # Response Filtering
@ -931,7 +967,6 @@ class Agent:
""" """
Apply the response filters to the response Apply the response filters to the response
""" """
for word in self.response_filters: for word in self.response_filters:
response = response.replace(word, "[FILTERED]") response = response.replace(word, "[FILTERED]")
@ -1029,22 +1064,70 @@ class Agent:
Example: Example:
>>> agent.save_state('saved_flow.json') >>> agent.save_state('saved_flow.json')
""" """
state = { try:
"memory": self.memory, state = {
# "llm_params": self.get_llm_params(), "agent_id": str(self.id),
"loop_interval": self.loop_interval, "agent_name": self.agent_name,
"retry_attempts": self.retry_attempts, "agent_description": self.agent_description,
"retry_interval": self.retry_interval, "system_prompt": self.system_prompt,
"interactive": self.interactive, "sop": self.sop,
"dashboard": self.dashboard, "short_memory": self.short_memory,
"dynamic_temperature": self.dynamic_temperature_enabled, "loop_interval": self.loop_interval,
} "retry_attempts": self.retry_attempts,
"retry_interval": self.retry_interval,
"interactive": self.interactive,
"dashboard": self.dashboard,
"dynamic_temperature": (
self.dynamic_temperature_enabled
),
"autosave": self.autosave,
"saved_state_path": self.saved_state_path,
"max_loops": self.max_loops,
}
with open(file_path, "w") as f: with open(file_path, "w") as f:
json.dump(state, f, indent=4) json.dump(state, f, indent=4)
saved = colored("Saved agent state to", "green") saved = colored(
print(f"{saved} {file_path}") f"Saved agent state to: {file_path}", "green"
)
print(saved)
except Exception as error:
print(
colored(f"Error saving agent state: {error}", "red")
)
def state_to_str(self):
"""Transform the JSON into a string"""
try:
state = {
"agent_id": str(self.id),
"agent_name": self.agent_name,
"agent_description": self.agent_description,
"system_prompt": self.system_prompt,
"sop": self.sop,
"short_memory": self.short_memory,
"loop_interval": self.loop_interval,
"retry_attempts": self.retry_attempts,
"retry_interval": self.retry_interval,
"interactive": self.interactive,
"dashboard": self.dashboard,
"dynamic_temperature": (
self.dynamic_temperature_enabled
),
"autosave": self.autosave,
"saved_state_path": self.saved_state_path,
"max_loops": self.max_loops,
}
out = str(state)
return out
except Exception as error:
print(
colored(
f"Error transforming state to string: {error}",
"red",
)
)
def load_state(self, file_path: str): def load_state(self, file_path: str):
""" """
@ -1061,7 +1144,16 @@ class Agent:
state = json.load(f) state = json.load(f)
# Restore other saved attributes # Restore other saved attributes
self.memory = state.get("memory", []) self.id = state.get("agent_id", self.id)
self.agent_name = state.get("agent_name", self.agent_name)
self.agent_description = state.get(
"agent_description", self.agent_description
)
self.system_prompt = state.get(
"system_prompt", self.system_prompt
)
self.sop = state.get("sop", self.sop)
self.short_memory = state.get("short_memory", [])
self.max_loops = state.get("max_loops", 5) self.max_loops = state.get("max_loops", 5)
self.loop_interval = state.get("loop_interval", 1) self.loop_interval = state.get("loop_interval", 1)
self.retry_attempts = state.get("retry_attempts", 3) self.retry_attempts = state.get("retry_attempts", 3)
@ -1093,8 +1185,6 @@ class Agent:
SYSTEM_PROMPT: {self.system_prompt} SYSTEM_PROMPT: {self.system_prompt}
History: {history} History: {history}
Your response:
""" """
response = self.llm(prompt, **kwargs) response = self.llm(prompt, **kwargs)
return {"role": self.agent_name, "content": response} return {"role": self.agent_name, "content": response}
@ -1121,7 +1211,7 @@ class Agent:
def reset(self): def reset(self):
"""Reset the agent""" """Reset the agent"""
self.memory = [] self.short_memory = []
def run_code(self, code: str): def run_code(self, code: str):
""" """
@ -1144,7 +1234,7 @@ class Agent:
text = pdf_to_text(pdf) text = pdf_to_text(pdf)
return text return text
def pdf_chunker(self, text: str = None): def pdf_chunker(self, text: str = None, num_limits: int = 1000):
"""Chunk the pdf into sentences """Chunk the pdf into sentences
Args: Args:
@ -1154,22 +1244,29 @@ class Agent:
_type_: _description_ _type_: _description_
""" """
text = text or self.pdf_connector() text = text or self.pdf_connector()
pass text = limit_tokens_from_string(text, num_limits)
return text
def tools_prompt_prep( def tools_prompt_prep(
self, docs: str = None, scenarios: str = None self, docs: str = None, scenarios: str = SCENARIOS
): ):
""" """
Prepare the tool prompt Tools prompt prep
Args:
docs (str, optional): _description_. Defaults to None.
scenarios (str, optional): _description_. Defaults to None.
Returns:
_type_: _description_
""" """
PROMPT = f""" PROMPT = f"""
# Task # Task
You will be provided with a list of APIs. These APIs will have a You will be provided with a list of APIs. These APIs will have a
description and a list of parameters and return types for each tool. Your description and a list of parameters and return types for each tool. Your
task involves creating 3 varied, complex, and detailed user scenarios task involves creating varied, complex, and detailed user scenarios
that require at least 5 API calls to complete involving at least 3 that require to call API calls. You must select what api to call based on
different APIs. One of these APIs will be explicitly provided and the the context of the task and the scenario.
other two will be chosen by you.
For instance, given the APIs: SearchHotels, BookHotel, CancelBooking, For instance, given the APIs: SearchHotels, BookHotel, CancelBooking,
GetNFLNews. Given that GetNFLNews is explicitly provided, your scenario GetNFLNews. Given that GetNFLNews is explicitly provided, your scenario
@ -1194,8 +1291,8 @@ class Agent:
Note that this scenario does not use all the APIs given and re-uses the " Note that this scenario does not use all the APIs given and re-uses the "
GetNBANews" API. Re-using APIs is allowed, but each scenario should GetNBANews" API. Re-using APIs is allowed, but each scenario should
involve at least 3 different APIs. Note that API usage is also included involve as many different APIs as the user demands. Note that API usage is also included
in the scenario, but exact parameters are not necessary. You must use a in the scenario, but exact parameters ar necessary. You must use a
different combination of APIs for each scenario. All APIs must be used in different combination of APIs for each scenario. All APIs must be used in
at least one scenario. You can only use the APIs provided in the APIs at least one scenario. You can only use the APIs provided in the APIs
section. section.
@ -1203,7 +1300,10 @@ class Agent:
Note that API calls are not explicitly mentioned and their uses are Note that API calls are not explicitly mentioned and their uses are
included in parentheses. This behaviour should be mimicked in your included in parentheses. This behaviour should be mimicked in your
response. response.
Deliver your response in this format:
Output the tool usage in a strict json format with the function name and input to
the function. For example, Deliver your response in this format:
{scenarios} {scenarios}
@ -1214,62 +1314,4 @@ class Agent:
# Response # Response
""" """
return PROMPT
# def self_healing(self, **kwargs):
# """
# Self healing by debugging errors and refactoring its own code
# Args:
# **kwargs (Any): Any additional keyword arguments
# """
# pass
# def refactor_code(
# self,
# file: str,
# changes: List,
# confirm: bool = False
# ):
# """
# Refactor the code
# """
# with open(file) as f:
# original_file_lines = f.readlines()
# # Filter out the changes that are not confirmed
# operation_changes = [
# change for change in changes if "operation" in change
# ]
# explanations = [
# change["explanation"] for change in changes if "explanation" in change
# ]
# # Sort the changes in reverse line order
# # explanations.sort(key=lambda x: x["line", reverse=True])
# def error_prompt_inject(
# self,
# file_path: str,
# args: List,
# error: str,
# ):
# with open(file_path, "r") as f:
# file_lines = f.readlines()
# file_with_lines = []
# for i, line in enumerate(file_lines):
# file_with_lines.append(str(i + 1) + "" + line)
# file_with_lines = "".join(file_with_lines)
# prompt = f"""
# Here is the script that needs fixing:\n\n
# {file_with_lines}\n\n
# Here are the arguments it was provided:\n\n
# {args}\n\n
# Here is the error message:\n\n
# {error}\n
# "Please provide your suggested changes, and remember to stick to the "
# exact format as described above.
# """
# print(prompt)

@ -1,18 +1,4 @@
""" import concurrent.futures
TODO:
- Add a method to update the arguments of a task
- Add a method to get the results of each task
- Add a method to get the results of a specific task
- Add a method to get the results of the workflow
- Add a method to get the results of the workflow as a dataframe
- Add a method to run the workflow in parallel with a pool of workers and a queue and a dashboard
- Add a dashboard to visualize the workflow
- Add async support
- Add context manager support
- Add workflow history
"""
import json import json
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Union from typing import Any, Callable, Dict, List, Optional, Union
@ -57,6 +43,7 @@ class Task:
kwargs: Dict[str, Any] = field(default_factory=dict) kwargs: Dict[str, Any] = field(default_factory=dict)
result: Any = None result: Any = None
history: List[Any] = field(default_factory=list) history: List[Any] = field(default_factory=list)
# logger = logging.getLogger(__name__)
def execute(self): def execute(self):
""" """
@ -118,11 +105,11 @@ class SequentialWorkflow:
""" """
name: str = None
description: str = None
tasks: List[Task] = field(default_factory=list) tasks: List[Task] = field(default_factory=list)
max_loops: int = 1 max_loops: int = 1
autosave: bool = False autosave: bool = False
name: str = (None,)
description: str = (None,)
saved_state_filepath: Optional[str] = ( saved_state_filepath: Optional[str] = (
"sequential_workflow_state.json" "sequential_workflow_state.json"
) )
@ -146,26 +133,38 @@ class SequentialWorkflow:
*args: Additional arguments to pass to the task execution. *args: Additional arguments to pass to the task execution.
**kwargs: Additional keyword arguments to pass to the task execution. **kwargs: Additional keyword arguments to pass to the task execution.
""" """
# If the agent is a Agent instance, we include the task in kwargs for Agent.run() try:
if isinstance(agent, Agent): # If the agent is a Agent instance, we include the task in kwargs for Agent.run()
kwargs["task"] = ( if isinstance(agent, Agent):
task # Set the task as a keyword argument for Agent kwargs["task"] = (
) task # Set the task as a keyword argument for Agent
)
# Append the task to the tasks list # Append the task to the tasks list
self.tasks.append( self.tasks.append(
Task( Task(
description=task, description=task,
agent=agent, agent=agent,
args=list(args), args=list(args),
kwargs=kwargs, kwargs=kwargs,
)
)
except Exception as error:
print(
colored(
f"Error adding task to workflow: {error}", "red"
),
) )
)
def reset_workflow(self) -> None: def reset_workflow(self) -> None:
"""Resets the workflow by clearing the results of each task.""" """Resets the workflow by clearing the results of each task."""
for task in self.tasks: try:
task.result = None for task in self.tasks:
task.result = None
except Exception as error:
print(
colored(f"Error resetting workflow: {error}", "red"),
)
def get_task_results(self) -> Dict[str, Any]: def get_task_results(self) -> Dict[str, Any]:
""" """
@ -174,13 +173,32 @@ class SequentialWorkflow:
Returns: Returns:
Dict[str, Any]: The results of each task in the workflow Dict[str, Any]: The results of each task in the workflow
""" """
return {task.description: task.result for task in self.tasks} try:
return {
task.description: task.result for task in self.tasks
}
except Exception as error:
print(
colored(
f"Error getting task results: {error}", "red"
),
)
def remove_task(self, task: str) -> None: def remove_task(self, task: str) -> None:
"""Remove tasks from sequential workflow""" """Remove tasks from sequential workflow"""
self.tasks = [ try:
task for task in self.tasks if task.description != task self.tasks = [
] task
for task in self.tasks
if task.description != task
]
except Exception as error:
print(
colored(
f"Error removing task from workflow: {error}",
"red",
),
)
def update_task(self, task: str, **updates) -> None: def update_task(self, task: str, **updates) -> None:
""" """
@ -205,12 +223,95 @@ class SequentialWorkflow:
{'max_tokens': 1000} {'max_tokens': 1000}
""" """
for task in self.tasks: try:
if task.description == task: for task in self.tasks:
task.kwargs.update(updates) if task.description == task:
break task.kwargs.update(updates)
else: break
raise ValueError(f"Task {task} not found in workflow.") else:
raise ValueError(
f"Task {task} not found in workflow."
)
except Exception as error:
print(
colored(
f"Error updating task in workflow: {error}", "red"
),
)
def delete_task(self, task: str) -> None:
"""
Delete a task from the workflow.
Args:
task (str): The description of the task to delete.
Raises:
ValueError: If the task is not found in the workflow.
Examples:
>>> from swarms.models import OpenAIChat
>>> from swarms.structs import SequentialWorkflow
>>> llm = OpenAIChat(openai_api_key="")
>>> workflow = SequentialWorkflow(max_loops=1)
>>> workflow.add("What's the weather in miami", llm)
>>> workflow.add("Create a report on these metrics", llm)
>>> workflow.delete_task("What's the weather in miami")
>>> workflow.tasks
[Task(description='Create a report on these metrics', agent=Agent(llm=OpenAIChat(openai_api_key=''), max_loops=1, dashboard=False), args=[], kwargs={}, result=None, history=[])]
"""
try:
for task in self.tasks:
if task.description == task:
self.tasks.remove(task)
break
else:
raise ValueError(
f"Task {task} not found in workflow."
)
except Exception as error:
print(
colored(
f"Error deleting task from workflow: {error}",
"red",
),
)
def concurrent_run(self):
"""
Concurrently run the workflow using a pool of workers.
Examples:
>>> from swarms.models import OpenAIChat
>>> from swarms.structs import SequentialWorkflow
>>> llm = OpenAIChat(openai_api_key="")
>>> workflow = SequentialWorkflow(max_loops=1)
"""
try:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures_to_task = {
executor.submit(task.run): task
for task in self.tasks
}
results = []
for future in concurrent.futures.as_completed(
futures_to_task
):
task = futures_to_task[future]
try:
result = future.result()
except Exception as error:
print(f"Error running workflow: {error}")
else:
results.append(result)
print(
f"Task {task} completed successfully with"
f" result: {result}"
)
except Exception as error:
print(colored(f"Error running workflow: {error}", "red"))
def save_workflow_state( def save_workflow_state(
self, self,
@ -232,26 +333,35 @@ class SequentialWorkflow:
>>> workflow.add("Create a report on these metrics", llm) >>> workflow.add("Create a report on these metrics", llm)
>>> workflow.save_workflow_state("sequential_workflow_state.json") >>> workflow.save_workflow_state("sequential_workflow_state.json")
""" """
filepath = filepath or self.saved_state_filepath try:
filepath = filepath or self.saved_state_filepath
with open(filepath, "w") as f:
# Saving the state as a json for simplicuty with open(filepath, "w") as f:
state = { # Saving the state as a json for simplicuty
"tasks": [ state = {
{ "tasks": [
"description": task.description, {
"args": task.args, "description": task.description,
"kwargs": task.kwargs, "args": task.args,
"result": task.result, "kwargs": task.kwargs,
"history": task.history, "result": task.result,
} "history": task.history,
for task in self.tasks }
], for task in self.tasks
"max_loops": self.max_loops, ],
} "max_loops": self.max_loops,
json.dump(state, f, indent=4) }
json.dump(state, f, indent=4)
except Exception as error:
print(
colored(
f"Error saving workflow state: {error}",
"red",
)
)
def workflow_bootup(self, **kwargs) -> None: def workflow_bootup(self, **kwargs) -> None:
"""Boots up the workflow."""
print( print(
colored( colored(
""" """
@ -312,22 +422,30 @@ class SequentialWorkflow:
def add_objective_to_workflow(self, task: str, **kwargs) -> None: def add_objective_to_workflow(self, task: str, **kwargs) -> None:
"""Adds an objective to the workflow.""" """Adds an objective to the workflow."""
print( try:
colored( print(
""" colored(
Adding Objective to Workflow...""", """
"green", Adding Objective to Workflow...""",
attrs=["bold", "underline"], "green",
attrs=["bold", "underline"],
)
) )
)
task = Task( task = Task(
description=task, description=task,
agent=kwargs["agent"], agent=kwargs["agent"],
args=list(kwargs["args"]), args=list(kwargs["args"]),
kwargs=kwargs["kwargs"], kwargs=kwargs["kwargs"],
) )
self.tasks.append(task) self.tasks.append(task)
except Exception as error:
print(
colored(
f"Error adding objective to workflow: {error}",
"red",
)
)
def load_workflow_state( def load_workflow_state(
self, filepath: str = None, **kwargs self, filepath: str = None, **kwargs
@ -349,22 +467,30 @@ class SequentialWorkflow:
>>> workflow.load_workflow_state("sequential_workflow_state.json") >>> workflow.load_workflow_state("sequential_workflow_state.json")
""" """
filepath = filepath or self.restore_state_filepath try:
filepath = filepath or self.restore_state_filepath
with open(filepath, "r") as f:
state = json.load(f) with open(filepath, "r") as f:
self.max_loops = state["max_loops"] state = json.load(f)
self.tasks = [] self.max_loops = state["max_loops"]
for task_state in state["tasks"]: self.tasks = []
task = Task( for task_state in state["tasks"]:
description=task_state["description"], task = Task(
agent=task_state["agent"], description=task_state["description"],
args=task_state["args"], agent=task_state["agent"],
kwargs=task_state["kwargs"], args=task_state["args"],
result=task_state["result"], kwargs=task_state["kwargs"],
history=task_state["history"], result=task_state["result"],
history=task_state["history"],
)
self.tasks.append(task)
except Exception as error:
print(
colored(
f"Error loading workflow state: {error}",
"red",
) )
self.tasks.append(task) )
def run(self) -> None: def run(self) -> None:
""" """
@ -439,43 +565,58 @@ class SequentialWorkflow:
ValueError: If a Agent instance is used as a task and the 'task' argument is not provided. ValueError: If a Agent instance is used as a task and the 'task' argument is not provided.
""" """
for _ in range(self.max_loops): try:
for task in self.tasks: for _ in range(self.max_loops):
# Check if the current task can be executed for task in self.tasks:
if task.result is None: # Check if the current task can be executed
# Check if the agent is a Agent and a 'task' argument is needed if task.result is None:
if isinstance(task.agent, Agent): # Check if the agent is a Agent and a 'task' argument is needed
# Ensure that 'task' is provided in the kwargs if isinstance(task.agent, Agent):
if "task" not in task.kwargs: # Ensure that 'task' is provided in the kwargs
raise ValueError( if "task" not in task.kwargs:
"The 'task' argument is required for" raise ValueError(
" the Agent agent execution in" "The 'task' argument is required"
f" '{task.description}'" " for the Agent agent execution"
f" in '{task.description}'"
)
# Separate the 'task' argument from other kwargs
flow_task_arg = task.kwargs.pop("task")
task.result = await task.agent.arun(
flow_task_arg,
*task.args,
**task.kwargs,
) )
# Separate the 'task' argument from other kwargs
flow_task_arg = task.kwargs.pop("task")
task.result = await task.agent.arun(
flow_task_arg, *task.args, **task.kwargs
)
else:
# If it's not a Agent instance, call the agent directly
task.result = await task.agent(
*task.args, **task.kwargs
)
# Pass the result as an argument to the next task if it exists
next_task_index = self.tasks.index(task) + 1
if next_task_index < len(self.tasks):
next_task = self.tasks[next_task_index]
if isinstance(next_task.agent, Agent):
# For Agent flows, 'task' should be a keyword argument
next_task.kwargs["task"] = task.result
else: else:
# For other callable flows, the result is added to args # If it's not a Agent instance, call the agent directly
next_task.args.insert(0, task.result) task.result = await task.agent(
*task.args, **task.kwargs
)
# Autosave the workflow state # Pass the result as an argument to the next task if it exists
if self.autosave: next_task_index = self.tasks.index(task) + 1
self.save_workflow_state( if next_task_index < len(self.tasks):
"sequential_workflow_state.json" next_task = self.tasks[next_task_index]
) if isinstance(next_task.agent, Agent):
# For Agent flows, 'task' should be a keyword argument
next_task.kwargs["task"] = task.result
else:
# For other callable flows, the result is added to args
next_task.args.insert(0, task.result)
# Autosave the workflow state
if self.autosave:
self.save_workflow_state(
"sequential_workflow_state.json"
)
except Exception as e:
print(
colored(
(
"Error initializing the Sequential workflow:"
f" {e} try optimizing your inputs like the"
" agent class and task description"
),
"red",
attrs=["bold", "underline"],
)
)

@ -0,0 +1,3 @@
from swarms.tools.tool_func_doc_scraper import scrape_tool_func_docs
__all__ = ["scrape_tool_func_docs"]

@ -0,0 +1,45 @@
import inspect
from typing import Callable
from termcolor import colored
def scrape_tool_func_docs(fn: Callable) -> str:
"""
Scrape the docstrings and parameters of a function decorated with `tool` and return a formatted string.
Args:
fn (Callable): The function to scrape.
Returns:
str: A string containing the function's name, documentation string, and a list of its parameters. Each parameter is represented as a line containing the parameter's name, default value, and annotation.
"""
try:
# If the function is a tool, get the original function
if hasattr(fn, "func"):
fn = fn.func
signature = inspect.signature(fn)
parameters = []
for name, param in signature.parameters.items():
parameters.append(
f"Name: {name}, Type:"
f" {param.default if param.default is not param.empty else 'None'},"
" Annotation:"
f" {param.annotation if param.annotation is not param.empty else 'None'}"
)
parameters_str = "\n".join(parameters)
return (
f"Function: {fn.__name__}\nDocstring:"
f" {inspect.getdoc(fn)}\nParameters:\n{parameters_str}"
)
except Exception as error:
print(
colored(
(
f"Error scraping tool function docs {error} try"
" optimizing your inputs with different"
" variables and attempt once more."
),
"red",
)
)

@ -0,0 +1,48 @@
import re
import json
def extract_tool_commands(self, text: str):
"""
Extract the tool commands from the text
Example:
```json
{
"tool": "tool_name",
"params": {
"tool1": "inputs",
"param2": "value2"
}
}
```
"""
# Regex to find JSON like strings
pattern = r"```json(.+?)```"
matches = re.findall(pattern, text, re.DOTALL)
json_commands = []
for match in matches:
try:
json_commands = json.loads(match)
json_commands.append(json_commands)
except Exception as error:
print(f"Error parsing JSON command: {error}")
def parse_and_execute_tools(response: str):
"""Parse and execute the tools"""
json_commands = extract_tool_commands(response)
for command in json_commands:
tool_name = command.get("tool")
params = command.get("parmas", {})
execute_tools(tool_name, params)
def execute_tools(self, tool_name, params):
"""Execute the tool with the provided params"""
tool = self.tool_find_by_name(tool_name)
if tool:
# Execute the tool with the provided parameters
tool_result = tool.run(**params)
print(tool_result)

@ -1,5 +1,4 @@
from swarms.utils.markdown_message import display_markdown_message from swarms.utils.markdown_message import display_markdown_message
from swarms.utils.futures import execute_futures_dict
from swarms.utils.code_interpreter import SubprocessCodeInterpreter from swarms.utils.code_interpreter import SubprocessCodeInterpreter
from swarms.utils.parse_code import ( from swarms.utils.parse_code import (
extract_code_in_backticks_in_string, extract_code_in_backticks_in_string,
@ -8,7 +7,6 @@ from swarms.utils.pdf_to_text import pdf_to_text
__all__ = [ __all__ = [
"display_markdown_message", "display_markdown_message",
"execute_futures_dict",
"SubprocessCodeInterpreter", "SubprocessCodeInterpreter",
"extract_code_in_backticks_in_string", "extract_code_in_backticks_in_string",
"pdf_to_text", "pdf_to_text",

@ -0,0 +1,42 @@
from concurrent import futures
from concurrent.futures import Future
from typing import TypeVar, Dict
T = TypeVar("T")
def execute_futures_dict(
fs_dict: Dict[str, Future[T]]
) -> Dict[str, T]:
"""Execute a dictionary of futures and return the results.
Args:
fs_dict (dict[str, futures.Future[T]]): _description_
Returns:
dict[str, T]: _description_
Example:
>>> import concurrent.futures
>>> import time
>>> import random
>>> import swarms.utils.futures
>>> def f(x):
... time.sleep(random.random())
... return x
>>> with concurrent.futures.ThreadPoolExecutor() as executor:
... fs_dict = {
... str(i): executor.submit(f, i)
... for i in range(10)
... }
... print(swarms.utils.futures.execute_futures_dict(fs_dict))
{'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9}
"""
futures.wait(
fs_dict.values(),
timeout=None,
return_when=futures.ALL_COMPLETED,
)
return {key: future.result() for key, future in fs_dict.items()}

@ -1,16 +0,0 @@
from concurrent import futures
from typing import TypeVar
T = TypeVar("T")
def execute_futures_dict(
fs_dict: dict[str, futures.Future[T]]
) -> dict[str, T]:
futures.wait(
fs_dict.values(),
timeout=None,
return_when=futures.ALL_COMPLETED,
)
return {key: future.result() for key, future in fs_dict.items()}

@ -1,15 +1,31 @@
import re import re
# def extract_code_in_backticks_in_string(s: str) -> str:
# """
# Extracts code blocks from a markdown string.
def extract_code_in_backticks_in_string(message: str) -> str: # Args:
# s (str): The markdown string to extract code from.
# Returns:
# list: A list of tuples. Each tuple contains the language of the code block (if specified) and the code itself.
# """
# pattern = r"```([\w\+\#\-\.\s]*)\n(.*?)```"
# matches = re.findall(pattern, s, re.DOTALL)
# out = [(match[0], match[1].strip()) for match in matches]
# print(out)
def extract_code_in_backticks_in_string(s: str) -> str:
""" """
To extract code from a string in markdown and return a string Extracts code blocks from a markdown string.
Args:
s (str): The markdown string to extract code from.
Returns:
str: A string containing all the code blocks.
""" """
pattern = ( # Non-greedy match between six backticks pattern = r"```([\w\+\#\-\.\s]*)(.*?)```"
r"`` ``(.*?)`` " matches = re.findall(pattern, s, re.DOTALL)
) return "\n".join(match[1].strip() for match in matches)
match = re.search(
pattern, message, re.DOTALL
) # re.DOTALL to match newline chars
return match.group(1).strip() if match else None

@ -0,0 +1,27 @@
import tiktoken
def limit_tokens_from_string(
string: str, model: str = "gpt-4", limit: int = 500
) -> str:
"""Limits the number of tokens in a string
Args:
string (str): _description_
model (str): _description_
limit (int): _description_
Returns:
str: _description_
"""
try:
encoding = tiktoken.encoding_for_model(model)
except Exception:
encoding = tiktoken.encoding_for_model(
"gpt2"
) # Fallback for others.
encoded = encoding.encode(string)
out = encoding.decode(encoded[:limit])
return out
Loading…
Cancel
Save