Update main.py

668^2
Kye Gomez 3 weeks ago committed by GitHub
parent 5ed5af20a7
commit 886ca11370
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -42,6 +42,7 @@ class AgentStatus(str, Enum):
# Security configurations
API_KEY_LENGTH = 32 # Length of generated API keys
class APIKey(BaseModel):
key: str
name: str
@ -49,9 +50,11 @@ class APIKey(BaseModel):
last_used: datetime
is_active: bool = True
class APIKeyCreate(BaseModel):
name: str # A friendly name for the API key
class User(BaseModel):
id: UUID
username: str
@ -60,7 +63,6 @@ class User(BaseModel):
api_keys: Dict[str, APIKey] = {} # key -> APIKey object
class AgentConfig(BaseModel):
"""Configuration model for creating a new agent."""
@ -120,7 +122,6 @@ class AgentConfig(BaseModel):
)
class AgentUpdate(BaseModel):
"""Model for updating agent configuration."""
@ -191,7 +192,9 @@ class AgentStore:
self.agent_metadata: Dict[UUID, Dict[str, Any]] = {}
self.users: Dict[UUID, User] = {} # user_id -> User
self.api_keys: Dict[str, UUID] = {} # api_key -> user_id
self.user_agents: Dict[UUID, List[UUID]] = {} # user_id -> [agent_ids]
self.user_agents: Dict[UUID, List[UUID]] = (
{}
) # user_id -> [agent_ids]
self.executor = ThreadPoolExecutor(max_workers=4)
self._ensure_directories()
@ -205,7 +208,7 @@ class AgentStore:
if user_id not in self.users:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
detail="User not found",
)
# Generate a secure random API key
@ -216,7 +219,7 @@ class AgentStore:
key=api_key,
name=key_name,
created_at=datetime.utcnow(),
last_used=datetime.utcnow()
last_used=datetime.utcnow(),
)
# Store the API key
@ -225,7 +228,9 @@ class AgentStore:
return key_object
async def verify_agent_access(self, agent_id: UUID, user_id: UUID) -> bool:
async def verify_agent_access(
self, agent_id: UUID, user_id: UUID
) -> bool:
"""Verify if a user has access to an agent."""
if agent_id not in self.agents:
return False
@ -248,7 +253,9 @@ class AgentStore:
key_object.last_used = datetime.utcnow()
return user_id
async def create_agent(self, config: AgentConfig, user_id: UUID) -> UUID:
async def create_agent(
self, config: AgentConfig, user_id: UUID
) -> UUID:
"""Create a new agent with the given configuration."""
try:
@ -536,24 +543,29 @@ class AgentStore:
finally:
metadata["status"] = AgentStatus.IDLE
class StoreManager:
_instance = None
@classmethod
def get_instance(cls) -> 'AgentStore':
def get_instance(cls) -> "AgentStore":
if cls._instance is None:
cls._instance = AgentStore()
return cls._instance
# Modify the dependency function
def get_store() -> AgentStore:
"""Dependency to get the AgentStore instance."""
return StoreManager.get_instance()
# Security utility function using the new dependency
async def get_current_user(
api_key: str = Header(..., description="API key for authentication"),
store: AgentStore = Depends(get_store)
api_key: str = Header(
..., description="API key for authentication"
),
store: AgentStore = Depends(get_store),
) -> User:
"""Validate API key and return current user."""
user_id = store.validate_api_key(api_key)
@ -604,44 +616,60 @@ class SwarmsAPI:
body = await request.json()
username = body.get("username")
if not username or len(username) < 3:
raise HTTPException(status_code=400, detail="Invalid username")
raise HTTPException(
status_code=400, detail="Invalid username"
)
user_id = uuid4()
user = User(id=user_id, username=username)
self.store.users[user_id] = user
initial_key = self.store.create_api_key(user_id, "Initial Key")
return {"user_id": user_id, "api_key": initial_key.key}
initial_key = self.store.create_api_key(
user_id, "Initial Key"
)
return {
"user_id": user_id,
"api_key": initial_key.key,
}
except Exception as e:
logger.error(f"Error creating user: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
@self.app.post("/v1/users/{user_id}/api-keys", response_model=APIKey)
@self.app.post(
"/v1/users/{user_id}/api-keys", response_model=APIKey
)
async def create_api_key(
user_id: UUID,
key_create: APIKeyCreate,
current_user: User = Depends(get_current_user)
current_user: User = Depends(get_current_user),
):
"""Create a new API key for a user."""
if current_user.id != user_id and not current_user.is_admin:
if (
current_user.id != user_id
and not current_user.is_admin
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to create API keys for this user"
detail="Not authorized to create API keys for this user",
)
return self.store.create_api_key(user_id, key_create.name)
@self.app.get("/v1/users/{user_id}/api-keys", response_model=List[APIKey])
@self.app.get(
"/v1/users/{user_id}/api-keys",
response_model=List[APIKey],
)
async def list_api_keys(
user_id: UUID,
current_user: User = Depends(get_current_user)
current_user: User = Depends(get_current_user),
):
"""List all API keys for a user."""
if current_user.id != user_id and not current_user.is_admin:
if (
current_user.id != user_id
and not current_user.is_admin
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to view API keys for this user"
detail="Not authorized to view API keys for this user",
)
return list(self.store.users[user_id].api_keys.values())
@ -650,47 +678,60 @@ class SwarmsAPI:
async def revoke_api_key(
user_id: UUID,
key: str,
current_user: User = Depends(get_current_user)
current_user: User = Depends(get_current_user),
):
"""Revoke an API key."""
if current_user.id != user_id and not current_user.is_admin:
if (
current_user.id != user_id
and not current_user.is_admin
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to revoke API keys for this user"
detail="Not authorized to revoke API keys for this user",
)
if key in self.store.users[user_id].api_keys:
self.store.users[user_id].api_keys[key].is_active = False
self.store.users[user_id].api_keys[
key
].is_active = False
del self.store.api_keys[key]
return {"status": "API key revoked"}
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="API key not found"
detail="API key not found",
)
@self.app.get("/v1/users/me/agents", response_model=List[AgentSummary])
@self.app.get(
"/v1/users/me/agents", response_model=List[AgentSummary]
)
async def list_user_agents(
current_user: User = Depends(get_current_user),
tags: Optional[List[str]] = Query(None),
status: Optional[AgentStatus] = None,
):
"""List all agents owned by the current user."""
user_agents = self.store.user_agents.get(current_user.id, [])
user_agents = self.store.user_agents.get(
current_user.id, []
)
return [
agent for agent in await self.store.list_agents(tags, status)
agent
for agent in await self.store.list_agents(
tags, status
)
if agent.agent_id in user_agents
]
# Modify existing routes to use API key authentication
@self.app.post("/v1/agent", response_model=Dict[str, UUID])
async def create_agent(
config: AgentConfig,
current_user: User = Depends(get_current_user)
current_user: User = Depends(get_current_user),
):
"""Create a new agent with the specified configuration."""
agent_id = await self.store.create_agent(config, current_user.id)
agent_id = await self.store.create_agent(
config, current_user.id
)
return {"agent_id": agent_id}
@self.app.get("/v1/agents", response_model=List[AgentSummary])
@ -804,6 +845,7 @@ class SwarmsAPI:
if k > cutoff
}
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
logger.info("Creating FastAPI application")
@ -812,9 +854,10 @@ def create_app() -> FastAPI:
logger.info("FastAPI application created successfully")
return app
app = create_app()
if __name__ == '__main__':
if __name__ == "__main__":
try:
logger.info("Starting API server...")
print("Starting API server on http://0.0.0.0:8000")
@ -823,7 +866,7 @@ if __name__ == '__main__':
app, # Pass the app instance directly
host="0.0.0.0",
port=8000,
log_level="info"
log_level="info",
)
except Exception as e:
logger.error(f"Failed to start API: {str(e)}")

Loading…
Cancel
Save