feat: add advanced technical use case examples for real-world scenarios

- Implemented `market_system.py` for financial market analysis
- Added `enterprise_doc_processor.py` for document classification and processing
- Created `diagnostic_system.py` for healthcare diagnostics using agent architecture
pull/819/head
Pavan Kumar 2 days ago committed by ascender1729
parent 7febc47960
commit b8dda6f92b

@ -0,0 +1,71 @@
from swarms.structs.agent import Agent
from swarms.prompts.finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
# Technical Analysis Specialist
technical_analyst = Agent(
agent_name="Technical-Analysis-Expert",
agent_description="Advanced technical analysis specialist focusing on complex market patterns",
system_prompt="""You are an expert Technical Analyst specializing in:
1. Advanced Pattern Recognition (Elliot Wave, Wyckoff Method)
2. Multi-timeframe Analysis
3. Volume Profile Analysis
4. Market Structure Analysis
5. Intermarket Analysis""",
max_loops=3,
model_name="gpt-4"
)
# Fundamental Analysis Expert
fundamental_analyst = Agent(
agent_name="Fundamental-Analysis-Expert",
agent_description="Deep fundamental analysis specialist",
system_prompt="""You are a Fundamental Analysis expert focusing on:
1. Advanced Financial Statement Analysis
2. Economic Indicator Impact Assessment
3. Industry Competitive Analysis
4. Global Macro Trends
5. Corporate Governance Evaluation""",
max_loops=3,
model_name="gpt-4"
)
# Risk Management Specialist
risk_analyst = Agent(
agent_name="Risk-Management-Expert",
agent_description="Complex risk analysis and management specialist",
system_prompt="""You are a Risk Management expert specializing in:
1. Portfolio Risk Assessment
2. Value at Risk (VaR) Analysis
3. Stress Testing Scenarios
4. Correlation Analysis
5. Risk-Adjusted Performance Metrics""",
max_loops=3,
model_name="gpt-4"
)
class MarketAnalysisSystem:
def __init__(self):
self.agents = [technical_analyst, fundamental_analyst, risk_analyst]
def comprehensive_analysis(self, asset_data):
analysis_results = []
for agent in self.agents:
analysis = agent.run(f"Analyze this asset data: {asset_data}")
analysis_results.append({
"analyst": agent.agent_name,
"analysis": analysis
})
# Synthesize results through risk analyst for final recommendation
final_analysis = risk_analyst.run(
f"Synthesize these analyses and provide a final recommendation: {analysis_results}"
)
return {
"detailed_analysis": analysis_results,
"final_recommendation": final_analysis
}
# Usage
analysis_system = MarketAnalysisSystem()

@ -0,0 +1,67 @@
from swarms.structs.agent import Agent
from swarms.utils.pdf_to_text import pdf_to_text
import asyncio
class DocumentProcessingPipeline:
def __init__(self):
self.document_analyzer = Agent(
agent_name="Document-Analyzer",
agent_description="Enterprise document analysis specialist",
system_prompt="""You are an expert document analyzer specializing in:
1. Complex Document Structure Analysis
2. Key Information Extraction
3. Compliance Verification
4. Document Classification
5. Content Validation""",
max_loops=2,
model_name="gpt-4"
)
self.legal_reviewer = Agent(
agent_name="Legal-Reviewer",
agent_description="Legal compliance and risk assessment specialist",
system_prompt="""You are a legal review expert focusing on:
1. Regulatory Compliance Check
2. Legal Risk Assessment
3. Contractual Obligation Analysis
4. Privacy Requirement Verification
5. Legal Term Extraction""",
max_loops=2,
model_name="gpt-4"
)
self.data_extractor = Agent(
agent_name="Data-Extractor",
agent_description="Structured data extraction specialist",
system_prompt="""You are a data extraction expert specializing in:
1. Named Entity Recognition
2. Relationship Extraction
3. Tabular Data Processing
4. Metadata Extraction
5. Data Standardization""",
max_loops=2,
model_name="gpt-4"
)
async def process_document(self, document_path):
# Convert document to text
document_text = pdf_to_text(document_path)
# Parallel processing tasks
tasks = [
self.document_analyzer.arun(f"Analyze this document: {document_text}"),
self.legal_reviewer.arun(f"Review legal aspects: {document_text}"),
self.data_extractor.arun(f"Extract structured data: {document_text}")
]
results = await asyncio.gather(*tasks)
return {
"document_analysis": results[0],
"legal_review": results[1],
"extracted_data": results[2]
}
# Usage
processor = DocumentProcessingPipeline()

@ -0,0 +1,69 @@
from swarms.structs.agent import Agent
from typing import Dict, List
class HealthcareDiagnosticSystem:
def __init__(self):
self.primary_diagnostician = Agent(
agent_name="Primary-Diagnostician",
agent_description="Primary diagnostic analysis specialist",
system_prompt="""You are a primary diagnostician expert in:
1. Initial Symptom Analysis
2. Patient History Evaluation
3. Preliminary Diagnosis Formation
4. Risk Factor Assessment
5. Treatment Priority Determination""",
max_loops=3,
model_name="gpt-4"
)
self.specialist_consultant = Agent(
agent_name="Specialist-Consultant",
agent_description="Specialized medical consultation expert",
system_prompt="""You are a medical specialist focusing on:
1. Complex Case Analysis
2. Specialized Treatment Planning
3. Comorbidity Assessment
4. Treatment Risk Evaluation
5. Advanced Diagnostic Interpretation""",
max_loops=3,
model_name="gpt-4"
)
self.treatment_coordinator = Agent(
agent_name="Treatment-Coordinator",
agent_description="Treatment planning and coordination specialist",
system_prompt="""You are a treatment coordination expert specializing in:
1. Treatment Plan Development
2. Care Coordination
3. Resource Allocation
4. Recovery Timeline Planning
5. Follow-up Protocol Design""",
max_loops=3,
model_name="gpt-4"
)
def process_case(self, patient_data: Dict) -> Dict:
# Initial diagnosis
primary_assessment = self.primary_diagnostician.run(
f"Perform initial diagnosis: {patient_data}"
)
# Specialist consultation
specialist_review = self.specialist_consultant.run(
f"Review case with initial assessment: {primary_assessment}"
)
# Treatment planning
treatment_plan = self.treatment_coordinator.run(
f"Develop treatment plan based on: Primary: {primary_assessment}, Specialist: {specialist_review}"
)
return {
"initial_assessment": primary_assessment,
"specialist_review": specialist_review,
"treatment_plan": treatment_plan
}
# Usage
diagnostic_system = HealthcareDiagnosticSystem()
Loading…
Cancel
Save