diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..8637cefc --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +.env +__pycache__ +.venv \ No newline at end of file diff --git a/.grit/.gitignore b/.grit/.gitignore new file mode 100644 index 00000000..799e2c72 --- /dev/null +++ b/.grit/.gitignore @@ -0,0 +1,2 @@ +.gritmodules +*.log diff --git a/DOCS/DOCUMENTATION.md b/DOCS/DOCUMENTATION.md new file mode 100644 index 00000000..8fd0e577 --- /dev/null +++ b/DOCS/DOCUMENTATION.md @@ -0,0 +1,75 @@ +# Swarms Documentation + +## Overview +The Swarm module includes the implementation of two classes, `WorkerNode` and `BossNode`, which respectively represent a worker agent and a boss agent. A worker agent is responsible for completing given tasks, while a boss agent is responsible for creating and managing tasks for the worker agent(s). + +## Key Classes + +### WorkerNode +```python +class WorkerNode: +``` + +The WorkerNode class represents an autonomous worker agent that can perform a range of tasks. + +__Methods__: + +- `create_agent(ai_name: str, ai_role: str, human_in_the_loop: bool, search_kwargs: dict) -> None`: + + This method creates a new autonomous agent that can complete tasks. The agent utilizes several tools such as search engines, a file writer/reader, and a multi-modal visual tool. + The agent's configuration is customizable through the method parameters. + + ```python + # Example usage + worker_node = WorkerNode(llm, tools, vectorstore) + worker_node.create_agent('test_agent', 'test_role', False, {}) + ``` + +- `run_agent(prompt: str) -> None`: + + This method runs the agent on a given task, defined by the `prompt` parameter. + + ```python + # Example usage + worker_node = WorkerNode(llm, tools, vectorstore) + worker_node.create_agent('test_agent', 'test_role', False, {}) + worker_node.run_agent('Calculate the square root of 144.') + ``` + +### BossNode +```python +class BossNode: +``` + +The BossNode class represents a manager agent that can create tasks and control the execution of these tasks. + +__Methods__: + +- `create_task(objective: str) -> dict`: + + This method creates a new task based on the provided `objective`. The created task is a dictionary with the objective as its value. + + ```python + # Example usage + boss_node = BossNode(llm, vectorstore, task_execution_chain, False, 3) + task = boss_node.create_task('Find the square root of 144.') + ``` + +- `execute_task(task: dict) -> None`: + + This method triggers the execution of a given task. + + ```python + # Example usage + boss_node = BossNode(llm, vectorstore, task_execution_chain, False, 3) + task = boss_node.create_task('Find the square root of 144.') + boss_node.execute_task(task) + ``` + +### Note + +Before creating the WorkerNode and BossNode, make sure to initialize the lower level model (llm), tools, and vectorstore which are used as parameters in the constructors of the two classes. + +In addition, the WorkerNode class uses the MultiModalVisualAgentTool which is a custom tool that enables the worker agent to run multi-modal visual tasks. Ensure that this tool is correctly initialized before running the WorkerNode. + +This documentation provides an overview of the main functionalities of the Swarm module. For additional details and advanced functionalities, please review the source code and the accompanying comments. diff --git a/DOCS/IDEAS.MD b/DOCS/IDEAS.MD new file mode 100644 index 00000000..e75f59d5 --- /dev/null +++ b/DOCS/IDEAS.MD @@ -0,0 +1,214 @@ +## Swarming Architectures + +Here are three examples of swarming architectures that could be applied in this context. + +1. **Hierarchical Swarms**: In this architecture, a 'lead' agent coordinates the efforts of other agents, distributing tasks based on each agent's unique strengths. The lead agent might be equipped with additional functionality or decision-making capabilities to effectively manage the swarm. + +2. **Collaborative Swarms**: Here, each agent in the swarm works in parallel, potentially on different aspects of a task. They then collectively determine the best output, often through a voting or consensus mechanism. + +3. **Competitive Swarms**: In this setup, multiple agents work on the same task independently. The output from the agent which produces the highest confidence or quality result is then selected. This can often lead to more robust outputs, as the competition drives each agent to perform at its best. + +4. **Multi-Agent Debate**: Here, multiple agents debate a topic. The output from the agent which produces the highest confidence or quality result is then selected. This can lead to more robust outputs, as the competition drives each agent to perform it's best. + + +# Ideas + +A swarm, particularly in the context of distributed computing, refers to a large number of coordinated agents or nodes that work together to solve a problem. The specific requirements of a swarm might vary depending on the task at hand, but some of the general requirements include: + +1. **Distributed Nature**: The swarm should consist of multiple individual units or nodes, each capable of functioning independently. + +2. **Coordination**: The nodes in the swarm need to coordinate with each other to ensure they're working together effectively. This might involve communication between nodes, or it could be achieved through a central orchestrator. + +3. **Scalability**: A well-designed swarm system should be able to scale up or down as needed, adding or removing nodes based on the task load. + +4. **Resilience**: If a node in the swarm fails, it shouldn't bring down the entire system. Instead, other nodes should be able to pick up the slack. + +5. **Load Balancing**: Tasks should be distributed evenly across the nodes in the swarm to avoid overloading any single node. + +6. **Interoperability**: Each node should be able to interact with others, regardless of differences in underlying hardware or software. + +Integrating these requirements with Large Language Models (LLMs) can be done as follows: + +1. **Distributed Nature**: Each LLM agent can be considered as a node in the swarm. These agents can be distributed across multiple servers or even geographically dispersed data centers. + +2. **Coordination**: An orchestrator can manage the LLM agents, assigning tasks, coordinating responses, and ensuring effective collaboration between agents. + +3. **Scalability**: As the demand for processing power increases or decreases, the number of LLM agents can be adjusted accordingly. + +4. **Resilience**: If an LLM agent goes offline or fails, the orchestrator can assign its tasks to other agents, ensuring the swarm continues functioning smoothly. + +5. **Load Balancing**: The orchestrator can also handle load balancing, ensuring tasks are evenly distributed amongst the LLM agents. + +6. **Interoperability**: By standardizing the input and output formats of the LLM agents, they can effectively communicate and collaborate, regardless of the specific model or configuration of each agent. + +In terms of architecture, the swarm might look something like this: + +``` + (Orchestrator) + / \ + Tools + Vector DB -- (LLM Agent)---(Communication Layer) (Communication Layer)---(LLM Agent)-- Tools + Vector DB + / | | \ +(Task Assignment) (Task Completion) (Task Assignment) (Task Completion) +``` + +Each LLM agent communicates with the orchestrator through a dedicated communication layer. The orchestrator assigns tasks to each LLM agent, which the agents then complete and return. This setup allows for a high degree of flexibility, scalability, and robustness. + + +## Communication Layer + +Communication layers play a critical role in distributed systems, enabling interaction between different nodes (agents) and the orchestrator. Here are three potential communication layers for a distributed system, including their strengths and weaknesses: + +1. **Message Queuing Systems (like RabbitMQ, Kafka)**: + + - Strengths: They are highly scalable, reliable, and designed for high-throughput systems. They also ensure delivery of messages and can persist them if necessary. Furthermore, they support various messaging patterns like publish/subscribe, which can be highly beneficial in a distributed system. They also have robust community support. + + - Weaknesses: They can add complexity to the system, including maintenance of the message broker. Moreover, they require careful configuration to perform optimally, and handling failures can sometimes be challenging. + +2. **RESTful APIs**: + + - Strengths: REST is widely adopted, and most programming languages have libraries to easily create RESTful APIs. They leverage standard HTTP(S) protocols and methods and are straightforward to use. Also, they can be stateless, meaning each request contains all the necessary information, enabling scalability. + + - Weaknesses: For real-time applications, REST may not be the best fit due to its synchronous nature. Additionally, handling a large number of API requests can put a strain on the system, causing slowdowns or timeouts. + +3. **gRPC (Google Remote Procedure Call)**: + + - Strengths: gRPC uses Protocol Buffers as its interface definition language, leading to smaller payloads and faster serialization/deserialization compared to JSON (commonly used in RESTful APIs). It supports bidirectional streaming and can use HTTP/2 features, making it excellent for real-time applications. + + - Weaknesses: gRPC is more complex to set up compared to REST. Protocol Buffers' binary format can be more challenging to debug than JSON. It's also not as widely adopted as REST, so tooling and support might be limited in some environments. + +In the context of swarm LLMs, one could consider an **Omni-Vector Embedding Database** for communication. This database could store and manage the high-dimensional vectors produced by each LLM agent. + +- Strengths: This approach would allow for similarity-based lookup and matching of LLM-generated vectors, which can be particularly useful for tasks that involve finding similar outputs or recognizing patterns. + +- Weaknesses: An Omni-Vector Embedding Database might add complexity to the system in terms of setup and maintenance. It might also require significant computational resources, depending on the volume of data being handled and the complexity of the vectors. The handling and transmission of high-dimensional vectors could also pose challenges in terms of network load. + + + + +# Technical Analysis Document: Particle Swarm of AI Agents using Ocean Database + +## Overview + +The goal is to create a particle swarm of AI agents using the OpenAI API for the agents and the Ocean database as the communication space, where the embeddings act as particles. The swarm will work collectively to perform tasks and optimize their behavior based on the interaction with the Ocean database. + +## Algorithmic Overview + +1. Initialize the AI agents and the Ocean database. +2. Assign tasks to the AI agents. +3. AI agents use the OpenAI API to perform tasks and generate embeddings. +4. AI agents store their embeddings in the Ocean database. +5. AI agents query the Ocean database for relevant embeddings. +6. AI agents update their positions based on the retrieved embeddings. +7. Evaluate the performance of the swarm and update the agents' behavior accordingly. +8. Repeat steps 3-7 until a stopping criterion is met. + +## Python Implementation Logic + +1. **Initialize the AI agents and the Ocean database.** + +```python +import openai +import oceandb +from oceandb.utils.embedding_functions import ImageBindEmbeddingFunction + +# Initialize Ocean database +client = oceandb.Client() +text_embedding_function = ImageBindEmbeddingFunction(modality="text") +collection = client.create_collection("all-my-documents", embedding_function=text_embedding_function) + +# Initialize AI agents +agents = initialize_agents(...) +``` + +2. **Assign tasks to the AI agents.** + +```python +tasks = assign_tasks_to_agents(agents, ...) +``` + +3. **AI agents use the OpenAI API to perform tasks and generate embeddings.** + +```python +def agent_perform_task(agent, task): + # Perform the task using the OpenAI API + result = perform_task_with_openai_api(agent, task) + # Generate the embedding + embedding = generate_embedding(result) + return embedding + +embeddings = [agent_perform_task(agent, task) for agent, task in zip(agents, tasks)] +``` + +4. **AI agents store their embeddings in the Ocean database.** + +```python +def store_embeddings_in_database(embeddings, collection): + for i, embedding in enumerate(embeddings): + document_id = f"agent_{i}" + collection.add(documents=[embedding], ids=[document_id]) + +store_embeddings_in_database(embeddings, collection) +``` + +5. **AI agents query the Ocean database for relevant embeddings.** + +```python +def query_database_for_embeddings(agent, collection, n_results=1): + query_result = collection.query(query_texts=[agent], n_results=n_results) + return query_result + +queried_embeddings = [query_database_for_embeddings(agent, collection) for agent in agents] +``` + +6. **AI agents update their positions based on the retrieved embeddings.** + +```python +def update_agent_positions(agents, queried_embeddings): + for agent, embedding in zip(agents, queried_embeddings): + agent.update_position(embedding) + +update_agent_positions(agents, queried_embeddings) +``` + +7. **Evaluate the performance of the swarm and update the agents' behavior accordingly.** + +```python +def evaluate_swarm_performance(agents, ...): + # Evaluate the performance of the swarm + performance = compute_performance_metric(agents, ...) + return performance + +def update_agent_behavior(agents, performance): + # Update agents' behavior based on swarm performance + for agent in agents: + agent.adjust_behavior(performance) + +performance = evaluate_swarm_performance(agents, ...) +update_agent_behavior(agents, performance) +``` + +8. **Repeat steps 3-7 until a stopping criterion is met.** + +```python +while not stopping_criterion_met(): + # Perform tasks and generate embeddings + embeddings = [agent_perform_task(agent, task) for agent, task in zip(agents, tasks)] + + # Store embeddings in the Ocean database + store_embeddings_in_database(embeddings, collection) + + # Query the Ocean database for relevant embeddings + queried_embeddings = [query_database_for_embeddings(agent, collection) for agent in agents] + + # Update AI agent positions based on the retrieved embeddings + update_agent_positions(agents, queried_embeddings) + + # Evaluate the performance of the swarm and update the agents' behavior accordingly + performance = evaluate_swarm_performance(agents, ...) + update_agent_behavior(agents, performance) +``` + +This code demonstrates the complete loop to repeat steps 3-7 until a stopping criterion is met. You will need to define the `stopping_criterion_met()` function, which could be based on a predefined number of iterations, a target performance level, or any other condition that indicates that the swarm has reached a desired state. + + + diff --git a/DOCS/MANIFESTO.md b/DOCS/MANIFESTO.md new file mode 100644 index 00000000..6a74572b --- /dev/null +++ b/DOCS/MANIFESTO.md @@ -0,0 +1,13 @@ +Today, we stand at the verge of a revolution in artificial intelligence and machine learning. Individual models have accomplished incredible feats, achieving unprecedented levels of understanding and generating incredibly human-like text. But this is just the beginning. + +In the future, we should expect more. These models, which we've seen perform so admirably in isolation, should be able to work together, as a team, a swarm. However, this kind of collaborative intelligence doesn't exist today. That's because the technology to seamlessly integrate these models and foster true inter-model collaboration has been missing, until now. + +In attempting to create this swarm, we face numerous challenges, such as developing the necessary infrastructure, ensuring seamless integration between the agents, and overcoming the practical limitations of our current computing capabilities. These are daunting tasks, and many have shied away from them because of the sheer complexity of the problem. But, if we can overcome these challenges, the rewards will be unimaginable, all digital activities will be automated. + +We envision a future where swarms of Language Learning Model (LLM) agents revolutionize fields like customer support, content creation, and research. Imagine an AI system that could work cohesively, understand complex problems, and deliver multi-faceted solutions. We estimate this could lead to a 100-fold improvement in AI effectiveness, and up to a trillion-dollar impact on the global economy. + +The secret to achieving this lies in our open-source approach and the power of the collective. By embracing open-source, we are enabling hundreds of thousands of minds worldwide to contribute to this vision, each bringing unique insights and solutions. Our bug bounty program and automated testing environments will act as catalysts, motivating and rewarding contributors while ensuring the robustness and reliability of our technology. + +At Agora, we believe in the transformative potential of this technology, and we are committed to making it a reality. Our world-class team of researchers, engineers, and AI enthusiasts are singularly focused on this mission. With a proven track record of success, and the tenacity to tackle the most complex problems, we are best positioned to lead this charge. + +We invite you to join us on this exciting journey. Let's come together to create swarms, advance humanity, and redefine what is possible with artificial intelligence. Our future is in our hands. Let's shape it together. \ No newline at end of file diff --git a/DOCS/MISSION.md b/DOCS/MISSION.md new file mode 100644 index 00000000..c287a0b5 --- /dev/null +++ b/DOCS/MISSION.md @@ -0,0 +1,149 @@ +# Bounty Program + +Our bounty program is an exciting opportunity for contributors to help us build the future of Swarms. By participating, you can earn rewards while contributing to a project that aims to revolutionize digital activity. + +Here's how it works: + +1. **Check out our Roadmap**: We've shared our roadmap detailing our short and long-term goals. These are the areas where we're seeking contributions. + +2. **Pick a Task**: Choose a task from the roadmap that aligns with your skills and interests. If you're unsure, you can reach out to our team for guidance. + +3. **Get to Work**: Once you've chosen a task, start working on it. Remember, quality is key. We're looking for contributions that truly make a difference. + +4. **Submit your Contribution**: Once your work is complete, submit it for review. We'll evaluate your contribution based on its quality, relevance, and the value it brings to Swarms. + +5. **Earn Rewards**: If your contribution is approved, you'll earn a bounty. The amount of the bounty depends on the complexity of the task, the quality of your work, and the value it brings to Swarms. + +## The Three Phases of Our Bounty Program + +### Phase 1: Building the Foundation +In the first phase, our focus is on building the basic infrastructure of Swarms. This includes developing key components like the Swarms class, integrating essential tools, and establishing task completion and evaluation logic. We'll also start developing our testing and evaluation framework during this phase. If you're interested in foundational work and have a knack for building robust, scalable systems, this phase is for you. + +### Phase 2: Enhancing the System +In the second phase, we'll focus on enhancing Swarms by integrating more advanced features, improving the system's efficiency, and refining our testing and evaluation framework. This phase involves more complex tasks, so if you enjoy tackling challenging problems and contributing to the development of innovative features, this is the phase for you. + +### Phase 3: Towards Super-Intelligence +The third phase of our bounty program is the most exciting - this is where we aim to achieve super-intelligence. In this phase, we'll be working on improving the swarm's capabilities, expanding its skills, and fine-tuning the system based on real-world testing and feedback. If you're excited about the future of AI and want to contribute to a project that could potentially transform the digital world, this is the phase for you. + +Remember, our roadmap is a guide, and we encourage you to bring your own ideas and creativity to the table. We believe that every contribution, no matter how small, can make a difference. So join us on this exciting journey and help us create the future of Swarms. + +**To participate in our bounty program, visit the [Swarms Bounty Program Page](https://swarms.ai/bounty).** Let's build the future together! + + + + + +## Bounties for Roadmap Items + +To accelerate the development of Swarms and to encourage more contributors to join our journey towards automating every digital activity in existence, we are announcing a Bounty Program for specific roadmap items. Each bounty will be rewarded based on the complexity and importance of the task. Below are the items available for bounty: + +1. **Multi-Agent Debate Integration**: $2000 +2. **Meta Prompting Integration**: $1500 +3. **Swarms Class**: $1500 +4. **Integration of Additional Tools**: $1000 +5. **Task Completion and Evaluation Logic**: $2000 +6. **Ocean Integration**: $2500 +7. **Improved Communication**: $2000 +8. **Testing and Evaluation**: $1500 +9. **Worker Swarm Class**: $2000 +10. **Documentation**: $500 + +For each bounty task, there will be a strict evaluation process to ensure the quality of the contribution. This process includes a thorough review of the code and extensive testing to ensure it meets our standards. + +# 3-Phase Testing Framework + +To ensure the quality and efficiency of the Swarm, we will introduce a 3-phase testing framework which will also serve as our evaluation criteria for each of the bounty tasks. + +## Phase 1: Unit Testing +In this phase, individual modules will be tested to ensure that they work correctly in isolation. Unit tests will be designed for all functions and methods, with an emphasis on edge cases. + +## Phase 2: Integration Testing +After passing unit tests, we will test the integration of different modules to ensure they work correctly together. This phase will also test the interoperability of the Swarm with external systems and libraries. + +## Phase 3: Benchmarking & Stress Testing +In the final phase, we will perform benchmarking and stress tests. We'll push the limits of the Swarm under extreme conditions to ensure it performs well in real-world scenarios. This phase will measure the performance, speed, and scalability of the Swarm under high load conditions. + +By following this 3-phase testing framework, we aim to develop a reliable, high-performing, and scalable Swarm that can automate all digital activities. + +# Reverse Engineering to Reach Phase 3 + +To reach the Phase 3 level, we need to reverse engineer the tasks we need to complete. Here's an example of what this might look like: + +1. **Set Clear Expectations**: Define what success looks like for each task. Be clear about the outputs and outcomes we expect. This will guide our testing and development efforts. + +2. **Develop Testing Scenarios**: Create a comprehensive list of testing scenarios that cover both common and edge cases. This will help us ensure that our Swarm can handle a wide range of situations. + +3. **Write Test Cases**: For each scenario, write detailed test cases that outline the exact steps to be followed, the inputs to be used, and the expected outputs. + +4. **Execute the Tests**: Run the test cases on our Swarm, making note of any issues or bugs that arise. + +5. **Iterate and Improve**: Based on the results of our tests, iterate and improve our Swarm. This may involve fixing bugs, optimizing code, or redesigning parts of our system. + +6. **Repeat**: Repeat this process until our Swarm meets our expectations and passes all test cases. + +By following these steps, we will systematically build, test, and improve our Swarm until it reaches the Phase 3 level. This methodical approach will help us ensure that we create a reliable, high-performing, and scalable Swarm that can truly automate all digital activities. + +Let's shape the future of digital automation together! + + +-------------------- +# Super-Intelligence Roadmap + +Creating a Super-Intelligent Swarm involves three main phases, where each phase has multiple sub-stages, each of which will require rigorous testing and evaluation to ensure progress towards super-intelligence. + +## Phase 1: Narrow Intelligence + +In this phase, the goal is to achieve high performance in specific tasks. These tasks will be predefined and the swarm will be trained and tested on these tasks. + +1. **Single Task Mastery**: Focus on mastering one task at a time. This can range from simple tasks like image recognition to complex tasks like natural language processing. + +2. **Task Switching**: Train the swarm to switch between different tasks effectively. This includes being able to stop one task and start another one without any loss in performance. + +3. **Multi-tasking**: The swarm should be capable of performing multiple tasks simultaneously without any degradation in performance. + +## Phase 2: General Intelligence + +In this phase, the swarm will be trained to handle a variety of tasks that were not part of the original training set. + +1. **Transfer Learning**: The swarm should be able to transfer knowledge learned in one context to another context. This means being able to apply knowledge learned in one task to a different but related task. + +2. **Adaptive Learning**: The swarm should be capable of adapting its learning strategies based on the task at hand. This includes being able to adjust its learning rate, exploration vs exploitation balance, etc. + +3. **Self-Learning**: The swarm should be able to learn new tasks on its own without any external guidance. This includes being able to understand the task requirements, find relevant information, learn the task, and evaluate its performance. + +## Phase 3: Super Intelligence + +In this phase, the swarm will surpass human-level performance in most economically valuable work. This involves the swarm being able to solve complex real-world problems, make accurate predictions, and generate innovative solutions. + +1. **Complex Problem Solving**: The swarm should be able to solve complex real-world problems. This includes being able to understand the problem, identify relevant information, generate solutions, evaluate the solutions, and implement the best solution. + +2. **Predictive Abilities**: The swarm should be able to make accurate predictions about future events based on past data. This includes being able to understand the data, identify relevant patterns, make accurate predictions, and evaluate the accuracy of its predictions. + +3. **Innovation**: The swarm should be able to generate innovative solutions to problems. This includes being able to think creatively, generate novel ideas, evaluate the ideas, and implement the best idea. + +4. **Self-improvement**: The swarm should be capable of improving its own capabilities. This includes being able to identify areas of weakness, find ways to improve, and implement the improvements. + +5. **Understanding**: The swarm should be able to understand complex concepts, make inferences, and draw conclusions. This includes being able to understand natural language, reason logically, and make sound judgments. + +Each of these stages will require extensive testing and evaluation to ensure progress towards super-intelligence. + +# Reverse-Engineering Super-Intelligence + +To reach the Phase 3 level of super-intelligence, we need to reverse engineer the tasks that need to be completed. Here's an outline of what this might look like: + +1. **Setting Success Metrics**: For each stage, define clear success metrics. These metrics should be quantitative and measurable, and they should align with the objectives of the stage. + +2. **Identifying Prerequisites**: Determine what needs to be in place before each stage can begin. This could include certain capabilities, resources, or technologies. + +3. **Developing Training Programs**: For each stage, develop a comprehensive training program. This should include a variety of tasks that will challenge the swarm and push it to + + develop the necessary capabilities. + +4. **Creating Testing Protocols**: Develop rigorous testing protocols for each stage. These protocols should test all aspects of the swarm's performance and they should be designed to push the swarm to its limits. + +5. **Iterating and Improving**: Based on the results of the tests, iterate and improve the swarm. This could involve adjusting the training program, modifying the swarm's architecture, or tweaking its learning algorithms. + +6. **Moving to the Next Stage**: Once the swarm has met the success metrics for a stage, it can move on to the next stage. This process continues until the swarm has reached the level of super-intelligence. + +This process will require a significant amount of time, resources, and effort. However, by following this structured approach, we can systematically guide the swarm towards super-intelligence. + diff --git a/DOCS/MONETIZATION.md b/DOCS/MONETIZATION.md new file mode 100644 index 00000000..d126a63e --- /dev/null +++ b/DOCS/MONETIZATION.md @@ -0,0 +1,91 @@ +Jeff Bezos, the founder of Amazon.com, is known for his customer-centric approach and long-term strategic thinking. Leveraging his methodology, here are five ways you could monetize the Swarms framework: + +1. **Platform as a Service (PaaS):** Create a cloud-based platform that allows users to build, run, and manage applications without the complexity of maintaining the infrastructure. You could charge users a subscription fee for access to the platform and provide different pricing tiers based on usage levels. This could be an attractive solution for businesses that do not have the capacity to build or maintain their own swarm intelligence solutions. + +2. **Professional Services:** Offer consultancy and implementation services to businesses looking to utilize the Swarm technology. This could include assisting with integration into existing systems, offering custom development services, or helping customers to build specific solutions using the framework. + +3. **Education and Training:** Create a certification program for developers or companies looking to become proficient with the Swarms framework. This could be sold as standalone courses, or bundled with other services. + +4. **Managed Services:** Some companies may prefer to outsource the management of their Swarm-based systems. A managed services solution could take care of all the technical aspects, from hosting the solution to ensuring it runs smoothly, allowing the customer to focus on their core business. + +5. **Data Analysis and Insights:** Swarm intelligence can generate valuable data and insights. By anonymizing and aggregating this data, you could provide industry reports, trend analysis, and other valuable insights to businesses. + +As for the type of platform, Swarms can be offered as a cloud-based solution given its scalability and flexibility. This would also allow you to apply a SaaS/PaaS type monetization model, which provides recurring revenue. + +Potential customers could range from small to large enterprises in various sectors such as logistics, eCommerce, finance, and technology, who are interested in leveraging artificial intelligence and machine learning for complex problem solving, optimization, and decision-making. + +**Product Brief Monetization Strategy:** + +Product Name: Swarms.AI Platform + +Product Description: A cloud-based AI and ML platform harnessing the power of swarm intelligence. + +1. **Platform as a Service (PaaS):** Offer tiered subscription plans (Basic, Premium, Enterprise) to accommodate different usage levels and business sizes. + +2. **Professional Services:** Offer consultancy and custom development services to tailor the Swarms solution to the specific needs of the business. + +3. **Education and Training:** Launch an online Swarms.AI Academy with courses and certifications for developers and businesses. + +4. **Managed Services:** Provide a premium, fully-managed service offering that includes hosting, maintenance, and 24/7 support. + +5. **Data Analysis and Insights:** Offer industry reports and customized insights generated from aggregated and anonymized Swarm data. + +Potential Customers: Enterprises in sectors such as logistics, eCommerce, finance, and technology. This can be sold globally, provided there's an internet connection. + +Marketing Channels: Online marketing (SEO, Content Marketing, Social Media), Partnerships with tech companies, Direct Sales to Enterprises. + +This strategy is designed to provide multiple revenue streams, while ensuring the Swarms.AI platform is accessible and useful to a range of potential customers. + +1. **AI Solution as a Service:** By offering the Swarms framework as a service, businesses can access and utilize the power of multiple LLM agents without the need to maintain the infrastructure themselves. Subscription can be tiered based on usage and additional features. + +2. **Integration and Custom Development:** Offer integration services to businesses wanting to incorporate the Swarms framework into their existing systems. Also, you could provide custom development for businesses with specific needs not met by the standard framework. + +3. **Training and Certification:** Develop an educational platform offering courses, webinars, and certifications on using the Swarms framework. This can serve both developers seeking to broaden their skills and businesses aiming to train their in-house teams. + +4. **Managed Swarms Solutions:** For businesses that prefer to outsource their AI needs, provide a complete solution which includes the development, maintenance, and continuous improvement of swarms-based applications. + +5. **Data Analytics Services:** Leveraging the aggregated insights from the AI swarms, you could offer data analytics services. Businesses can use these insights to make informed decisions and predictions. + +**Type of Platform:** + +Cloud-based platform or Software as a Service (SaaS) will be a suitable model. It offers accessibility, scalability, and ease of updates. + +**Target Customers:** + +The technology can be beneficial for businesses across sectors like eCommerce, technology, logistics, finance, healthcare, and education, among others. + +**Product Brief Monetization Strategy:** + +Product Name: Swarms.AI + +1. **AI Solution as a Service:** Offer different tiered subscriptions (Standard, Premium, and Enterprise) each with varying levels of usage and features. + +2. **Integration and Custom Development:** Offer custom development and integration services, priced based on the scope and complexity of the project. + +3. **Training and Certification:** Launch the Swarms.AI Academy with courses and certifications, available for a fee. + +4. **Managed Swarms Solutions:** Offer fully managed solutions tailored to business needs, priced based on scope and service level agreements. + +5. **Data Analytics Services:** Provide insightful reports and data analyses, which can be purchased on a one-off basis or through a subscription. + +By offering a variety of services and payment models, Swarms.AI will be able to cater to a diverse range of business needs, from small start-ups to large enterprises. Marketing channels would include digital marketing, partnerships with technology companies, presence in tech events, and direct sales to targeted industries. + + + +# Roadmap + +* Create a landing page for swarms apac.ai/product/swarms + +* Create Hosted Swarms API for anybody to just use without need for mega gpu infra, charge usage based pricing. Prerequisites for success => Swarms has to be extremely reliable + we need world class documentation and many daily users => how do we get many daily users? We provide a seamless and fluid experience, how do we create a seamless and fluid experience? We write good code that is modular, provides feedback to the user in times of distress, and ultimately accomplishes the user's tasks. + +* Hosted consumer and enterprise subscription as a service on The Domain, where users can interact with 1000s of APIs and ingest 1000s of different data streams. + +* Hosted dedicated capacity deals with mega enterprises on automating many operations with Swarms for monthly subscription 300,000+$ + +* Partnerships with enterprises, massive contracts with performance based fee + +* Have discord bot and or slack bot with users personal data, charge subscription + browser extension + +* each user gets a dedicated ocean instance of all their data so the swarm can query it as needed. + +* \ No newline at end of file diff --git a/PULL_REQUEST_TEMPLATE.yml b/PULL_REQUEST_TEMPLATE.yml new file mode 100644 index 00000000..1148e304 --- /dev/null +++ b/PULL_REQUEST_TEMPLATE.yml @@ -0,0 +1,26 @@ + \ No newline at end of file diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/api/container.py b/api/container.py new file mode 100644 index 00000000..6c591a01 --- /dev/null +++ b/api/container.py @@ -0,0 +1,61 @@ + +import os +import re +from pathlib import Path +from typing import Dict, List + +from fastapi.templating import Jinja2Templates + +from swarms.agents.workers.agents import AgentManager +from swarms.utils.utils import BaseHandler, FileHandler, FileType, StaticUploader, CsvToDataframe + +from swarms.tools.main import BaseToolSet, ExitConversation, RequestsGet, CodeEditor, Terminal + +from env import settings + + +BASE_DIR = Path(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +os.chdir(BASE_DIR / os.getenv["PLAYGROUND_DIR"]) + + +toolsets: List[BaseToolSet] = [ + Terminal(), + CodeEditor(), + RequestsGet(), + ExitConversation(), +] +handlers: Dict[FileType, BaseHandler] = {FileType.DATAFRAME: CsvToDataframe()} + +if os.getenv["USE_GPU"]: + import torch + + from swarms.tools.main import ImageCaptioning + from swarms.tools.main import ( + ImageEditing, + InstructPix2Pix, + Text2Image, + VisualQuestionAnswering, + ) + + if torch.cuda.is_available(): + toolsets.extend( + [ + Text2Image("cuda"), + ImageEditing("cuda"), + InstructPix2Pix("cuda"), + VisualQuestionAnswering("cuda"), + ] + ) + handlers[FileType.IMAGE] = ImageCaptioning("cuda") + +agent_manager = AgentManager.create(toolsets=toolsets) + +file_handler = FileHandler(handlers=handlers, path=BASE_DIR) + +templates = Jinja2Templates(directory=BASE_DIR / "api" / "templates") + +uploader = StaticUploader.from_settings( + settings, path=BASE_DIR / "static", endpoint="static" +) + +reload_dirs = [BASE_DIR / "swarms", BASE_DIR / "api"] \ No newline at end of file diff --git a/api/main.py b/api/main.py new file mode 100644 index 00000000..1d2a44ad --- /dev/null +++ b/api/main.py @@ -0,0 +1,130 @@ +import os +import re +from multiprocessing import Process +from tempfile import NamedTemporaryFile + +from typing import List, TypedDict +import uvicorn +from fastapi import FastAPI, Request, UploadFile +from fastapi.responses import HTMLResponse + +from fastapi.staticfiles import StaticFiles +from pydantic import BaseModel + +from api.container import agent_manager, file_handler, reload_dirs, templates, uploader +from api.worker import get_task_result, start_worker, task_execute + + +app = FastAPI() + +app.mount("/static", StaticFiles(directory=uploader.path), name="static") + + +class ExecuteRequest(BaseModel): + session: str + prompt: str + files: List[str] + + +class ExecuteResponse(TypedDict): + answer: str + files: List[str] + + +@app.get("/", response_class=HTMLResponse) +async def index(request: Request): + return templates.TemplateResponse("index.html", {"request": request}) + + +@app.get("/dashboard", response_class=HTMLResponse) +async def dashboard(request: Request): + return templates.TemplateResponse("dashboard.html", {"request": request}) + + +@app.post("/upload") +async def create_upload_file(files: List[UploadFile]): + urls = [] + for file in files: + extension = "." + file.filename.split(".")[-1] + with NamedTemporaryFile(suffix=extension) as tmp_file: + tmp_file.write(file.file.read()) + tmp_file.flush() + urls.append(uploader.upload(tmp_file.name)) + return {"urls": urls} + + +@app.post("/api/execute") +async def execute(request: ExecuteRequest) -> ExecuteResponse: + query = request.prompt + files = request.files + session = request.session + + executor = agent_manager.create_executor(session) + + promptedQuery = "\n".join([file_handler.handle(file) for file in files]) + promptedQuery += query + + try: + res = executor({"input": promptedQuery}) + except Exception as e: + return {"answer": str(e), "files": []} + + files = re.findall(r"\[file://\S*\]", res["output"]) + files = [file[1:-1].split("file://")[1] for file in files] + + return { + "answer": res["output"], + "files": [uploader.upload(file) for file in files], + } + + +@app.post("/api/execute/async") +async def execute_async(request: ExecuteRequest): + query = request.prompt + files = request.files + session = request.session + + promptedQuery = "\n".join([file_handler.handle(file) for file in files]) + promptedQuery += query + + execution = task_execute.delay(session, promptedQuery) + return {"id": execution.id} + + +@app.get("/api/execute/async/{execution_id}") +async def execute_async(execution_id: str): + execution = get_task_result(execution_id) + + result = {} + if execution.status == "SUCCESS" and execution.result: + output = execution.result.get("output", "") + files = re.findall(r"\[file://\S*\]", output) + files = [file[1:-1].split("file://")[1] for file in files] + result = { + "answer": output, + "files": [uploader.upload(file) for file in files], + } + + return { + "status": execution.status, + "info": execution.info, + "result": result, + } + + +def serve(): + p = Process(target=start_worker, args=[]) + p.start() + uvicorn.run("api.main:app", host="0.0.0.0", port=os.getenv["EVAL_PORT"]) + + +def dev(): + p = Process(target=start_worker, args=[]) + p.start() + uvicorn.run( + "api.main:app", + host="0.0.0.0", + port=os.getenv["EVAL_PORT"], + reload=True, + reload_dirs=reload_dirs, + ) \ No newline at end of file diff --git a/api/worker.py b/api/worker.py new file mode 100644 index 00000000..798d8ab8 --- /dev/null +++ b/api/worker.py @@ -0,0 +1,46 @@ +import os +from celery import Celery +from celery.result import AsyncResult + +from api.container import agent_manager +# from env import settings + +celery_broker = os.environ["CELERY_BROKER_URL"] + + +celery_app = Celery(__name__) +celery_app.conf.broker_url = celery_broker +celery_app.conf.result_backend = celery_broker +celery_app.conf.update( + task_track_started=True, + task_serializer="json", + accept_content=["json"], # Ignore other content + result_serializer="json", + enable_utc=True, +) + + +@celery_app.task(name="task_execute", bind=True) +def task_execute(self, session: str, prompt: str): + executor = agent_manager.create_executor(session, self) + response = executor({"input": prompt}) + result = {"output": response["output"]} + + previous = AsyncResult(self.request.id) + if previous and previous.info: + result.update(previous.info) + + return result + + +def get_task_result(task_id): + return AsyncResult(task_id) + + +def start_worker(): + celery_app.worker_main( + [ + "worker", + "--loglevel=INFO", + ] + ) \ No newline at end of file diff --git a/example.py b/example.py index 74e79812..1888c663 100644 --- a/example.py +++ b/example.py @@ -1,9 +1,13 @@ from swarms.models import OpenAIChat from swarms.structs import Flow -from langchain.schema.messages import ChatMessage +from langchain.schema.messages import ChatMessage, BaseMessage +import os +from dotenv import load_dotenv -message = ChatMessage(role="user", content='Translate the following English text to French: Hello World"') -api_key = "" +load_dotenv() + +message: BaseMessage = [ ChatMessage(role="user", content='Translate the following English text to French: Hello World"') ] +api_key = os.environ.get("OPENAI_API_KEY") # Initialize the language model, this model can be swapped out with Anthropic, ETC, Huggingface Models like Mistral, ETC llm = OpenAIChat( diff --git a/swarms/models/openai_models.py b/swarms/models/openai_models.py index b227cc07..f420173a 100644 --- a/swarms/models/openai_models.py +++ b/swarms/models/openai_models.py @@ -1,78 +1,91 @@ from __future__ import annotations -"""OpenAI chat wrapper.""" import logging import os import sys +import warnings from typing import ( - TYPE_CHECKING, + AbstractSet, Any, AsyncIterator, Callable, + Collection, Dict, Iterator, List, + Literal, Mapping, Optional, - Sequence, + Set, Tuple, - Type, Union, ) -from langchain.adapters.openai import convert_dict_to_message, convert_message_to_dict from langchain.callbacks.manager import ( AsyncCallbackManagerForLLMRun, CallbackManagerForLLMRun, ) -from langchain.chat_models.base import ( - BaseChatModel, - _agenerate_from_stream, - _generate_from_stream, -) -from langchain.llms.base import create_base_retry_decorator -from langchain.pydantic_v1 import BaseModel, Field, root_validator -from langchain.schema import ChatGeneration, ChatResult -from langchain.schema.language_model import LanguageModelInput -from langchain.schema.messages import ( - AIMessageChunk, - BaseMessage, - BaseMessageChunk, - ChatMessageChunk, - FunctionMessageChunk, - HumanMessageChunk, - SystemMessageChunk, - ToolMessageChunk, -) -from langchain.schema.output import ChatGenerationChunk -from langchain.schema.runnable import Runnable -from langchain.utils import ( - get_from_dict_or_env, - get_pydantic_field_names, -) +from langchain.llms.base import BaseLLM, create_base_retry_decorator +from langchain.pydantic_v1 import Field, root_validator +from langchain.schema import Generation, LLMResult +from langchain.schema.output import GenerationChunk +from langchain.utils import get_from_dict_or_env, get_pydantic_field_names from langchain.utils.openai import is_openai_v1 +from langchain.utils.utils import build_extra_kwargs -if TYPE_CHECKING: - import tiktoken +logger = logging.getLogger(__name__) -logger = logging.getLogger(__name__) +def update_token_usage( + keys: Set[str], response: Dict[str, Any], token_usage: Dict[str, Any] +) -> None: + """Update token usage.""" + _keys_to_use = keys.intersection(response["usage"]) + for _key in _keys_to_use: + if _key not in token_usage: + token_usage[_key] = response["usage"][_key] + else: + token_usage[_key] += response["usage"][_key] + + +def _stream_response_to_generation_chunk( + stream_response: Dict[str, Any], +) -> GenerationChunk: + """Convert a stream response to a generation chunk.""" + if not stream_response["choices"]: + return GenerationChunk(text="") + return GenerationChunk( + text=stream_response["choices"][0]["text"], + generation_info=dict( + finish_reason=stream_response["choices"][0].get("finish_reason", None), + logprobs=stream_response["choices"][0].get("logprobs", None), + ), + ) -def _import_tiktoken() -> Any: - try: - import tiktoken - except ImportError: - raise ValueError( - "Could not import tiktoken python package. " - "This is needed in order to calculate get_token_ids. " - "Please install it with `pip install tiktoken`." - ) - return tiktoken +def _update_response(response: Dict[str, Any], stream_response: Dict[str, Any]) -> None: + """Update response from the stream response.""" + response["choices"][0]["text"] += stream_response["choices"][0]["text"] + response["choices"][0]["finish_reason"] = stream_response["choices"][0].get( + "finish_reason", None + ) + response["choices"][0]["logprobs"] = stream_response["choices"][0]["logprobs"] + + +def _streaming_response_template() -> Dict[str, Any]: + return { + "choices": [ + { + "text": "", + "finish_reason": None, + "logprobs": None, + } + ] + } def _create_retry_decorator( - llm: OpenAIChat, + llm: Union[BaseOpenAI, OpenAIChat], run_manager: Optional[ Union[AsyncCallbackManagerForLLMRun, CallbackManagerForLLMRun] ] = None, @@ -91,8 +104,26 @@ def _create_retry_decorator( ) +def completion_with_retry( + llm: Union[BaseOpenAI, OpenAIChat], + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, +) -> Any: + """Use tenacity to retry the completion call.""" + if is_openai_v1(): + return llm.client.create(**kwargs) + + retry_decorator = _create_retry_decorator(llm, run_manager=run_manager) + + @retry_decorator + def _completion_with_retry(**kwargs: Any) -> Any: + return llm.client.create(**kwargs) + + return _completion_with_retry(**kwargs) + + async def acompletion_with_retry( - llm: OpenAIChat, + llm: Union[BaseOpenAI, OpenAIChat], run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, **kwargs: Any, ) -> Any: @@ -110,134 +141,8 @@ async def acompletion_with_retry( return await _completion_with_retry(**kwargs) -def _convert_delta_to_message_chunk( - _dict: Mapping[str, Any], default_class: Type[BaseMessageChunk] -) -> BaseMessageChunk: - role = _dict.get("role") - content = _dict.get("content") or "" - additional_kwargs: Dict = {} - if _dict.get("function_call"): - function_call = dict(_dict["function_call"]) - if "name" in function_call and function_call["name"] is None: - function_call["name"] = "" - additional_kwargs["function_call"] = function_call - if _dict.get("tool_calls"): - additional_kwargs["tool_calls"] = _dict["tool_calls"] - - if role == "user" or default_class == HumanMessageChunk: - return HumanMessageChunk(content=content) - elif role == "assistant" or default_class == AIMessageChunk: - return AIMessageChunk(content=content, additional_kwargs=additional_kwargs) - elif role == "system" or default_class == SystemMessageChunk: - return SystemMessageChunk(content=content) - elif role == "function" or default_class == FunctionMessageChunk: - return FunctionMessageChunk(content=content, name=_dict["name"]) - elif role == "tool" or default_class == ToolMessageChunk: - return ToolMessageChunk(content=content, tool_call_id=_dict["tool_call_id"]) - elif role or default_class == ChatMessageChunk: - return ChatMessageChunk(content=content, role=role) - else: - return default_class(content=content) - -class OpenAI(BaseChatModel): - """OpenAI large language models. - - To use, you should have the ``openai`` python package installed, and the - environment variable ``OPENAI_API_KEY`` set with your API key. - - Any parameters that are valid to be passed to the openai.create call can be passed - in, even if not explicitly saved on this class.., - - Example: - .. code-block:: python - - from swarms.models import OpenAI - openai = OpenAI(model_name="text-davinci-003") - openai("What is the report on the 2022 oympian games?") - """ - - @property - def _invocation_params(self) -> Dict[str, Any]: - return {**{"model": self.model_name}, **super()._invocation_params} - - -class AzureOpenAI(BaseChatModel): - """Azure-specific OpenAI large language models. - - To use, you should have the ``openai`` python package installed, and the - environment variable ``OPENAI_API_KEY`` set with your API key. - - Any parameters that are valid to be passed to the openai.create call can be passed - in, even if not explicitly saved on this class. - - Example: - .. code-block:: python - - from swarms.models import AzureOpenAI - openai = AzureOpenAI(model_name="text-davinci-003") - """ - - deployment_name: str = "" - """Deployment name to use.""" - openai_api_type: str = "" - openai_api_version: str = "" - - @root_validator() - def validate_azure_settings(cls, values: Dict) -> Dict: - values["openai_api_version"] = get_from_dict_or_env( - values, - "openai_api_version", - "OPENAI_API_VERSION", - ) - values["openai_api_type"] = get_from_dict_or_env( - values, "openai_api_type", "OPENAI_API_TYPE", "azure" - ) - return values - - @property - def _identifying_params(self) -> Mapping[str, Any]: - return { - **{"deployment_name": self.deployment_name}, - **super()._identifying_params, - } - - @property - def _invocation_params(self) -> Dict[str, Any]: - openai_params = { - "engine": self.deployment_name, - "api_type": self.openai_api_type, - "api_version": self.openai_api_version, - } - return {**openai_params, **super()._invocation_params} - - @property - def _llm_type(self) -> str: - """Return type of llm.""" - return "azure" - - @property - def lc_attributes(self) -> Dict[str, Any]: - return { - "openai_api_type": self.openai_api_type, - "openai_api_version": self.openai_api_version, - } - - -class OpenAIChat(BaseChatModel): - """`OpenAI` Chat large language models API. - - To use, you should have the ``openai`` python package installed, and the - environment variable ``OPENAI_API_KEY`` set with your API key. - - Any parameters that are valid to be passed to the openai.create call can be passed - in, even if not explicitly saved on this class. - - Example: - .. code-block:: python - - from langchain.chat_models import ChatOpenAI - openai = ChatOpenAI(model_name="gpt-3.5-turbo") - """ +class BaseOpenAI(BaseLLM): + """Base OpenAI large language model class.""" @property def lc_secrets(self) -> Dict[str, str]: @@ -246,13 +151,12 @@ class OpenAIChat(BaseChatModel): @property def lc_attributes(self) -> Dict[str, Any]: attributes: Dict[str, Any] = {} + if self.openai_api_base: + attributes["openai_api_base"] = self.openai_api_base if self.openai_organization: attributes["openai_organization"] = self.openai_organization - if self.openai_api_base: - attributes["openai_api_base"] = self.openai_api_base - if self.openai_proxy: attributes["openai_proxy"] = self.openai_proxy @@ -260,15 +164,28 @@ class OpenAIChat(BaseChatModel): @classmethod def is_lc_serializable(cls) -> bool: - """Return whether this model can be serialized by Langchain.""" return True - client: Any = None #: :meta private: - async_client: Any = None #: :meta private: - model_name: str = Field(default="gpt-3.5-turbo", alias="model") + client: Any = Field(default=None, exclude=True) #: :meta private: + async_client: Any = Field(default=None, exclude=True) #: :meta private: + model_name: str = Field(default="text-davinci-003", alias="model") """Model name to use.""" temperature: float = 0.7 """What sampling temperature to use.""" + max_tokens: int = 256 + """The maximum number of tokens to generate in the completion. + -1 returns as many tokens as possible given the prompt and + the models maximal context size.""" + top_p: float = 1 + """Total probability mass of tokens to consider at each step.""" + frequency_penalty: float = 0 + """Penalizes repeated tokens according to frequency.""" + presence_penalty: float = 0 + """Penalizes repeated tokens.""" + n: int = 1 + """How many completions to generate for each prompt.""" + best_of: int = 1 + """Generates best_of completions server-side and returns the "best".""" model_kwargs: Dict[str, Any] = Field(default_factory=dict) """Holds any model parameters valid for `create` call not explicitly specified.""" # When updating this to use a SecretStr @@ -283,19 +200,23 @@ class OpenAIChat(BaseChatModel): """Automatically inferred from env var `OPENAI_ORG_ID` if not provided.""" # to support explicit proxy for OpenAI openai_proxy: Optional[str] = None + batch_size: int = 20 + """Batch size to use when passing multiple documents to generate.""" request_timeout: Union[float, Tuple[float, float], Any, None] = Field( default=None, alias="timeout" ) """Timeout for requests to OpenAI completion API. Can be float, httpx.Timeout or None.""" + logit_bias: Optional[Dict[str, float]] = Field(default_factory=dict) + """Adjust the probability of specific tokens being generated.""" max_retries: int = 2 """Maximum number of retries to make when generating.""" streaming: bool = False """Whether to stream the results or not.""" - n: int = 1 - """Number of chat completions to generate for each prompt.""" - max_tokens: Optional[int] = None - """Maximum number of tokens to generate.""" + allowed_special: Union[Literal["all"], AbstractSet[str]] = set() + """Set of special tokens that are allowed。""" + disallowed_special: Union[Literal["all"], Collection[str]] = "all" + """Set of special tokens that are not allowed。""" tiktoken_model_name: Optional[str] = None """The model name to pass to tiktoken when using this class. Tiktoken is used to count the number of tokens in documents to constrain @@ -313,6 +234,20 @@ class OpenAIChat(BaseChatModel): http_client: Union[Any, None] = None """Optional httpx.Client.""" + def __new__(cls, **data: Any) -> Union[OpenAIChat, BaseOpenAI]: # type: ignore + """Initialize the OpenAI object.""" + model_name = data.get("model_name", "") + if ( + model_name.startswith("gpt-3.5-turbo") or model_name.startswith("gpt-4") + ) and "-instruct" not in model_name: + warnings.warn( + "You are trying to use a chat model. This way of initializing it is " + "no longer supported. Instead, please use: " + "`from langchain.chat_models import ChatOpenAI`" + ) + return OpenAIChat(**data) + return super().__new__(cls) + class Config: """Configuration for this pydantic object.""" @@ -323,25 +258,9 @@ class OpenAIChat(BaseChatModel): """Build extra kwargs from additional params that were passed in.""" all_required_field_names = get_pydantic_field_names(cls) extra = values.get("model_kwargs", {}) - for field_name in list(values): - if field_name in extra: - raise ValueError(f"Found {field_name} supplied twice.") - if field_name not in all_required_field_names: - logger.warning( - f"""WARNING! {field_name} is not default parameter. - {field_name} was transferred to model_kwargs. - Please confirm that {field_name} is what you intended.""" - ) - extra[field_name] = values.pop(field_name) - - invalid_model_kwargs = all_required_field_names.intersection(extra.keys()) - if invalid_model_kwargs: - raise ValueError( - f"Parameters {invalid_model_kwargs} should be specified explicitly. " - f"Instead they were passed in as part of `model_kwargs` parameter." - ) - - values["model_kwargs"] = extra + values["model_kwargs"] = build_extra_kwargs( + extra, values, all_required_field_names + ) return values @root_validator() @@ -349,18 +268,14 @@ class OpenAIChat(BaseChatModel): """Validate that api key and python package exists in environment.""" if values["n"] < 1: raise ValueError("n must be at least 1.") - if values["n"] > 1 and values["streaming"]: - raise ValueError("n must be 1 when streaming.") + if values["streaming"] and values["n"] > 1: + raise ValueError("Cannot stream results when n > 1.") + if values["streaming"] and values["best_of"] > 1: + raise ValueError("Cannot stream results when best_of > 1.") values["openai_api_key"] = get_from_dict_or_env( values, "openai_api_key", "OPENAI_API_KEY" ) - # Check OPENAI_ORGANIZATION for backwards compatibility. - values["openai_organization"] = ( - values["openai_organization"] - or os.getenv("OPENAI_ORG_ID") - or os.getenv("OPENAI_ORGANIZATION") - ) values["openai_api_base"] = values["openai_api_base"] or os.getenv( "OPENAI_API_BASE" ) @@ -370,9 +285,13 @@ class OpenAIChat(BaseChatModel): "OPENAI_PROXY", default="", ) + values["openai_organization"] = ( + values["openai_organization"] + or os.getenv("OPENAI_ORG_ID") + or os.getenv("OPENAI_ORGANIZATION") + ) try: import openai - except ImportError: raise ImportError( "Could not import openai python package. " @@ -390,213 +309,280 @@ class OpenAIChat(BaseChatModel): "default_query": values["default_query"], "http_client": values["http_client"], } - values["client"] = openai.OpenAI(**client_params).chat.completions - values["async_client"] = openai.AsyncOpenAI( - **client_params - ).chat.completions + if not values.get("client"): + values["client"] = openai.OpenAI(**client_params).completions + if not values.get("async_client"): + values["async_client"] = openai.AsyncOpenAI(**client_params).completions + elif not values.get("client"): + values["client"] = openai.Completion else: - values["client"] = openai.ChatCompletion + pass + return values @property def _default_params(self) -> Dict[str, Any]: """Get the default parameters for calling OpenAI API.""" - params = { - "model": self.model_name, - "stream": self.streaming, - "n": self.n, + normal_params: Dict[str, Any] = { "temperature": self.temperature, - **self.model_kwargs, + "top_p": self.top_p, + "frequency_penalty": self.frequency_penalty, + "presence_penalty": self.presence_penalty, + "n": self.n, + "logit_bias": self.logit_bias, } + if self.max_tokens is not None: - params["max_tokens"] = self.max_tokens + normal_params["max_tokens"] = self.max_tokens if self.request_timeout is not None and not is_openai_v1(): - params["request_timeout"] = self.request_timeout - return params + normal_params["request_timeout"] = self.request_timeout - def completion_with_retry( - self, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any - ) -> Any: - """Use tenacity to retry the completion call.""" - if is_openai_v1(): - return self.client.create(**kwargs) - - retry_decorator = _create_retry_decorator(self, run_manager=run_manager) - - @retry_decorator - def _completion_with_retry(**kwargs: Any) -> Any: - return self.client.create(**kwargs) - - return _completion_with_retry(**kwargs) - - def _combine_llm_outputs(self, llm_outputs: List[Optional[dict]]) -> dict: - overall_token_usage: dict = {} - system_fingerprint = None - for output in llm_outputs: - if output is None: - # Happens in streaming - continue - token_usage = output["token_usage"] - for k, v in token_usage.items(): - if k in overall_token_usage: - overall_token_usage[k] += v - else: - overall_token_usage[k] = v - if system_fingerprint is None: - system_fingerprint = output.get("system_fingerprint") - combined = {"token_usage": overall_token_usage, "model_name": self.model_name} - if system_fingerprint: - combined["system_fingerprint"] = system_fingerprint - return combined + # Azure gpt-35-turbo doesn't support best_of + # don't specify best_of if it is 1 + if self.best_of > 1: + normal_params["best_of"] = self.best_of + + return {**normal_params, **self.model_kwargs} def _stream( self, - messages: List[BaseMessage], + prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, - ) -> Iterator[ChatGenerationChunk]: - message_dicts, params = self._create_message_dicts(messages, stop) - params = {**params, **kwargs, "stream": True} + ) -> Iterator[GenerationChunk]: + params = {**self._invocation_params, **kwargs, "stream": True} + self.get_sub_prompts(params, [prompt], stop) # this mutates params + for stream_resp in completion_with_retry( + self, prompt=prompt, run_manager=run_manager, **params + ): + if not isinstance(stream_resp, dict): + stream_resp = stream_resp.dict() + chunk = _stream_response_to_generation_chunk(stream_resp) + yield chunk + if run_manager: + run_manager.on_llm_new_token( + chunk.text, + chunk=chunk, + verbose=self.verbose, + logprobs=chunk.generation_info["logprobs"] + if chunk.generation_info + else None, + ) - default_chunk_class = AIMessageChunk - for chunk in self.completion_with_retry( - messages=message_dicts, run_manager=run_manager, **params + async def _astream( + self, + prompt: str, + stop: Optional[List[str]] = None, + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> AsyncIterator[GenerationChunk]: + params = {**self._invocation_params, **kwargs, "stream": True} + self.get_sub_prompts(params, [prompt], stop) # this mutate params + async for stream_resp in await acompletion_with_retry( + self, prompt=prompt, run_manager=run_manager, **params ): - if not isinstance(chunk, dict): - chunk = chunk.dict() - if len(chunk["choices"]) == 0: - continue - choice = chunk["choices"][0] - chunk = _convert_delta_to_message_chunk( - choice["delta"], default_chunk_class - ) - finish_reason = choice.get("finish_reason") - generation_info = ( - dict(finish_reason=finish_reason) if finish_reason is not None else None - ) - default_chunk_class = chunk.__class__ - chunk = ChatGenerationChunk(message=chunk, generation_info=generation_info) + if not isinstance(stream_resp, dict): + stream_resp = stream_resp.dict() + chunk = _stream_response_to_generation_chunk(stream_resp) yield chunk if run_manager: - run_manager.on_llm_new_token(chunk.text, chunk=chunk) + await run_manager.on_llm_new_token( + chunk.text, + chunk=chunk, + verbose=self.verbose, + logprobs=chunk.generation_info["logprobs"] + if chunk.generation_info + else None, + ) def _generate( self, - messages: List[BaseMessage], + prompts: List[str], stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, - stream: Optional[bool] = None, **kwargs: Any, - ) -> ChatResult: - should_stream = stream if stream is not None else self.streaming - if should_stream: - stream_iter = self._stream( - messages, stop=stop, run_manager=run_manager, **kwargs - ) - return _generate_from_stream(stream_iter) - message_dicts, params = self._create_message_dicts(messages, stop) - params = {**params, **kwargs} - response = self.completion_with_retry( - messages=message_dicts, run_manager=run_manager, **params - ) - return self._create_chat_result(response) + ) -> LLMResult: + """Call out to OpenAI's endpoint with k unique prompts. - def _create_message_dicts( - self, messages: List[BaseMessage], stop: Optional[List[str]] - ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: - params = self._client_params - if stop is not None: - if "stop" in params: - raise ValueError("`stop` found in both the input and default params.") - params["stop"] = stop - message_dicts = [convert_message_to_dict(m) for m in messages] - return message_dicts, params + Args: + prompts: The prompts to pass into the model. + stop: Optional list of stop words to use when generating. - def _create_chat_result(self, response: Union[dict, BaseModel]) -> ChatResult: - generations = [] - if not isinstance(response, dict): - response = response.dict() - for res in response["choices"]: - message = convert_dict_to_message(res["message"]) - gen = ChatGeneration( - message=message, - generation_info=dict(finish_reason=res.get("finish_reason")), - ) - generations.append(gen) - token_usage = response.get("usage", {}) - llm_output = { - "token_usage": token_usage, - "model_name": self.model_name, - "system_fingerprint": response.get("system_fingerprint", ""), - } - return ChatResult(generations=generations, llm_output=llm_output) + Returns: + The full LLM output. - async def _astream( - self, - messages: List[BaseMessage], - stop: Optional[List[str]] = None, - run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, - **kwargs: Any, - ) -> AsyncIterator[ChatGenerationChunk]: - message_dicts, params = self._create_message_dicts(messages, stop) - params = {**params, **kwargs, "stream": True} + Example: + .. code-block:: python - default_chunk_class = AIMessageChunk - async for chunk in await acompletion_with_retry( - self, messages=message_dicts, run_manager=run_manager, **params - ): - if not isinstance(chunk, dict): - chunk = chunk.dict() - if len(chunk["choices"]) == 0: - continue - choice = chunk["choices"][0] - chunk = _convert_delta_to_message_chunk( - choice["delta"], default_chunk_class - ) - finish_reason = choice.get("finish_reason") - generation_info = ( - dict(finish_reason=finish_reason) if finish_reason is not None else None - ) - default_chunk_class = chunk.__class__ - chunk = ChatGenerationChunk(message=chunk, generation_info=generation_info) - yield chunk - if run_manager: - await run_manager.on_llm_new_token(token=chunk.text, chunk=chunk) + response = openai.generate(["Tell me a joke."]) + """ + # TODO: write a unit test for this + params = self._invocation_params + params = {**params, **kwargs} + sub_prompts = self.get_sub_prompts(params, prompts, stop) + choices = [] + token_usage: Dict[str, int] = {} + # Get the token usage from the response. + # Includes prompt, completion, and total tokens used. + _keys = {"completion_tokens", "prompt_tokens", "total_tokens"} + system_fingerprint: Optional[str] = None + for _prompts in sub_prompts: + if self.streaming: + if len(_prompts) > 1: + raise ValueError("Cannot stream results with multiple prompts.") + + generation: Optional[GenerationChunk] = None + for chunk in self._stream(_prompts[0], stop, run_manager, **kwargs): + if generation is None: + generation = chunk + else: + generation += chunk + assert generation is not None + choices.append( + { + "text": generation.text, + "finish_reason": generation.generation_info.get("finish_reason") + if generation.generation_info + else None, + "logprobs": generation.generation_info.get("logprobs") + if generation.generation_info + else None, + } + ) + else: + response = completion_with_retry( + self, prompt=_prompts, run_manager=run_manager, **params + ) + if not isinstance(response, dict): + # V1 client returns the response in an PyDantic object instead of + # dict. For the transition period, we deep convert it to dict. + response = response.dict() + + choices.extend(response["choices"]) + update_token_usage(_keys, response, token_usage) + if not system_fingerprint: + system_fingerprint = response.get("system_fingerprint") + return self.create_llm_result( + choices, + prompts, + token_usage, + system_fingerprint=system_fingerprint, + ) async def _agenerate( self, - messages: List[BaseMessage], + prompts: List[str], stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, - stream: Optional[bool] = None, **kwargs: Any, - ) -> ChatResult: - should_stream = stream if stream is not None else self.streaming - if should_stream: - stream_iter = self._astream( - messages, stop=stop, run_manager=run_manager, **kwargs - ) - return await _agenerate_from_stream(stream_iter) - - message_dicts, params = self._create_message_dicts(messages, stop) + ) -> LLMResult: + """Call out to OpenAI's endpoint async with k unique prompts.""" + params = self._invocation_params params = {**params, **kwargs} - response = await acompletion_with_retry( - self, messages=message_dicts, run_manager=run_manager, **params + sub_prompts = self.get_sub_prompts(params, prompts, stop) + choices = [] + token_usage: Dict[str, int] = {} + # Get the token usage from the response. + # Includes prompt, completion, and total tokens used. + _keys = {"completion_tokens", "prompt_tokens", "total_tokens"} + system_fingerprint: Optional[str] = None + for _prompts in sub_prompts: + if self.streaming: + if len(_prompts) > 1: + raise ValueError("Cannot stream results with multiple prompts.") + + generation: Optional[GenerationChunk] = None + async for chunk in self._astream( + _prompts[0], stop, run_manager, **kwargs + ): + if generation is None: + generation = chunk + else: + generation += chunk + assert generation is not None + choices.append( + { + "text": generation.text, + "finish_reason": generation.generation_info.get("finish_reason") + if generation.generation_info + else None, + "logprobs": generation.generation_info.get("logprobs") + if generation.generation_info + else None, + } + ) + else: + response = await acompletion_with_retry( + self, prompt=_prompts, run_manager=run_manager, **params + ) + if not isinstance(response, dict): + response = response.dict() + choices.extend(response["choices"]) + update_token_usage(_keys, response, token_usage) + return self.create_llm_result( + choices, + prompts, + token_usage, + system_fingerprint=system_fingerprint, ) - return self._create_chat_result(response) - @property - def _identifying_params(self) -> Dict[str, Any]: - """Get the identifying parameters.""" - return {**{"model_name": self.model_name}, **self._default_params} + def get_sub_prompts( + self, + params: Dict[str, Any], + prompts: List[str], + stop: Optional[List[str]] = None, + ) -> List[List[str]]: + """Get the sub prompts for llm call.""" + if stop is not None: + if "stop" in params: + raise ValueError("`stop` found in both the input and default params.") + params["stop"] = stop + if params["max_tokens"] == -1: + if len(prompts) != 1: + raise ValueError( + "max_tokens set to -1 not supported for multiple inputs." + ) + params["max_tokens"] = self.max_tokens_for_prompt(prompts[0]) + sub_prompts = [ + prompts[i : i + self.batch_size] + for i in range(0, len(prompts), self.batch_size) + ] + return sub_prompts + + def create_llm_result( + self, + choices: Any, + prompts: List[str], + token_usage: Dict[str, int], + *, + system_fingerprint: Optional[str] = None, + ) -> LLMResult: + """Create the LLMResult from the choices and prompts.""" + generations = [] + for i, _ in enumerate(prompts): + sub_choices = choices[i * self.n : (i + 1) * self.n] + generations.append( + [ + Generation( + text=choice["text"], + generation_info=dict( + finish_reason=choice.get("finish_reason"), + logprobs=choice.get("logprobs"), + ), + ) + for choice in sub_choices + ] + ) + llm_output = {"token_usage": token_usage, "model_name": self.model_name} + if system_fingerprint: + llm_output["system_fingerprint"] = system_fingerprint + return LLMResult(generations=generations, llm_output=llm_output) @property - def _client_params(self) -> Dict[str, Any]: - """Get the parameters used for the openai client.""" - openai_creds: Dict[str, Any] = { - "model": self.model_name, - } + def _invocation_params(self) -> Dict[str, Any]: + """Get the parameters used to invoke the model.""" + openai_creds: Dict[str, Any] = {} if not is_openai_v1(): openai_creds.update( { @@ -609,129 +595,613 @@ class OpenAIChat(BaseChatModel): import openai openai.proxy = {"http": self.openai_proxy, "https": self.openai_proxy} # type: ignore[assignment] # noqa: E501 - return {**self._default_params, **openai_creds} + return {**openai_creds, **self._default_params} - def _get_invocation_params( - self, stop: Optional[List[str]] = None, **kwargs: Any - ) -> Dict[str, Any]: - """Get the parameters used to invoke the model.""" - return { - "model": self.model_name, - **super()._get_invocation_params(stop=stop), - **self._default_params, - **kwargs, - } + @property + def _identifying_params(self) -> Mapping[str, Any]: + """Get the identifying parameters.""" + return {**{"model_name": self.model_name}, **self._default_params} @property def _llm_type(self) -> str: - """Return type of chat model.""" - return "openai-chat" + """Return type of llm.""" + return "openai" - def _get_encoding_model(self) -> Tuple[str, tiktoken.Encoding]: - tiktoken_ = _import_tiktoken() - if self.tiktoken_model_name is not None: - model = self.tiktoken_model_name - else: - model = self.model_name - if model == "gpt-3.5-turbo": - # gpt-3.5-turbo may change over time. - # Returning num tokens assuming gpt-3.5-turbo-0301. - model = "gpt-3.5-turbo-0301" - elif model == "gpt-4": - # gpt-4 may change over time. - # Returning num tokens assuming gpt-4-0314. - model = "gpt-4-0314" - # Returns the number of tokens used by a list of messages. + def get_token_ids(self, text: str) -> List[int]: + """Get the token IDs using the tiktoken package.""" + # tiktoken NOT supported for Python < 3.8 + if sys.version_info[1] < 8: + return super().get_num_tokens(text) + try: + import tiktoken + except ImportError: + raise ImportError( + "Could not import tiktoken python package. " + "This is needed in order to calculate get_num_tokens. " + "Please install it with `pip install tiktoken`." + ) + + model_name = self.tiktoken_model_name or self.model_name try: - encoding = tiktoken_.encoding_for_model(model) + enc = tiktoken.encoding_for_model(model_name) except KeyError: logger.warning("Warning: model not found. Using cl100k_base encoding.") model = "cl100k_base" - encoding = tiktoken_.get_encoding(model) - return model, encoding + enc = tiktoken.get_encoding(model) - def get_token_ids(self, text: str) -> List[int]: - """Get the tokens present in the text with tiktoken package.""" - # tiktoken NOT supported for Python 3.7 or below - if sys.version_info[1] <= 7: - return super().get_token_ids(text) - _, encoding_model = self._get_encoding_model() - return encoding_model.encode(text) - - def get_num_tokens_from_messages(self, messages: List[BaseMessage]) -> int: - """Calculate num tokens for gpt-3.5-turbo and gpt-4 with tiktoken package. - - Official documentation: https://github.com/openai/openai-cookbook/blob/ - main/examples/How_to_format_inputs_to_ChatGPT_models.ipynb""" - if sys.version_info[1] <= 7: - return super().get_num_tokens_from_messages(messages) - model, encoding = self._get_encoding_model() - if model.startswith("gpt-3.5-turbo-0301"): - # every message follows {role/name}\n{content}\n - tokens_per_message = 4 - # if there's a name, the role is omitted - tokens_per_name = -1 - elif model.startswith("gpt-3.5-turbo") or model.startswith("gpt-4"): - tokens_per_message = 3 - tokens_per_name = 1 + return enc.encode( + text, + allowed_special=self.allowed_special, + disallowed_special=self.disallowed_special, + ) + + @staticmethod + def modelname_to_contextsize(modelname: str) -> int: + """Calculate the maximum number of tokens possible to generate for a model. + + Args: + modelname: The modelname we want to know the context size for. + + Returns: + The maximum context size + + Example: + .. code-block:: python + + max_tokens = openai.modelname_to_contextsize("text-davinci-003") + """ + model_token_mapping = { + "gpt-4": 8192, + "gpt-4-0314": 8192, + "gpt-4-0613": 8192, + "gpt-4-32k": 32768, + "gpt-4-32k-0314": 32768, + "gpt-4-32k-0613": 32768, + "gpt-3.5-turbo": 4096, + "gpt-3.5-turbo-0301": 4096, + "gpt-3.5-turbo-0613": 4096, + "gpt-3.5-turbo-16k": 16385, + "gpt-3.5-turbo-16k-0613": 16385, + "gpt-3.5-turbo-instruct": 4096, + "text-ada-001": 2049, + "ada": 2049, + "text-babbage-001": 2040, + "babbage": 2049, + "text-curie-001": 2049, + "curie": 2049, + "davinci": 2049, + "text-davinci-003": 4097, + "text-davinci-002": 4097, + "code-davinci-002": 8001, + "code-davinci-001": 8001, + "code-cushman-002": 2048, + "code-cushman-001": 2048, + } + + # handling finetuned models + if "ft-" in modelname: + modelname = modelname.split(":")[0] + + context_size = model_token_mapping.get(modelname, None) + + if context_size is None: + raise ValueError( + f"Unknown model: {modelname}. Please provide a valid OpenAI model name." + "Known models are: " + ", ".join(model_token_mapping.keys()) + ) + + return context_size + + @property + def max_context_size(self) -> int: + """Get max context size for this model.""" + return self.modelname_to_contextsize(self.model_name) + + def max_tokens_for_prompt(self, prompt: str) -> int: + """Calculate the maximum number of tokens possible to generate for a prompt. + + Args: + prompt: The prompt to pass into the model. + + Returns: + The maximum number of tokens to generate for a prompt. + + Example: + .. code-block:: python + + max_tokens = openai.max_token_for_prompt("Tell me a joke.") + """ + num_tokens = self.get_num_tokens(prompt) + return self.max_context_size - num_tokens + + +class OpenAI(BaseOpenAI): + """OpenAI large language models. + + To use, you should have the ``openai`` python package installed, and the + environment variable ``OPENAI_API_KEY`` set with your API key. + + Any parameters that are valid to be passed to the openai.create call can be passed + in, even if not explicitly saved on this class. + + Example: + .. code-block:: python + + from langchain.llms import OpenAI + openai = OpenAI(model_name="text-davinci-003") + """ + + @property + def _invocation_params(self) -> Dict[str, Any]: + return {**{"model": self.model_name}, **super()._invocation_params} + + +class AzureOpenAI(BaseOpenAI): + """Azure-specific OpenAI large language models. + + To use, you should have the ``openai`` python package installed, and the + environment variable ``OPENAI_API_KEY`` set with your API key. + + Any parameters that are valid to be passed to the openai.create call can be passed + in, even if not explicitly saved on this class. + + Example: + .. code-block:: python + + from langchain.llms import AzureOpenAI + openai = AzureOpenAI(model_name="text-davinci-003") + """ + + azure_endpoint: Union[str, None] = None + """Your Azure endpoint, including the resource. + + Automatically inferred from env var `AZURE_OPENAI_ENDPOINT` if not provided. + + Example: `https://example-resource.azure.openai.com/` + """ + deployment_name: Union[str, None] = Field(default=None, alias="azure_deployment") + """A model deployment. + + If given sets the base client URL to include `/deployments/{azure_deployment}`. + Note: this means you won't be able to use non-deployment endpoints. + """ + openai_api_version: str = Field(default="", alias="api_version") + """Automatically inferred from env var `OPENAI_API_VERSION` if not provided.""" + openai_api_key: Union[str, None] = Field(default=None, alias="api_key") + """Automatically inferred from env var `AZURE_OPENAI_API_KEY` if not provided.""" + azure_ad_token: Union[str, None] = None + """Your Azure Active Directory token. + + Automatically inferred from env var `AZURE_OPENAI_AD_TOKEN` if not provided. + + For more: + https://www.microsoft.com/en-us/security/business/identity-access/microsoft-entra-id. + """ # noqa: E501 + azure_ad_token_provider: Union[str, None] = None + """A function that returns an Azure Active Directory token. + + Will be invoked on every request. + """ + openai_api_type: str = "" + """Legacy, for openai<1.0.0 support.""" + validate_base_url: bool = True + """For backwards compatibility. If legacy val openai_api_base is passed in, try to + infer if it is a base_url or azure_endpoint and update accordingly. + """ + + @root_validator() + def validate_environment(cls, values: Dict) -> Dict: + """Validate that api key and python package exists in environment.""" + if values["n"] < 1: + raise ValueError("n must be at least 1.") + if values["streaming"] and values["n"] > 1: + raise ValueError("Cannot stream results when n > 1.") + if values["streaming"] and values["best_of"] > 1: + raise ValueError("Cannot stream results when best_of > 1.") + + # Check OPENAI_KEY for backwards compatibility. + # TODO: Remove OPENAI_API_KEY support to avoid possible conflict when using + # other forms of azure credentials. + values["openai_api_key"] = ( + values["openai_api_key"] + or os.getenv("AZURE_OPENAI_API_KEY") + or os.getenv("OPENAI_API_KEY") + ) + + values["azure_endpoint"] = values["azure_endpoint"] or os.getenv( + "AZURE_OPENAI_ENDPOINT" + ) + values["azure_ad_token"] = values["azure_ad_token"] or os.getenv( + "AZURE_OPENAI_AD_TOKEN" + ) + values["openai_api_base"] = values["openai_api_base"] or os.getenv( + "OPENAI_API_BASE" + ) + values["openai_proxy"] = get_from_dict_or_env( + values, + "openai_proxy", + "OPENAI_PROXY", + default="", + ) + values["openai_organization"] = ( + values["openai_organization"] + or os.getenv("OPENAI_ORG_ID") + or os.getenv("OPENAI_ORGANIZATION") + ) + values["openai_api_version"] = values["openai_api_version"] or os.getenv( + "OPENAI_API_VERSION" + ) + values["openai_api_type"] = get_from_dict_or_env( + values, "openai_api_type", "OPENAI_API_TYPE", default="azure" + ) + try: + import openai + except ImportError: + raise ImportError( + "Could not import openai python package. " + "Please install it with `pip install openai`." + ) + if is_openai_v1(): + # For backwards compatibility. Before openai v1, no distinction was made + # between azure_endpoint and base_url (openai_api_base). + openai_api_base = values["openai_api_base"] + if openai_api_base and values["validate_base_url"]: + if "/openai" not in openai_api_base: + values["openai_api_base"] = ( + values["openai_api_base"].rstrip("/") + "/openai" + ) + warnings.warn( + "As of openai>=1.0.0, Azure endpoints should be specified via " + f"the `azure_endpoint` param not `openai_api_base` " + f"(or alias `base_url`). Updating `openai_api_base` from " + f"{openai_api_base} to {values['openai_api_base']}." + ) + if values["deployment_name"]: + warnings.warn( + "As of openai>=1.0.0, if `deployment_name` (or alias " + "`azure_deployment`) is specified then " + "`openai_api_base` (or alias `base_url`) should not be. " + "Instead use `deployment_name` (or alias `azure_deployment`) " + "and `azure_endpoint`." + ) + if values["deployment_name"] not in values["openai_api_base"]: + warnings.warn( + "As of openai>=1.0.0, if `openai_api_base` " + "(or alias `base_url`) is specified it is expected to be " + "of the form " + "https://example-resource.azure.openai.com/openai/deployments/example-deployment. " # noqa: E501 + f"Updating {openai_api_base} to " + f"{values['openai_api_base']}." + ) + values["openai_api_base"] += ( + "/deployments/" + values["deployment_name"] + ) + values["deployment_name"] = None + client_params = { + "api_version": values["openai_api_version"], + "azure_endpoint": values["azure_endpoint"], + "azure_deployment": values["deployment_name"], + "api_key": values["openai_api_key"], + "azure_ad_token": values["azure_ad_token"], + "azure_ad_token_provider": values["azure_ad_token_provider"], + "organization": values["openai_organization"], + "base_url": values["openai_api_base"], + "timeout": values["request_timeout"], + "max_retries": values["max_retries"], + "default_headers": values["default_headers"], + "default_query": values["default_query"], + "http_client": values["http_client"], + } + values["client"] = openai.AzureOpenAI(**client_params).completions + values["async_client"] = openai.AsyncAzureOpenAI( + **client_params + ).completions + + else: + values["client"] = openai.Completion + + return values + + @property + def _identifying_params(self) -> Mapping[str, Any]: + return { + **{"deployment_name": self.deployment_name}, + **super()._identifying_params, + } + + @property + def _invocation_params(self) -> Dict[str, Any]: + if is_openai_v1(): + openai_params = {"model": self.deployment_name} else: - raise NotImplementedError( - f"get_num_tokens_from_messages() is not presently implemented " - f"for model {model}." - "See https://github.com/openai/openai-python/blob/main/chatml.md for " - "information on how messages are converted to tokens." + openai_params = { + "engine": self.deployment_name, + "api_type": self.openai_api_type, + "api_version": self.openai_api_version, + } + return {**openai_params, **super()._invocation_params} + + @property + def _llm_type(self) -> str: + """Return type of llm.""" + return "azure" + + @property + def lc_attributes(self) -> Dict[str, Any]: + return { + "openai_api_type": self.openai_api_type, + "openai_api_version": self.openai_api_version, + } + + +class OpenAIChat(BaseLLM): + """OpenAI Chat large language models. + + To use, you should have the ``openai`` python package installed, and the + environment variable ``OPENAI_API_KEY`` set with your API key. + + Any parameters that are valid to be passed to the openai.create call can be passed + in, even if not explicitly saved on this class. + + Example: + .. code-block:: python + + from langchain.llms import OpenAIChat + openaichat = OpenAIChat(model_name="gpt-3.5-turbo") + """ + + client: Any = Field(default=None, exclude=True) #: :meta private: + async_client: Any = Field(default=None, exclude=True) #: :meta private: + model_name: str = "gpt-3.5-turbo" + """Model name to use.""" + model_kwargs: Dict[str, Any] = Field(default_factory=dict) + """Holds any model parameters valid for `create` call not explicitly specified.""" + # When updating this to use a SecretStr + # Check for classes that derive from this class (as some of them + # may assume openai_api_key is a str) + openai_api_key: Optional[str] = Field(default=None, alias="api_key") + """Automatically inferred from env var `OPENAI_API_KEY` if not provided.""" + openai_api_base: Optional[str] = Field(default=None, alias="base_url") + """Base URL path for API requests, leave blank if not using a proxy or service + emulator.""" + # to support explicit proxy for OpenAI + openai_proxy: Optional[str] = None + max_retries: int = 6 + """Maximum number of retries to make when generating.""" + prefix_messages: List = Field(default_factory=list) + """Series of messages for Chat input.""" + streaming: bool = False + """Whether to stream the results or not.""" + allowed_special: Union[Literal["all"], AbstractSet[str]] = set() + """Set of special tokens that are allowed。""" + disallowed_special: Union[Literal["all"], Collection[str]] = "all" + """Set of special tokens that are not allowed。""" + + @root_validator(pre=True) + def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: + """Build extra kwargs from additional params that were passed in.""" + all_required_field_names = {field.alias for field in cls.__fields__.values()} + + extra = values.get("model_kwargs", {}) + for field_name in list(values): + if field_name not in all_required_field_names: + if field_name in extra: + raise ValueError(f"Found {field_name} supplied twice.") + extra[field_name] = values.pop(field_name) + values["model_kwargs"] = extra + return values + + @root_validator() + def validate_environment(cls, values: Dict) -> Dict: + """Validate that api key and python package exists in environment.""" + openai_api_key = get_from_dict_or_env( + values, "openai_api_key", "OPENAI_API_KEY" + ) + openai_api_base = get_from_dict_or_env( + values, + "openai_api_base", + "OPENAI_API_BASE", + default="", + ) + openai_proxy = get_from_dict_or_env( + values, + "openai_proxy", + "OPENAI_PROXY", + default="", + ) + openai_organization = get_from_dict_or_env( + values, "openai_organization", "OPENAI_ORGANIZATION", default="" + ) + try: + import openai + + openai.api_key = openai_api_key + if openai_api_base: + openai.api_base = openai_api_base + if openai_organization: + openai.organization = openai_organization + if openai_proxy: + openai.proxy = {"http": openai_proxy, "https": openai_proxy} # type: ignore[assignment] # noqa: E501 + except ImportError: + raise ImportError( + "Could not import openai python package. " + "Please install it with `pip install openai`." + ) + try: + values["client"] = openai.ChatCompletion + except AttributeError: + raise ValueError( + "`openai` has no `ChatCompletion` attribute, this is likely " + "due to an old version of the openai package. Try upgrading it " + "with `pip install --upgrade openai`." + ) + warnings.warn( + "You are trying to use a chat model. This way of initializing it is " + "no longer supported. Instead, please use: " + "`from langchain.chat_models import ChatOpenAI`" + ) + return values + + @property + def _default_params(self) -> Dict[str, Any]: + """Get the default parameters for calling OpenAI API.""" + return self.model_kwargs + + def _get_chat_params( + self, prompts: List[str], stop: Optional[List[str]] = None + ) -> Tuple: + if len(prompts) > 1: + raise ValueError( + f"OpenAIChat currently only supports single prompt, got {prompts}" ) - num_tokens = 0 - messages_dict = [convert_message_to_dict(m) for m in messages] - for message in messages_dict: - num_tokens += tokens_per_message - for key, value in message.items(): - # Cast str(value) in case the message value is not a string - # This occurs with function messages - num_tokens += len(encoding.encode(str(value))) - if key == "name": - num_tokens += tokens_per_name - # every reply is primed with assistant - num_tokens += 3 - return num_tokens - - def bind_functions( + messages = self.prefix_messages + [{"role": "user", "content": prompts[0]}] + params: Dict[str, Any] = {**{"model": self.model_name}, **self._default_params} + if stop is not None: + if "stop" in params: + raise ValueError("`stop` found in both the input and default params.") + params["stop"] = stop + if params.get("max_tokens") == -1: + # for ChatGPT api, omitting max_tokens is equivalent to having no limit + del params["max_tokens"] + return messages, params + + def _stream( self, - functions: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable]], - function_call: Optional[str] = None, + prompt: str, + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, - ) -> Runnable[LanguageModelInput, BaseMessage]: - """Bind functions (and other objects) to this chat model. + ) -> Iterator[GenerationChunk]: + messages, params = self._get_chat_params([prompt], stop) + params = {**params, **kwargs, "stream": True} + for stream_resp in completion_with_retry( + self, messages=messages, run_manager=run_manager, **params + ): + if not isinstance(stream_resp, dict): + stream_resp = stream_resp.dict() + token = stream_resp["choices"][0]["delta"].get("content", "") + chunk = GenerationChunk(text=token) + yield chunk + if run_manager: + run_manager.on_llm_new_token(token, chunk=chunk) - Args: - functions: A list of function definitions to bind to this chat model. - Can be a dictionary, pydantic model, or callable. Pydantic - models and callables will be automatically converted to - their schema dictionary representation. - function_call: Which function to require the model to call. - Must be the name of the single provided function or - "auto" to automatically determine which function to call - (if any). - kwargs: Any additional parameters to pass to the - :class:`~langchain.runnable.Runnable` constructor. - """ - from langchain.chains.openai_functions.base import convert_to_openai_function + async def _astream( + self, + prompt: str, + stop: Optional[List[str]] = None, + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> AsyncIterator[GenerationChunk]: + messages, params = self._get_chat_params([prompt], stop) + params = {**params, **kwargs, "stream": True} + async for stream_resp in await acompletion_with_retry( + self, messages=messages, run_manager=run_manager, **params + ): + if not isinstance(stream_resp, dict): + stream_resp = stream_resp.dict() + token = stream_resp["choices"][0]["delta"].get("content", "") + chunk = GenerationChunk(text=token) + yield chunk + if run_manager: + await run_manager.on_llm_new_token(token, chunk=chunk) - formatted_functions = [convert_to_openai_function(fn) for fn in functions] - if function_call is not None: - if len(formatted_functions) != 1: - raise ValueError( - "When specifying `function_call`, you must provide exactly one " - "function." - ) - if formatted_functions[0]["name"] != function_call: - raise ValueError( - f"Function call {function_call} was specified, but the only " - f"provided function was {formatted_functions[0]['name']}." - ) - function_call_ = {"name": function_call} - kwargs = {**kwargs, "function_call": function_call_} - return super().bind( - functions=formatted_functions, - **kwargs, + def _generate( + self, + prompts: List[str], + stop: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> LLMResult: + if self.streaming: + generation: Optional[GenerationChunk] = None + for chunk in self._stream(prompts[0], stop, run_manager, **kwargs): + if generation is None: + generation = chunk + else: + generation += chunk + assert generation is not None + return LLMResult(generations=[[generation]]) + + messages, params = self._get_chat_params(prompts, stop) + params = {**params, **kwargs} + full_response = completion_with_retry( + self, messages=messages, run_manager=run_manager, **params + ) + if not isinstance(full_response, dict): + full_response = full_response.dict() + llm_output = { + "token_usage": full_response["usage"], + "model_name": self.model_name, + } + return LLMResult( + generations=[ + [Generation(text=full_response["choices"][0]["message"]["content"])] + ], + llm_output=llm_output, + ) + + async def _agenerate( + self, + prompts: List[str], + stop: Optional[List[str]] = None, + run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: Any, + ) -> LLMResult: + if self.streaming: + generation: Optional[GenerationChunk] = None + async for chunk in self._astream(prompts[0], stop, run_manager, **kwargs): + if generation is None: + generation = chunk + else: + generation += chunk + assert generation is not None + return LLMResult(generations=[[generation]]) + + messages, params = self._get_chat_params(prompts, stop) + params = {**params, **kwargs} + full_response = await acompletion_with_retry( + self, messages=messages, run_manager=run_manager, **params + ) + if not isinstance(full_response, dict): + full_response = full_response.dict() + llm_output = { + "token_usage": full_response["usage"], + "model_name": self.model_name, + } + return LLMResult( + generations=[ + [Generation(text=full_response["choices"][0]["message"]["content"])] + ], + llm_output=llm_output, + ) + + @property + def _identifying_params(self) -> Mapping[str, Any]: + """Get the identifying parameters.""" + return {**{"model_name": self.model_name}, **self._default_params} + + @property + def _llm_type(self) -> str: + """Return type of llm.""" + return "openai-chat" + + def get_token_ids(self, text: str) -> List[int]: + """Get the token IDs using the tiktoken package.""" + # tiktoken NOT supported for Python < 3.8 + if sys.version_info[1] < 8: + return super().get_token_ids(text) + try: + import tiktoken + except ImportError: + raise ImportError( + "Could not import tiktoken python package. " + "This is needed in order to calculate get_num_tokens. " + "Please install it with `pip install tiktoken`." + ) + + enc = tiktoken.encoding_for_model(self.model_name) + return enc.encode( + text, + allowed_special=self.allowed_special, + disallowed_special=self.disallowed_special, )