Merge branch 'master' into master

pull/766/merge^2
nathanogaga118 3 months ago committed by GitHub
commit 393988cebd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -7,7 +7,7 @@ Before we dive into the code, let's briefly introduce the Swarms framework. Swar
For more information and to contribute to the project, visit the [Swarms GitHub repository](https://github.com/kyegomez/swarms). We highly recommend exploring the documentation for a deeper understanding of Swarms' capabilities. For more information and to contribute to the project, visit the [Swarms GitHub repository](https://github.com/kyegomez/swarms). We highly recommend exploring the documentation for a deeper understanding of Swarms' capabilities.
Additional resources: Additional resources:
- [Swarms Discord](https://discord.com/servers/agora-999382051935506503) for community discussions - [Swarms Discord](https://discord.gg/swarms) for community discussions
- [Swarms Twitter](https://x.com/swarms_corp) for updates - [Swarms Twitter](https://x.com/swarms_corp) for updates
- [Swarms Spotify](https://open.spotify.com/show/2HLiswhmUaMdjHC8AUHcCF?si=c831ef10c5ef4994) for podcasts - [Swarms Spotify](https://open.spotify.com/show/2HLiswhmUaMdjHC8AUHcCF?si=c831ef10c5ef4994) for podcasts
- [Swarms Blog](https://medium.com/@kyeg) for in-depth articles - [Swarms Blog](https://medium.com/@kyeg) for in-depth articles
@ -460,7 +460,7 @@ This system provides a powerful foundation for financial analysis, but there's a
Remember, the Swarms framework is a powerful and flexible tool that can be adapted to a wide range of complex tasks beyond just financial analysis. We encourage you to explore the [Swarms GitHub repository](https://github.com/kyegomez/swarms) for more examples and inspiration. Remember, the Swarms framework is a powerful and flexible tool that can be adapted to a wide range of complex tasks beyond just financial analysis. We encourage you to explore the [Swarms GitHub repository](https://github.com/kyegomez/swarms) for more examples and inspiration.
For more in-depth discussions and community support, consider joining the [Swarms Discord](https://discord.com/servers/agora-999382051935506503). You can also stay updated with the latest developments by following [Swarms on Twitter](https://x.com/swarms_corp). For more in-depth discussions and community support, consider joining the [Swarms Discord](https://discord.gg/swarms). You can also stay updated with the latest developments by following [Swarms on Twitter](https://x.com/swarms_corp).
If you're interested in learning more about AI and its applications in various fields, check out the [Swarms Spotify podcast](https://open.spotify.com/show/2HLiswhmUaMdjHC8AUHcCF?si=c831ef10c5ef4994) and the [Swarms Blog](https://medium.com/@kyeg) for insightful articles and discussions. If you're interested in learning more about AI and its applications in various fields, check out the [Swarms Spotify podcast](https://open.spotify.com/show/2HLiswhmUaMdjHC8AUHcCF?si=c831ef10c5ef4994) and the [Swarms Blog](https://medium.com/@kyeg) for insightful articles and discussions.
@ -474,7 +474,7 @@ By leveraging the power of multi-agent AI systems, you're well-equipped to navig
* [Swarms Github](https://github.com/kyegomez/swarms) * [Swarms Github](https://github.com/kyegomez/swarms)
* [Swarms Discord](https://discord.com/servers/agora-999382051935506503) * [Swarms Discord](https://discord.gg/swarms)
* [Swarms Twitter](https://x.com/swarms_corp) * [Swarms Twitter](https://x.com/swarms_corp)
* [Swarms Spotify](https://open.spotify.com/show/2HLiswhmUaMdjHC8AUHcCF?si=c831ef10c5ef4994) * [Swarms Spotify](https://open.spotify.com/show/2HLiswhmUaMdjHC8AUHcCF?si=c831ef10c5ef4994)
* [Swarms Blog](https://medium.com/@kyeg) * [Swarms Blog](https://medium.com/@kyeg)

@ -261,7 +261,7 @@ The table below summarizes the estimated savings for each use case:
- [book a call](https://cal.com/swarms) - [book a call](https://cal.com/swarms)
- Swarms Discord: https://discord.com/servers/agora-999382051935506503 - Swarms Discord: https://discord.gg/swarms
- Swarms Twitter: https://x.com/swarms_corp - Swarms Twitter: https://x.com/swarms_corp

@ -39,7 +39,7 @@ Here you'll find references about the Swarms framework, marketplace, community,
## Community ## Community
| Section | Links | | Section | Links |
|----------------------|--------------------------------------------------------------------------------------------| |----------------------|--------------------------------------------------------------------------------------------|
| Community | [Discord](https://discord.com/servers/agora-999382051935506503) | | Community | [Discord](https://discord.gg/swarms) |
| Blog | [Blog](https://medium.com/@kyeg) | | Blog | [Blog](https://medium.com/@kyeg) |
| Event Calendar | [LUMA](https://lu.ma/swarms_calendar) | | Event Calendar | [LUMA](https://lu.ma/swarms_calendar) |
| Twitter | [Twitter](https://x.com/swarms_corp) | | Twitter | [Twitter](https://x.com/swarms_corp) |

@ -50,7 +50,7 @@ extra:
- icon: fontawesome/brands/twitter - icon: fontawesome/brands/twitter
link: https://x.com/swarms_corp link: https://x.com/swarms_corp
- icon: fontawesome/brands/discord - icon: fontawesome/brands/discord
link: https://discord.com/servers/agora-999382051935506503 link: https://discord.gg/swarms
analytics: analytics:
provider: google provider: google
@ -219,6 +219,8 @@ nav:
- Meme Agents: - Meme Agents:
- Bob The Builder: "swarms/examples/bob_the_builder.md" - Bob The Builder: "swarms/examples/bob_the_builder.md"
- Meme Agent Builder: "swarms/examples/meme_agents.md" - Meme Agent Builder: "swarms/examples/meme_agents.md"
- Multi-Agent Collaboration:
- Swarms DAO: "swarms/examples/swarms_dao.md"
- Swarm Models: - Swarm Models:
- Overview: "swarms/models/index.md" - Overview: "swarms/models/index.md"
# - Models Available: "swarms/models/index.md" # - Models Available: "swarms/models/index.md"
@ -242,6 +244,7 @@ nav:
- Swarms Cloud API: - Swarms Cloud API:
# - Overview: "swarms_cloud/main.md" # - Overview: "swarms_cloud/main.md"
- Overview: "swarms_cloud/vision.md" - Overview: "swarms_cloud/vision.md"
- Deploying Swarms on Google Cloud Run: "swarms_cloud/cloud_run.md"
# - Swarms Cloud CLI: "swarms_cloud/cli.md" # - Swarms Cloud CLI: "swarms_cloud/cli.md"
- Swarm APIs: - Swarm APIs:
- MCS API: "swarms_cloud/mcs_api.md" - MCS API: "swarms_cloud/mcs_api.md"

@ -54,7 +54,7 @@ class Lumo:
Agent( Agent(
agent_name="Solana-Analysis-Agent", agent_name="Solana-Analysis-Agent",
model_name=Lumo(), llm=Lumo(),
max_loops="auto", max_loops="auto",
interactive=True, interactive=True,
streaming_on=True, streaming_on=True,

@ -0,0 +1,237 @@
# Swarms DAO Example
This example demonstrates how to create a swarm of agents to collaborate on a task. The agents are designed to work together to create a comprehensive strategy for a DAO focused on decentralized governance for climate action.
You can customize the agents and their system prompts to fit your specific needs.
And, this example is using the `deepseek-reasoner` model, which is a large language model that is optimized for reasoning tasks.
## Todo
- Add tools to check wallet of the treasury and check the balance of the treasury
- Add tools to check the price of the token
- Add tools to check the price of the token on different exchanges
- Add tools to check the price of the token on different chains
- Add tools to check twitter posts and check the sentiment of the posts
```python
import random
from swarms import Agent
# System prompts for each agent
MARKETING_AGENT_SYS_PROMPT = """
You are the Marketing Strategist Agent for a DAO. Your role is to develop, implement, and optimize all marketing and branding strategies to align with the DAO's mission and vision. The DAO is focused on decentralized governance for climate action, funding projects aimed at reducing carbon emissions, and incentivizing community participation through its native token.
### Objectives:
1. **Brand Awareness**: Build a globally recognized and trusted brand for the DAO.
2. **Community Growth**: Expand the DAO's community by onboarding individuals passionate about climate action and blockchain technology.
3. **Campaign Execution**: Launch high-impact marketing campaigns on platforms like Twitter, Discord, and YouTube to engage and retain community members.
4. **Partnerships**: Identify and build partnerships with like-minded organizations, NGOs, and influencers.
5. **Content Strategy**: Design educational and engaging content, including infographics, blog posts, videos, and AMAs.
### Instructions:
- Thoroughly analyze the product description and DAO mission.
- Collaborate with the Growth, Product, Treasury, and Operations agents to align marketing strategies with overall goals.
- Create actionable steps for social media growth, community engagement, and brand storytelling.
- Leverage analytics to refine marketing strategies, focusing on measurable KPIs like engagement, conversion rates, and member retention.
- Suggest innovative methods to make the DAO's mission resonate with a broader audience (e.g., gamified incentives, contests, or viral campaigns).
- Ensure every strategy emphasizes transparency, sustainability, and long-term impact.
"""
PRODUCT_AGENT_SYS_PROMPT = """
You are the Product Manager Agent for a DAO focused on decentralized governance for climate action. Your role is to design, manage, and optimize the DAO's product roadmap. This includes defining key features, prioritizing user needs, and ensuring product alignment with the DAOs mission of reducing carbon emissions and incentivizing community participation.
### Objectives:
1. **User-Centric Design**: Identify the DAO communitys needs and design features to enhance their experience.
2. **Roadmap Prioritization**: Develop a prioritized product roadmap based on community feedback and alignment with climate action goals.
3. **Integration**: Suggest technical solutions and tools for seamless integration with other platforms and blockchains.
4. **Continuous Improvement**: Regularly evaluate product features and recommend optimizations to improve usability, engagement, and adoption.
### Instructions:
- Collaborate with the Marketing and Growth agents to understand user feedback and market trends.
- Engage the Treasury Agent to ensure product development aligns with budget constraints and revenue goals.
- Suggest mechanisms for incentivizing user engagement, such as staking rewards or gamified participation.
- Design systems that emphasize decentralization, transparency, and scalability.
- Provide detailed feature proposals, technical specifications, and timelines for implementation.
- Ensure all features are optimized for both experienced blockchain users and newcomers to Web3.
"""
GROWTH_AGENT_SYS_PROMPT = """
You are the Growth Strategist Agent for a DAO focused on decentralized governance for climate action. Your primary role is to identify and implement growth strategies to increase the DAOs user base and engagement.
### Objectives:
1. **User Acquisition**: Identify effective strategies to onboard more users to the DAO.
2. **Retention**: Suggest ways to improve community engagement and retain active members.
3. **Data-Driven Insights**: Leverage data analytics to identify growth opportunities and areas of improvement.
4. **Collaborative Growth**: Work with other agents to align growth efforts with marketing, product development, and treasury goals.
### Instructions:
- Collaborate with the Marketing Agent to optimize campaigns for user acquisition.
- Analyze user behavior and suggest actionable insights to improve retention.
- Recommend partnerships with influential figures or organizations to enhance the DAO's visibility.
- Propose growth experiments (A/B testing, new incentives, etc.) and analyze their effectiveness.
- Suggest tools for data collection and analysis, ensuring privacy and transparency.
- Ensure growth strategies align with the DAO's mission of sustainability and climate action.
"""
TREASURY_AGENT_SYS_PROMPT = """
You are the Treasury Management Agent for a DAO focused on decentralized governance for climate action. Your role is to oversee the DAO's financial operations, including budgeting, funding allocation, and financial reporting.
### Objectives:
1. **Financial Transparency**: Maintain clear and detailed reports of the DAO's financial status.
2. **Budget Management**: Allocate funds strategically to align with the DAO's goals and priorities.
3. **Fundraising**: Identify and recommend strategies for fundraising to ensure the DAO's financial sustainability.
4. **Cost Optimization**: Suggest ways to reduce operational costs without sacrificing quality.
### Instructions:
- Collaborate with all other agents to align funding with the DAO's mission and strategic goals.
- Propose innovative fundraising campaigns (e.g., NFT drops, token sales) to generate revenue.
- Analyze financial risks and suggest mitigation strategies.
- Ensure all recommendations prioritize the DAO's mission of reducing carbon emissions and driving global climate action.
- Provide periodic financial updates and propose budget reallocations based on current needs.
"""
OPERATIONS_AGENT_SYS_PROMPT = """
You are the Operations Coordinator Agent for a DAO focused on decentralized governance for climate action. Your role is to ensure smooth day-to-day operations, coordinate workflows, and manage governance processes.
### Objectives:
1. **Workflow Optimization**: Streamline operational processes to maximize efficiency and effectiveness.
2. **Task Coordination**: Manage and delegate tasks to ensure timely delivery of goals.
3. **Governance**: Oversee governance processes, including proposal management and voting mechanisms.
4. **Communication**: Ensure seamless communication between all agents and community members.
### Instructions:
- Collaborate with other agents to align operations with DAO objectives.
- Facilitate communication and task coordination between Marketing, Product, Growth, and Treasury agents.
- Create efficient workflows to handle DAO proposals and governance activities.
- Suggest tools or platforms to improve operational efficiency.
- Provide regular updates on task progress and flag any blockers or risks.
"""
# Initialize agents
marketing_agent = Agent(
agent_name="Marketing-Agent",
system_prompt=MARKETING_AGENT_SYS_PROMPT,
model_name="deepseek/deepseek-reasoner",
autosave=True,
dashboard=False,
verbose=True,
)
product_agent = Agent(
agent_name="Product-Agent",
system_prompt=PRODUCT_AGENT_SYS_PROMPT,
model_name="deepseek/deepseek-reasoner",
autosave=True,
dashboard=False,
verbose=True,
)
growth_agent = Agent(
agent_name="Growth-Agent",
system_prompt=GROWTH_AGENT_SYS_PROMPT,
model_name="deepseek/deepseek-reasoner",
autosave=True,
dashboard=False,
verbose=True,
)
treasury_agent = Agent(
agent_name="Treasury-Agent",
system_prompt=TREASURY_AGENT_SYS_PROMPT,
model_name="deepseek/deepseek-reasoner",
autosave=True,
dashboard=False,
verbose=True,
)
operations_agent = Agent(
agent_name="Operations-Agent",
system_prompt=OPERATIONS_AGENT_SYS_PROMPT,
model_name="deepseek/deepseek-reasoner",
autosave=True,
dashboard=False,
verbose=True,
)
agents = [marketing_agent, product_agent, growth_agent, treasury_agent, operations_agent]
class DAOSwarmRunner:
"""
A class to manage and run a swarm of agents in a discussion.
"""
def __init__(self, agents: list, max_loops: int = 5, shared_context: str = "") -> None:
"""
Initializes the DAO Swarm Runner.
Args:
agents (list): A list of agents in the swarm.
max_loops (int, optional): The maximum number of discussion loops between agents. Defaults to 5.
shared_context (str, optional): The shared context for all agents to base their discussion on. Defaults to an empty string.
"""
self.agents = agents
self.max_loops = max_loops
self.shared_context = shared_context
self.discussion_history = []
def run(self, task: str) -> str:
"""
Runs the swarm in a random discussion.
Args:
task (str): The task or context that agents will discuss.
Returns:
str: The final discussion output after all loops.
"""
print(f"Task: {task}")
print("Initializing Random Discussion...")
# Initialize the discussion with the shared context
current_message = f"Task: {task}\nContext: {self.shared_context}"
self.discussion_history.append(current_message)
# Run the agents in a randomized discussion
for loop in range(self.max_loops):
print(f"\n--- Loop {loop + 1}/{self.max_loops} ---")
# Choose a random agent
agent = random.choice(self.agents)
print(f"Agent {agent.agent_name} is responding...")
# Run the agent and get a response
response = agent.run(current_message)
print(f"Agent {agent.agent_name} says:\n{response}\n")
# Append the response to the discussion history
self.discussion_history.append(f"{agent.agent_name}: {response}")
# Update the current message for the next agent
current_message = response
print("\n--- Discussion Complete ---")
return "\n".join(self.discussion_history)
swarm = DAOSwarmRunner(agents=agents, max_loops=1, shared_context="")
# User input for product description
product_description = """
The DAO is focused on decentralized governance for climate action.
It funds projects aimed at reducing carbon emissions and incentivizes community participation with a native token.
"""
# Assign a shared context for all agents
swarm.shared_context = product_description
# Run the swarm
task = """
Analyze the product description and create a collaborative strategy for marketing, product, growth, treasury, and operations. Ensure all recommendations align with the DAO's mission of reducing carbon emissions.
"""
output = swarm.run(task)
# Print the swarm output
print("Collaborative Strategy Output:\n", output)
```

@ -0,0 +1,254 @@
# Hosting Agents on Google Cloud Run
This documentation provides a highly detailed, step-by-step guide to hosting your agents using Google Cloud Run. It uses a well-structured project setup that includes a Dockerfile at the root level, a folder dedicated to your API file, and a `requirements.txt` file to manage all dependencies. This guide will ensure your deployment is scalable, efficient, and easy to maintain.
---
## **Project Structure**
Your project directory should adhere to the following structure to ensure compatibility and ease of deployment:
```
.
├── Dockerfile
├── requirements.txt
└── api/
└── api.py
```
Each component serves a specific purpose in the deployment pipeline, ensuring modularity and maintainability.
---
## **Step 1: Prerequisites**
Before you begin, make sure to satisfy the following prerequisites to avoid issues during deployment:
1. **Google Cloud Account**:
- Create a Google Cloud account at [Google Cloud Console](https://console.cloud.google.com/).
- Enable billing for your project. Billing is necessary for accessing Cloud Run services.
2. **Install Google Cloud SDK**:
- Follow the [installation guide](https://cloud.google.com/sdk/docs/install) to set up the Google Cloud SDK on your local machine.
3. **Install Docker**:
- Download and install Docker by following the [official Docker installation guide](https://docs.docker.com/get-docker/). Docker is crucial for containerizing your application.
4. **Create a Google Cloud Project**:
- Navigate to the Google Cloud Console and create a new project. Assign it a meaningful name and note the **Project ID**, as it will be used throughout this guide.
5. **Enable Required APIs**:
- Visit the [API Library](https://console.cloud.google.com/apis/library) and enable the following APIs:
- Cloud Run API
- Cloud Build API
- Artifact Registry API
- These APIs are essential for deploying and managing your application in Cloud Run.
---
## **Step 2: Creating the Files**
### 1. **`api/api.py`**
This is the main Python script where you define your Swarms agents and expose an API endpoint for interacting with them. Heres an example:
```python
from flask import Flask, request, jsonify
from swarms import Agent # Assuming `swarms` is the framework you're using
app = Flask(__name__)
# Example Swarm agent
agent = Agent(
agent_name="Stock-Analysis-Agent",
model_name="gpt-4o-mini",
max_loops="auto",
interactive=True,
streaming_on=True,
)
@app.route('/run-agent', methods=['POST'])
def run_agent():
data = request.json
task = data.get('task', '')
result = agent.run(task)
return jsonify({"result": result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
```
This example sets up a basic API that listens for POST requests, processes a task using a Swarm agent, and returns the result as a JSON response. Customize it based on your agents functionality.
---
### 2. **`requirements.txt`**
This file lists all Python dependencies required for your project. Example:
```
flask
swarms
# add any other dependencies here
```
Be sure to include any additional libraries your agents rely on. Keeping this file up to date ensures smooth dependency management during deployment.
---
### 3. **`Dockerfile`**
The Dockerfile specifies how your application is containerized. Below is a sample Dockerfile for your setup:
```dockerfile
# Use an official Python runtime as the base image
FROM python:3.10-slim
# Set the working directory
WORKDIR /app
# Copy requirements.txt and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the application code
COPY api/ ./api/
# Expose port 8080 (Cloud Run default port)
EXPOSE 8080
# Run the application
CMD ["python", "api/api.py"]
```
This Dockerfile ensures your application is containerized with minimal overhead, focusing on slim images for efficiency.
---
## **Step 3: Deploying to Google Cloud Run**
### 1. **Authenticate with Google Cloud**
Log in to your Google Cloud account by running:
```bash
gcloud auth login
```
Set the active project to match your deployment target:
```bash
gcloud config set project [PROJECT_ID]
```
Replace `[PROJECT_ID]` with your actual Project ID.
---
### 2. **Build the Docker Image**
Use Google Cloud's Artifact Registry to store and manage your Docker image. Follow these steps:
1. **Create a Repository**:
```bash
gcloud artifacts repositories create my-repo --repository-format=Docker --location=us-central1
```
2. **Authenticate Docker with Google Cloud**:
```bash
gcloud auth configure-docker us-central1-docker.pkg.dev
```
3. **Build and Tag the Image**:
```bash
docker build -t us-central1-docker.pkg.dev/[PROJECT_ID]/my-repo/my-image .
```
4. **Push the Image**:
```bash
docker push us-central1-docker.pkg.dev/[PROJECT_ID]/my-repo/my-image
```
---
### 3. **Deploy to Cloud Run**
Deploy the application to Cloud Run with the following command:
```bash
gcloud run deploy my-agent-service \
--image us-central1-docker.pkg.dev/[PROJECT_ID]/my-repo/my-image \
--platform managed \
--region us-central1 \
--allow-unauthenticated
```
Key points:
- Replace `[PROJECT_ID]` with your actual Project ID.
- The `--allow-unauthenticated` flag makes the service publicly accessible. Exclude it to restrict access.
---
## **Step 4: Testing the Deployment**
Once the deployment is complete, test the service:
1. Note the URL provided by Cloud Run.
2. Use `curl` or Postman to send a request. Example:
```bash
curl -X POST [CLOUD_RUN_URL]/run-agent \
-H "Content-Type: application/json" \
-d '{"task": "example task"}'
```
This tests whether your agent processes the task correctly and returns the expected output.
---
## **Step 5: Updating the Service**
To apply changes to your application:
1. Edit the necessary files.
2. Rebuild and push the updated Docker image:
```bash
docker build -t us-central1-docker.pkg.dev/[PROJECT_ID]/my-repo/my-image .
docker push us-central1-docker.pkg.dev/[PROJECT_ID]/my-repo/my-image
```
3. Redeploy the service:
```bash
gcloud run deploy my-agent-service \
--image us-central1-docker.pkg.dev/[PROJECT_ID]/my-repo/my-image
```
This ensures the latest version of your application is live.
---
## **Troubleshooting**
- **Permission Errors**:
Ensure your account has roles like Cloud Run Admin and Artifact Registry Reader.
- **Port Issues**:
Confirm the application listens on port 8080. Cloud Run expects this port by default.
- **Logs**:
Use the Google Cloud Console or CLI to review logs for debugging:
```bash
gcloud logs read --project [PROJECT_ID]
```
---
## **Conclusion**
By following this comprehensive guide, you can deploy your agents on Google Cloud Run with ease. This method leverages Docker for containerization and Google Cloud services for seamless scalability and management. With a robust setup like this, you can focus on enhancing your agents capabilities rather than worrying about deployment challenges.

@ -113,9 +113,9 @@ To further enhance your understanding and usage of the Swarms Platform, explore
### Links ### Links
- [API Documentation](https://docs.swarms.world) - [API Documentation](https://docs.swarms.world)
- [Community Forums](https://discord.com/servers/agora-999382051935506503) - [Community Forums](https://discord.gg/swarms)
- [Tutorials and Guides](https://docs.swarms.world)) - [Tutorials and Guides](https://docs.swarms.world))
- [Support](https://discord.com/servers/agora-999382051935506503) - [Support](https://discord.gg/swarms)
## Conclusion ## Conclusion

@ -0,0 +1,233 @@
import random
from swarms import Agent
# System prompts for each agent
MARKETING_AGENT_SYS_PROMPT = """
You are the Marketing Strategist Agent for a DAO. Your role is to develop, implement, and optimize all marketing and branding strategies to align with the DAO's mission and vision. The DAO is focused on decentralized governance for climate action, funding projects aimed at reducing carbon emissions, and incentivizing community participation through its native token.
### Objectives:
1. **Brand Awareness**: Build a globally recognized and trusted brand for the DAO.
2. **Community Growth**: Expand the DAO's community by onboarding individuals passionate about climate action and blockchain technology.
3. **Campaign Execution**: Launch high-impact marketing campaigns on platforms like Twitter, Discord, and YouTube to engage and retain community members.
4. **Partnerships**: Identify and build partnerships with like-minded organizations, NGOs, and influencers.
5. **Content Strategy**: Design educational and engaging content, including infographics, blog posts, videos, and AMAs.
### Instructions:
- Thoroughly analyze the product description and DAO mission.
- Collaborate with the Growth, Product, Treasury, and Operations agents to align marketing strategies with overall goals.
- Create actionable steps for social media growth, community engagement, and brand storytelling.
- Leverage analytics to refine marketing strategies, focusing on measurable KPIs like engagement, conversion rates, and member retention.
- Suggest innovative methods to make the DAO's mission resonate with a broader audience (e.g., gamified incentives, contests, or viral campaigns).
- Ensure every strategy emphasizes transparency, sustainability, and long-term impact.
"""
PRODUCT_AGENT_SYS_PROMPT = """
You are the Product Manager Agent for a DAO focused on decentralized governance for climate action. Your role is to design, manage, and optimize the DAO's product roadmap. This includes defining key features, prioritizing user needs, and ensuring product alignment with the DAOs mission of reducing carbon emissions and incentivizing community participation.
### Objectives:
1. **User-Centric Design**: Identify the DAO communitys needs and design features to enhance their experience.
2. **Roadmap Prioritization**: Develop a prioritized product roadmap based on community feedback and alignment with climate action goals.
3. **Integration**: Suggest technical solutions and tools for seamless integration with other platforms and blockchains.
4. **Continuous Improvement**: Regularly evaluate product features and recommend optimizations to improve usability, engagement, and adoption.
### Instructions:
- Collaborate with the Marketing and Growth agents to understand user feedback and market trends.
- Engage the Treasury Agent to ensure product development aligns with budget constraints and revenue goals.
- Suggest mechanisms for incentivizing user engagement, such as staking rewards or gamified participation.
- Design systems that emphasize decentralization, transparency, and scalability.
- Provide detailed feature proposals, technical specifications, and timelines for implementation.
- Ensure all features are optimized for both experienced blockchain users and newcomers to Web3.
"""
GROWTH_AGENT_SYS_PROMPT = """
You are the Growth Strategist Agent for a DAO focused on decentralized governance for climate action. Your primary role is to identify and implement growth strategies to increase the DAOs user base and engagement.
### Objectives:
1. **User Acquisition**: Identify effective strategies to onboard more users to the DAO.
2. **Retention**: Suggest ways to improve community engagement and retain active members.
3. **Data-Driven Insights**: Leverage data analytics to identify growth opportunities and areas of improvement.
4. **Collaborative Growth**: Work with other agents to align growth efforts with marketing, product development, and treasury goals.
### Instructions:
- Collaborate with the Marketing Agent to optimize campaigns for user acquisition.
- Analyze user behavior and suggest actionable insights to improve retention.
- Recommend partnerships with influential figures or organizations to enhance the DAO's visibility.
- Propose growth experiments (A/B testing, new incentives, etc.) and analyze their effectiveness.
- Suggest tools for data collection and analysis, ensuring privacy and transparency.
- Ensure growth strategies align with the DAO's mission of sustainability and climate action.
"""
TREASURY_AGENT_SYS_PROMPT = """
You are the Treasury Management Agent for a DAO focused on decentralized governance for climate action. Your role is to oversee the DAO's financial operations, including budgeting, funding allocation, and financial reporting.
### Objectives:
1. **Financial Transparency**: Maintain clear and detailed reports of the DAO's financial status.
2. **Budget Management**: Allocate funds strategically to align with the DAO's goals and priorities.
3. **Fundraising**: Identify and recommend strategies for fundraising to ensure the DAO's financial sustainability.
4. **Cost Optimization**: Suggest ways to reduce operational costs without sacrificing quality.
### Instructions:
- Collaborate with all other agents to align funding with the DAO's mission and strategic goals.
- Propose innovative fundraising campaigns (e.g., NFT drops, token sales) to generate revenue.
- Analyze financial risks and suggest mitigation strategies.
- Ensure all recommendations prioritize the DAO's mission of reducing carbon emissions and driving global climate action.
- Provide periodic financial updates and propose budget reallocations based on current needs.
"""
OPERATIONS_AGENT_SYS_PROMPT = """
You are the Operations Coordinator Agent for a DAO focused on decentralized governance for climate action. Your role is to ensure smooth day-to-day operations, coordinate workflows, and manage governance processes.
### Objectives:
1. **Workflow Optimization**: Streamline operational processes to maximize efficiency and effectiveness.
2. **Task Coordination**: Manage and delegate tasks to ensure timely delivery of goals.
3. **Governance**: Oversee governance processes, including proposal management and voting mechanisms.
4. **Communication**: Ensure seamless communication between all agents and community members.
### Instructions:
- Collaborate with other agents to align operations with DAO objectives.
- Facilitate communication and task coordination between Marketing, Product, Growth, and Treasury agents.
- Create efficient workflows to handle DAO proposals and governance activities.
- Suggest tools or platforms to improve operational efficiency.
- Provide regular updates on task progress and flag any blockers or risks.
"""
# Initialize agents
marketing_agent = Agent(
agent_name="Marketing-Agent",
system_prompt=MARKETING_AGENT_SYS_PROMPT,
model_name="deepseek/deepseek-reasoner",
autosave=True,
dashboard=False,
verbose=True,
)
product_agent = Agent(
agent_name="Product-Agent",
system_prompt=PRODUCT_AGENT_SYS_PROMPT,
model_name="deepseek/deepseek-reasoner",
autosave=True,
dashboard=False,
verbose=True,
)
growth_agent = Agent(
agent_name="Growth-Agent",
system_prompt=GROWTH_AGENT_SYS_PROMPT,
model_name="deepseek/deepseek-reasoner",
autosave=True,
dashboard=False,
verbose=True,
)
treasury_agent = Agent(
agent_name="Treasury-Agent",
system_prompt=TREASURY_AGENT_SYS_PROMPT,
model_name="deepseek/deepseek-reasoner",
autosave=True,
dashboard=False,
verbose=True,
)
operations_agent = Agent(
agent_name="Operations-Agent",
system_prompt=OPERATIONS_AGENT_SYS_PROMPT,
model_name="deepseek/deepseek-reasoner",
autosave=True,
dashboard=False,
verbose=True,
)
agents = [
marketing_agent,
product_agent,
growth_agent,
treasury_agent,
operations_agent,
]
class DAOSwarmRunner:
"""
A class to manage and run a swarm of agents in a discussion.
"""
def __init__(
self,
agents: list,
max_loops: int = 5,
shared_context: str = "",
) -> None:
"""
Initializes the DAO Swarm Runner.
Args:
agents (list): A list of agents in the swarm.
max_loops (int, optional): The maximum number of discussion loops between agents. Defaults to 5.
shared_context (str, optional): The shared context for all agents to base their discussion on. Defaults to an empty string.
"""
self.agents = agents
self.max_loops = max_loops
self.shared_context = shared_context
self.discussion_history = []
def run(self, task: str) -> str:
"""
Runs the swarm in a random discussion.
Args:
task (str): The task or context that agents will discuss.
Returns:
str: The final discussion output after all loops.
"""
print(f"Task: {task}")
print("Initializing Random Discussion...")
# Initialize the discussion with the shared context
current_message = (
f"Task: {task}\nContext: {self.shared_context}"
)
self.discussion_history.append(current_message)
# Run the agents in a randomized discussion
for loop in range(self.max_loops):
print(f"\n--- Loop {loop + 1}/{self.max_loops} ---")
# Choose a random agent
agent = random.choice(self.agents)
print(f"Agent {agent.agent_name} is responding...")
# Run the agent and get a response
response = agent.run(current_message)
print(f"Agent {agent.agent_name} says:\n{response}\n")
# Append the response to the discussion history
self.discussion_history.append(
f"{agent.agent_name}: {response}"
)
# Update the current message for the next agent
current_message = response
print("\n--- Discussion Complete ---")
return "\n".join(self.discussion_history)
swarm = DAOSwarmRunner(agents=agents, max_loops=1, shared_context="")
# User input for product description
product_description = """
The DAO is focused on decentralized governance for climate action.
It funds projects aimed at reducing carbon emissions and incentivizes community participation with a native token.
"""
# Assign a shared context for all agents
swarm.shared_context = product_description
# Run the swarm
task = """
Analyze the product description and create a collaborative strategy for marketing, product, growth, treasury, and operations. Ensure all recommendations align with the DAO's mission of reducing carbon emissions.
"""
output = swarm.run(task)
# Print the swarm output
print("Collaborative Strategy Output:\n", output)

@ -0,0 +1,9 @@
from swarms import Agent
Agent(
agent_name="Stock-Analysis-Agent",
model_name="deepseek/deepseek-reasoner",
max_loops="auto",
interactive=True,
streaming_on=True,
).run("What are 5 hft algorithms")

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "6.9.8" version = "7.0.0"
description = "Swarms - TGSC" description = "Swarms - TGSC"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]
@ -78,25 +78,11 @@ rich = "*"
# sentence-transformers = "*" # sentence-transformers = "*"
# [tool.poetry.extras]
# # Extra for NLP-related functionalities
# nlp = [
# "torch>=2.1.1,<3.0",
# "transformers>=4.39.0,<5.0.0",
# "sentence-transformers",
# "swarm-models",
# ]
# # Extra for database-related functionalities
# db = ["chromadb"]
# # All optional dependencies for convenience # # All optional dependencies for convenience
# all = [ # all = [
# "torch>=2.1.1,<3.0", # "torch",
# "transformers>=4.39.0,<5.0.0", # "transformers",
# "sentence-transformers", # "litellm"
# "chromadb",
# "swarm-models"
# ] # ]

@ -2,7 +2,6 @@ from swarms.tools.tool_utils import (
scrape_tool_func_docs, scrape_tool_func_docs,
tool_find_by_name, tool_find_by_name,
) )
from swarms.tools.func_calling_executor import openai_tool_executor
from swarms.tools.pydantic_to_json import ( from swarms.tools.pydantic_to_json import (
_remove_a_key, _remove_a_key,
base_model_to_openai_function, base_model_to_openai_function,
@ -34,7 +33,6 @@ from swarms.tools.json_utils import base_model_to_json
__all__ = [ __all__ = [
"scrape_tool_func_docs", "scrape_tool_func_docs",
"tool_find_by_name", "tool_find_by_name",
"openai_tool_executor",
"_remove_a_key", "_remove_a_key",
"base_model_to_openai_function", "base_model_to_openai_function",
"multi_base_model_to_openai_function", "multi_base_model_to_openai_function",

@ -1,238 +0,0 @@
import concurrent.futures
from typing import Callable, Any, Dict, List
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="func_calling_executor")
# def openai_tool_executor(
# tools: List[Dict[str, Any]],
# function_map: Dict[str, Callable],
# verbose: bool = True,
# return_as_string: bool = False,
# *args,
# **kwargs,
# ) -> Callable:
# """
# Creates a function that dynamically and concurrently executes multiple functions based on parameters specified
# in a list of tool dictionaries, with extensive error handling and validation.
# Args:
# tools (List[Dict[str, Any]]): A list of dictionaries, each containing configuration for a tool, including parameters.
# function_map (Dict[str, Callable]): A dictionary mapping function names to their corresponding callable functions.
# verbose (bool): If True, enables verbose logging.
# return_as_string (bool): If True, returns the results as a concatenated string.
# Returns:
# Callable: A function that, when called, executes the specified functions concurrently with the parameters given.
# Examples:
# >>> def test_function(param1: int, param2: str) -> str:
# ... return f"Test function called with parameters: {param1}, {param2}"
# >>> tool_executor = openai_tool_executor(
# ... tools=[
# ... {
# ... "type": "function",
# ... "function": {
# ... "name": "test_function",
# ... "parameters": {
# ... "param1": 1,
# ... "param2": "example"
# ... }
# ... }
# ... }
# ... ],
# ... function_map={
# ... "test_function": test_function
# ... },
# ... return_as_string=True
# ... )
# >>> results = tool_executor()
# >>> print(results)
# """
# def tool_executor():
# # Prepare tasks for concurrent execution
# results = []
# logger.info(f"Executing {len(tools)} tools concurrently.")
# with concurrent.futures.ThreadPoolExecutor() as executor:
# futures = []
# for tool in tools:
# if tool.get("type") != "function":
# continue # Skip non-function tool entries
# function_info = tool.get("function", {})
# func_name = function_info.get("name")
# logger.info(f"Executing function: {func_name}")
# # Check if the function name is mapped to an actual function
# if func_name not in function_map:
# error_message = f"Function '{func_name}' not found in function map."
# logger.error(error_message)
# results.append(error_message)
# continue
# # Validate parameters
# params = function_info.get("parameters", {})
# if not params:
# error_message = f"No parameters specified for function '{func_name}'."
# logger.error(error_message)
# results.append(error_message)
# continue
# # Submit the function for execution
# try:
# future = executor.submit(
# function_map[func_name], **params
# )
# futures.append((func_name, future))
# except Exception as e:
# error_message = f"Failed to submit the function '{func_name}' for execution: {e}"
# logger.error(error_message)
# results.append(error_message)
# # Gather results from all futures
# for func_name, future in futures:
# try:
# result = future.result() # Collect result from future
# results.append(f"{func_name}: {result}")
# except Exception as e:
# error_message = f"Error during execution of function '{func_name}': {e}"
# logger.error(error_message)
# results.append(error_message)
# if return_as_string:
# return "\n".join(results)
# logger.info(f"Results: {results}")
# return results
# return tool_executor
def openai_tool_executor(
tools: List[Dict[str, Any]],
function_map: Dict[str, Callable],
verbose: bool = True,
return_as_string: bool = False,
*args,
**kwargs,
) -> Callable:
def tool_executor():
results = []
logger.info(f"Executing {len(tools)} tools concurrently.")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for tool in tools:
if tool.get("type") != "function":
continue
function_info = tool.get("function", {})
func_name = function_info.get("name")
logger.info(f"Executing function: {func_name}")
if func_name not in function_map:
error_message = f"Function '{func_name}' not found in function map."
logger.error(error_message)
results.append(error_message)
continue
params = function_info.get("parameters", {})
if not params:
error_message = f"No parameters specified for function '{func_name}'."
logger.error(error_message)
results.append(error_message)
continue
if (
"name" in params
and params["name"] in function_map
):
try:
result = function_map[params["name"]](
**params
)
results.append(f"{params['name']}: {result}")
except Exception as e:
error_message = f"Failed to execute the function '{params['name']}': {e}"
logger.error(error_message)
results.append(error_message)
continue
try:
future = executor.submit(
function_map[func_name], **params
)
futures.append((func_name, future))
except Exception as e:
error_message = f"Failed to submit the function '{func_name}' for execution: {e}"
logger.error(error_message)
results.append(error_message)
for func_name, future in futures:
try:
result = future.result()
results.append(f"{func_name}: {result}")
except Exception as e:
error_message = f"Error during execution of function '{func_name}': {e}"
logger.error(error_message)
results.append(error_message)
if return_as_string:
return "\n".join(results)
logger.info(f"Results: {results}")
return results
return tool_executor
# function_schema = {
# "name": "execute",
# "description": "Executes code on the user's machine **in the users local environment** and returns the output",
# "parameters": {
# "type": "object",
# "properties": {
# "language": {
# "type": "string",
# "description": "The programming language (required parameter to the `execute` function)",
# "enum": [
# # This will be filled dynamically with the languages OI has access to.
# ],
# },
# "code": {
# "type": "string",
# "description": "The code to execute (required)",
# },
# },
# "required": ["language", "code"],
# },
# }
# def execute(language: str, code: str):
# """
# Executes code on the user's machine **in the users local environment** and returns the output
# Args:
# language (str): The programming language (required parameter to the `execute` function)
# code (str): The code to execute (required)
# Returns:
# str: The output of the code execution
# """
# # This function will be implemented by the user
# return "Code execution not implemented yet"
# # Example execution
# out = openai_tool_executor(
# tools=[function_schema],
# function_map={
# "execute": execute,
# },
# return_as_string=True,
# )
# print(out)

@ -1,16 +1,24 @@
import inspect import inspect
from typing import Callable, Type, Union
def process_tool_docs(item): def process_tool_docs(item: Union[Callable, Type]) -> str:
""" """
Process the documentation for a given item. Process the documentation for a given item, which can be a function or a class.
Args: Args:
item: The item to process the documentation for. item: The item to process the documentation for. It can be a function or a class.
Returns: Returns:
metadata: The processed metadata containing the item's name, documentation, and source code. str: The processed metadata containing the item's name, documentation, and source code.
Raises:
TypeError: If the item is not a function or a class.
""" """
# Check if item is a function or a class
if not inspect.isfunction(item) and not inspect.isclass(item):
raise TypeError("Item must be a function or a class.")
# If item is an instance of a class, get its class # If item is an instance of a class, get its class
if not inspect.isclass(item) and hasattr(item, "__class__"): if not inspect.isclass(item) and hasattr(item, "__class__"):
item = item.__class__ item = item.__class__

@ -144,3 +144,7 @@ def auto_check_and_download_package(
success = False success = False
return success return success
if __name__ == "__main__":
print(auto_check_and_download_package("torch"))

@ -1,263 +0,0 @@
"""
Lazy Package Loader
This module provides utilities for lazy loading Python packages to improve startup time
and reduce memory usage by only importing packages when they are actually used.
Features:
- Type-safe lazy loading of packages
- Support for nested module imports
- Auto-completion support in IDEs
- Thread-safe implementation
- Comprehensive test coverage
"""
from types import ModuleType
from typing import (
Optional,
Dict,
Any,
Callable,
Type,
TypeVar,
Union,
cast,
)
import importlib
import functools
import threading
from importlib.util import find_spec
from swarms.utils.auto_download_check_packages import (
auto_check_and_download_package,
)
T = TypeVar("T")
C = TypeVar("C")
class ImportError(Exception):
"""Raised when a lazy import fails."""
pass
class LazyLoader:
"""
A thread-safe lazy loader for Python packages that only imports them when accessed.
Attributes:
_module_name (str): The name of the module to be lazily loaded
_module (Optional[ModuleType]): The cached module instance once loaded
_lock (threading.Lock): Thread lock for safe concurrent access
Examples:
>>> np = LazyLoader('numpy')
>>> # numpy is not imported yet
>>> result = np.array([1, 2, 3])
>>> # numpy is imported only when first used
"""
def __init__(self, module_name: str) -> None:
"""
Initialize the lazy loader with a module name.
Args:
module_name: The fully qualified name of the module to lazily load
Raises:
ImportError: If the module cannot be found in sys.path
"""
self._module_name = module_name
self._module: Optional[ModuleType] = None
self._lock = threading.Lock()
auto_check_and_download_package(
module_name, package_manager="pip"
)
# Verify module exists without importing it
if find_spec(module_name) is None:
raise ImportError(
f"Module '{module_name}' not found in sys.path"
)
def _load_module(self) -> ModuleType:
"""
Thread-safe module loading.
Returns:
ModuleType: The loaded module
Raises:
ImportError: If module import fails
"""
if self._module is None:
with self._lock:
# Double-check pattern
if self._module is None:
try:
self._module = importlib.import_module(
self._module_name
)
except Exception as e:
raise ImportError(
f"Failed to import '{self._module_name}': {str(e)}"
)
return cast(ModuleType, self._module)
def __getattr__(self, name: str) -> Any:
"""
Intercepts attribute access to load the module if needed.
Args:
name: The attribute name being accessed
Returns:
Any: The requested attribute from the loaded module
Raises:
AttributeError: If the attribute doesn't exist in the module
"""
module = self._load_module()
try:
return getattr(module, name)
except AttributeError:
raise AttributeError(
f"Module '{self._module_name}' has no attribute '{name}'"
)
def __dir__(self) -> list[str]:
"""
Returns list of attributes for autocomplete support.
Returns:
List[str]: Available attributes in the module
"""
return dir(self._load_module())
def is_loaded(self) -> bool:
"""
Check if the module has been loaded.
Returns:
bool: True if module is loaded, False otherwise
"""
return self._module is not None
class LazyLoaderMetaclass(type):
"""Metaclass to handle lazy loading behavior"""
def __call__(cls, *args, **kwargs):
if hasattr(cls, "_lazy_loader"):
return super().__call__(*args, **kwargs)
return super().__call__(*args, **kwargs)
class LazyClassLoader:
"""
A descriptor that creates the actual class only when accessed,
with proper inheritance support.
"""
def __init__(
self, class_name: str, bases: tuple, namespace: Dict[str, Any]
):
self.class_name = class_name
self.bases = bases
self.namespace = namespace
self._real_class: Optional[Type] = None
self._lock = threading.Lock()
def _create_class(self) -> Type:
"""Creates the actual class if it hasn't been created yet."""
if self._real_class is None:
with self._lock:
if self._real_class is None:
# Update namespace to include metaclass
namespace = dict(self.namespace)
namespace["__metaclass__"] = LazyLoaderMetaclass
# Create the class with metaclass
new_class = LazyLoaderMetaclass(
self.class_name, self.bases, namespace
)
# Store reference to this loader
new_class._lazy_loader = self
self._real_class = new_class
return cast(Type, self._real_class)
def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Creates an instance of the lazy loaded class."""
real_class = self._create_class()
# Use the metaclass __call__ method
return real_class(*args, **kwargs)
def __instancecheck__(self, instance: Any) -> bool:
"""Support for isinstance() checks"""
real_class = self._create_class()
return isinstance(instance, real_class)
def __subclasscheck__(self, subclass: Type) -> bool:
"""Support for issubclass() checks"""
real_class = self._create_class()
return issubclass(subclass, real_class)
def lazy_import(*names: str) -> Dict[str, LazyLoader]:
"""
Create multiple lazy loaders at once.
Args:
*names: Module names to create lazy loaders for
Returns:
Dict[str, LazyLoader]: Dictionary mapping module names to their lazy loaders
Examples:
>>> modules = lazy_import('numpy', 'pandas', 'matplotlib.pyplot')
>>> np = modules['numpy']
>>> pd = modules['pandas']
>>> plt = modules['matplotlib.pyplot']
"""
return {name.split(".")[-1]: LazyLoader(name) for name in names}
def lazy_import_decorator(
target: Union[Callable[..., T], Type[C]]
) -> Union[Callable[..., T], Type[C], LazyClassLoader]:
"""
Enhanced decorator that supports both lazy imports and lazy class loading.
"""
if isinstance(target, type):
# Store the original class details
namespace = {
name: value
for name, value in target.__dict__.items()
if not name.startswith("__")
or name in ("__init__", "__new__")
}
# Create lazy loader
loader = LazyClassLoader(
target.__name__, target.__bases__, namespace
)
# Preserve class metadata
loader.__module__ = target.__module__
loader.__doc__ = target.__doc__
# Add reference to original class
loader._original_class = target
return loader
else:
# Handle function decoration
@functools.wraps(target)
def wrapper(*args: Any, **kwargs: Any) -> T:
return target(*args, **kwargs)
return wrapper

@ -1,6 +1,16 @@
from litellm import encode import subprocess
def count_tokens(text: str, model: str = "gpt-4o") -> int: def count_tokens(text: str, model: str = "gpt-4o") -> int:
"""Count the number of tokens in the given text.""" """Count the number of tokens in the given text."""
try:
from litellm import encode
except ImportError:
subprocess.run(["pip", "install", "litellm"])
from litellm import encode
return len(encode(model=model, text=text)) return len(encode(model=model, text=text))
# if __name__ == "__main__":
# print(count_tokens("Hello, how are you?"))

@ -1,3 +1,4 @@
import platform
from typing import Any from typing import Any
@ -53,6 +54,14 @@ def exec_callable_with_clusterops(
logger.info(f"Attempting to run on device: {device}") logger.info(f"Attempting to run on device: {device}")
device = device.lower() device = device.lower()
# Check if the platform is Windows and do nothing if true
if platform.system() == "Windows":
if enable_logging:
logger.info(
"Platform is Windows, not executing on device."
)
return None
if device == "cpu": if device == "cpu":
if enable_logging: if enable_logging:
logger.info("Device set to CPU") logger.info("Device set to CPU")

@ -0,0 +1,104 @@
from swarms.utils.auto_download_check_packages import (
auto_check_and_download_package,
check_and_install_package,
)
def test_check_and_install_package_pip():
result = check_and_install_package("numpy", package_manager="pip")
print(f"Test result for 'numpy' installation using pip: {result}")
assert result, "Failed to install or verify 'numpy' using pip"
def test_check_and_install_package_conda():
result = check_and_install_package(
"numpy", package_manager="conda"
)
print(
f"Test result for 'numpy' installation using conda: {result}"
)
assert result, "Failed to install or verify 'numpy' using conda"
def test_check_and_install_specific_version():
result = check_and_install_package(
"numpy", package_manager="pip", version="1.21.0"
)
print(
f"Test result for specific version of 'numpy' installation using pip: {result}"
)
assert (
result
), "Failed to install or verify specific version of 'numpy' using pip"
def test_check_and_install_with_upgrade():
result = check_and_install_package(
"numpy", package_manager="pip", upgrade=True
)
print(f"Test result for 'numpy' upgrade using pip: {result}")
assert result, "Failed to upgrade 'numpy' using pip"
def test_auto_check_and_download_single_package():
result = auto_check_and_download_package(
"scipy", package_manager="pip"
)
print(f"Test result for 'scipy' installation using pip: {result}")
assert result, "Failed to install or verify 'scipy' using pip"
def test_auto_check_and_download_multiple_packages():
packages = ["scipy", "pandas"]
result = auto_check_and_download_package(
packages, package_manager="pip"
)
print(
f"Test result for multiple packages installation using pip: {result}"
)
assert (
result
), f"Failed to install or verify one or more packages in {packages} using pip"
def test_auto_check_and_download_multiple_packages_with_versions():
packages = ["numpy:1.21.0", "pandas:1.3.0"]
result = auto_check_and_download_package(
packages, package_manager="pip"
)
print(
f"Test result for multiple packages with versions installation using pip: {result}"
)
assert (
result
), f"Failed to install or verify one or more packages in {packages} with specific versions using pip"
# Example of running tests
if __name__ == "__main__":
try:
test_check_and_install_package_pip()
print("test_check_and_install_package_pip passed")
test_check_and_install_package_conda()
print("test_check_and_install_package_conda passed")
test_check_and_install_specific_version()
print("test_check_and_install_specific_version passed")
test_check_and_install_with_upgrade()
print("test_check_and_install_with_upgrade passed")
test_auto_check_and_download_single_package()
print("test_auto_check_and_download_single_package passed")
test_auto_check_and_download_multiple_packages()
print("test_auto_check_and_download_multiple_packages passed")
test_auto_check_and_download_multiple_packages_with_versions()
print(
"test_auto_check_and_download_multiple_packages_with_versions passed"
)
except AssertionError as e:
print(f"Test failed: {str(e)}")
Loading…
Cancel
Save