Add task execution methods to Agent class with persian language support for status messages

pull/818/head
MohammadAminDHM 2 months ago
parent 769fd87488
commit cceb503e00

@ -0,0 +1,43 @@
from swarms import Agent, Swarm
from swarms.utils.language_config import Language
def main():
# ایجاد عامل‌ها با زبان فارسی
agent1 = Agent(
name="تحلیل‌گر",
task="تحلیل داده‌های فروش",
language=Language.PERSIAN
)
agent2 = Agent(
name="گزارش‌گر",
task="تولید گزارش تحلیلی",
language=Language.PERSIAN
)
# ایجاد گروه با زبان فارسی
swarm = Swarm(
agents=[agent1, agent2],
language=Language.PERSIAN
)
# اجرای وظیفه
task = """
لطفاً دادههای فروش سه ماه گذشته را تحلیل کنید و یک گزارش تحلیلی تهیه کنید.
در گزارش موارد زیر را بررسی کنید:
1. روند فروش ماهانه
2. محصولات پرفروش
3. پیشنهادات برای بهبود فروش
"""
try:
results = swarm.run(task)
print("\nنتایج:")
for agent_name, result in results.items():
print(f"\n{agent_name}:")
print(result)
except Exception as e:
print(f"خطا در اجرای وظیفه: {str(e)}")
if __name__ == "__main__":
main()

@ -63,6 +63,7 @@ from swarms.utils.history_output_formatter import (
from swarms.utils.litellm_tokenizer import count_tokens
from swarms.utils.litellm_wrapper import LiteLLM
from swarms.utils.pdf_to_text import pdf_to_text
from ..utils.language_config import language_config, Language
# Utils
@ -2717,3 +2718,20 @@ class Agent:
role="Output Cleaner",
content=response,
)
def run(self, task: str) -> str:
"""Run the agent with the given task."""
try:
print(language_config.get_translation("status_messages", "task_started"))
result = self._execute_task(task)
print(language_config.get_translation("status_messages", "task_completed"))
return result
except Exception as e:
error_msg = language_config.get_translation("error_messages", "task_failed")
print(f"{error_msg}: {str(e)}")
raise
def _execute_task(self, task: str) -> str:
"""Execute the given task."""
print(language_config.get_translation("status_messages", "task_in_progress"))
// ... existing code ...

@ -0,0 +1,36 @@
from ..utils.language_config import language_config, Language
class Swarm:
def __init__(
self,
agents: List[Agent],
language: Language = Language.ENGLISH,
**kwargs
):
self.agents = agents
self.language = language
language_config.set_language(language)
def run(self, task: str) -> Dict[str, Any]:
"""Run the swarm with the given task."""
try:
print(language_config.get_translation("status_messages", "task_started"))
results = {}
for agent in self.agents:
print(f"{agent.name}: {language_config.get_translation('status_messages', 'task_in_progress')}")
results[agent.name] = agent.run(task)
print(language_config.get_translation("status_messages", "task_completed"))
return results
except Exception as e:
error_msg = language_config.get_translation("error_messages", "task_failed")
print(f"{error_msg}: {str(e)}")
raise
def add_agent(self, agent: Agent) -> None:
"""Add a new agent to the swarm."""
if agent.name in [a.name for a in self.agents]:
error_msg = language_config.get_translation("error_messages", "agent_not_found")
raise ValueError(f"{error_msg}: {agent.name}")
self.agents.append(agent)

@ -0,0 +1,63 @@
from typing import Dict, Optional
from enum import Enum
class Language(Enum):
ENGLISH = "en"
PERSIAN = "fa"
class LanguageConfig:
def __init__(self, default_language: Language = Language.ENGLISH):
self.default_language = default_language
self.current_language = default_language
self.translations: Dict[str, Dict[Language, str]] = {
"error_messages": {
Language.ENGLISH: {
"agent_not_found": "Agent not found",
"task_failed": "Task failed to execute",
"invalid_input": "Invalid input provided",
},
Language.PERSIAN: {
"agent_not_found": "عامل یافت نشد",
"task_failed": "اجرای وظیفه با شکست مواجه شد",
"invalid_input": "ورودی نامعتبر است",
}
},
"status_messages": {
Language.ENGLISH: {
"task_started": "Task started",
"task_completed": "Task completed",
"task_in_progress": "Task in progress",
},
Language.PERSIAN: {
"task_started": "وظیفه شروع شد",
"task_completed": "وظیفه تکمیل شد",
"task_in_progress": "وظیفه در حال انجام است",
}
}
}
def set_language(self, language: Language) -> None:
"""Set the current language for the system."""
self.current_language = language
def get_translation(self, category: str, key: str, language: Optional[Language] = None) -> str:
"""Get a translation for a specific message."""
lang = language or self.current_language
try:
return self.translations[category][lang][key]
except KeyError:
# Fallback to English if translation not found
return self.translations[category][Language.ENGLISH][key]
def add_translation(self, category: str, key: str, translations: Dict[Language, str]) -> None:
"""Add new translations to the system."""
if category not in self.translations:
self.translations[category] = {}
for lang, text in translations.items():
if lang not in self.translations[category]:
self.translations[category][lang] = {}
self.translations[category][lang][key] = text
# Global language configuration instance
language_config = LanguageConfig()
Loading…
Cancel
Save