parent
5bce72cedc
commit
780b04925e
@ -0,0 +1,9 @@
|
||||
"""
|
||||
Plan -> act in a loop until observation is met
|
||||
|
||||
|
||||
# Tools
|
||||
- Terminal
|
||||
- Text Editor
|
||||
- Browser
|
||||
"""
|
@ -0,0 +1,59 @@
|
||||
def test_create_graph():
|
||||
"""
|
||||
Tests that a graph can be created.
|
||||
"""
|
||||
graph = create_graph()
|
||||
assert isinstance(graph, dict)
|
||||
|
||||
|
||||
def test_weight_edges():
|
||||
"""
|
||||
Tests that the edges of a graph can be weighted.
|
||||
"""
|
||||
graph = create_graph()
|
||||
weight_edges(graph)
|
||||
for edge in graph.edges:
|
||||
assert isinstance(edge.weight, int)
|
||||
|
||||
|
||||
def test_create_user_list():
|
||||
"""
|
||||
Tests that a list of all the podcasts that the user has listened to can be created.
|
||||
"""
|
||||
user_list = create_user_list()
|
||||
assert isinstance(user_list, list)
|
||||
|
||||
|
||||
def test_find_most_similar_podcasts():
|
||||
"""
|
||||
Tests that the most similar podcasts to a given podcast can be found.
|
||||
"""
|
||||
graph = create_graph()
|
||||
weight_edges(graph)
|
||||
user_list = create_user_list()
|
||||
most_similar_podcasts = find_most_similar_podcasts(
|
||||
graph, user_list
|
||||
)
|
||||
assert isinstance(most_similar_podcasts, list)
|
||||
|
||||
|
||||
def test_add_most_similar_podcasts():
|
||||
"""
|
||||
Tests that the most similar podcasts to a given podcast can be added to the user's list.
|
||||
"""
|
||||
graph = create_graph()
|
||||
weight_edges(graph)
|
||||
user_list = create_user_list()
|
||||
add_most_similar_podcasts(graph, user_list)
|
||||
assert len(user_list) > 0
|
||||
|
||||
|
||||
def test_repeat_steps():
|
||||
"""
|
||||
Tests that steps 5-6 can be repeated until the user's list contains the desired number of podcasts.
|
||||
"""
|
||||
graph = create_graph()
|
||||
weight_edges(graph)
|
||||
user_list = create_user_list()
|
||||
repeat_steps(graph, user_list)
|
||||
assert len(user_list) == 10
|
@ -0,0 +1,46 @@
|
||||
import pytest
|
||||
|
||||
def test_create_youtube_account():
|
||||
# Arrange
|
||||
# Act
|
||||
# Assert
|
||||
|
||||
def test_install_video_editing_software():
|
||||
# Arrange
|
||||
# Act
|
||||
# Assert
|
||||
|
||||
def test_write_script():
|
||||
# Arrange
|
||||
# Act
|
||||
# Assert
|
||||
|
||||
def test_gather_footage():
|
||||
# Arrange
|
||||
# Act
|
||||
# Assert
|
||||
|
||||
def test_edit_video():
|
||||
# Arrange
|
||||
# Act
|
||||
# Assert
|
||||
|
||||
def test_export_video():
|
||||
# Arrange
|
||||
# Act
|
||||
# Assert
|
||||
|
||||
def test_upload_video_to_youtube():
|
||||
# Arrange
|
||||
# Act
|
||||
# Assert
|
||||
|
||||
def test_optimize_video_for_search():
|
||||
# Arrange
|
||||
# Act
|
||||
# Assert
|
||||
|
||||
def test_share_video():
|
||||
# Arrange
|
||||
# Act
|
||||
# Assert
|
@ -0,0 +1,253 @@
|
||||
import concurrent
|
||||
import csv
|
||||
from swarms import Agent, OpenAIChat
|
||||
from swarms.memory import ChromaDB
|
||||
from dotenv import load_dotenv
|
||||
from swarms.utils.parse_code import extract_code_from_markdown
|
||||
from swarms.utils.file_processing import create_file
|
||||
from swarms.utils.loguru_logger import logger
|
||||
|
||||
|
||||
# Load ENV
|
||||
load_dotenv()
|
||||
|
||||
# Gemini
|
||||
gemini = OpenAIChat()
|
||||
|
||||
# memory
|
||||
memory = ChromaDB(output_dir="swarm_hackathon")
|
||||
|
||||
|
||||
def execute_concurrently(callable_functions: callable, max_workers=5):
|
||||
"""
|
||||
Executes callable functions concurrently using multithreading.
|
||||
|
||||
Parameters:
|
||||
- callable_functions: A list of tuples, each containing the callable function and its arguments.
|
||||
For example: [(function1, (arg1, arg2), {'kwarg1': val1}), (function2, (), {})]
|
||||
- max_workers: The maximum number of threads to use.
|
||||
|
||||
Returns:
|
||||
- results: A list of results returned by the callable functions. If an error occurs in any function,
|
||||
the exception object will be placed at the corresponding index in the list.
|
||||
"""
|
||||
results = [None] * len(callable_functions)
|
||||
|
||||
def worker(fn, args, kwargs, index):
|
||||
try:
|
||||
result = fn(*args, **kwargs)
|
||||
results[index] = result
|
||||
except Exception as e:
|
||||
results[index] = e
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=max_workers
|
||||
) as executor:
|
||||
futures = []
|
||||
for i, (fn, args, kwargs) in enumerate(callable_functions):
|
||||
futures.append(
|
||||
executor.submit(worker, fn, args, kwargs, i)
|
||||
)
|
||||
|
||||
# Wait for all threads to complete
|
||||
concurrent.futures.wait(futures)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# Adjusting the function to extract specific column values
|
||||
def extract_and_create_agents(
|
||||
csv_file_path: str, target_columns: list
|
||||
):
|
||||
"""
|
||||
Reads a CSV file, extracts "Project Name" and "Lightning Proposal" for each row,
|
||||
creates an Agent for each, and adds it to the swarm network.
|
||||
|
||||
Parameters:
|
||||
- csv_file_path: The path to the CSV file.
|
||||
- target_columns: A list of column names to extract values from.
|
||||
"""
|
||||
try:
|
||||
agents = []
|
||||
with open(csv_file_path, mode="r", encoding="utf-8") as file:
|
||||
reader = csv.DictReader(file)
|
||||
for row in reader:
|
||||
project_name = row[target_columns[0]]
|
||||
lightning_proposal = row[target_columns[1]]
|
||||
|
||||
# Example of creating and adding an agent based on the project name and lightning proposal
|
||||
agent_name = f"{project_name} agent"
|
||||
print(agent_name) # For demonstration
|
||||
|
||||
# Create the agent
|
||||
logger.info("Creating agent...")
|
||||
|
||||
# Design agent
|
||||
logger.info("Creating design agent...")
|
||||
design_agent = Agent(
|
||||
llm=gemini,
|
||||
agent_name="Design Agent",
|
||||
max_loops=1,
|
||||
stopping_token="<DONE?>",
|
||||
sop=None,
|
||||
system_prompt=(
|
||||
"Transform an app idea into step by step very"
|
||||
" simple algorithmic psuedocode so it can be"
|
||||
" implemented simply."
|
||||
),
|
||||
long_term_memory=memory,
|
||||
)
|
||||
|
||||
# Log the agent
|
||||
logger.info(
|
||||
f"Code Agent created: {agent_name} with long term"
|
||||
" memory"
|
||||
)
|
||||
agent = Agent(
|
||||
llm=gemini,
|
||||
agent_name=agent_name,
|
||||
max_loops=1,
|
||||
code_interpreter=True,
|
||||
stopping_token="<DONE?>",
|
||||
sop=None,
|
||||
system_prompt=(
|
||||
"Transform an app idea into a very simple"
|
||||
" python app in markdown. Return all the"
|
||||
" python code in a single markdown file."
|
||||
" Return only code and nothing else."
|
||||
),
|
||||
long_term_memory=memory,
|
||||
)
|
||||
|
||||
# Testing agent
|
||||
logger.info(f"Testing_agent agent: {agent_name}")
|
||||
agent = Agent(
|
||||
llm=gemini,
|
||||
agent_name=agent_name + " testing",
|
||||
max_loops=1,
|
||||
stopping_token="<DONE?>",
|
||||
sop=None,
|
||||
system_prompt=(
|
||||
"Create unit tests using pytest based on the"
|
||||
" code you see, only return unit test code in"
|
||||
" python using markdown, only return the code"
|
||||
" and nothing else."
|
||||
),
|
||||
long_term_memory=memory,
|
||||
)
|
||||
|
||||
# Log the agent
|
||||
logger.info(
|
||||
f"Agent created: {agent_name} with long term"
|
||||
" memory"
|
||||
)
|
||||
agents.append(agent)
|
||||
|
||||
# Design agent
|
||||
design_agent_output = design_agent.run(
|
||||
(
|
||||
"Create the algorithmic psuedocode for the"
|
||||
f" {lightning_proposal} in markdown and"
|
||||
" return it"
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Algorithmic psuedocode created:"
|
||||
f" {design_agent_output}"
|
||||
)
|
||||
|
||||
# Create the code for each project
|
||||
output = agent.run(
|
||||
(
|
||||
"Create the code for the"
|
||||
f" {lightning_proposal} in python using the"
|
||||
" algorithmic psuedocode"
|
||||
f" {design_agent_output} and wrap it in"
|
||||
" markdown and return it"
|
||||
),
|
||||
None,
|
||||
)
|
||||
print(output)
|
||||
# Parse the output
|
||||
output = extract_code_from_markdown(output)
|
||||
# Create the file
|
||||
output = create_file(output, f"{project_name}.py")
|
||||
|
||||
# Testing agent
|
||||
testing_agent_output = agent.run(
|
||||
(
|
||||
"Create the unit tests for the"
|
||||
f" {lightning_proposal} in python using the"
|
||||
f" code {output} and wrap it in markdown and"
|
||||
" return it"
|
||||
),
|
||||
None,
|
||||
)
|
||||
print(testing_agent_output)
|
||||
|
||||
# Parse the output
|
||||
testing_agent_output = extract_code_from_markdown(
|
||||
testing_agent_output
|
||||
)
|
||||
# Create the file
|
||||
testing_agent_output = create_file(
|
||||
testing_agent_output, f"test_{project_name}.py"
|
||||
)
|
||||
|
||||
# Log the project created
|
||||
logger.info(
|
||||
f"Project {project_name} created: {output} at"
|
||||
f" file path {project_name}.py"
|
||||
)
|
||||
print(output)
|
||||
|
||||
# Log the unit tests created
|
||||
logger.info(
|
||||
f"Unit tests for {project_name} created:"
|
||||
f" {testing_agent_output} at file path"
|
||||
f" test_{project_name}.py"
|
||||
)
|
||||
|
||||
print(
|
||||
f"Agent {agent_name} created and added to the"
|
||||
" swarm network"
|
||||
)
|
||||
|
||||
return agents
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"An error occurred while extracting and creating"
|
||||
f" agents: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# CSV
|
||||
csv_file = "presentation.csv"
|
||||
|
||||
# Specific columns to extract
|
||||
target_columns = ["Project Name", "Project Description"]
|
||||
|
||||
# Use the adjusted function
|
||||
specific_column_values = extract_and_create_agents(
|
||||
csv_file, target_columns
|
||||
)
|
||||
|
||||
# Display the extracted column values
|
||||
print(specific_column_values)
|
||||
|
||||
|
||||
# Concurrently execute the function
|
||||
logger.info(
|
||||
"Concurrently executing the swarm for each hackathon project..."
|
||||
)
|
||||
output = execute_concurrently(
|
||||
[
|
||||
(extract_and_create_agents, (csv_file, target_columns), {}),
|
||||
],
|
||||
max_workers=5,
|
||||
)
|
||||
print(output)
|
@ -0,0 +1,86 @@
|
||||
class MockApp:
|
||||
def __init__(self):
|
||||
self.running = True
|
||||
self.session = None
|
||||
self.slides = []
|
||||
|
||||
def main_menu(self):
|
||||
return input("Choose option: 1. Start, 2. Load, 3. Exit ")
|
||||
|
||||
def start_new_talk(self, title):
|
||||
self.session = title
|
||||
self.slides = []
|
||||
|
||||
def add_slide(self, content):
|
||||
self.slides.append(content)
|
||||
|
||||
def edit_slide(self, index, content):
|
||||
self.slides[index] = content
|
||||
|
||||
def delete_slide(self, index):
|
||||
del self.slides[index]
|
||||
|
||||
def reorder_slides(self, new_order):
|
||||
self.slides = [self.slides[i] for i in new_order]
|
||||
|
||||
def get_number_of_slides(self):
|
||||
return len(self.slides)
|
||||
|
||||
# Function to simulate user actions
|
||||
def simulate_user_action(self, action):
|
||||
# Placeholder function to simulate user interaction, not part of the actual app code
|
||||
pass
|
||||
|
||||
|
||||
# Testing starting a new talk
|
||||
def test_start_new_talk():
|
||||
app = MockApp()
|
||||
app.start_new_talk("My New Talk")
|
||||
assert app.session == "My New Talk"
|
||||
assert app.slides == []
|
||||
|
||||
|
||||
# Testing adding a slide
|
||||
def test_add_slide():
|
||||
app = MockApp()
|
||||
app.start_new_talk("Talk 1")
|
||||
app.add_slide("Slide Content 1")
|
||||
assert app.slides == ["Slide Content 1"]
|
||||
|
||||
|
||||
# Testing editing a slide
|
||||
def test_edit_slide():
|
||||
app = MockApp()
|
||||
app.start_new_talk("Talk 1")
|
||||
app.add_slide("Slide Content 1")
|
||||
app.edit_slide(0, "Updated Slide Content 1")
|
||||
assert app.slides == ["Updated Slide Content 1"]
|
||||
|
||||
|
||||
# Testing deleting a slide
|
||||
def test_delete_slide():
|
||||
app = MockApp()
|
||||
app.start_new_talk("Talk 1")
|
||||
app.add_slide("Slide Content 1")
|
||||
app.add_slide("Slide Content 2")
|
||||
app.delete_slide(0)
|
||||
assert app.slides == ["Slide Content 2"]
|
||||
|
||||
|
||||
# Testing reordering slides
|
||||
def test_reorder_slides():
|
||||
app = MockApp()
|
||||
app.start_new_talk("Talk 1")
|
||||
app.add_slide("Slide Content 1")
|
||||
app.add_slide("Slide Content 2")
|
||||
app.reorder_slides([1, 0])
|
||||
assert app.slides == ["Slide Content 2", "Slide Content 1"]
|
||||
|
||||
|
||||
# Testing the number of slides
|
||||
def test_slide_count():
|
||||
app = MockApp()
|
||||
app.start_new_talk("Talk 1")
|
||||
app.add_slide("Slide Content 1")
|
||||
app.add_slide("Slide Content 2")
|
||||
assert app.get_number_of_slides() == 2
|
|
@ -0,0 +1,38 @@
|
||||
from ai_acceleerated_learning.Vocal import Vocal
|
||||
|
||||
vocal = Vocal()
|
||||
|
||||
|
||||
def test_pass():
|
||||
assert (
|
||||
vocal.generate_video(
|
||||
"I love to play basketball, and I am a very good player.",
|
||||
"basketball",
|
||||
)
|
||||
== "Successfully generated a YouTube video for your prompt: I"
|
||||
" love to play basketball, and I am a very good player."
|
||||
)
|
||||
|
||||
|
||||
def test_invalid_sports():
|
||||
assert (
|
||||
vocal.generate_video(
|
||||
"I just ate some delicious tacos", "tacos"
|
||||
)
|
||||
== "Invalid sports entered!! Please enter a valid sport."
|
||||
)
|
||||
|
||||
|
||||
def test_invalid_prompt():
|
||||
assert (
|
||||
vocal.generate_video(987, "basketball")
|
||||
== "Invalid prompt entered!! Please enter a valid prompt."
|
||||
)
|
||||
|
||||
|
||||
def test_not_string():
|
||||
assert (
|
||||
vocal.generate_video(789, 234)
|
||||
== "Invalid prompt and sports entered!! Please enter valid"
|
||||
" prompt and sport."
|
||||
)
|
@ -0,0 +1,86 @@
|
||||
# test_presentation_assistant.py
|
||||
|
||||
import pytest
|
||||
from presentation_assistant import (
|
||||
PresentationAssistant,
|
||||
SlideNotFoundError,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def assistant():
|
||||
slides = [
|
||||
"Welcome to our presentation!",
|
||||
"Here is the agenda for today.",
|
||||
"Let's dive into the first topic.",
|
||||
"Thank you for attending.",
|
||||
]
|
||||
return PresentationAssistant(slides)
|
||||
|
||||
|
||||
def test_init():
|
||||
slides = ["Slide 1", "Slide 2"]
|
||||
pa = PresentationAssistant(slides)
|
||||
assert pa.slides == slides
|
||||
assert pa.current_slide == 0
|
||||
|
||||
|
||||
def test_next_slide(assistant):
|
||||
assistant.next_slide()
|
||||
assert assistant.current_slide == 1
|
||||
assistant.next_slide()
|
||||
assert assistant.current_slide == 2
|
||||
|
||||
|
||||
def test_previous_slide(assistant):
|
||||
assistant.current_slide = 2
|
||||
assistant.previous_slide()
|
||||
assert assistant.current_slide == 1
|
||||
assistant.previous_slide()
|
||||
assert assistant.current_slide == 0
|
||||
|
||||
|
||||
def test_next_slide_at_end(assistant):
|
||||
assistant.current_slide = len(assistant.slides) - 1
|
||||
with pytest.raises(SlideNotFoundError):
|
||||
assistant.next_slide()
|
||||
|
||||
|
||||
def test_previous_slide_at_start(assistant):
|
||||
with pytest.raises(SlideNotFoundError):
|
||||
assistant.previous_slide()
|
||||
|
||||
|
||||
def test_go_to_slide(assistant):
|
||||
assistant.go_to_slide(2)
|
||||
assert assistant.current_slide == 2
|
||||
|
||||
|
||||
def test_go_to_slide_out_of_range(assistant):
|
||||
with pytest.raises(SlideNotFoundError):
|
||||
assistant.go_to_slide(len(assistant.slides))
|
||||
|
||||
|
||||
def test_go_to_slide_negative(assistant):
|
||||
with pytest.raises(SlideNotFoundError):
|
||||
assistant.go_to_slide(-1)
|
||||
|
||||
|
||||
def test_current_slide_content(assistant):
|
||||
content = assistant.current_slide_content()
|
||||
assert content == assistant.slides[0]
|
||||
assistant.next_slide()
|
||||
content = assistant.current_slide_content()
|
||||
assert content == assistant.slides[1]
|
||||
|
||||
|
||||
def test_show_slide(
|
||||
assistant, capsys
|
||||
): # capsys is a pytest fixture to capture stdout and stderr
|
||||
assistant.show_slide()
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out.strip() == assistant.slides[0]
|
||||
assistant.next_slide()
|
||||
assistant.show_slide()
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out.strip() == assistant.slides[1]
|
@ -0,0 +1,29 @@
|
||||
from swarms import Agent
|
||||
from swarms.models.base_llm import AbstractLLM
|
||||
|
||||
|
||||
class ExampleLLM(AbstractLLM):
|
||||
def __init__():
|
||||
pass
|
||||
|
||||
def run(self, task: str, *args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
## Initialize the workflow
|
||||
agent = Agent(
|
||||
llm=ExampleLLM(),
|
||||
max_loops="auto",
|
||||
autosave=True,
|
||||
dashboard=False,
|
||||
streaming_on=True,
|
||||
verbose=True,
|
||||
stopping_token="<DONE>",
|
||||
interactive=True,
|
||||
)
|
||||
|
||||
# Run the workflow on a task
|
||||
agent(
|
||||
"Generate a transcript for a youtube video on what swarms are!"
|
||||
" Output a <DONE> token when done."
|
||||
)
|
Loading…
Reference in new issue