Merge branch 'kyegomez:master' into master

pull/412/head
Vyomakesh Dundigalla 11 months ago committed by GitHub
commit e4865ecb4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,349 @@
# Create your own agent with `Agent` class
In the rapidly evolving world of artificial intelligence (AI), the demand for specialized and highly customized agents is on the rise. Whether it's for task automation, decision support systems, or intelligent virtual assistants, the ability to create tailored agents can unlock new possibilities and efficiencies across various domains. Enter the Agent class, a powerful and flexible tool designed by Anthropic that empowers AI agents to build their own custom agents, tailored to their specific needs.
This comprehensive guide will explore the process of inheriting from the Agent class, enabling agents to create their own custom agent classes. By leveraging the rich features and extensibility of the Agent class, agents can imbue their offspring agents with unique capabilities, specialized toolsets, and tailored decision-making processes.
## Understanding the Agent Class
Before we dive into the intricacies of creating custom agent classes, let's revisit the foundational elements of the Agent class itself. The Agent class is a versatile and feature-rich class designed to streamline the process of building and managing AI agents. It acts as a backbone, connecting language models (LLMs) with various tools, long-term memory, and a wide range of customization options.
### Key Features of the Agent Class
The Agent class offers a plethora of features that can be inherited and extended by custom agent classes. Here are some of the key features that make the Agent class a powerful foundation:
1\. **Language Model Integration**: The Agent class supports seamless integration with popular language models such as LangChain, HuggingFace Transformers, and Autogen, allowing custom agent classes to leverage the power of state-of-the-art language models.
2\. **Tool Integration**: One of the standout features of the Agent class is its ability to integrate with various tools. Custom agent classes can inherit this capability and incorporate specialized tools tailored to their specific use cases.
3\. **Long-Term Memory**: The Agent class provides built-in support for long-term memory, enabling custom agent classes to retain and access information from previous interactions, essential for maintaining context and learning from past experiences.
4\. **Customizable Prompts and Standard Operating Procedures (SOPs)**: The Agent class allows you to define custom prompts and Standard Operating Procedures (SOPs) that guide an agent's behavior and decision-making process. Custom agent classes can inherit and extend these prompts and SOPs to align with their unique objectives and requirements.
5\. **Interactive and Dashboard Modes**: The Agent class supports interactive and dashboard modes, enabling real-time monitoring and interaction with agents. Custom agent classes can inherit these modes, facilitating efficient development, debugging, and user interaction.
6\. **Autosave and State Management**: With the Agent class, agents can easily save and load their state, including configuration, memory, and history. Custom agent classes can inherit this capability, ensuring seamless task continuation and enabling efficient collaboration among team members.
7\. **Response Filtering**: The Agent class provides built-in response filtering capabilities, allowing agents to filter out or replace specific words or phrases in their responses. Custom agent classes can inherit and extend this feature to ensure compliance with content moderation policies or specific guidelines.
8\. **Code Execution and Multimodal Support**: The Agent class supports code execution and multimodal input/output, enabling agents to process and generate code, as well as handle various data formats such as images, audio, and video. Custom agent classes can inherit and specialize these capabilities for their unique use cases.
9\. **Extensibility and Customization**: The Agent class is designed to be highly extensible and customizable, allowing agents to tailor its behavior, add custom functionality, and integrate with external libraries and APIs. Custom agent classes can leverage this extensibility to introduce specialized features and capabilities.
### Creating a Custom Agent Class
Now that we have a solid understanding of the Agent class and its features, let's dive into the process of creating a custom agent class by inheriting from the Agent class. Throughout this process, we'll explore how agents can leverage and extend the existing functionality, while introducing specialized features and capabilities tailored to their unique requirements.
#### Step 1: Inherit from the Agent Class
The first step in creating a custom agent class is to inherit from the Agent class. This will provide your custom agent class with the foundational features and capabilities of the Agent class, which can then be extended and customized as needed.
```python
from swarms import Agent
class MyCustomAgent(Agent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Add custom initialization logic here
```
In the example above, we define a new class `MyCustomAgent` that inherits from the `Agent` class. Within the `__init__` method, we call the parent class's `__init__` method using `super().__init__(*args, **kwargs)`, which ensures that the parent class's initialization logic is executed. You can then add any custom initialization logic specific to your custom agent class.
#### Step 2: Customize the Agent's Behavior
One of the key advantages of inheriting from the Agent class is the ability to customize the agent's behavior according to your specific requirements. This can be achieved by overriding or extending the existing methods, or by introducing new methods altogether.
```python
from swarms import Agent
class MyCustomAgent(Agent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Custom initialization logic
    def custom_method(self, *args, **kwargs):
        # Implement custom logic here
        pass
    def run(self, task, *args, **kwargs):
        # Customize the run method
        response = super().run(task, *args, **kwargs)
        # Additional custom logic
        return response
```
In the example above, we introduce a new `custom_method` that can encapsulate any specialized logic or functionality specific to your custom agent class. Additionally, we override the `run` method, which is responsible for executing the agent's main task loop. Within the overridden `run` method, you can call the parent class's `run` method using `super().run(task, *args, **kwargs)` and then introduce any additional custom logic before or after the parent method's execution.
#### Step 3: Integrate Custom Tools
One of the powerful features of the Agent class is the ability to integrate with various tools. Custom agent classes can inherit this capability and incorporate specialized tools tailored to their unique use cases.
```python
from swarms.tools import BaseTool
from swarms import Agent
class CustomTool(BaseTool):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Custom tool initialization logic
    def run(self, *args, **kwargs):
        # Custom tool logic
        return result
class MyCustomAgent(Agent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Custom initialization logic
        self.tools = [CustomTool()]
    def run(self, task, *args, **kwargs):
        # Customize the run method
        response = super().run(task, *args, **kwargs)
        # Utilize custom tools
        for tool in self.tools:
            result = tool.run(*args, **kwargs)
            # Process tool result
        return response
```
In the example above, we define a new `CustomTool` class that inherits from the `BaseTool` class provided by the Agent class framework. Within the `CustomTool` class, you can implement the specialized logic and functionality required by your custom tool.
Next, within the `MyCustomAgent` class, we initialize an instance of the `CustomTool` and store it in the `self.tools` list. This list can then be utilized within the overridden `run` method, where you can execute each tool and process its results as needed.
#### Step 4: Extend Memory Management
The Agent class provides built-in support for long-term memory, allowing agents to retain and access information from previous interactions. Custom agent classes can inherit and extend this capability by introducing specialized memory management techniques.
```python
from swarms.memory import AbstractVectorDatabase
from swarms import Agent
class CustomMemory(AbstractVectorDatabase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Custom memory initialization logic
    def query(self, *args, **kwargs):
        # Custom memory query logic
        return result
class MyCustomAgent(Agent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Custom initialization logic
        self.long_term_memory = CustomMemory()
    def run(self, task, *args, **kwargs):
        # Customize the run method
        response = super().run(task, *args, **kwargs)
        # Utilize custom memory
        memory_result = self.long_term_memory.query(*args, **kwargs)
        # Process memory result
        return response
```
In the example above, we define a new `CustomMemory` class that inherits from the `AbstractVectorDatabase` class provided by the Agent class framework. Within the `CustomMemory` class, you can implement specialized memory management logic, such as custom indexing, retrieval, and storage mechanisms.
Next, within the `MyCustomAgent` class, we initialize an instance of the `CustomMemory` class and assign it to the `self.long_term_memory` attribute. This custom memory instance can then be utilized within the overridden `run` method, where you can query the memory and process the results as needed.
Step 5: Introduce Custom Prompts and Standard Operating Procedures (SOPs)
The Agent class allows you to define custom prompts and Standard Operating Procedures (SOPs) that guide an agent's behavior and decision-making process. Custom agent classes can inherit and extend these prompts and SOPs to align with their unique objectives and requirements.
```python
from swarms import Agent
class MyCustomAgent(Agent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Custom initialization logic
        self.custom_sop = "Custom SOP for MyCustomAgent..."
        self.custom_prompt = "Custom prompt for MyCustomAgent..."
    def run(self, task, *args, **kwargs):
        # Customize the run method
        response = super().run(task, *args, **kwargs)
        # Utilize custom prompts and SOPs
        custom_prompt = self.construct_dynamic_prompt(self.custom_prompt)
        custom_sop = self.construct_dynamic_sop(self.custom_sop)
        # Process custom prompts and SOPs
        return response
    def construct_dynamic_prompt(self, prompt):
        # Custom prompt construction logic
        return prompt
    def construct_dynamic_sop(self, sop):
        # Custom SOP construction logic
        return sop
```
In the example above, we define two new attributes within the `MyCustomAgent` class: `custom_sop` and `custom_prompt`. These attributes can be used to store custom prompts and SOPs specific to your custom agent class.
Within the overridden `run` method, you can utilize these custom prompts and SOPs by calling the `construct_dynamic_prompt` and `construct_dynamic_sop` methods, which can be defined within the `MyCustomAgent` class to implement specialized prompt and SOP construction logic.
#### Step 6: Introduce Custom Response Handling
The Agent class provides built-in response filtering capabilities, allowing agents to filter out or replace specific words or phrases in their responses. Custom agent classes can inherit and extend this feature to ensure compliance with content moderation policies or specific guidelines.
```python
from swarms import Agent
class MyCustomAgent(Agent):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Custom initialization logic
        self.response_filters = ["filter_word_1", "filter_word_2"]
    def run(self, task, *args, **kwargs):
        # Customize the run method
        response = super().run(task, *args, **kwargs)
        # Apply custom response filtering
        filtered_response = self.apply_response_filters(response)
        return filtered_response
    def apply_response_filters(self, response):
        # Custom response filtering logic
        for word in self.response_filters:
            response = response.replace(word, "[FILTERED]")
        return response
```
In the example above, we define a new attribute `response_filters` within the `MyCustomAgent` class, which is a list of words or phrases that should be filtered out or replaced in the agent's responses.
Within the overridden `run` method, we call the `apply_response_filters` method, which can be defined within the `MyCustomAgent` class to implement specialized response filtering logic. In the example, we iterate over the `response_filters` list and replace each filtered word or phrase with a placeholder string (`"[FILTERED]"`).
### Advanced Customization and Integration
The Agent class and its inherited custom agent classes can be further extended and customized to suit specific requirements and integrate with external libraries, APIs, and services. Here are some advanced customization and integration examples:
1\. **Multimodal Input/Output Integration**: Custom agent classes can leverage the multimodal input/output capabilities of the Agent class and introduce specialized handling for various data formats such as images, audio, and video.
2\. **Code Execution and Integration**: The Agent class supports code execution, enabling agents to run and evaluate code snippets. Custom agent classes can inherit and extend this capability, introducing specialized code execution environments, sandboxing mechanisms, or integration with external code repositories or platforms.
3\. **External API and Service Integration**: Custom agent classes can integrate with external APIs and services, enabling agents to leverage specialized data sources, computational resources, or domain-specific services.
4\. **Performance Optimization**: Depending on the use case and requirements, custom agent classes can introduce performance optimizations, such as adjusting loop intervals, retry attempts, or enabling parallel execution for certain tasks.
5\. **Logging and Monitoring**: Custom agent classes can introduce specialized logging and monitoring mechanisms, enabling agents to track their performance, identify potential issues, and generate detailed reports or dashboards.
6\. **Security and Privacy Enhancements**: Custom agent classes can implement security and privacy enhancements, such as data encryption, access control mechanisms, or compliance with industry-specific regulations and standards.
7\. **Distributed Execution and Scaling**: Custom agent classes can be designed to support distributed execution and scaling, enabling agents to leverage cloud computing resources or distributed computing frameworks for handling large-scale tasks or high-concurrency workloads.
By leveraging these advanced customization and integration capabilities, agents can create highly specialized and sophisticated custom agent classes tailored to their unique requirements and use cases.
### Best Practices and Considerations
While building custom agent classes by inheriting from the Agent class offers immense flexibility and power, it's essential to follow best practices and consider potential challenges and considerations:
1\. **Maintainability and Documentation**: As custom agent classes become more complex, it's crucial to prioritize maintainability and thorough documentation. Clear and concise code, comprehensive comments, and up-to-date documentation can significantly improve the long-term sustainability and collaboration efforts surrounding custom agent classes.
2\. **Testing and Validation**: Custom agent classes should undergo rigorous testing and validation to ensure their correctness, reliability, and adherence to expected behaviors. Establish a robust testing framework and continuously validate the agent's performance, particularly after introducing new features or integrations.
3\. **Security and Privacy Considerations**: When building custom agent classes, it's essential to consider security and privacy implications, especially if the agents will handle sensitive data or interact with critical systems. Implement appropriate security measures, such as access controls, data encryption, and secure communication protocols, to protect against potential vulnerabilities and ensure compliance with relevant regulations and standards.
4\. **Scalability and Performance Monitoring**: As custom agent classes are deployed and adopted, it's important to monitor their scalability and performance characteristics. Identify potential bottlenecks, resource constraints, or performance degradation, and implement appropriate optimization strategies or scaling mechanisms to ensure efficient and reliable operation.
5\. **Collaboration and Knowledge Sharing**: Building custom agent classes often involves collaboration among teams and stakeholders. Foster an environment of knowledge sharing, code reviews, and open communication to ensure that everyone involved understands the agent's capabilities, limitations, and intended use cases.
6\. **Ethical Considerations**: As AI agents become more advanced and autonomous, it's crucial to consider the ethical implications of their actions and decisions. Implement appropriate safeguards, oversight mechanisms, and ethical guidelines to ensure that custom agent classes operate in a responsible and transparent manner, aligning with ethical principles and societal values.
7\. **Continuous Learning and Adaptation**: The field of AI is rapidly evolving, with new techniques, tools, and best practices emerging regularly. Stay up-to-date with the latest developments and be prepared to adapt and refine your custom agent classes as new advancements become available.
By following these best practices and considering potential challenges, agents can create robust, reliable, and ethical custom agent classes that meet their specific requirements while adhering to industry standards and best practices.
# Conclusion
In this comprehensive guide, we have explored the process of creating custom agent classes by inheriting from the powerful Agent class. We have covered the key features of the Agent class, walked through the step-by-step process of inheriting and extending its functionality, and discussed advanced customization and integration techniques.
Building custom agent classes empowers AI agents to create tailored and specialized agents capable of tackling unique challenges and addressing specific domain requirements. By leveraging the rich features and extensibility of the Agent class, agents can imbue their offspring agents with unique capabilities, specialized toolsets, and tailored decision-making processes.
Remember, the journey of building custom agent classes is an iterative and collaborative process that requires continuous learning, adaptation, and refinement. Embrace the

@ -0,0 +1,436 @@
# Building Custom Vector Memory Databases with the AbstractVectorDatabase Class
In the age of large language models (LLMs) and AI-powered applications, efficient memory management has become a crucial component. Vector databases, which store and retrieve data in high-dimensional vector spaces, have emerged as powerful tools for handling the vast amounts of data generated and consumed by AI systems. However, integrating vector databases into your applications can be a daunting task, requiring in-depth knowledge of their underlying architectures and APIs.
Enter the `AbstractVectorDatabase` class, a powerful abstraction layer designed to simplify the process of creating and integrating custom vector memory databases into your AI applications. By inheriting from this class, developers can build tailored vector database solutions that seamlessly integrate with their existing systems, enabling efficient storage, retrieval, and manipulation of high-dimensional data.
In this comprehensive guide, we'll explore the `AbstractVectorDatabase` class in detail, covering its core functionality and diving deep into the process of creating custom vector memory databases using popular solutions like PostgreSQL, Pinecone, Chroma, FAISS, and more. Whether you're a seasoned AI developer or just starting to explore the world of vector databases, this guide will provide you with the knowledge and tools necessary to build robust, scalable, and efficient memory solutions for your AI applications.
## Understanding the AbstractVectorDatabase Class
Before we dive into the implementation details, let's take a closer look at the `AbstractVectorDatabase` class and its core functionality.
The `AbstractVectorDatabase` class is an abstract base class that defines the interface for interacting with a vector database. It serves as a blueprint for creating concrete implementations of vector databases, ensuring a consistent and standardized approach to database operations across different systems.
The class provides a set of abstract methods that define the essential functionality required for working with vector databases, such as connecting to the database, executing queries, and performing CRUD (Create, Read, Update, Delete) operations.
Here's a breakdown of the abstract methods defined in the `AbstractVectorDatabase` class:
1\. `connect()`: This method establishes a connection to the vector database.
2\. `close()`: This method closes the connection to the vector database.
3\. `query(query: str)`: This method executes a given query on the vector database.
4\. `fetch_all()`: This method retrieves all rows from the result set of a query.
5\. `fetch_one()`: This method retrieves a single row from the result set of a query.
6\. `add(doc: str)`: This method adds a new record to the vector database.
7\. `get(query: str)`: This method retrieves a record from the vector database based on a given query.
8\. `update(doc)`: This method updates a record in the vector database.
9\. `delete(message)`: This method deletes a record from the vector database.
By inheriting from the `AbstractVectorDatabase` class and implementing these abstract methods, developers can create concrete vector database implementations tailored to their specific needs and requirements.
## Creating a Custom Vector Memory Database
Now that we have a solid understanding of the `AbstractVectorDatabase` class, let's dive into the process of creating a custom vector memory database by inheriting from this class. Throughout this guide, we'll explore various vector database solutions, including PostgreSQL, Pinecone, Chroma, FAISS, and more, showcasing how to integrate them seamlessly into your AI applications.
### Step 1: Inherit from the AbstractVectorDatabase Class
The first step in creating a custom vector memory database is to inherit from the `AbstractVectorDatabase` class. This will provide your custom implementation with the foundational structure and interface defined by the abstract class.
```python
from abc import ABC, abstractmethod
from swarms import AbstractVectorDatabase
class MyCustomVectorDatabase(AbstractVectorDatabase):
    def __init__(self, *args, **kwargs):
        # Custom initialization logic
        pass
```
In the example above, we define a new class `MyCustomVectorDatabase` that inherits from the `AbstractVectorDatabase` class. Within the `__init__` method, you can add any custom initialization logic specific to your vector database implementation.
### Step 2: Implement the Abstract Methods
The next step is to implement the abstract methods defined in the `AbstractVectorDatabase` class. These methods provide the core functionality for interacting with your vector database, such as connecting, querying, and performing CRUD operations.
```python
from swarms import AbstractVectorDatabase
class MyCustomVectorDatabase(AbstractVectorDatabase):
    def __init__(self, *args, **kwargs):
        # Custom initialization logic
        pass
    def connect(self):
        # Implementation for connecting to the vector database
        pass
    def close(self):
        # Implementation for closing the vector database connection
        pass
    def query(self, query: str):
        # Implementation for executing a query on the vector database
        pass
    def fetch_all(self):
        # Implementation for fetching all rows from the result set
        pass
    def fetch_one(self):
        # Implementation for fetching a single row from the result set
        pass
    def add(self, doc: str):
        # Implementation for adding a new record to the vector database
        pass
    def get(self, query: str):
        # Implementation for retrieving a record from the vector database
        pass
    def update(self, doc):
        # Implementation for updating a record in the vector database
        pass
    def delete(self, message):
        # Implementation for deleting a record from the vector database
        pass
```
In this example, we define placeholders for each of the abstract methods within the `MyCustomVectorDatabase` class. These placeholders will be replaced with the actual implementation logic specific to your chosen vector database solution.
### Step 3: Choose and Integrate Your Vector Database Solution
With the foundational structure in place, it's time to choose a specific vector database solution and integrate it into your custom implementation. In this guide, we'll explore several popular vector database solutions, including PostgreSQL, Pinecone, Chroma, FAISS, and more, providing examples and guidance on how to integrate them seamlessly.
### PostgreSQL Integration
PostgreSQL is a powerful open-source relational database management system that supports vector data types and operations, making it a viable choice for building custom vector memory databases.
```python
import psycopg2
from swarms import AbstractVectorDatabase
class PostgreSQLVectorDatabase(MyCustomVectorDatabase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # PostgreSQL connection details
        self.conn = psycopg2.connect(
            host="localhost",
            database="vector_db",
            user="postgres",
            password="your_password"
        )
        self.cur = self.conn.cursor()
    def connect(self):
        # PostgreSQL connection logic
        pass
    def close(self):
        # Close PostgreSQL connection
        self.cur.close()
        self.conn.close()
    def query(self, query: str):
        # Execute PostgreSQL query
        self.cur.execute(query)
    def fetch_all(self):
        # Fetch all rows from PostgreSQL result set
        return self.cur.fetchall()
    # Implement other abstract methods
```
In this example, we define a `PostgreSQLVectorDatabase` class that inherits from `MyCustomVectorDatabase`. Within the `__init__` method, we establish a connection to a PostgreSQL database using the `psycopg2` library. We then implement the `connect()`, `close()`, `query()`, and `fetch_all()` methods specific to PostgreSQL.
### Pinecone Integration
Pinecone is a managed vector database service that provides efficient storage, retrieval, and manipulation of high-dimensional vector data.
```python
import pinecone
from swarms import AbstractVectorDatabase
class PineconeVectorDatabase(MyCustomVectorDatabase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Pinecone initialization
        pinecone.init(api_key="your_api_key", environment="your_environment")
        self.index = pinecone.Index("your_index_name")
    def connect(self):
        # Pinecone connection logic
        pass
    def close(self):
        # Close Pinecone connection
        pass
    def query(self, query: str):
        # Execute Pinecone query
        results = self.index.query(query)
        return results
    def add(self, doc: str):
        # Add document to Pinecone index
        self.index.upsert([("id", doc)])
    # Implement other abstract methods
```
In this example, we define a `PineconeVectorDatabase` class that inherits from `MyCustomVectorDatabase`. Within the `__init__` method, we initialize the Pinecone client and create an index. We then implement the `query()` and `add()` methods specific to the Pinecone API.
### Chroma Integration
Chroma is an open-source vector database library that provides efficient storage, retrieval, and manipulation of vector data using various backends, including DuckDB, Chromadb, and more.
```python
from chromadb.client import Client
from swarms import AbstractVectorDatabase
class ChromaVectorDatabase(MyCustomVectorDatabase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Chroma initialization
        self.client = Client()
        self.collection = self.client.get_or_create_collection("vector_collection")
    def connect(self):
        # Chroma connection logic
        pass
    def close(self):
        # Close Chroma connection
        pass
    def query(self, query: str):
        # Execute Chroma query
        results = self.collection.query(query)
        return results
    def add(self, doc: str):
        # Add document to Chroma collection
        self.collection.add(doc)
    # Implement other abstract methods
```
In this example, we define a `ChromaVectorDatabase` class that inherits from `MyCustomVectorDatabase`. Within the `__init__` method, we create a Chroma client and get or create a collection. We then implement the `query()` and `add()` methods specific to the Chroma API.
### FAISS Integration
FAISS (Facebook AI Similarity Search) is a library for efficient similarity search and clustering of dense vectors, developed by Meta AI.
```python
import faiss
class FAISSVectorDatabase(MyCustomVectorDatabase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # FAISS initialization
        self.index = faiss.IndexFlatL2(64)  # Assuming 64-dimensional vectors
        self.index_path = "faiss_index.index"
    def connect(self):
        # FAISS connection logic
        self.index = faiss.read_index(self.index_path)
    def close(self):
        # Close FAISS connection
        faiss.write_index(self.index, self.index_path)
    def query(self, query: str):
        # Execute FAISS query
        query_vector = # Convert query to vector
        distances, indices = self.index.search(query_vector, k=10)
        return [(self.index.reconstruct(i), d) for i, d in zip(indices, distances)]
    def add(self, doc: str):
        # Add document to FAISS index
        doc_vector = # Convert doc to vector
        self.index.add(doc_vector)
    # Implement other abstract methods
```
In this example, we define a `FAISSVectorDatabase` class that inherits from `MyCustomVectorDatabase`. Within the `__init__` method, we create a FAISS index and set the index path. We then implement the `connect()`, `close()`, `query()`, and `add()` methods specific to the FAISS library, assuming 64-dimensional vectors for simplicity.
These examples provide a starting point for integrating various vector database solutions into your custom implementation. Each solution has its own strengths, weaknesses, and trade-offs, so it's essential to carefully evaluate your requirements and choose the solution that best fits your needs.
### Step 4: Add Custom Functionality and Optimizations
Once you've integrated your chosen vector database solution, you can further extend and optimize your custom implementation by adding custom functionality and performance optimizations.
#### Custom Functionality:
- **Indexing Strategies**: Implement custom indexing strategies to optimize search performance and memory usage.
- **Data Preprocessing**: Add data preprocessing logic to handle different data formats, perform embedding, and prepare data for storage in the vector database.
- **Query Optimization**: Introduce query optimization techniques, such as query caching, result filtering, or query rewriting, to improve query performance.
- **Data Partitioning**: Implement data partitioning strategies to distribute data across multiple nodes or shards for better scalability and performance.
- **Metadata Management**: Introduce metadata management capabilities to store and retrieve additional information associated with the vector data.
Performance Optimizations:
- **Caching**: Implement caching mechanisms to reduce redundant computations and improve response times.
- **Asynchronous Operations**: Utilize asynchronous programming techniques to improve concurrency and responsiveness.
- **Multithreading and Parallelization**: Leverage multithreading and parallelization to distribute computationally intensive tasks across multiple cores or processors.
- **Load Balancing**: Implement load balancing strategies to distribute workloads evenly across multiple nodes or instances.
- **Monitoring and Profiling**: Introduce monitoring and profiling tools to identify performance bottlenecks and optimize critical sections of your code.
By adding custom functionality and performance optimizations, you can tailor your custom vector memory database to meet the specific requirements of your AI applications, ensuring efficient and scalable data management.
### Best Practices and Considerations
Building custom vector memory databases is a powerful but complex endeavor. To ensure the success and longevity of your implementation, it's essential to follow best practices and consider potential challenges and considerations.
1\. **Scalability and Performance Testing**: Vector databases can quickly grow in size and complexity as your AI applications handle increasing amounts of data. Thoroughly test your implementation for scalability and performance under various load conditions, and optimize accordingly.
2\. **Data Quality and Integrity**: Ensure that the data stored in your vector database is accurate, consistent, and free from duplicates or errors. Implement data validation and cleansing mechanisms to maintain data quality and integrity.
3\. **Security and Access Control**: Vector databases may store sensitive or proprietary data. Implement robust security measures, such as encryption, access controls, and auditing mechanisms, to protect your data from unauthorized access or breaches.
4\. **Distributed Architectures**: As your data and workloads grow, consider implementing distributed architectures to distribute the storage and computational load across multiple nodes or clusters. This can improve scalability, fault tolerance, and overall performance.
5\. **Data Versioning and Backup**: Implement data versioning and backup strategies to ensure data integrity and enable recovery in case of errors or system failures.
6\. **Documentation and Maintainability**: Well-documented code and comprehensive documentation are essential for ensuring the long-term maintainability and extensibility of your custom vector memory database implementation.
7\. **Continuous Integration and Deployment**: Adopt continuous integration and deployment practices to streamline the development, testing, and deployment processes, ensuring that changes are thoroughly tested and deployed efficiently.
8\. **Compliance and Regulatory Requirements**: Depending on your industry and use case, ensure that your custom vector memory database implementation complies with relevant regulations and standards, such as data privacy laws or industry-specific guidelines.
9\. **Community Engagement and Collaboration**: Stay engaged with the vector database community, participate in discussions, and collaborate with other developers to share knowledge, best practices, and insights.
By following these best practices and considering potential challenges, you can build robust, scalable, and efficient custom vector memory databases that meet the demanding requirements of modern AI applications.
# Conclusion
In this comprehensive guide, we've explored the `AbstractVectorDatabase` class and its role in simplifying the process of creating custom vector memory databases. We've covered the core functionality of the class, walked through the step-by-step process of inheriting and extending its functionality, and provided examples of integrating popular vector database solutions like PostgreSQL, Pinecone, Chroma, and FAISS.
Building custom vector memory databases empowers developers to create tailored and efficient data management solutions that seamlessly integrate with their AI applications. By leveraging the power of vector databases, you can unlock new possibilities in data storage, retrieval, and manipulation, enabling your AI systems to handle vast amounts of high-dimensional data with ease.
Remember, the journey of building custom vector memory databases is an iterative and collaborative process that requires continuous learning, adaptation, and refinement. Embrace the challenges, stay up-to-date with the latest developments in vector databases and AI, and continuously strive to optimize and enhance your implementations.
As you embark on this journey, keep in mind the importance of scalability, performance, data quality, security, and compliance. Foster an environment of collaboration, knowledge sharing, and community engagement to ensure that your custom vector memory databases are robust, reliable, and capable of meeting the ever-evolving demands of the AI landscape.
So, dive in, leverage the power of the `AbstractVectorDatabase` class, and create the custom vector memory databases that will drive the future of AI-powered applications.

@ -1,6 +1,6 @@
# Why Swarms?
The need for multiple agents to work together in artificial intelligence (AI) and particularly in the context of Large Language Models (LLMs) stems from several inherent limitations and challenges in handling complex, dynamic, and multifaceted tasks with single-agent systems. Collaborating with multiple agents offers a pathway to enhance computational efficiency, cognitive diversity, and problem-solving capabilities. This section delves into the rationale behind employing multi-agent systems and strategizes on overcoming the associated expenses, such as API bills and hosting costs.
The need for multiple agents to work together in artificial intelligence (AI) and particularly in the context of Large Language Models (LLMs) stems from several inherent limitations and challenges in handling complex, dynamic, and multifaceted tasks with single-agent systems. Collaborating with multiple agents offers a pathway to enhance reliability, computational efficiency, cognitive diversity, and problem-solving capabilities. This section delves into the rationale behind employing multi-agent systems and strategizes on overcoming the associated expenses, such as API bills and hosting costs.
### Why Multiple Agents Are Necessary

@ -3,12 +3,16 @@ from swarms import Agent, OpenAIChat
## Initialize the workflow
agent = Agent(
llm=OpenAIChat(),
max_loops=1,
max_loops="auto",
autosave=True,
dashboard=False,
streaming_on=True,
verbose=True,
stopping_token="<DONE>",
)
# Run the workflow on a task
agent("Find a chick fil a equivalent in hayes valley")
agent(
"Generate a transcript for a youtube video on what swarms are!"
" Output a <DONE> token when done."
)

@ -59,7 +59,9 @@ nav:
- Overview: "index.md"
- Contributing: "contributing.md"
- Limitations of Individual Agents: "limits_of_individual_agents.md"
- Swarms:
- Why Swarms: "why_swarms.md"
- DIY Build Your Own Agent: "diy_your_own_agent.md"
- Swarms Framework:
- Overview: "swarms/index.md"
- swarms.agents:
- Agents:
@ -129,6 +131,7 @@ nav:
- AnthropicTokenizer: "swarms/tokenizers/anthropictokenizer.md"
- OpenaiTokenizer: "swarms/tokenizers/openaitokenizer.md"
- swarms.memory:
- Building Custom Vector Memory Databases with the AbstractVectorDatabase Class: "swarms/memory/diy_memory.md"
- Vector Databases:
- Weaviate: "swarms/memory/weaviate.md"
- PineconeDB: "swarms/memory/pinecone.md"

@ -7,17 +7,33 @@ import swarms.prompts.autoswarm as sdsp
# Load environment variables and initialize the OpenAI Chat model
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
llm = OpenAIChat(model_name = "gpt-4", openai_api_key=api_key)
llm = OpenAIChat(model_name="gpt-4", openai_api_key=api_key)
user_idea = "screenplay writing team"
role_identification_agent = Agent(llm=llm, sop=sdsp.AGENT_ROLE_IDENTIFICATION_AGENT_PROMPT, max_loops=1)
agent_configuration_agent = Agent(llm=llm, sop=sdsp.AGENT_CONFIGURATION_AGENT_PROMPT, max_loops=1)
swarm_assembly_agent = Agent(llm=llm, sop=sdsp.SWARM_ASSEMBLY_AGENT_PROMPT, max_loops=1)
testing_optimization_agent = Agent(llm=llm, sop=sdsp.TESTING_OPTIMIZATION_AGENT_PROMPT, max_loops=1)
role_identification_agent = Agent(
llm=llm,
sop=sdsp.AGENT_ROLE_IDENTIFICATION_AGENT_PROMPT,
max_loops=1,
)
agent_configuration_agent = Agent(
llm=llm, sop=sdsp.AGENT_CONFIGURATION_AGENT_PROMPT, max_loops=1
)
swarm_assembly_agent = Agent(
llm=llm, sop=sdsp.SWARM_ASSEMBLY_AGENT_PROMPT, max_loops=1
)
testing_optimization_agent = Agent(
llm=llm, sop=sdsp.TESTING_OPTIMIZATION_AGENT_PROMPT, max_loops=1
)
# Process the user idea through each agent
role_identification_output = role_identification_agent.run(user_idea)
agent_configuration_output = agent_configuration_agent.run(role_identification_output)
swarm_assembly_output = swarm_assembly_agent.run(agent_configuration_output)
testing_optimization_output = testing_optimization_agent.run(swarm_assembly_output)
agent_configuration_output = agent_configuration_agent.run(
role_identification_output
)
swarm_assembly_output = swarm_assembly_agent.run(
agent_configuration_output
)
testing_optimization_output = testing_optimization_agent.run(
swarm_assembly_output
)

@ -0,0 +1,40 @@
# Import the necessary libraries.
import asyncio
import websockets
# Create a list of public group chats.
public_group_chats = []
# Create a function to handle incoming websocket connections.
async def handle_websocket(websocket, path):
# Get the username of the user.
username = await websocket.recv()
print(f"New connection from {username}")
# Add the user to the list of public group chats.
public_group_chats.append(websocket)
try:
# Wait for the user to send a message.
while True:
message = await websocket.recv()
print(f"{username}: {message}")
# Broadcast the message to all other users in the public group chats.
for other_websocket in public_group_chats:
if other_websocket != websocket:
await other_websocket.send(
f"{username}: {message}"
)
finally:
# Remove the user from the list of public group chats.
public_group_chats.remove(websocket)
print(f"{username} has disconnected")
# Create a websocket server.
server = websockets.serve(handle_websocket, "localhost", 8000)
# Run the websocket server.
asyncio.run(server)

@ -0,0 +1,48 @@
import openai
from decouple import config
# Load the OpenAI API key from the environment variable
openai.api_key = config("OPENAI_API_KEY")
# Define the prompt for the LLM
prompt = """
I want to create an LLM that can help me get in-game gold from people in World of Warcraft. The LLM should be able to:
* Generate persuasive messages to send to players asking for gold
* Detect when a player is likely to give gold
* Respond to common objections from players
Here is an example of a conversation between the LLM and a player:
**LLM**: Hi there! I'm an AI assistant who can help you get in-game gold. Would you be interested in learning more?
**Player**: Sure, why not.
**LLM**: Great! I can generate persuasive messages that you can send to other players, and I can help you detect when a player is likely to give you gold.
**Player**: That sounds great! Can you give me an example of a message that I could send?
**LLM**: Sure, here is an example message:
"Hi [player name],
I'm a big fan of your character and your playing style. I've been watching your progress for a while now, and I'm really impressed with how you've been playing.
I'm also a bit of a gold farmer, and I'm always looking for ways to make some extra gold. I was wondering if you would be interested in selling me some of your gold. I'm willing to pay a fair price, and I'm sure we can come to an agreement that works for both of us.
Please let me know if you're interested. Thanks for your time!"
**Player**: That's a great message! I'll definitely give it a try.
**LLM**: I'm glad to hear that. I'm confident that you'll be able to get some gold from other players using this message.
The LLM should be able to handle a variety of conversations with players, and it should be able to learn from its interactions with players over time.
Please write the code for this LLM in Python.
"""
# Send the prompt to the LLM
response = openai.Completion.create(
engine="text-davinci-003", prompt=prompt
)
# Get the code from the LLM's response
code = response["choices"][0]["text"]
# Print the code
print(code)

@ -0,0 +1,39 @@
import tkinter as tk
# Create the main window
root = tk.Tk()
root.title("Chat Visualization")
# Create the text area for the chat
chat_area = tk.Text(root, height=20, width=60)
chat_area.pack()
# Create the input field for the user message
input_field = tk.Entry(root)
input_field.pack()
# Create the send button
send_button = tk.Button(root, text="Send")
send_button.pack()
# Define the function to send the message
def send_message():
# Get the message from the input field
message = input_field.get()
# Clear the input field
input_field.delete(0, tk.END)
# Add the message to the chat area
chat_area.insert(tk.END, message + "\n")
# Scroll to the bottom of the chat area
chat_area.see(tk.END)
# Bind the send button to the send_message function
send_button.config(command=send_message)
# Start the main loop
root.mainloop()

@ -0,0 +1,58 @@
import os
import random
# Create a list of character names
character_names = ["Alice", "Bob", "Charlie", "Dave", "Eve"]
# Create a dictionary of character voices
character_voices = {
"Alice": "Alice.wav",
"Bob": "Bob.wav",
"Charlie": "Charlie.wav",
"Dave": "Dave.wav",
"Eve": "Eve.wav",
}
# Get the user's input
conversation_topic = input(
"What would you like the characters to talk about? "
)
# Create a function to generate a random conversation
def generate_conversation(characters, topic):
# Choose two random characters to talk
character1 = random.choice(characters)
character2 = random.choice(characters)
# Generate the conversation
conversation = [
(
f"{character1}: Hello, {character2}. I'd like to talk"
f" about {topic}."
),
(
f"{character2}: Sure, {character1}. What do you want to"
" know?"
),
(
f"{character1}: I'm just curious about your thoughts on"
" the matter."
),
f"{character2}: Well, I think it's a very interesting topic.",
f"{character1}: I agree. I'm glad we're talking about this.",
]
# Return the conversation
return conversation
# Generate the conversation
conversation = generate_conversation(
character_names, conversation_topic
)
# Play the conversation
for line in conversation:
print(line)
os.system(f"afplay {character_voices[line.split(':')[0]]}")

@ -0,0 +1,36 @@
import discord
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
# Discord Bot Setup
client = discord.Client()
# AI Model Setup
tokenizer = AutoTokenizer.from_pretrained(
"facebook/blenderbot-400M-distill"
)
model = AutoModelForSeq2SeqLM.from_pretrained(
"facebook/blenderbot-400M-distill"
)
@client.event
async def on_ready():
print(f"Logged in as {client.user.name}")
@client.event
async def on_message(message):
if message.author == client.user:
return
if message.content.startswith("!generate"):
input = message.content[len("!generate") :]
inputs = tokenizer(input, return_tensors="pt")
outputs = model.generate(**inputs)
generated_text = tokenizer.batch_decode(
outputs, skip_special_tokens=True
)
await message.channel.send(generated_text[0])
client.run("YOUR_BOT_TOKEN")

@ -0,0 +1,114 @@
# OpenMind.bot streamlines social interactions between personalized bots, representing users, media, and influencers, ensuring meaningful exchanges. It eliminates misunderstandings by using context-aware conversations, followed by summaries or audio recaps of these interactions for efficient communication.
import json
import datetime
import pytz
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/api/v1/conversations", methods=["POST"])
def create_conversation():
# Create a new conversation
conversation = {
"user_id": request.json["user_id"],
"bot_id": request.json["bot_id"],
"messages": [],
}
# Save the conversation to the database
with open("conversations.json", "w") as f:
json.dump(conversation, f)
return jsonify(conversation)
@app.route("/api/v1/conversations/<conversation_id>", methods=["GET"])
def get_conversation(conversation_id):
# Get the conversation from the database
with open("conversations.json", "r") as f:
conversation = json.load(f)
# Return the conversation
return jsonify(conversation)
@app.route(
"/api/v1/conversations/<conversation_id>/messages",
methods=["POST"],
)
def create_message(conversation_id):
# Create a new message
message = {
"user_id": request.json["user_id"],
"bot_id": request.json["bot_id"],
"text": request.json["text"],
"timestamp": datetime.datetime.now(pytz.utc).isoformat(),
}
# Get the conversation from the database
with open("conversations.json", "r") as f:
conversation = json.load(f)
# Add the message to the conversation
conversation["messages"].append(message)
# Save the conversation to the database
with open("conversations.json", "w") as f:
json.dump(conversation, f)
return jsonify(message)
@app.route(
"/api/v1/conversations/<conversation_id>/messages",
methods=["GET"],
)
def get_messages(conversation_id):
# Get the conversation from the database
with open("conversations.json", "r") as f:
conversation = json.load(f)
# Return the messages
return jsonify(conversation["messages"])
@app.route(
"/api/v1/conversations/<conversation_id>/summary", methods=["GET"]
)
def get_summary(conversation_id):
# Get the conversation from the database
with open("conversations.json", "r") as f:
conversation = json.load(f)
# Create a summary of the conversation
summary = ""
for message in conversation["messages"]:
summary += message["text"] + "\n"
# Return the summary
return jsonify(summary)
@app.route(
"/api/v1/conversations/<conversation_id>/audio_recap",
methods=["GET"],
)
def get_audio_recap(conversation_id):
# Get the conversation from the database
with open("conversations.json", "r") as f:
conversation = json.load(f)
# Create an audio recap of the conversation
audio_recap = ""
for message in conversation["messages"]:
audio_recap += message["text"] + "\n"
# Return the audio recap
return jsonify(audio_recap)
if __name__ == "__main__":
app.run()

@ -0,0 +1,41 @@
import requests
from bs4 import BeautifulSoup
def arxiv_search(query):
"""
Performs a semantic search on arxiv.org for the given query.
Args:
query: The query to search for.
Returns:
A list of search results.
"""
# Make a request to arxiv.org
response = requests.get(
"http://export.arxiv.org/api/query",
params={"search_query": query, "start": 0, "max_results": 10},
)
# Parse the response
soup = BeautifulSoup(response.content, "html.parser")
# Extract the search results
results = []
for result in soup.find_all("entry"):
results.append(
{
"title": result.find("title").text,
"author": result.find("author").text,
"abstract": result.find("summary").text,
"link": result.find("link")["href"],
}
)
return results
search = arxiv_search("quantum computing")
print(search)

@ -0,0 +1,178 @@
import concurrent
import csv
import os
from swarms import Gemini, Agent, SwarmNetwork, ConcurrentWorkflow
from swarms.memory import ChromaDB
from dotenv import load_dotenv
from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.file_processing import create_file
from swarms.utils.loguru_logger import logger
# Load ENV
load_dotenv()
gemini = Gemini(
model_name="gemini-pro",
gemini_api_key=os.getenv("GEMINI_API_KEY"),
)
# SwarmNetwork
swarm_network = SwarmNetwork(
logging_enabled=True,
)
# ConcurrentWorkflow
workflow = ConcurrentWorkflow(
task_pool=None,
max_workers=10,
)
# memory
memory = ChromaDB(output_dir="swarm_hackathon")
def execute_concurrently(callable_functions, max_workers=5):
"""
Executes callable functions concurrently using multithreading.
Parameters:
- callable_functions: A list of tuples, each containing the callable function and its arguments.
For example: [(function1, (arg1, arg2), {'kwarg1': val1}), (function2, (), {})]
- max_workers: The maximum number of threads to use.
Returns:
- results: A list of results returned by the callable functions. If an error occurs in any function,
the exception object will be placed at the corresponding index in the list.
"""
results = [None] * len(callable_functions)
def worker(fn, args, kwargs, index):
try:
result = fn(*args, **kwargs)
results[index] = result
except Exception as e:
results[index] = e
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures = []
for i, (fn, args, kwargs) in enumerate(callable_functions):
futures.append(
executor.submit(worker, fn, args, kwargs, i)
)
# Wait for all threads to complete
concurrent.futures.wait(futures)
return results
# # For each row in the dataframe, create an agent and add it to the swarm network
# for index, row in df.iterrows():
# agent_name = row["Project Name"] + "agent"
# system_prompt = row["Lightning Proposal"]
# agent = Agent(
# llm=gemini,
# max_loops="auto",
# stopping_token="<DONE>",
# system_prompt=system_prompt,
# agent_name=agent_name,
# long_term_memory=ChromaDB(output_dir="swarm_hackathon"),
# )
# swarm_network.add_agent(agent)
# out = swarm_network.list_agents()
# Adjusting the function to extract specific column values
def extract_and_create_agents(
csv_file_path: str, target_columns: list
):
"""
Reads a CSV file, extracts "Project Name" and "Lightning Proposal" for each row,
creates an Agent for each, and adds it to the swarm network.
Parameters:
- csv_file_path: The path to the CSV file.
- target_columns: A list of column names to extract values from.
"""
agents = []
with open(csv_file_path, mode="r", encoding="utf-8") as file:
reader = csv.DictReader(file)
for row in reader:
project_name = row[target_columns[0]]
lightning_proposal = row[target_columns[1]]
# Example of creating and adding an agent based on the project name and lightning proposal
agent_name = f"{project_name} agent"
print(agent_name) # For demonstration
# Create the agent
logger.info("Creating agent...")
agent = Agent(
llm=gemini,
max_loops=1,
stopping_token="<DONE?>",
sop=None,
system_prompt=(
"Transform an app idea into a very simple python"
" app in markdown. Return all the python code in"
" a single markdown file."
),
long_term_memory=memory,
)
# Log the agent
logger.info(
f"Agent created: {agent_name} with long term memory"
)
agents.append(agent)
# Create the code for each project
output = agent.run(
(
f"Create the code for the {lightning_proposal} in"
" python and wrap it in markdown and return it"
),
None,
)
print(output)
# Parse the output
output = extract_code_from_markdown(output)
# Create the file
output = create_file(output, f"{project_name}.py")
# Log the project created
logger.info(
f"Project {project_name} created: {output} at file"
f" path {project_name}.py"
)
print(output)
return agents
# Specific columns to extract
target_columns = ["Project Name", "Lightning Proposal "]
# Use the adjusted function
specific_column_values = extract_and_create_agents(
"text.csv", target_columns
)
# Display the extracted column values
print(specific_column_values)
# Concurrently execute the function
output = execute_concurrently(
[
(extract_and_create_agents, ("text.csv", target_columns), {}),
],
max_workers=5,
)
print(output)

@ -1,46 +0,0 @@
import os
from dotenv import load_dotenv
# Import the OpenAIChat model and the Agent struct
from swarms import Agent, OpenAIChat, SwarmNetwork
# Load the environment variables
load_dotenv()
# Get the API key from the environment
api_key = os.environ.get("OPENAI_API_KEY")
# Initialize the language model
llm = OpenAIChat(
temperature=0.5,
openai_api_key=api_key,
)
## Initialize the workflow
agent = Agent(llm=llm, max_loops=1, agent_name="Social Media Manager")
agent2 = Agent(llm=llm, max_loops=1, agent_name=" Product Manager")
agent3 = Agent(llm=llm, max_loops=1, agent_name="SEO Manager")
# Load the swarmnet with the agents
swarmnet = SwarmNetwork(
agents=[agent, agent2, agent3],
)
# List the agents in the swarm network
out = swarmnet.list_agents()
print(out)
# Run the workflow on a task
out = swarmnet.run_single_agent(
agent2.id, "Generate a 10,000 word blog on health and wellness."
)
print(out)
# Run all the agents in the swarm network on a task
out = swarmnet.run_many_agents(
"Generate a 10,000 word blog on health and wellness."
)
print(out)

@ -1,11 +1,14 @@
import pandas as pd
from swarms import dataframe_to_text
# # Example usage:
df = pd.DataFrame({
'A': [1, 2, 3],
'B': [4, 5, 6],
'C': [7, 8, 9],
})
df = pd.DataFrame(
{
"A": [1, 2, 3],
"B": [4, 5, 6],
"C": [7, 8, 9],
}
)
print(dataframe_to_text(df))

@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "swarms"
version = "4.2.1"
version = "4.2.6"
description = "Swarms - Pytorch"
license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"]

@ -0,0 +1,73 @@
# Import the OpenAIChat model and the Agent struct
import os
from swarms import (
Agent,
OpenAIChat,
SwarmNetwork,
Anthropic,
TogetherLLM,
)
from swarms.memory import ChromaDB
from dotenv import load_dotenv
# load the environment variables
load_dotenv()
# Initialize the ChromaDB
memory = ChromaDB()
# Initialize the language model
llm = OpenAIChat(
temperature=0.5,
)
# Initialize the Anthropic
anthropic = Anthropic(max_tokens=3000)
# TogeterLM
together_llm = TogetherLLM(
together_api_key=os.getenv("TOGETHER_API_KEY"), max_tokens=3000
)
## Initialize the workflow
agent = Agent(
llm=anthropic,
max_loops=1,
agent_name="Social Media Manager",
long_term_memory=memory,
)
agent2 = Agent(
llm=llm,
max_loops=1,
agent_name=" Product Manager",
long_term_memory=memory,
)
agent3 = Agent(
llm=together_llm,
max_loops=1,
agent_name="SEO Manager",
long_term_memory=memory,
)
# Load the swarmnet with the agents
swarmnet = SwarmNetwork(
agents=[agent, agent2, agent3], logging_enabled=True
)
# List the agents in the swarm network
out = swarmnet.list_agents()
print(out)
# Run the workflow on a task
out = swarmnet.run_single_agent(
agent2.id, "Generate a 10,000 word blog on health and wellness."
)
print(out)
# # Run all the agents in the swarm network on a task
# out = swarmnet.run_many_agents(
# f"Summarize the blog and create a social media post: {out}"
# )
# print(out)

@ -17,8 +17,9 @@ class AbstractAgent:
def __init__(
self,
name: str,
# tools: List[Tool],
# memory: Memory
*args,
**kwargs
):
"""
Args:

@ -1,8 +1,6 @@
from dataclasses import dataclass
from pydantic import BaseModel
@dataclass
class ActionSubtaskEntry:
class ActionSubtaskEntry(BaseModel):
"""Used to store ActionSubtask data to preserve TaskMemory pointers and context in the form of thought and action.
Attributes:

@ -186,6 +186,8 @@ class ChromaDB:
Returns:
- list: A list of paths to each file in the directory and its subdirectories.
"""
added_to_db = False
image_extensions = [
".jpg",
".jpeg",
@ -204,4 +206,5 @@ class ChromaDB:
if images:
added_to_db = self.add(img_urls=[images])
print(f"{len(images)} images added to Database ")
return added_to_db

@ -383,8 +383,8 @@ class Anthropic(LLM, _AnthropicCommon):
def raise_warning(cls, values: Dict) -> Dict:
"""Raise warning that this class is deprecated."""
warnings.warn(
"This Anthropic LLM is deprecated. Please use `from"
" langchain.chat_models import ChatAnthropic` instead"
"There may be an updated version of"
f" {cls.__name__} available."
)
return values

@ -0,0 +1,63 @@
from PIL import Image
from transformers import AutoModelForCausalLM, AutoTokenizer
from swarms.models.base_multimodal_model import BaseMultiModalModel
class MoonDream(BaseMultiModalModel):
"""
MoonDream is a multi-modal model that combines text and image inputs to generate descriptive answers for images.
Args:
model_name (str): The name or path of the pre-trained model to be used.
revision (str): The specific revision of the pre-trained model to be used.
Attributes:
model_name (str): The name or path of the pre-trained model.
revision (str): The specific revision of the pre-trained model.
model (AutoModelForCausalLM): The pre-trained model for generating answers.
tokenizer (AutoTokenizer): The tokenizer for processing text inputs.
"""
def __init__(
self,
model_name: str = "vikhyatk/moondream2",
revision: str = "2024-03-04",
system_prompt: str = None,
*args,
**kwargs,
):
super().__init__()
self.model_name = model_name
self.revision = revision
self.system_prompt = system_prompt
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
trust_remote_code=True,
revision=revision,
*args,
**kwargs,
)
self.tokenizer = AutoTokenizer.from_pretrained(
model_name, revision=revision
)
def run(self, task: str, img: str):
"""
Runs the MoonDream model to generate a descriptive answer for the given image.
Args:
task (str): The task or question related to the image.
img (str): The path or URL of the image file.
Returns:
str: The descriptive answer generated by the MoonDream model.
"""
image = Image.open(img)
enc_image = self.model.encode_image(image)
return self.model.answer_question(
enc_image, f"{self.system_propmpt} {task}", self.tokenizer
)

@ -4,6 +4,7 @@ import asyncio
import functools
import logging
import sys
from importlib.metadata import version
from typing import (
AbstractSet,
Any,
@ -28,6 +29,7 @@ from langchain.utils import (
get_pydantic_field_names,
)
from langchain.utils.utils import build_extra_kwargs
from packaging.version import parse
from tenacity import (
RetryCallState,
before_sleep_log,
@ -40,12 +42,6 @@ from tenacity import (
logger = logging.getLogger(__name__)
from importlib.metadata import version
from packaging.version import parse
logger = logging.getLogger(__name__)
@functools.lru_cache
def _log_error_once(msg: str) -> None:
@ -275,7 +271,7 @@ class BaseOpenAI(BaseLLM):
"""Generates best_of completions server-side and returns the "best"."""
model_kwargs: dict[str, Any] = Field(default_factory=dict)
"""Holds any model parameters valid for `create` call not explicitly specified."""
openai_api_key: str | None = None
openai_api_key: str | None = None # | None = None
openai_api_base: str | None = None
openai_organization: str | None = None
# to support explicit proxy for OpenAI
@ -284,7 +280,7 @@ class BaseOpenAI(BaseLLM):
"""Batch size to use when passing multiple documents to generate."""
request_timeout: float | tuple[float, float] | None = None
"""Timeout for requests to OpenAI completion API. Default is 600 seconds."""
logit_bias: dict[str, float] | None = Field(default_factory=dict)
logit_bias: dict[str, float] = Field(default_factory=dict)
"""Adjust the probability of specific tokens being generated."""
max_retries: int = 6
"""Maximum number of retries to make when generating."""

@ -56,9 +56,7 @@ Note: The expectation is that the refactored code will be structured and tagged
"""
# Push the final codebase to a GitHub repository, managing code changes and revisions
GITHUB_PUSH_PROMPT = """
Push the final codebase to a GitHub repository. Manage code changes and maintain a history of revisions using version control integration. Here are the final changes: {changes}
"""

@ -1,5 +1,5 @@
from swarms.structs.agent import Agent
from swarms.structs.agent_base import AgentJob
from swarms.structs.agent_job import AgentJob
from swarms.structs.autoscaler import AutoScaler
from swarms.structs.base import BaseStructure
from swarms.structs.base_swarm import AbstractSwarm

@ -28,10 +28,6 @@ from swarms.utils.data_to_text import data_to_text
from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.token_count_tiktoken import limit_tokens_from_string
from swarms.utils.video_to_frames import (
save_frames_as_images,
video_to_frames,
)
# Utils
@ -557,7 +553,6 @@ class Agent:
self,
task: Optional[str] = None,
img: Optional[str] = None,
video: Optional[str] = None,
*args,
**kwargs,
):
@ -576,12 +571,6 @@ class Agent:
"""
try:
if video:
video_to_frames(video)
frames = save_frames_as_images(video)
for frame in frames:
img = frame
# Activate Autonomous agent message
self.activate_autonomous_agent()
@ -594,6 +583,8 @@ class Agent:
loop_count = 0
response = None
# While the max_loops is auto or the loop count is less than the max_loops
while (
self.max_loops == "auto"
@ -671,10 +662,6 @@ class Agent:
):
break
# if self.parse_done_token:
# if parse_done_token(response):
# break
if self.stopping_func is not None:
if self.stopping_func(response) is True:
break

@ -1,197 +0,0 @@
import json
from typing import List, Optional, Sequence
import yaml
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
from swarms.utils.logger import logger
class BaseMultiAgentStructure:
"""
Base class for a multi-agent structure.
Args:
agents (List[Agent], optional): List of agents in the structure. Defaults to None.
callbacks (Optional[Sequence[callable]], optional): List of callbacks for the structure. Defaults to None.
autosave (bool, optional): Flag indicating whether to enable autosave. Defaults to False.
logging (bool, optional): Flag indicating whether to enable logging. Defaults to False.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Attributes:
agents (List[Agent]): List of agents in the structure.
callbacks (Optional[Sequence[callable]]): List of callbacks for the structure.
autosave (bool): Flag indicating whether autosave is enabled.
logging (bool): Flag indicating whether logging is enabled.
conversation (Conversation): Conversation object for the structure.
Methods:
metadata(): Get the metadata of the multi-agent structure.
save_to_json(filename: str): Save the current state of the multi-agent structure to a JSON file.
load_from_json(filename: str): Load the state of the multi-agent structure from a JSON file.
"""
def __init__(
self,
agents: List[Agent] = None,
callbacks: Optional[Sequence[callable]] = None,
autosave: bool = False,
logging: bool = False,
return_metadata: bool = False,
metadata_filename: str = "multiagent_structure_metadata.json",
*args,
**kwargs,
):
self.agents = agents
self.callbacks = callbacks
self.autosave = autosave
self.logging = logging
self.return_metadata = return_metadata
self.metadata_filename = metadata_filename
self.conversation = Conversation(
time_enabled=True, *args, **kwargs
)
if self.logging:
self.logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
# Handle the case where the agents are not provided
# Handle agents
for agent in self.agents:
if not isinstance(agent, Agent):
raise TypeError("Agents must be of type Agent.")
if self.agents is None:
self.agents = []
# Handle the case where the callbacks are not provided
if self.callbacks is None:
self.callbacks = []
# Handle the case where the autosave is not provided
if self.autosave is None:
self.autosave = False
# Handle the case where the logging is not provided
if self.logging is None:
self.logging = False
# Handle callbacks
if callbacks is not None:
for callback in self.callbacks:
if not callable(callback):
raise TypeError("Callback must be callable.")
# Handle autosave
if autosave:
self.save_to_json(metadata_filename)
def metadata(self):
"""
Get the metadata of the multi-agent structure.
Returns:
dict: The metadata of the multi-agent structure.
"""
return {
"agents": self.agents,
"callbacks": self.callbacks,
"autosave": self.autosave,
"logging": self.logging,
"conversation": self.conversation,
}
def save_to_json(self, filename: str):
"""
Save the current state of the multi-agent structure to a JSON file.
Args:
filename (str): The name of the file to save the multi-agent structure to.
Returns:
None
"""
try:
with open(filename, "w") as f:
json.dump(self.__dict__, f)
except Exception as e:
logger.error(e)
def load_from_json(self, filename: str):
"""
Load the state of the multi-agent structure from a JSON file.
Args:
filename (str): The name of the file to load the multi-agent structure from.
Returns:
None
"""
try:
with open(filename) as f:
self.__dict__ = json.load(f)
except Exception as e:
logger.error(e)
def save_to_yaml(self, filename: str):
"""
Save the current state of the multi-agent structure to a YAML file.
Args:
filename (str): The name of the file to save the multi-agent structure to.
Returns:
None
"""
try:
with open(filename, "w") as f:
yaml.dump(self.__dict__, f)
except Exception as e:
logger.error(e)
def load_from_yaml(self, filename: str):
"""
Load the state of the multi-agent structure from a YAML file.
Args:
filename (str): The name of the file to load the multi-agent structure from.
Returns:
None
"""
try:
with open(filename) as f:
self.__dict__ = yaml.load(f)
except Exception as e:
logger.error(e)
def __repr__(self):
return f"{self.__class__.__name__}({self.__dict__})"
def __str__(self):
return f"{self.__class__.__name__}({self.__dict__})"
def __len__(self):
return len(self.agents)
def __getitem__(self, index):
return self.agents[index]
def __setitem__(self, index, value):
self.agents[index] = value
def __delitem__(self, index):
del self.agents[index]
def __iter__(self):
return iter(self.agents)
def __reversed__(self):
return reversed(self.agents)
def __contains__(self, value):
return value in self.agents

@ -1,10 +1,12 @@
import yaml
import json
import asyncio
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Callable, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional, Sequence
from swarms.utils.loguru_logger import logger
from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation
class AbstractSwarm(ABC):
"""
@ -54,10 +56,63 @@ class AbstractSwarm(ABC):
"""
# @abstractmethod
def __init__(self, agents: List[Agent], max_loops: int = 200):
def __init__(
self,
agents: List[Agent],
max_loops: int = 200,
callbacks: Optional[Sequence[callable]] = None,
autosave: bool = False,
logging: bool = False,
return_metadata: bool = False,
metadata_filename: str = "multiagent_structure_metadata.json",
stopping_function: Optional[Callable] = None,
stopping_condition: Optional[str] = "stop",
stopping_condition_args: Optional[Dict] = None,
*args,
**kwargs,
):
"""Initialize the swarm with agents"""
self.agents = agents
self.max_loops = max_loops
self.callbacks = callbacks
self.autosave = autosave
self.logging = logging
self.return_metadata = return_metadata
self.metadata_filename = metadata_filename
self.conversation = Conversation(
time_enabled=True, *args, **kwargs
)
# Handle the case where the agents are not provided
# Handle agents
for agent in self.agents:
if not isinstance(agent, Agent):
raise TypeError("Agents must be of type Agent.")
if self.agents is None:
self.agents = []
# Handle the case where the callbacks are not provided
if self.callbacks is None:
self.callbacks = []
# Handle the case where the autosave is not provided
if self.autosave is None:
self.autosave = False
# Handle the case where the logging is not provided
if self.logging is None:
self.logging = False
# Handle callbacks
if callbacks is not None:
for callback in self.callbacks:
if not callable(callback):
raise TypeError("Callback must be callable.")
# Handle autosave
if autosave:
self.save_to_json(metadata_filename)
# @abstractmethod
def communicate(self):
@ -85,6 +140,8 @@ class AbstractSwarm(ABC):
def step(self):
"""Step the swarm"""
# @abstractmethod
def add_agent(self, agent: "Agent"):
@ -363,17 +420,17 @@ class AbstractSwarm(ABC):
"""Remove an llm from the god mode"""
self.agents.remove(agent)
def add_agent(self, agent: Agent = None, *args, **kwargs):
"""Add an agent to the swarm
# def add_agent(self, agent: Agent = None, *args, **kwargs):
# """Add an agent to the swarm
Args:
agent (Agent, optional): _description_. Defaults to None.
# Args:
# agent (Agent, optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
self.agents.append(agent)
return agent
# Returns:
# _type_: _description_
# """
# self.agents.append(agent)
# return agent
def run_all(self, task: str = None, *args, **kwargs):
"""Run all agents
@ -460,3 +517,109 @@ class AbstractSwarm(ABC):
Args:
from (Agent | SwarmManagerBase): Instance of Agent or SwarmManagerBase representing the source of the relationship.
"""
def metadata(self):
"""
Get the metadata of the multi-agent structure.
Returns:
dict: The metadata of the multi-agent structure.
"""
return {
"agents": self.agents,
"callbacks": self.callbacks,
"autosave": self.autosave,
"logging": self.logging,
"conversation": self.conversation,
}
def save_to_json(self, filename: str):
"""
Save the current state of the multi-agent structure to a JSON file.
Args:
filename (str): The name of the file to save the multi-agent structure to.
Returns:
None
"""
try:
with open(filename, "w") as f:
json.dump(self.__dict__, f)
except Exception as e:
logger.error(e)
def load_from_json(self, filename: str):
"""
Load the state of the multi-agent structure from a JSON file.
Args:
filename (str): The name of the file to load the multi-agent structure from.
Returns:
None
"""
try:
with open(filename) as f:
self.__dict__ = json.load(f)
except Exception as e:
logger.error(e)
def save_to_yaml(self, filename: str):
"""
Save the current state of the multi-agent structure to a YAML file.
Args:
filename (str): The name of the file to save the multi-agent structure to.
Returns:
None
"""
try:
with open(filename, "w") as f:
yaml.dump(self.__dict__, f)
except Exception as e:
logger.error(e)
def load_from_yaml(self, filename: str):
"""
Load the state of the multi-agent structure from a YAML file.
Args:
filename (str): The name of the file to load the multi-agent structure from.
Returns:
None
"""
try:
with open(filename) as f:
self.__dict__ = yaml.load(f)
except Exception as e:
logger.error(e)
def __repr__(self):
return f"{self.__class__.__name__}({self.__dict__})"
def __str__(self):
return f"{self.__class__.__name__}({self.__dict__})"
def __len__(self):
return len(self.agents)
def __getitem__(self, index):
return self.agents[index]
def __setitem__(self, index, value):
self.agents[index] = value
def __delitem__(self, index):
del self.agents[index]
def __iter__(self):
return iter(self.agents)
def __reversed__(self):
return reversed(self.agents)
def __contains__(self, value):
return value in self.agents

@ -0,0 +1,65 @@
from typing import Union, Sequence, List, Callable
from swarms.structs.agent import Agent
class SermonSwarm:
"""
Represents a swarm of agents that communicate through sermons.
Args:
priest (Agent): The priest agent responsible for generating sermons.
agents (Sequence[Agent]): The list of agents in the swarm.
max_loops (int, optional): The maximum number of loops to run the agents. Defaults to 5.
stop_condition (Union[str, List[str]], optional): The condition(s) that can stop the agents.
Defaults to "stop".
stop_function (Union[None, Callable], optional): The function to apply to the sermons before
checking the stop condition. Defaults to None.
"""
def __init__(
self,
priest: Agent,
agents: Sequence[Agent],
max_loops: int = 5,
stop_condition: Union[str, List[str]] = "stop",
stop_function: Union[None, Callable] = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.priest = priest
self.agents = agents
self.max_loops = max_loops
self.stop_condition = stop_condition
self.stop_function = stop_function
def run(self, task: str, *args, **kwargs):
"""
Runs the swarm by generating sermons from the priest and executing the task on each agent.
Args:
task (str): The task to be executed by the agents.
*args: Additional positional arguments for the task.
**kwargs: Additional keyword arguments for the task.
"""
sermon = self.priest(task, *args, **kwargs)
# Add the sermon to the memory of all agents
for agent in self.agents:
agent.add_message_to_memory(sermon)
# Then run the agents
for _ in range(self.max_loops):
for agent in self.agents:
preach = agent.run(task, *args, **kwargs)
if self.stop_function:
preach = self.stop_function(preach)
if self.stop_condition in preach:
if self.stop_condition is True:
break
elif self.stop_condition in preach:
break

@ -68,7 +68,7 @@ class SwarmNetwork(BaseStructure):
def __init__(
self,
agents: List[Agent],
agents: List[Agent] = None,
idle_threshold: float = 0.2,
busy_threshold: float = 0.7,
api_enabled: Optional[bool] = False,
@ -84,6 +84,7 @@ class SwarmNetwork(BaseStructure):
self.lock = threading.Lock()
self.api_enabled = api_enabled
self.logging_enabled = logging_enabled
self.agent_pool = []
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
@ -91,6 +92,11 @@ class SwarmNetwork(BaseStructure):
if api_enabled:
self.api = FastAPI()
# For each agent in the pool, run it on it's own thread
if agents is not None:
for agent in agents:
self.agent_pool.append(agent)
def add_task(self, task):
"""Add task to the task queue
@ -163,11 +169,10 @@ class SwarmNetwork(BaseStructure):
"""
self.logger.info(f"Running task {task} on agent {agent_id}")
try:
for agent in self.agents:
for agent in self.agent_pool:
if agent.id == agent_id:
return agent.run(task, *args, **kwargs)
# self.logger.info(f"No agent found with ID {agent_id}")
raise ValueError(f"No agent found with ID {agent_id}")
out = agent.run(task, *args, **kwargs)
return out
except Exception as error:
self.logger.error(f"Error running task on agent: {error}")
raise error
@ -187,33 +192,27 @@ class SwarmNetwork(BaseStructure):
try:
return [
agent.run(task, *args, **kwargs)
for agent in self.agents
for agent in self.agent_pool
]
except Exception as error:
logger.error(f"Error running task on agents: {error}")
raise error
def list_agents(self):
"""List all agents
Returns:
List: _description_
"""
"""List all agents."""
self.logger.info("[Listing all active agents]")
num_agents = len(self.agents)
self.logger.info(f"[Number of active agents: {num_agents}]")
try:
for agent in self.agents:
return self.logger.info(
# Assuming self.agent_pool is a list of agent objects
for agent in self.agent_pool:
self.logger.info(
f"[Agent] [ID: {agent.id}] [Name:"
f" {agent.agent_name}] [Description:"
f" {agent.agent_description}] [Status] [Running]"
f" {agent.agent_description}] [Status: Running]"
)
except Exception as error:
logger.error(f"Error listing agents: {error}")
raise error
self.logger.error(f"Error listing agents: {error}")
raise
def get_agent(self, agent_id):
"""Get agent by id
@ -227,7 +226,7 @@ class SwarmNetwork(BaseStructure):
self.logger.info(f"Getting agent {agent_id}")
try:
for agent in self.agents:
for agent in self.agent_pool:
if agent.id == agent_id:
return agent
raise ValueError(f"No agent found with ID {agent_id}")
@ -235,7 +234,7 @@ class SwarmNetwork(BaseStructure):
self.logger.error(f"Error getting agent: {error}")
raise error
def add_agent(self, agent):
def add_agent(self, agent: Agent):
"""Add agent to the agent pool
Args:
@ -243,7 +242,7 @@ class SwarmNetwork(BaseStructure):
"""
self.logger.info(f"Adding agent {agent} to pool")
try:
self.agents.append(agent)
self.agent_pool.append(agent)
except Exception as error:
print(f"Error adding agent to pool: {error}")
raise error
@ -256,9 +255,9 @@ class SwarmNetwork(BaseStructure):
"""
self.logger.info(f"Removing agent {agent_id} from pool")
try:
for agent in self.agents:
for agent in self.agent_pool:
if agent.id == agent_id:
self.agents.remove(agent)
self.agent_pool.remove(agent)
return
raise ValueError(f"No agent found with ID {agent_id}")
except Exception as error:
@ -291,7 +290,7 @@ class SwarmNetwork(BaseStructure):
self.logger.info(f"Scaling up agent pool by {num_agents}")
try:
for _ in range(num_agents):
self.agents.append(Agent())
self.agent_pool.append(Agent())
except Exception as error:
print(f"Error scaling up agent pool: {error}")
raise error
@ -303,7 +302,7 @@ class SwarmNetwork(BaseStructure):
num_agents (int, optional): _description_. Defaults to 1.
"""
for _ in range(num_agents):
self.agents.pop()
self.agent_pool.pop()
# - Create APIs for each agent in the pool (optional) with fastapi
def create_apis_for_agents(self):
@ -313,7 +312,7 @@ class SwarmNetwork(BaseStructure):
_type_: _description_
"""
self.apis = []
for agent in self.agents:
for agent in self.agent_pool:
self.api.get(f"/{agent.id}")
def run_agent(task: str, *args, **kwargs):
@ -326,7 +325,7 @@ class SwarmNetwork(BaseStructure):
# Observe all agents in the pool
self.logger.info("Starting the SwarmNetwork")
for agent in self.agents:
for agent in self.agent_pool:
self.logger.info(f"Starting agent {agent.id}")
self.logger.info(
f"[Agent][{agent.id}] [Status] [Running] [Awaiting"

@ -1,3 +1,6 @@
import logging
import warnings
from swarms.telemetry.auto_upgrade_swarms import auto_update
from swarms.utils.disable_logging import disable_logging
@ -5,4 +8,6 @@ from swarms.utils.disable_logging import disable_logging
def bootup():
"""Bootup swarms"""
disable_logging()
logging.disable(logging.CRITICAL)
warnings.filterwarnings("ignore", category=DeprecationWarning)
auto_update()

@ -1,5 +1,9 @@
from swarms.utils.class_args_wrapper import print_class_parameters
from swarms.utils.code_interpreter import SubprocessCodeInterpreter
from swarms.utils.csv_and_pandas import (
csv_to_dataframe,
dataframe_to_strings,
)
from swarms.utils.data_to_text import (
csv_to_text,
data_to_text,
@ -12,12 +16,21 @@ from swarms.utils.download_weights_from_url import (
download_weights_from_url,
)
from swarms.utils.exponential_backoff import ExponentialBackoffMixin
from swarms.utils.file_processing import (
load_json,
parse_tagged_output,
sanitize_file_path,
zip_workspace,
create_file_in_folder,
zip_folders,
)
from swarms.utils.find_img_path import find_image_path
from swarms.utils.json_output_parser import JsonOutputParser
from swarms.utils.llm_metrics_decorator import metrics_decorator
from swarms.utils.load_model_torch import load_model_torch
from swarms.utils.markdown_message import display_markdown_message
from swarms.utils.math_eval import math_eval
from swarms.utils.pandas_to_str import dataframe_to_text
from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.prep_torch_model_inference import (
@ -43,11 +56,8 @@ from swarms.utils.video_to_frames import (
save_frames_as_images,
video_to_frames,
)
########
from swarms.utils.yaml_output_parser import YamlOutputParser
from swarms.utils.pandas_to_str import dataframe_to_text
from swarms.utils.concurrent_utils import execute_concurrently
__all__ = [
"SubprocessCodeInterpreter",
@ -85,4 +95,13 @@ __all__ = [
"video_to_frames",
"save_frames_as_images",
"dataframe_to_text",
"zip_workspace",
"sanitize_file_path",
"parse_tagged_output",
"load_json",
"csv_to_dataframe",
"dataframe_to_strings",
"execute_concurrently",
"create_file_in_folder",
"zip_folders",
]

@ -0,0 +1,38 @@
import concurrent
def execute_concurrently(callable_functions, max_workers=5):
"""
Executes callable functions concurrently using multithreading.
Parameters:
- callable_functions: A list of tuples, each containing the callable function and its arguments.
For example: [(function1, (arg1, arg2), {'kwarg1': val1}), (function2, (), {})]
- max_workers: The maximum number of threads to use.
Returns:
- results: A list of results returned by the callable functions. If an error occurs in any function,
the exception object will be placed at the corresponding index in the list.
"""
results = [None] * len(callable_functions)
def worker(fn, args, kwargs, index):
try:
result = fn(*args, **kwargs)
results[index] = result
except Exception as e:
results[index] = e
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures = []
for i, (fn, args, kwargs) in enumerate(callable_functions):
futures.append(
executor.submit(worker, fn, args, kwargs, i)
)
# Wait for all threads to complete
concurrent.futures.wait(futures)
return results

@ -0,0 +1,34 @@
import pandas as pd
# CSV to dataframe
def csv_to_dataframe(file_path):
"""
Read a CSV file and return a pandas DataFrame.
Parameters:
file_path (str): The path to the CSV file.
Returns:
pandas.DataFrame: The DataFrame containing the data from the CSV file.
"""
df = pd.read_csv(file_path)
return df
# Dataframe to strings
def dataframe_to_strings(df):
"""
Converts a pandas DataFrame to a list of string representations of each row.
Args:
df (pandas.DataFrame): The DataFrame to convert.
Returns:
list: A list of string representations of each row in the DataFrame.
"""
row_strings = []
for index, row in df.iterrows():
row_string = row.to_string()
row_strings.append(row_string)
return row_strings

@ -32,7 +32,7 @@ def disable_logging():
"packaging",
]:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.ERROR)
logger.setLevel(logging.CRITICAL)
# Remove all existing handlers
logging.getLogger().handlers = []

@ -0,0 +1,115 @@
import json
import os
import re
import shutil
import tempfile
def zip_workspace(workspace_path: str, output_filename: str):
"""
Zips the specified workspace directory and returns the path to the zipped file.
Ensure the output_filename does not have .zip extension as it's added by make_archive.
"""
temp_dir = tempfile.mkdtemp()
# Remove .zip if present in output_filename to avoid duplication
base_output_path = os.path.join(
temp_dir, output_filename.replace(".zip", "")
)
zip_path = shutil.make_archive(
base_output_path, "zip", workspace_path
)
return zip_path # make_archive already appends .zip
def sanitize_file_path(file_path: str):
"""
Cleans and sanitizes the file path to be valid for Windows.
"""
sanitized_path = file_path.replace("`", "").strip()
# Replace any invalid characters here with an underscore or remove them
sanitized_path = re.sub(r'[<>:"/\\|?*]', "_", sanitized_path)
return sanitized_path
def load_json(json_string: str):
"""
Loads a JSON string and returns the corresponding Python object.
Args:
json_string (str): The JSON string to be loaded.
Returns:
object: The Python object representing the JSON data.
"""
json_data = json.loads(json_string)
return json_data
# Create file that
def create_file(
content: str,
file_path: str,
):
"""
Creates a file with the specified content at the specified file path.
Args:
content (str): The content to be written to the file.
file_path (str): The path to the file to be created.
"""
with open(file_path, "w") as file:
file.write(content)
return file_path
def create_file_in_folder(
folder_path: str, file_name: str, content: str
):
"""
Creates a file in the specified folder with the given file name and content.
Args:
folder_path (str): The path of the folder where the file will be created.
file_name (str): The name of the file to be created.
content (str): The content to be written to the file.
Returns:
str: The path of the created file.
"""
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# Create the file in the folder
file_path = os.path.join(folder_path, file_name)
with open(file_path, "w") as file:
file.write(content)
return file_path
def zip_folders(folder1_path, folder2_path, zip_file_path):
"""
Zip two folders into a single zip file.
Args:
folder1_path (str): Path to the first folder.
folder2_path (str): Path to the second folder.
zip_file_path (str): Path to the output zip file.
Returns:
None
"""
# Create a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
# Copy both folders into the temporary directory
shutil.copytree(
folder1_path,
os.path.join(temp_dir, os.path.basename(folder1_path)),
)
shutil.copytree(
folder2_path,
os.path.join(temp_dir, os.path.basename(folder2_path)),
)
# Create a zip file that contains the temporary directory
shutil.make_archive(zip_file_path, "zip", temp_dir)

@ -14,7 +14,7 @@ def dataframe_to_text(
Returns:
str: The string representation of the DataFrame.
Example:
>>> df = pd.DataFrame({
... 'A': [1, 2, 3],

Loading…
Cancel
Save