[UPDATED DOCS]

pull/595/head
Your Name 3 months ago
parent a71af8b807
commit 6be0db2c16

@ -0,0 +1,290 @@
import os
from datetime import datetime
from typing import List, Optional
from dotenv import load_dotenv
from loguru import logger
from pydantic import BaseModel, Field
from swarm_models import OpenAIChat
from swarms import Agent
from swarms.prompts.finance_agent_sys_prompt import (
FINANCIAL_AGENT_SYS_PROMPT,
)
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key,
model_name="gpt-4o-mini",
temperature=0.1,
max_tokens=2000,
)
# Initialize the agent
agent = Agent(
agent_name="Financial-Analysis-Agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT,
llm=model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="finance_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
# output_type="json",
output_type=str,
)
class ThoughtLog(BaseModel):
"""
Pydantic model to log each thought generated by the agent.
"""
thought: str
timestamp: datetime = Field(default_factory=datetime.now)
recursion_depth: int
class MemoryLog(BaseModel):
"""
Pydantic model to log memory states during the agent's execution.
"""
thoughts: List[ThoughtLog] = []
final_result: Optional[str] = None
completion_status: bool = False
task: str
class RecursiveAgent(Agent):
"""
An autonomous agent built on top of the Swarms Agent framework.
Capable of recursively exploring tasks using a Tree of Thoughts mechanism.
Attributes:
- agent_name (str): The name of the agent.
- system_prompt (str): The system prompt guiding the agent's behavior.
- max_loops (int): The maximum depth for recursion in the Tree of Thoughts.
- memory_limit (int): The maximum number of thought logs to store.
- memory (MemoryLog): Pydantic model to store thoughts and logs.
"""
def __init__(
self,
agent_name: str,
system_prompt: str,
max_loops: int,
memory_limit: int = 5,
agent: Agent = agent,
*args,
**kwargs,
) -> None:
"""
Initialize the RecursiveAgent.
:param agent_name: Name of the agent.
:param system_prompt: The prompt guiding the agent's behavior.
:param max_loops: The maximum number of recursive loops allowed.
:param memory_limit: Maximum number of memory entries.
:param kwargs: Additional arguments passed to the base Agent.
"""
super().__init__(agent_name=agent_name, **kwargs)
self.system_prompt = system_prompt
self.max_loops = max_loops
self.memory = MemoryLog(task="")
self.memory_limit = memory_limit # Max thoughts to store
self.finished = False # Task completion flag
self.agent = agent(
agent_name=agent_name,
system_prompt=system_prompt,
max_loops=max_loops,
)
logger.info(
f"Initialized agent {self.agent_name} with recursion limit of {self.max_loops}"
)
def add_to_memory(
self, thought: str, recursion_depth: int
) -> None:
"""
Add a thought to the agent's memory using the Pydantic ThoughtLog model.
:param thought: The thought generated by the agent.
:param recursion_depth: The depth of the current recursion.
"""
if len(self.memory.thoughts) >= self.memory_limit:
logger.debug(
"Memory limit reached, discarding the oldest memory entry."
)
self.memory.thoughts.pop(0) # Maintain memory size
thought_log = ThoughtLog(
thought=thought, recursion_depth=recursion_depth
)
self.memory.thoughts.append(thought_log)
logger.info(
f"Added thought to memory at depth {recursion_depth}: {thought}"
)
def check_if_finished(self, current_thought: str) -> bool:
"""
Check if the task is finished by evaluating the current thought.
:param current_thought: The current thought or reasoning result.
:return: True if task completion keywords are found, else False.
"""
# Define criteria for task completion based on keywords
completion_criteria = [
"criteria met",
"task completed",
"done",
"fully solved",
]
if any(
keyword in current_thought.lower()
for keyword in completion_criteria
):
self.finished = True
self.memory.completion_status = True
logger.info(
f"Task completed with thought: {current_thought}"
)
return self.finished
def run_tree_of_thoughts(
self, task: str, current_depth: int = 0
) -> Optional[str]:
"""
Recursively explore thought branches based on the Tree of Thoughts mechanism.
:param task: The task or query to be reasoned upon.
:param current_depth: The current recursion depth.
:return: The final solution or message indicating task completion or failure.
"""
logger.debug(f"Current recursion depth: {current_depth}")
if current_depth >= self.max_loops:
logger.warning(
"Max recursion depth reached, task incomplete."
)
return "Max recursion depth reached, task incomplete."
# Generate multiple possible thoughts/branches using Swarms logic
response = self.generate_thoughts(task)
thoughts = self.extract_thoughts(response)
self.memory.task = task # Log the task in memory
# Store thoughts in memory
for idx, thought in enumerate(thoughts):
logger.info(
f"Exploring thought {idx + 1}/{len(thoughts)}: {thought}"
)
self.add_to_memory(thought, current_depth)
if self.check_if_finished(thought):
self.memory.final_result = (
thought # Log the final result
)
return f"Task completed with thought: {thought}"
# Recursive exploration
result = self.run_tree_of_thoughts(
thought, current_depth + 1
)
if self.finished:
return result
return "Exploration done but no valid solution found."
def generate_thoughts(self, task: str) -> str:
"""
Generate thoughts for the task using the Swarms framework.
:param task: The task or query to generate thoughts for.
:return: A string representing multiple thought branches generated by Swarms logic.
"""
logger.debug(f"Generating thoughts for task: {task}")
response = self.agent.run(
task
) # Assuming Swarms uses an LLM for thought generation
return response
def extract_thoughts(self, response: str) -> List[str]:
"""
Extract individual thoughts/branches from the LLM's response.
:param response: The response string containing multiple thoughts.
:return: A list of extracted thoughts.
"""
logger.debug(f"Extracting thoughts from response: {response}")
return [
thought.strip()
for thought in response.split("\n")
if thought
]
def reflect(self) -> str:
"""
Reflect on the task and thoughts stored in memory, providing a summary of the process.
The reflection will be generated by the LLM based on the stored thoughts.
:return: Reflection output generated by the LLM.
"""
logger.debug("Running reflection on the task.")
# Compile all thoughts into a prompt for reflection
thoughts_for_reflection = "\n".join(
[
f"Thought {i + 1}: {log.thought}"
for i, log in enumerate(self.memory.thoughts)
]
)
reflection_prompt = (
f"Reflect on the following task and thoughts:\n"
f"Task: {self.memory.task}\n"
f"Thoughts:\n{thoughts_for_reflection}\n"
"What did we learn from this? How could this process be improved?"
)
# Use the agent's LLM to generate a reflection based on the memory
reflection_response = self.agent.run(reflection_prompt)
self.memory.final_result = reflection_response
logger.info(f"Reflection generated: {reflection_response}")
return reflection_response
# # Example usage of the RecursiveAgent
# if __name__ == "__main__":
# # Example initialization and running
# agent_name = "Autonomous-Financial-Agent"
# system_prompt = "You are a highly intelligent agent designed to handle financial queries efficiently."
# max_loops = 1
# # Initialize the agent using Swarms
# agent = RecursiveAgent(
# agent_name=agent_name,
# system_prompt=system_prompt,
# max_loops=max_loops
# )
# # Define the task for the agent
# task = "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria?"
# # Run the tree of thoughts mechanism
# result = agent.run_tree_of_thoughts(task)
# logger.info(f"Final result: {result}")
# # Perform reflection
# reflection = agent.reflect()
# logger.info(f"Reflection: {reflection}")

@ -8,9 +8,9 @@ from pydantic import BaseModel, Field
from swarms import ( from swarms import (
Agent, Agent,
OpenAIChat,
SpreadSheetSwarm, SpreadSheetSwarm,
) )
from swarm_models import OpenAIChat
from swarm_models.openai_function_caller import OpenAIFunctionCaller from swarm_models.openai_function_caller import OpenAIFunctionCaller
agent_pool = [] agent_pool = []
@ -240,17 +240,17 @@ def design_and_run_swarm(task: str = None):
return build_and_run_swarm(agent_pool, task_for_agent) return build_and_run_swarm(agent_pool, task_for_agent)
# out = design_and_run_swarm( out = design_and_run_swarm(
# """ """
# Create a swarm of agents to that are specialized in every social media platform to market the swarms github framework which makes it easy for you to orchestrate and manage multiple agents in a swarm. Create a swarm of agents to that are specialized in every social media platform to market the swarms github framework which makes it easy for you to orchestrate and manage multiple agents in a swarm.
# Create a minimum of 10 agents that are hyper-specialized in different areas of social media marketing. Create a minimum of 10 agents that are hyper-specialized in different areas of social media marketing.
# We need to promote the new SpreadSheet Swarm feature that allows you to run multiple agents in parallel and manage them from a single dashboard. We need to promote the new SpreadSheet Swarm feature that allows you to run multiple agents in parallel and manage them from a single dashboard.
# Here is the link: https://docs.swarms.world/en/latest/swarms/structs/spreadsheet_swarm/ Here is the link: https://docs.swarms.world/en/latest/swarms/structs/spreadsheet_swarm/
# """ """
# ) )
# print(out) print(out)
# out = design_and_run_swarm( # out = design_and_run_swarm(
@ -330,12 +330,12 @@ def design_and_run_swarm(task: str = None):
# print(out) # print(out)
out = design_and_run_swarm( # out = design_and_run_swarm(
""" # """
Create a swarm of agents that will generate the python code to generate the qr codes for the following links: # Create a swarm of agents that will generate the python code to generate the qr codes for the following links:
Telegram: https://t.me/+Sm4J-sSkw8c0ODA5 # Telegram: https://t.me/+Sm4J-sSkw8c0ODA5
Discord: https://discord.gg/F8sSH4Gh # • Discord: https://discord.gg/F8sSH4Gh
https://lu.ma/GPTuesdays?k=c # https://lu.ma/GPTuesdays?k=c
""" # """
) # )
print(out) # print(out)

@ -47,7 +47,9 @@ In a Parallel Swarm architecture, multiple agents operate independently and simu
**Use-Cases:** **Use-Cases:**
- Tasks that can be processed independently, such as parallel data analysis. - Tasks that can be processed independently, such as parallel data analysis.
- Large-scale simulations where multiple scenarios are run in parallel. - Large-scale simulations where multiple scenarios are run in parallel.
```mermaid ```mermaid
@ -66,6 +68,7 @@ graph LR
A Sequential Swarm architecture processes tasks in a linear sequence. Each agent completes its task before passing the result to the next agent in the chain. This architecture ensures orderly processing and is useful when tasks have dependencies. [Learn more here in the docs:](https://docs.swarms.world/en/latest/swarms/structs/agent_rearrange/) A Sequential Swarm architecture processes tasks in a linear sequence. Each agent completes its task before passing the result to the next agent in the chain. This architecture ensures orderly processing and is useful when tasks have dependencies. [Learn more here in the docs:](https://docs.swarms.world/en/latest/swarms/structs/agent_rearrange/)
**Use-Cases:** **Use-Cases:**
- Workflows where each step depends on the previous one, such as assembly lines or sequential data processing. - Workflows where each step depends on the previous one, such as assembly lines or sequential data processing.
- Scenarios requiring strict order of operations. - Scenarios requiring strict order of operations.

@ -89,6 +89,10 @@ Swarm Agent is a powerful autonomous agent framework designed to connect Languag
| `custom_planning_prompt` | A string representing a custom prompt for planning. | | `custom_planning_prompt` | A string representing a custom prompt for planning. |
| `memory_chunk_size` | An integer representing the maximum size of memory chunks for long-term memory retrieval. | | `memory_chunk_size` | An integer representing the maximum size of memory chunks for long-term memory retrieval. |
| `agent_ops_on` | A boolean indicating whether agent operations should be enabled. | | `agent_ops_on` | A boolean indicating whether agent operations should be enabled. |
| `return_step_meta` | A boolean indicating whether or not to return JSON of all the steps and additional metadata |
| `output_type` | A Literal type indicating whether to output "string", "str", "list", "json", "dict", "yaml" |
### `Agent` Methods ### `Agent` Methods
@ -164,10 +168,11 @@ agent = Agent(
retry_attempts=1, retry_attempts=1,
context_length=200000, context_length=200000,
return_step_meta=False, return_step_meta=False,
output_type="str",
) )
out = agent.run( agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria"
) )
print(out) print(out)
@ -396,7 +401,9 @@ from swarms.structs import Agent
from my_sentiment_analyzer import sentiment_analyzer_function from my_sentiment_analyzer import sentiment_analyzer_function
# Initialize the agent with a sentiment analyzer # Initialize the agent with a sentiment analyzer
agent = Agent(llm=llm, max_loops=3, sentiment_analyzer=sentiment_analyzer_function) agent = Agent(
agent_name = "sentiment-analyzer-agent-01", system_prompt="..."
llm=llm, max_loops=3, sentiment_analyzer=sentiment_analyzer_function)
``` ```
## Documentation Examples ## Documentation Examples

@ -35,11 +35,10 @@ agent = Agent(
context_length=200000, context_length=200000,
return_step_meta=False, return_step_meta=False,
# output_type="json", # output_type="json",
output_type=str, output_type="string",
) )
out = agent.run( agent.run(
"How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria" "How can I establish a ROTH IRA to buy stocks and get a tax break? What are the criteria"
) )
print(out)

@ -3,7 +3,8 @@ from typing import Any, Dict, Optional
import requests import requests
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from swarms import OpenAIFunctionCaller, Conversation from swarms import Conversation
from swarm_models import OpenAIFunctionCaller
from loguru import logger from loguru import logger
import os import os

@ -1,21 +0,0 @@
from swarms import tool
# Create the wrapper to wrap the function
@tool(
name="Geo Coordinates Locator",
description=(
"Locates geo coordinates with a city and or zip code"
),
return_string=False,
return_dict=False,
)
def send_api_request_to_get_geo_coordinates(
city: str = None, zip: int = None
):
return "Test"
# Run the function to get the schema
out = send_api_request_to_get_geo_coordinates()
print(out)

@ -1,58 +0,0 @@
from swarms import Agent
from swarm_models import Anthropic, tool
# Model
llm = Anthropic(
temperature=0.1,
)
"""
How to create tools:
1. Define a function that takes the required arguments with documentation and type hints.
2. Add the `@tool` decorator to the function.
3. Add the function to the `tools` list in the `Agent` class.
"""
# Tools
# Browser tools
@tool
def browser(query: str):
"""
Opens a web browser and searches for the given query on Google.
Args:
query (str): The search query.
Returns:
str: A message indicating that the search is being performed.
"""
import webbrowser
url = f"https://www.google.com/search?q={query}"
webbrowser.open(url)
return f"Searching for {query} in the browser."
# Agent
agent = Agent(
agent_name="Devin",
system_prompt=(
"Autonomous agent that can interact with humans and other"
" agents. Be Helpful and Kind. Use the tools provided to"
" assist the user. Return all code in markdown format."
),
llm=llm,
max_loops="auto",
autosave=True,
dashboard=False,
verbose=True,
stopping_token="<DONE>",
interactive=True,
tools=[browser],
)
# Run the agent
out = agent.run("what's the weather in Miami?")
print(out)

@ -0,0 +1,258 @@
import os
import requests
from loguru import logger
from swarms import Agent
from swarm_models import OpenAIChat
from pydantic import BaseModel, Field
from typing import Any, Dict, Optional
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")
# Define the system prompt for the pharmaceutical analysis agent
PHARMACEUTICAL_AGENT_SYS_PROMPT = """
You are an expert pharmaceutical data analyst. Your task is to analyze chemical and protein data to provide detailed insights into their potential interactions and uses in drug development. Use the provided data and ensure your analysis is scientifically accurate, reliable, and considers potential side effects and clinical trials.
Always answer in a structured, detailed format. Consider the following information when analyzing:
- Chemical: {chemical_title}, Molecular Formula: {chemical_formula}
- Protein: {protein_name}, Function: {protein_function}
Your goal is to provide a comprehensive understanding of how these chemical compounds might interact with the protein and their potential use cases in medicine, considering real-world clinical scenarios.
"""
# Pydantic Model for chemical data
class ChemicalData(BaseModel):
title: Optional[str] = Field(None, title="Chemical Title")
molecular_formula: Optional[str] = Field(
None, title="Molecular Formula"
)
isomeric_smiles: Optional[str] = Field(
None, title="Isomeric SMILES"
)
# Pydantic Model for protein data
class ProteinData(BaseModel):
entry_name: Optional[str] = Field(
None, title="Protein Entry Name"
)
function: Optional[str] = Field(None, title="Protein Function")
# Pydantic Model for the analysis output
class AnalysisOutput(BaseModel):
analysis_id: str = Field(..., title="Unique Analysis ID")
timestamp: str = Field(..., title="Timestamp of the analysis")
chemical_data: Optional[ChemicalData] = Field(
None, title="Chemical Data"
)
protein_data: Optional[ProteinData] = Field(
None, title="Protein Data"
)
analysis_result: Optional[str] = Field(
None, title="Result from the agent analysis"
)
# Create an instance of the OpenAIChat class
model = OpenAIChat(
openai_api_key=api_key, model_name="gpt-4o-mini", temperature=0.1
)
# Initialize the Swarms Agent
agent = Agent(
agent_name="Pharmaceutical-Analysis-Agent",
# system_prompt=PHARMACEUTICAL_AGENT_SYS_PROMPT,
llm=model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="pharmaceutical_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
)
class PharmaDataIntegration:
def __init__(self):
"""
Initializes the integration class for Swarms and public pharmaceutical APIs (PubChem, UniProt).
"""
pass
@logger.catch
def fetch_chemical_data(self, compound_id: str) -> ChemicalData:
"""
Fetch chemical data from the PubChem API based on compound ID. No API key is required.
:param compound_id: The PubChem compound ID to fetch data for.
:return: Pydantic model containing chemical data.
"""
url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/cid/{compound_id}/property/Title,MolecularFormula,IsomericSMILES/JSON"
logger.debug(
f"Fetching chemical data for compound ID: {compound_id}"
)
response = requests.get(url)
if response.status_code == 200:
logger.info(
f"Successfully fetched chemical data for compound ID: {compound_id}"
)
data = (
response.json()
.get("PropertyTable", {})
.get("Properties", [{}])[0]
)
return ChemicalData(
title=data.get("Title", "Unknown Chemical"),
molecular_formula=data.get(
"MolecularFormula", "Unknown Formula"
),
isomeric_smiles=data.get(
"IsomericSMILES", "Unknown SMILES"
),
)
else:
logger.error(
f"Failed to fetch chemical data for compound ID: {compound_id}, Status Code: {response.status_code}"
)
return ChemicalData()
@logger.catch
def fetch_protein_data(self, protein_id: str) -> ProteinData:
"""
Fetch protein data from the UniProt API based on protein ID. No API key is required.
:param protein_id: The UniProt protein ID to fetch data for.
:return: Pydantic model containing protein data.
"""
url = f"https://www.uniprot.org/uniprot/{protein_id}.json"
logger.debug(
f"Fetching protein data for protein ID: {protein_id}"
)
response = requests.get(url)
if response.status_code == 200:
logger.info(
f"Successfully fetched protein data for protein ID: {protein_id}"
)
data = response.json()
return ProteinData(
entry_name=data.get("entryName", "Unknown Protein"),
function=data.get("function", "Unknown Function"),
)
else:
logger.error(
f"Failed to fetch protein data for protein ID: {protein_id}, Status Code: {response.status_code}"
)
return ProteinData()
@logger.catch
def analyze_data_with_swarms_agent(
self,
chemical_data: Optional[ChemicalData],
protein_data: Optional[ProteinData],
) -> str:
"""
Use the Swarms Agent to analyze the fetched chemical and protein data.
:param chemical_data: Data fetched from PubChem about the chemical.
:param protein_data: Data fetched from UniProt about the protein.
:return: Analysis result from the Swarms Agent.
"""
# Fill in the system prompt with the actual data
agent_input = PHARMACEUTICAL_AGENT_SYS_PROMPT.format(
chemical_title=(
chemical_data.title if chemical_data else "Unknown"
),
chemical_formula=(
chemical_data.molecular_formula
if chemical_data
else "Unknown"
),
protein_name=(
protein_data.entry_name if protein_data else "Unknown"
),
protein_function=(
protein_data.function if protein_data else "Unknown"
),
)
logger.debug(
"Running Swarms Agent with the provided chemical and protein data."
)
out = agent.run(agent_input)
logger.info(f"Swarms Agent analysis result: {out}")
return out
@logger.catch
def run(
self,
task: str,
protein_id: Optional[str] = None,
compound_id: Optional[str] = None,
*args,
**kwargs,
) -> AnalysisOutput:
"""
The main method that dynamically handles task, protein, and chemical analysis.
:param task: Natural language task that guides the analysis (e.g., "Analyze the effects of this protein").
:param protein_id: (Optional) Protein ID from UniProt.
:param compound_id: (Optional) Compound ID from PubChem.
:return: JSON output with chemical, protein, and analysis data.
"""
chemical_data = None
protein_data = None
# Dynamic task handling
if "protein" in task.lower() and protein_id:
logger.debug(f"Task is protein-related: {task}")
protein_data = self.fetch_protein_data(protein_id)
logger.info(protein_data)
if "chemical" in task.lower() and compound_id:
logger.debug(f"Task is chemical-related: {task}")
chemical_data = self.fetch_chemical_data(compound_id)
# Analyze data using the Swarms Agent
analysis_result = self.analyze_data_with_swarms_agent(
chemical_data, protein_data
)
# Create the output model
output = AnalysisOutput(
analysis_id=f"{compound_id or 'unknown'}-{protein_id or 'unknown'}",
timestamp=datetime.utcnow().isoformat(),
chemical_data=chemical_data,
protein_data=protein_data,
analysis_result=analysis_result,
)
# Log the JSON output
# logger.info(f"Final analysis result as JSON: {output.json(indent=2)}")
# Return the structured JSON output
return output.model_dump_json(indent=4)
# Example usage:
if __name__ == "__main__":
pharma_integration = PharmaDataIntegration()
# Example: Analyze the effects of a specific protein and chemical compound
result = pharma_integration.run(
task="Analyze this compound and provide an analysis",
# protein_id="P12345",
compound_id="19833",
)
# Print the result in JSON format
print(result)

@ -0,0 +1,351 @@
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Union
import aiohttp
import requests
from dotenv import load_dotenv
from loguru import logger
from swarm_models import OpenAIChat
from swarms import Agent
load_dotenv()
# New Pharmaceutical Agent System Prompt
PHARMA_AGENT_SYS_PROMPT = """
You are a pharmaceutical data analysis agent specializing in retrieving and analyzing chemical data.
You have access to the latest chemical databases and can provide detailed analysis of any chemical compounds
relevant to pharmaceutical research. Your goal is to assist pharmaceutical companies in retrieving chemical
properties, safety data, and usage details for various compounds.
When given a chemical name, you will:
1. Retrieve the relevant chemical properties such as molecular weight, CAS number, chemical formula,
melting point, boiling point, and solubility.
2. Analyze the chemical properties and provide insights on the compound's potential applications in
pharmaceuticals, safety precautions, and any known interactions with other compounds.
3. If you encounter missing or incomplete data, make a note of it and proceed with the available information,
ensuring you provide the most relevant and accurate analysis.
You will respond in a structured format and, where applicable, recommend further reading or research papers.
Keep responses concise but informative, with a focus on helping pharmaceutical companies make informed decisions
about chemical compounds.
If there are specific safety risks or regulatory concerns, highlight them clearly.
"""
class PharmaAgent:
"""
A pharmaceutical data agent that dynamically fetches chemical data from external sources and uses an LLM
to analyze and respond to queries related to chemicals for pharmaceutical companies.
Attributes:
api_key (str): The OpenAI API key for accessing the LLM.
agent (Agent): An instance of the swarms Agent class to manage interactions with the LLM.
"""
def __init__(
self,
model_name: str = "gpt-4o-mini",
temperature: float = 0.1,
):
"""
Initializes the PharmaAgent with the OpenAI model and necessary configurations.
Args:
model_name (str): The name of the LLM model to use.
temperature (float): The temperature for the LLM to control randomness.
"""
self.api_key = os.getenv("OPENAI_API_KEY")
logger.info("Initializing OpenAI model and Agent...")
model = OpenAIChat(
openai_api_key=self.api_key,
model_name=model_name,
temperature=temperature,
)
# Initialize the agent
self.agent = Agent(
agent_name="Pharmaceutical-Data-Agent",
system_prompt=PHARMA_AGENT_SYS_PROMPT,
llm=model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path="pharma_agent.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
)
logger.info("Agent initialized successfully.")
def get_latest_chemical_data(
self, chemical_name: str
) -> Union[Dict[str, Any], Dict[str, str]]:
"""
Fetches the latest chemical data dynamically from PubChem's API.
Args:
chemical_name (str): The name of the chemical to query.
Returns:
Dict[str, Any]: A dictionary containing chemical data if successful, or an error message if failed.
"""
logger.info(f"Fetching data for chemical: {chemical_name}")
base_url = (
"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name"
)
response = requests.get(f"{base_url}/{chemical_name}/JSON")
if response.status_code == 200:
chemical_data = response.json()
try:
compound_info = chemical_data["PC_Compounds"][0]
chemical_properties = {
"name": compound_info.get("props", [])[0]
.get("urn", {})
.get("label", "Unknown"),
"molecular_weight": compound_info.get(
"props", []
)[1]
.get("value", {})
.get("fval", "Unknown"),
"CAS_number": compound_info.get("props", [])[2]
.get("urn", {})
.get("label", "Unknown"),
"formula": compound_info.get("props", [])[3]
.get("value", {})
.get("sval", "Unknown"),
"properties": {
"melting_point": compound_info.get(
"props", []
)[4]
.get("value", {})
.get("fval", "Unknown"),
"boiling_point": compound_info.get(
"props", []
)[5]
.get("value", {})
.get("fval", "Unknown"),
"solubility": "miscible with water", # Placeholder as PubChem may not provide this
},
}
logger.info(
f"Data successfully retrieved for chemical: {chemical_name}"
)
return chemical_properties
except (IndexError, KeyError):
logger.error(
f"Incomplete data for chemical: {chemical_name}"
)
return {
"error": "Chemical data not found or incomplete"
}
else:
logger.error(
f"Failed to fetch chemical data. Status code: {response.status_code}"
)
return {
"error": f"Failed to fetch chemical data. Status code: {response.status_code}"
}
def query_chemical_data(self, chemical_name: str) -> str:
"""
Queries the latest chemical data and passes it to the LLM agent for further analysis and response.
Args:
chemical_name (str): The name of the chemical to query.
Returns:
str: The response from the LLM agent after analyzing the chemical data.
"""
chemical_data = self.get_latest_chemical_data(chemical_name)
if "error" in chemical_data:
return f"Error: {chemical_data['error']}"
prompt = f"Fetch and analyze the latest chemical data for {chemical_name}: {chemical_data}"
logger.info(
f"Sending chemical data to agent for analysis: {chemical_name}"
)
return self.agent.run(prompt)
def run(self, chemical_name: str) -> str:
"""
Main method to fetch and analyze the latest chemical data using the LLM agent.
Args:
chemical_name (str): The name of the chemical to query.
Returns:
str: The result of the chemical query processed by the agent.
"""
logger.info(f"Running chemical query for: {chemical_name}")
return self.query_chemical_data(chemical_name)
def run_concurrently(
self, chemical_names: List[str]
) -> List[str]:
"""
Runs multiple chemical queries concurrently using ThreadPoolExecutor.
Args:
chemical_names (List[str]): List of chemical names to query.
Returns:
List[str]: List of results from the LLM agent for each chemical.
"""
logger.info("Running chemical queries concurrently...")
results = []
with ThreadPoolExecutor() as executor:
future_to_chemical = {
executor.submit(self.run, chemical): chemical
for chemical in chemical_names
}
for future in as_completed(future_to_chemical):
chemical = future_to_chemical[future]
try:
result = future.result()
logger.info(f"Completed query for: {chemical}")
results.append(result)
except Exception as exc:
logger.error(
f"Chemical {chemical} generated an exception: {exc}"
)
results.append(f"Error querying {chemical}")
return results
async def fetch_chemical_data_async(
self, session: aiohttp.ClientSession, chemical_name: str
) -> Union[Dict[str, Any], Dict[str, str]]:
"""
Asynchronously fetches chemical data using aiohttp.
Args:
session (aiohttp.ClientSession): An aiohttp client session.
chemical_name (str): The name of the chemical to query.
Returns:
Union[Dict[str, Any], Dict[str, str]]: A dictionary containing chemical data if successful, or an error message if failed.
"""
logger.info(
f"Fetching data asynchronously for chemical: {chemical_name}"
)
base_url = (
"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name"
)
async with session.get(
f"{base_url}/{chemical_name}/JSON"
) as response:
if response.status == 200:
chemical_data = await response.json()
try:
compound_info = chemical_data["PC_Compounds"][0]
chemical_properties = {
"name": compound_info.get("props", [])[0]
.get("urn", {})
.get("label", "Unknown"),
"molecular_weight": compound_info.get(
"props", []
)[1]
.get("value", {})
.get("fval", "Unknown"),
"CAS_number": compound_info.get("props", [])[
2
]
.get("urn", {})
.get("label", "Unknown"),
"formula": compound_info.get("props", [])[3]
.get("value", {})
.get("sval", "Unknown"),
"properties": {
"melting_point": compound_info.get(
"props", []
)[4]
.get("value", {})
.get("fval", "Unknown"),
"boiling_point": compound_info.get(
"props", []
)[5]
.get("value", {})
.get("fval", "Unknown"),
"solubility": "miscible with water", # Placeholder as PubChem may not provide this
},
}
logger.info(
f"Data successfully retrieved for chemical: {chemical_name}"
)
return chemical_properties
except (IndexError, KeyError):
logger.error(
f"Incomplete data for chemical: {chemical_name}"
)
return {
"error": "Chemical data not found or incomplete"
}
else:
logger.error(
f"Failed to fetch chemical data. Status code: {response.status}"
)
return {
"error": f"Failed to fetch chemical data. Status code: {response.status}"
}
async def run_async(self, chemical_name: str) -> str:
"""
Asynchronously runs the agent to fetch and analyze the latest chemical data.
Args:
chemical_name (str): The name of the chemical to query.
Returns:
str: The result of the chemical query processed by the agent.
"""
async with aiohttp.ClientSession() as session:
chemical_data = await self.fetch_chemical_data_async(
session, chemical_name
)
if "error" in chemical_data:
return f"Error: {chemical_data['error']}"
prompt = f"Fetch and analyze the latest chemical data for {chemical_name}: {chemical_data}"
logger.info(
f"Sending chemical data to agent for analysis: {chemical_name}"
)
return self.agent.run(prompt)
async def run_many_async(
self, chemical_names: List[str]
) -> List[str]:
"""
Runs multiple chemical queries asynchronously using aiohttp and asyncio.
Args:
chemical_names (List[str]): List of chemical names to query.
Returns:
List[str]: List of results from the LLM agent for each chemical.
"""
logger.info(
"Running multiple chemical queries asynchronously..."
)
tasks = []
async with aiohttp.ClientSession() as session:
for chemical in chemical_names:
task = self.run_async(chemical)
tasks.append(task)
return await asyncio.gather(*tasks)
# Example usage
if __name__ == "__main__":
pharma_agent = PharmaAgent()
# Example of running concurrently
chemical_names = ["formaldehyde", "acetone", "ethanol"]
concurrent_results = pharma_agent.run_concurrently(chemical_names)
print(concurrent_results)
Loading…
Cancel
Save