From 32a8496c3f01bf3c107e0835517599d0013cee5f Mon Sep 17 00:00:00 2001 From: Artem-Darius Weber Date: Mon, 21 Jul 2025 17:40:48 +0300 Subject: [PATCH] init --- .env | 6 + .gitignore | 2 + Dockerfile | 29 ++ docker-compose.yml | 71 ++++ main.py | 55 +++ requirements.txt | 14 + src/__init__.py | 1 + src/application/__init__.py | 7 + src/application/services.py | 440 +++++++++++++++++++++++ src/application/use_cases.py | 122 +++++++ src/domain/__init__.py | 7 + src/domain/entities.py | 54 +++ src/domain/repositories.py | 48 +++ src/infrastructure/__init__.py | 3 + src/infrastructure/config.py | 22 ++ src/infrastructure/database/__init__.py | 3 + src/infrastructure/database/mongodb.py | 299 +++++++++++++++ src/infrastructure/logging/__init__.py | 29 ++ src/infrastructure/logging/logger.py | 212 +++++++++++ src/infrastructure/ml/__init__.py | 4 + src/infrastructure/ml/detectors.py | 144 ++++++++ src/infrastructure/ml/recognizers.py | 116 ++++++ src/infrastructure/storage/__init__.py | 4 + src/infrastructure/storage/chromadb.py | 279 ++++++++++++++ src/infrastructure/storage/filesystem.py | 42 +++ src/presentation/__init__.py | 3 + src/presentation/gradio_app.py | 353 ++++++++++++++++++ 27 files changed, 2369 insertions(+) create mode 100644 .env create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 src/__init__.py create mode 100644 src/application/__init__.py create mode 100644 src/application/services.py create mode 100644 src/application/use_cases.py create mode 100644 src/domain/__init__.py create mode 100644 src/domain/entities.py create mode 100644 src/domain/repositories.py create mode 100644 src/infrastructure/__init__.py create mode 100644 src/infrastructure/config.py create mode 100644 src/infrastructure/database/__init__.py create mode 100644 src/infrastructure/database/mongodb.py create mode 100644 src/infrastructure/logging/__init__.py create mode 100644 src/infrastructure/logging/logger.py create mode 100644 src/infrastructure/ml/__init__.py create mode 100644 src/infrastructure/ml/detectors.py create mode 100644 src/infrastructure/ml/recognizers.py create mode 100644 src/infrastructure/storage/__init__.py create mode 100644 src/infrastructure/storage/chromadb.py create mode 100644 src/infrastructure/storage/filesystem.py create mode 100644 src/presentation/__init__.py create mode 100644 src/presentation/gradio_app.py diff --git a/.env b/.env new file mode 100644 index 0000000..e799697 --- /dev/null +++ b/.env @@ -0,0 +1,6 @@ +MONGODB_URL=mongodb://admin:admin123@localhost:27017/face_recognition?authSource=admin +MONGODB_DB=face_recognition +CHROMA_HOST=localhost +CHROMA_PORT=8000 +UPLOAD_DIR=/app/data/uploads +GPU_ENABLED=true \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a5b713f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +data/ +models/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3663b4c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,29 @@ +FROM nvidia/cuda:11.8.0-cudnn8-runtime-ubuntu22.04 + +ENV DEBIAN_FRONTEND=noninteractive +ENV PYTHONUNBUFFERED=1 + +RUN apt-get update && apt-get install -y \ + python3.10 \ + python3-pip \ + python3-opencv \ + libglib2.0-0 \ + libsm6 \ + libxext6 \ + libxrender-dev \ + libgomp1 \ + wget \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . +RUN pip3 install --no-cache-dir -r requirements.txt + +COPY . . + +RUN mkdir -p /app/data/uploads /app/data/chroma /app/models + +EXPOSE 7860 + +CMD ["python3", "main.py"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..3b86ca7 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,71 @@ +version: '3.8' + +services: + mongodb: + image: mongo:7.0 + container_name: face_recognition_mongo + restart: always + ports: + - "27017:27017" + volumes: + - mongo_data:/data/db + environment: + MONGO_INITDB_ROOT_USERNAME: admin + MONGO_INITDB_ROOT_PASSWORD: admin123 + MONGO_INITDB_DATABASE: face_recognition + networks: + - face_recognition_net + + chromadb: + image: chromadb/chroma:latest + container_name: face_recognition_chroma + restart: always + ports: + - "8000:8000" + volumes: + - chroma_data:/chroma/chroma + environment: + - IS_PERSISTENT=TRUE + - PERSIST_DIRECTORY=/chroma/chroma + - ANONYMIZED_TELEMETRY=FALSE + networks: + - face_recognition_net + + face_recognition: + build: + context: . + dockerfile: Dockerfile + container_name: face_recognition_app + restart: always + ports: + - "7860:7860" + volumes: + - ./data:/app/data + - ./models:/app/models + environment: + - MONGODB_URL=mongodb://admin:admin123@mongodb:27017/face_recognition?authSource=admin + - MONGODB_DB=face_recognition + - CHROMA_HOST=chromadb + - CHROMA_PORT=8000 + - UPLOAD_DIR=/app/data/uploads + - NVIDIA_VISIBLE_DEVICES=all + depends_on: + - mongodb + - chromadb + networks: + - face_recognition_net + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] + +volumes: + mongo_data: + chroma_data: + +networks: + face_recognition_net: + driver: bridge \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..8a9f5ab --- /dev/null +++ b/main.py @@ -0,0 +1,55 @@ +from pathlib import Path +import gradio as gr +import numpy as np +from typing import Optional, Tuple, List +import cv2 +from datetime import datetime +import asyncio +from concurrent.futures import ThreadPoolExecutor + +from src.domain.entities import Face, VerificationResult, IdentificationResult +from src.domain.repositories import FaceRepository +from src.infrastructure.database.mongodb import MongoDBFaceRepository +from src.infrastructure.storage.chromadb import ChromaDBVectorStore +from src.infrastructure.storage.filesystem import FileSystemStorage +from src.application.use_cases import ( + RegisterFaceUseCase, + VerifyFaceUseCase, + IdentifyFaceUseCase +) +from src.application.services import FaceRecognitionService +from src.infrastructure.ml.detectors import RetinaFaceDetector +from src.infrastructure.ml.recognizers import SFaceRecognizer +from src.presentation.gradio_app import create_gradio_interface +from src.infrastructure.config import Settings + +settings = Settings() + +async def initialize_app(): + face_repository = MongoDBFaceRepository(settings.MONGODB_URL, settings.MONGODB_DB) + vector_store = ChromaDBVectorStore(settings.CHROMA_HOST, settings.CHROMA_PORT) + file_storage = FileSystemStorage(settings.UPLOAD_DIR) + + detector = RetinaFaceDetector() + recognizer = SFaceRecognizer() + + face_service = FaceRecognitionService( + face_repository=face_repository, + vector_store=vector_store, + verification_threshold=settings.VERIFICATION_THRESHOLD, + identification_threshold=settings.IDENTIFICATION_THRESHOLD + ) + + register_use_case = RegisterFaceUseCase(face_service) + verify_use_case = VerifyFaceUseCase(face_service) + identify_use_case = IdentifyFaceUseCase(face_service) + + return create_gradio_interface( + register_use_case=register_use_case, + verify_use_case=verify_use_case, + identify_use_case=identify_use_case + ) + +if __name__ == "__main__": + app = asyncio.run(initialize_app()) + app.launch(server_name="0.0.0.0", server_port=7860) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..af434bd --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +gradio==4.16.0 +numpy==1.24.3 +opencv-python==4.8.1.78 +Pillow==10.2.0 +motor==3.3.2 +pymongo==4.6.1 +chromadb==0.4.22 +insightface==0.7.3 +onnxruntime-gpu==1.16.3 +pydantic>=2.0.0,<3.0.0 +pydantic-settings>=2.0.0 +python-multipart==0.0.6 +uvicorn==0.27.0 +fastapi==0.109.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..03f0a2f --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +__version__ = "1.0.0" \ No newline at end of file diff --git a/src/application/__init__.py b/src/application/__init__.py new file mode 100644 index 0000000..d5342a5 --- /dev/null +++ b/src/application/__init__.py @@ -0,0 +1,7 @@ +from .services import FaceRecognitionService +from .use_cases import RegisterFaceUseCase, VerifyFaceUseCase, IdentifyFaceUseCase + +__all__ = [ + "FaceRecognitionService", + "RegisterFaceUseCase", "VerifyFaceUseCase", "IdentifyFaceUseCase" +] \ No newline at end of file diff --git a/src/application/services.py b/src/application/services.py new file mode 100644 index 0000000..9ea63c1 --- /dev/null +++ b/src/application/services.py @@ -0,0 +1,440 @@ +from typing import List, Optional, Tuple +import numpy as np +import uuid +from datetime import datetime +import time + +from src.domain.entities import Face, User +from src.domain.repositories import FaceRepository, VectorStore +from src.infrastructure.ml.detectors import RetinaFaceDetector +from src.infrastructure.ml.recognizers import SFaceRecognizer +from src.infrastructure.storage.chromadb import ChromaDBVectorStore +from src.infrastructure.storage.filesystem import FileSystemStorage +from src.infrastructure.logging import service_logger, data_logger + + +class FaceRecognitionService: + def __init__( + self, + face_repository: FaceRepository, + vector_store: Optional[VectorStore] = None, + verification_threshold: float = 0.6, + identification_threshold: float = 0.5 + ): + service_logger.info( + "Initializing Face Recognition Service", + verification_threshold=verification_threshold, + identification_threshold=identification_threshold + ) + + try: + self.face_repository = face_repository + self.vector_store = vector_store or ChromaDBVectorStore() + self.detector = RetinaFaceDetector() + self.recognizer = SFaceRecognizer() + self.file_storage = FileSystemStorage("/app/data/uploads") + + self.verification_threshold = verification_threshold + self.identification_threshold = identification_threshold + + service_logger.info( + "Face Recognition Service initialized successfully", + components_loaded=["detector", "recognizer", "vector_store", "file_storage", "face_repository"] + ) + except Exception as e: + service_logger.error("Failed to initialize Face Recognition Service", error=e) + raise + + async def register_face(self, user_id: str, image: np.ndarray) -> Optional[str]: + start_time = time.time() + service_logger.info( + "Starting face registration", + user_id=user_id, + image_shape=image.shape + ) + + try: + # Step 1: Detect faces + detected_faces = self.detector.detect_faces(image) + + if not detected_faces: + service_logger.warning( + "No faces detected in image", + user_id=user_id, + processing_time_seconds=time.time() - start_time + ) + return None + + service_logger.debug( + f"Detected {len(detected_faces)} faces, selecting best one", + user_id=user_id, + faces_count=len(detected_faces) + ) + + # Step 2: Select best face + best_face = max(detected_faces, key=lambda x: x.get('det_score', 0)) + + # Step 3: Check quality + quality_score = self.detector.calculate_quality_score(best_face) + if quality_score < 0.5: + service_logger.warning( + "Face quality too low for registration", + user_id=user_id, + quality_score=quality_score, + quality_threshold=0.5, + processing_time_seconds=time.time() - start_time + ) + return None + + service_logger.debug( + "Face quality check passed", + user_id=user_id, + quality_score=quality_score + ) + + # Step 4: Align face + if best_face.get('landmarks') is not None: + aligned_face = self.detector.align_face(image, np.array(best_face['landmarks'])) + service_logger.debug("Face aligned using landmarks", user_id=user_id) + else: + aligned_face = self.detector._crop_face(image, best_face['bbox']) + service_logger.debug("Face cropped using bbox", user_id=user_id) + + # Step 5: Extract embedding + embedding = self.recognizer.extract_embedding(aligned_face) + + # Step 6: Generate unique ID and save image + face_id = str(uuid.uuid4()) + image_path = await self.file_storage.save_image(aligned_face, user_id) + + service_logger.debug( + "Face image saved", + user_id=user_id, + face_id=face_id, + image_path=image_path + ) + + # Step 7: Create Face entity + face = Face( + id=face_id, + user_id=user_id, + image_path=image_path, + embedding=embedding, + created_at=datetime.utcnow(), + quality_score=quality_score, + bbox=best_face['bbox'], + landmarks=best_face.get('landmarks') + ) + + # Step 8: Save to repository + await self.face_repository.save(face) + service_logger.debug("Face saved to database", user_id=user_id, face_id=face_id) + + # Step 9: Add to vector store + metadata = { + 'user_id': user_id, + 'face_id': face_id, + 'quality_score': quality_score + } + await self.vector_store.add(face_id, embedding, metadata) + service_logger.debug("Face embedding added to vector store", face_id=face_id) + + # Log successful registration + processing_time = time.time() - start_time + data_logger.log_face_registration(user_id, face_id, quality_score, len(embedding)) + + service_logger.info( + "Face registration completed successfully", + user_id=user_id, + face_id=face_id, + quality_score=quality_score, + embedding_dimensions=len(embedding), + processing_time_seconds=processing_time + ) + + return face_id + + except Exception as e: + processing_time = time.time() - start_time + service_logger.error( + "Face registration failed", + user_id=user_id, + error=e, + processing_time_seconds=processing_time + ) + raise + + async def verify_face(self, user_id: str, image: np.ndarray) -> Tuple[bool, float]: + start_time = time.time() + service_logger.info( + "Starting face verification", + user_id=user_id, + image_shape=image.shape, + verification_threshold=self.verification_threshold + ) + + try: + # Step 1: Detect faces + detected_faces = self.detector.detect_faces(image) + + if not detected_faces: + processing_time = time.time() - start_time + service_logger.warning( + "No faces detected for verification", + user_id=user_id, + processing_time_seconds=processing_time + ) + data_logger.log_face_verification(user_id, False, 0.0, self.verification_threshold, processing_time) + return False, 0.0 + + service_logger.debug( + f"Detected {len(detected_faces)} faces for verification", + user_id=user_id, + faces_count=len(detected_faces) + ) + + # Step 2: Select best face + best_face = max(detected_faces, key=lambda x: x.get('det_score', 0)) + service_logger.debug( + "Selected best face for verification", + user_id=user_id, + detection_score=best_face.get('det_score', 0) + ) + + # Step 3: Align face + if best_face.get('landmarks') is not None: + aligned_face = self.detector.align_face(image, np.array(best_face['landmarks'])) + service_logger.debug("Face aligned using landmarks for verification", user_id=user_id) + else: + aligned_face = self.detector._crop_face(image, best_face['bbox']) + service_logger.debug("Face cropped using bbox for verification", user_id=user_id) + + # Step 4: Extract embedding + query_embedding = self.recognizer.extract_embedding(aligned_face) + service_logger.debug( + "Query embedding extracted for verification", + user_id=user_id, + embedding_shape=query_embedding.shape + ) + + # Step 5: Get user's registered faces + user_faces = await self.face_repository.get_by_user_id(user_id) + + if not user_faces: + processing_time = time.time() - start_time + service_logger.warning( + "No registered faces found for user", + user_id=user_id, + processing_time_seconds=processing_time + ) + data_logger.log_face_verification(user_id, False, 0.0, self.verification_threshold, processing_time) + return False, 0.0 + + service_logger.debug( + f"Found {len(user_faces)} registered faces for user", + user_id=user_id, + registered_faces_count=len(user_faces) + ) + + # Step 6: Calculate similarities with all user faces + max_similarity = 0.0 + similarities = [] + + for i, face in enumerate(user_faces): + similarity = self.recognizer.calculate_similarity(query_embedding, face.embedding) + similarities.append(similarity) + max_similarity = max(max_similarity, similarity) + + service_logger.debug( + f"Similarity calculated with face {i+1}", + user_id=user_id, + face_id=face.id, + similarity=similarity, + quality_score=face.quality_score + ) + + # Step 7: Make verification decision + is_verified = max_similarity >= self.verification_threshold + processing_time = time.time() - start_time + + # Log verification result + data_logger.log_face_verification(user_id, is_verified, max_similarity, self.verification_threshold, processing_time) + + service_logger.info( + "Face verification completed", + user_id=user_id, + verification_result=is_verified, + max_similarity=max_similarity, + verification_threshold=self.verification_threshold, + registered_faces_compared=len(user_faces), + all_similarities=similarities, + processing_time_seconds=processing_time + ) + + return is_verified, max_similarity + + except Exception as e: + processing_time = time.time() - start_time + service_logger.error( + "Face verification failed", + user_id=user_id, + error=e, + processing_time_seconds=processing_time + ) + raise + + async def identify_face(self, image: np.ndarray) -> Tuple[Optional[str], float, List[Tuple[str, float]]]: + start_time = time.time() + service_logger.info( + "Starting face identification", + image_shape=image.shape, + identification_threshold=self.identification_threshold + ) + + try: + # Step 1: Detect faces + detected_faces = self.detector.detect_faces(image) + + if not detected_faces: + processing_time = time.time() - start_time + service_logger.warning( + "No faces detected for identification", + processing_time_seconds=processing_time + ) + data_logger.log_face_identification(None, 0.0, 0, processing_time) + return None, 0.0, [] + + service_logger.debug( + f"Detected {len(detected_faces)} faces for identification", + faces_count=len(detected_faces) + ) + + # Step 2: Select best face + best_face = max(detected_faces, key=lambda x: x.get('det_score', 0)) + service_logger.debug( + "Selected best face for identification", + detection_score=best_face.get('det_score', 0) + ) + + # Step 3: Align face + if best_face.get('landmarks') is not None: + aligned_face = self.detector.align_face(image, np.array(best_face['landmarks'])) + service_logger.debug("Face aligned using landmarks for identification") + else: + aligned_face = self.detector._crop_face(image, best_face['bbox']) + service_logger.debug("Face cropped using bbox for identification") + + # Step 4: Extract embedding + query_embedding = self.recognizer.extract_embedding(aligned_face) + service_logger.debug( + "Query embedding extracted for identification", + embedding_shape=query_embedding.shape + ) + + # Step 5: Search similar faces in vector store + similar_faces = await self.vector_store.search_similar(query_embedding, top_k=10) + service_logger.debug( + f"Vector search completed, found {len(similar_faces)} similar faces", + similar_faces_count=len(similar_faces), + search_top_k=10 + ) + + # Log detailed similarity results + for i, (face_id, similarity, metadata) in enumerate(similar_faces): + service_logger.debug( + f"Similar face {i+1} found", + face_id=face_id, + similarity=similarity, + user_id=metadata.get('user_id', 'unknown'), + quality_score=metadata.get('quality_score', 'unknown') + ) + + # Step 6: Filter candidates by threshold and group by user + candidates = [] + user_scores = {} + valid_matches = 0 + + for face_id, similarity, metadata in similar_faces: + if similarity >= self.identification_threshold: + valid_matches += 1 + user_id = metadata.get('user_id') + if user_id: + if user_id not in user_scores: + user_scores[user_id] = [] + user_scores[user_id].append(similarity) + + service_logger.debug( + "Valid match found", + face_id=face_id, + user_id=user_id, + similarity=similarity, + threshold=self.identification_threshold + ) + + service_logger.debug( + f"Found {valid_matches} matches above threshold", + valid_matches=valid_matches, + identification_threshold=self.identification_threshold, + unique_users=len(user_scores) + ) + + # Step 7: Calculate best score per user + for user_id, scores in user_scores.items(): + max_score = max(scores) + avg_score = sum(scores) / len(scores) + candidates.append((user_id, max_score)) + + service_logger.debug( + "User candidate scores calculated", + user_id=user_id, + max_score=max_score, + avg_score=avg_score, + face_matches=len(scores), + all_scores=scores + ) + + # Step 8: Sort candidates by confidence + candidates.sort(key=lambda x: x[1], reverse=True) + + # Step 9: Determine final result + best_user_id = candidates[0][0] if candidates else None + best_confidence = candidates[0][1] if candidates else 0.0 + processing_time = time.time() - start_time + + # Log identification result + data_logger.log_face_identification(best_user_id, best_confidence, len(candidates), processing_time) + + service_logger.info( + "Face identification completed", + identified_user_id=best_user_id, + confidence=best_confidence, + identification_threshold=self.identification_threshold, + candidates_count=len(candidates), + all_candidates=candidates[:5], # Log top 5 candidates + vector_matches=len(similar_faces), + valid_matches=valid_matches, + processing_time_seconds=processing_time + ) + + return best_user_id, best_confidence, candidates + + except Exception as e: + processing_time = time.time() - start_time + service_logger.error( + "Face identification failed", + error=e, + processing_time_seconds=processing_time + ) + raise + + async def update_user_mean_embedding(self, user_id: str) -> Optional[np.ndarray]: + user_faces = await self.face_repository.get_by_user_id(user_id) + + if not user_faces: + return None + + embeddings = [face.embedding for face in user_faces] + mean_embedding = np.mean(embeddings, axis=0) + mean_embedding = mean_embedding / np.linalg.norm(mean_embedding) + + return mean_embedding \ No newline at end of file diff --git a/src/application/use_cases.py b/src/application/use_cases.py new file mode 100644 index 0000000..8146b23 --- /dev/null +++ b/src/application/use_cases.py @@ -0,0 +1,122 @@ +from typing import Optional, List, Tuple +import numpy as np +from datetime import datetime + +from src.domain.entities import User, VerificationResult, IdentificationResult +from src.application.services import FaceRecognitionService +from src.infrastructure.database.mongodb import MongoDBUserRepository + +class RegisterFaceUseCase: + def __init__(self, face_service: FaceRecognitionService): + self.face_service = face_service + from src.infrastructure.config import Settings + settings = Settings() + self.user_repository = MongoDBUserRepository( + settings.MONGODB_URL, + settings.MONGODB_DB + ) + + async def execute(self, user_name: str, images: List[np.ndarray]) -> Tuple[bool, str]: + user = await self.user_repository.get_user_by_name(user_name) + + if not user: + user = User( + id="", + name=user_name, + created_at=datetime.utcnow(), + face_ids=[] + ) + user_id = await self.user_repository.save(user) + user.id = user_id + else: + user_id = user.id + + face_ids = [] + for image in images: + face_id = await self.face_service.register_face(user_id, image) + if face_id: + face_ids.append(face_id) + + if not face_ids: + return False, "No valid faces detected in the provided images" + + user.face_ids.extend(face_ids) + mean_embedding = await self.face_service.update_user_mean_embedding(user_id) + user.mean_embedding = mean_embedding + + await self.user_repository.update_user(user) + + return True, f"Successfully registered {len(face_ids)} face(s) for user {user_name}" + +class VerifyFaceUseCase: + def __init__(self, face_service: FaceRecognitionService): + self.face_service = face_service + from src.infrastructure.config import Settings + settings = Settings() + self.user_repository = MongoDBUserRepository( + settings.MONGODB_URL, + settings.MONGODB_DB + ) + + async def execute(self, user_name: str, image: np.ndarray) -> VerificationResult: + import time + start_time = time.time() + + user = await self.user_repository.get_user_by_name(user_name) + if not user: + return VerificationResult( + is_verified=False, + confidence=0.0, + user_id="", + face_id="", + threshold=self.face_service.verification_threshold, + processing_time=time.time() - start_time + ) + + is_verified, confidence = await self.face_service.verify_face(user.id, image) + + return VerificationResult( + is_verified=is_verified, + confidence=confidence, + user_id=user.id, + face_id="", + threshold=self.face_service.verification_threshold, + processing_time=time.time() - start_time + ) + +class IdentifyFaceUseCase: + def __init__(self, face_service: FaceRecognitionService): + self.face_service = face_service + from src.infrastructure.config import Settings + settings = Settings() + self.user_repository = MongoDBUserRepository( + settings.MONGODB_URL, + settings.MONGODB_DB + ) + + async def execute(self, image: np.ndarray) -> IdentificationResult: + import time + start_time = time.time() + + user_id, confidence, candidates = await self.face_service.identify_face(image) + + candidate_names = [] + for candidate_user_id, candidate_confidence in candidates: + user = await self.user_repository.get_by_id(candidate_user_id) + if user: + candidate_names.append((user.name, candidate_confidence)) + + identified_user_name = None + if user_id: + user = await self.user_repository.get_by_id(user_id) + if user: + identified_user_name = user.name + + return IdentificationResult( + is_identified=user_id is not None, + user_id=identified_user_name, + confidence=confidence, + candidates=candidate_names, + threshold=self.face_service.identification_threshold, + processing_time=time.time() - start_time + ) \ No newline at end of file diff --git a/src/domain/__init__.py b/src/domain/__init__.py new file mode 100644 index 0000000..bb0b3be --- /dev/null +++ b/src/domain/__init__.py @@ -0,0 +1,7 @@ +from .entities import Face, User, VerificationResult, IdentificationResult, ProcessedFace, OperationType +from .repositories import FaceRepository, UserRepository, VectorStore + +__all__ = [ + "Face", "User", "VerificationResult", "IdentificationResult", "ProcessedFace", "OperationType", + "FaceRepository", "UserRepository", "VectorStore" +] \ No newline at end of file diff --git a/src/domain/entities.py b/src/domain/entities.py new file mode 100644 index 0000000..92e42f6 --- /dev/null +++ b/src/domain/entities.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import List, Optional, Tuple +import numpy as np +from enum import Enum +from pydantic import BaseModel + +class OperationType(str, Enum): + REGISTER = "register" + VERIFICATION = "verify" + IDENTIFICATION = "identify" + +@dataclass +class Face: + id: str + user_id: str + image_path: str + embedding: np.ndarray + created_at: datetime + quality_score: float + bbox: List[float] + landmarks: Optional[List[List[float]]] = None + +@dataclass +class User: + id: str + name: str + created_at: datetime + face_ids: List[str] + mean_embedding: Optional[np.ndarray] = None + +@dataclass +class VerificationResult: + is_verified: bool + confidence: float + user_id: str + face_id: str + threshold: float + processing_time: float + +@dataclass +class IdentificationResult: + is_identified: bool + user_id: Optional[str] + confidence: float + candidates: List[Tuple[str, float]] + threshold: float + processing_time: float + +class ProcessedFace(BaseModel): + bbox: List[float] # [x1, y1, x2, y2] + landmarks: List[List[float]] # [[x1, y1], [x2, y2], ...] + quality_score: float + embedding: List[float] \ No newline at end of file diff --git a/src/domain/repositories.py b/src/domain/repositories.py new file mode 100644 index 0000000..723f5f3 --- /dev/null +++ b/src/domain/repositories.py @@ -0,0 +1,48 @@ +from abc import ABC, abstractmethod +from typing import List, Optional, Tuple, Dict +import numpy as np + +from .entities import Face, User + +class FaceRepository(ABC): + @abstractmethod + async def save(self, face: Face) -> str: + pass + + @abstractmethod + async def get_by_id(self, face_id: str) -> Optional[Face]: + pass + + @abstractmethod + async def get_by_user_id(self, user_id: str) -> List[Face]: + pass + + @abstractmethod + async def delete(self, face_id: str) -> bool: + pass + +class UserRepository(ABC): + @abstractmethod + async def save(self, user: User) -> str: + pass + + @abstractmethod + async def get_by_id(self, user_id: str) -> Optional[User]: + pass + + @abstractmethod + async def delete(self, user_id: str) -> bool: + pass + +class VectorStore(ABC): + @abstractmethod + async def add(self, id: str, embedding: np.ndarray, metadata: Dict = None) -> bool: + pass + + @abstractmethod + async def search_similar(self, embedding: np.ndarray, top_k: int = 10) -> List[Tuple[str, float, dict]]: + pass + + @abstractmethod + async def delete(self, id: str) -> bool: + pass \ No newline at end of file diff --git a/src/infrastructure/__init__.py b/src/infrastructure/__init__.py new file mode 100644 index 0000000..484dc79 --- /dev/null +++ b/src/infrastructure/__init__.py @@ -0,0 +1,3 @@ +from .config import Settings + +__all__ = ["Settings"] \ No newline at end of file diff --git a/src/infrastructure/config.py b/src/infrastructure/config.py new file mode 100644 index 0000000..51e7be4 --- /dev/null +++ b/src/infrastructure/config.py @@ -0,0 +1,22 @@ +from pydantic_settings import BaseSettings +from pathlib import Path + +class Settings(BaseSettings): + MONGODB_URL: str = "mongodb://mongo:27017" + MONGODB_DB: str = "face_recognition" + + CHROMA_HOST: str = "chromadb" + CHROMA_PORT: int = 8000 + UPLOAD_DIR: str = "/app/data/uploads" + + QUALITY_THRESHOLD: float = 0.7 + VERIFICATION_THRESHOLD: float = 0.8 + IDENTIFICATION_THRESHOLD: float = 0.75 + + MAX_FACES_PER_USER: int = 10 + + GPU_ENABLED: bool = True + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" \ No newline at end of file diff --git a/src/infrastructure/database/__init__.py b/src/infrastructure/database/__init__.py new file mode 100644 index 0000000..356f0ca --- /dev/null +++ b/src/infrastructure/database/__init__.py @@ -0,0 +1,3 @@ +from .mongodb import MongoDBFaceRepository, MongoDBUserRepository + +__all__ = ["MongoDBFaceRepository", "MongoDBUserRepository"] \ No newline at end of file diff --git a/src/infrastructure/database/mongodb.py b/src/infrastructure/database/mongodb.py new file mode 100644 index 0000000..38cdbb5 --- /dev/null +++ b/src/infrastructure/database/mongodb.py @@ -0,0 +1,299 @@ +from typing import List, Optional +import numpy as np +from motor.motor_asyncio import AsyncIOMotorClient +from bson import ObjectId +from datetime import datetime +import time + +from src.domain.entities import Face, User +from src.domain.repositories import FaceRepository, UserRepository +from src.infrastructure.logging import repository_logger, data_logger + +class MongoDBFaceRepository(FaceRepository): + def __init__(self, connection_string: str, database_name: str): + repository_logger.info( + "Initializing MongoDB Face Repository", + connection_string=connection_string.replace(connection_string.split('@')[-1].split('/')[0], '***') if '@' in connection_string else connection_string, + database_name=database_name + ) + + try: + self.client = AsyncIOMotorClient(connection_string) + self.db = self.client[database_name] + self.faces_collection = self.db.faces + self.users_collection = self.db.users + + repository_logger.info("MongoDB Face Repository initialized successfully") + except Exception as e: + repository_logger.error("Failed to initialize MongoDB Face Repository", error=e) + raise + + async def save(self, face: Face) -> str: + start_time = time.time() + repository_logger.debug( + "Saving face to database", + face_id=face.id, + user_id=face.user_id, + quality_score=face.quality_score + ) + + try: + face_dict = { + "user_id": face.user_id, + "image_path": face.image_path, + "embedding": face.embedding.tolist(), + "created_at": face.created_at, + "quality_score": face.quality_score, + "bbox": face.bbox, + "landmarks": face.landmarks + } + result = await self.faces_collection.insert_one(face_dict) + face_id = str(result.inserted_id) + + processing_time = time.time() - start_time + data_logger.log_database_operation( + "insert", "faces", face_id, True, + processing_time_seconds=processing_time, + user_id=face.user_id + ) + + repository_logger.info( + "Face saved successfully", + face_id=face_id, + user_id=face.user_id, + embedding_dimensions=len(face.embedding), + quality_score=face.quality_score, + processing_time_seconds=processing_time + ) + + return face_id + + except Exception as e: + processing_time = time.time() - start_time + data_logger.log_database_operation( + "insert", "faces", face.id or "unknown", False, + processing_time_seconds=processing_time, + error_message=str(e) + ) + + repository_logger.error( + "Failed to save face", + face_id=face.id, + user_id=face.user_id, + error=e, + processing_time_seconds=processing_time + ) + raise + + async def get_by_id(self, face_id: str) -> Optional[Face]: + start_time = time.time() + repository_logger.debug("Retrieving face by ID", face_id=face_id) + + try: + face_dict = await self.faces_collection.find_one({"_id": ObjectId(face_id)}) + processing_time = time.time() - start_time + + if face_dict: + face = self._dict_to_face(face_dict) + data_logger.log_database_operation( + "select", "faces", face_id, True, + processing_time_seconds=processing_time, + user_id=face.user_id + ) + + repository_logger.info( + "Face retrieved successfully", + face_id=face_id, + user_id=face.user_id, + processing_time_seconds=processing_time + ) + return face + else: + data_logger.log_database_operation( + "select", "faces", face_id, False, + processing_time_seconds=processing_time, + error_message="Face not found" + ) + + repository_logger.warning( + "Face not found", + face_id=face_id, + processing_time_seconds=processing_time + ) + return None + + except Exception as e: + processing_time = time.time() - start_time + data_logger.log_database_operation( + "select", "faces", face_id, False, + processing_time_seconds=processing_time, + error_message=str(e) + ) + + repository_logger.error( + "Failed to retrieve face", + face_id=face_id, + error=e, + processing_time_seconds=processing_time + ) + raise + + async def get_by_user_id(self, user_id: str) -> List[Face]: + start_time = time.time() + repository_logger.debug("Retrieving faces by user ID", user_id=user_id) + + try: + cursor = self.faces_collection.find({"user_id": user_id}) + faces = [] + async for face_dict in cursor: + faces.append(self._dict_to_face(face_dict)) + + processing_time = time.time() - start_time + data_logger.log_database_operation( + "select_multiple", "faces", user_id, True, + processing_time_seconds=processing_time, + records_found=len(faces) + ) + + repository_logger.info( + "User faces retrieved successfully", + user_id=user_id, + faces_count=len(faces), + processing_time_seconds=processing_time + ) + + return faces + + except Exception as e: + processing_time = time.time() - start_time + data_logger.log_database_operation( + "select_multiple", "faces", user_id, False, + processing_time_seconds=processing_time, + error_message=str(e) + ) + + repository_logger.error( + "Failed to retrieve user faces", + user_id=user_id, + error=e, + processing_time_seconds=processing_time + ) + raise + + async def delete(self, face_id: str) -> bool: + start_time = time.time() + repository_logger.debug("Deleting face from database", face_id=face_id) + + try: + result = await self.faces_collection.delete_one({"_id": ObjectId(face_id)}) + success = result.deleted_count > 0 + processing_time = time.time() - start_time + + data_logger.log_database_operation( + "delete", "faces", face_id, success, + processing_time_seconds=processing_time, + deleted_count=result.deleted_count + ) + + if success: + repository_logger.info( + "Face deleted successfully", + face_id=face_id, + processing_time_seconds=processing_time + ) + else: + repository_logger.warning( + "Face not found for deletion", + face_id=face_id, + processing_time_seconds=processing_time + ) + + return success + + except Exception as e: + processing_time = time.time() - start_time + data_logger.log_database_operation( + "delete", "faces", face_id, False, + processing_time_seconds=processing_time, + error_message=str(e) + ) + + repository_logger.error( + "Failed to delete face", + face_id=face_id, + error=e, + processing_time_seconds=processing_time + ) + raise + + def _dict_to_face(self, face_dict: dict) -> Face: + return Face( + id=str(face_dict["_id"]), + user_id=face_dict["user_id"], + image_path=face_dict["image_path"], + embedding=np.array(face_dict["embedding"]), + created_at=face_dict["created_at"], + quality_score=face_dict["quality_score"], + bbox=face_dict["bbox"], + landmarks=face_dict.get("landmarks") + ) + +class MongoDBUserRepository(UserRepository): + def __init__(self, connection_string: str, database_name: str): + self.client = AsyncIOMotorClient(connection_string) + self.db = self.client[database_name] + self.users_collection = self.db.users + + async def save(self, user: User) -> str: + user_dict = { + "name": user.name, + "created_at": user.created_at, + "face_ids": user.face_ids, + "mean_embedding": user.mean_embedding.tolist() if user.mean_embedding is not None else None + } + result = await self.users_collection.insert_one(user_dict) + return str(result.inserted_id) + + async def get_by_id(self, user_id: str) -> Optional[User]: + user_dict = await self.users_collection.find_one({"_id": ObjectId(user_id)}) + if user_dict: + return self._dict_to_user(user_dict) + return None + + async def get_user_by_name(self, name: str) -> Optional[User]: + user_dict = await self.users_collection.find_one({"name": name}) + if user_dict: + return self._dict_to_user(user_dict) + return None + + async def update_user(self, user: User) -> bool: + user_dict = { + "name": user.name, + "face_ids": user.face_ids, + "mean_embedding": user.mean_embedding.tolist() if user.mean_embedding is not None else None + } + result = await self.users_collection.update_one( + {"_id": ObjectId(user.id)}, + {"$set": user_dict} + ) + return result.modified_count > 0 + + async def delete(self, user_id: str) -> bool: + result = await self.users_collection.delete_one({"_id": ObjectId(user_id)}) + return result.deleted_count > 0 + + async def get_all_users(self) -> List[User]: + cursor = self.users_collection.find() + users = [] + async for user_dict in cursor: + users.append(self._dict_to_user(user_dict)) + return users + + def _dict_to_user(self, user_dict: dict) -> User: + return User( + id=str(user_dict["_id"]), + name=user_dict["name"], + created_at=user_dict["created_at"], + face_ids=user_dict["face_ids"], + mean_embedding=np.array(user_dict["mean_embedding"]) if user_dict.get("mean_embedding") else None + ) \ No newline at end of file diff --git a/src/infrastructure/logging/__init__.py b/src/infrastructure/logging/__init__.py new file mode 100644 index 0000000..8cdb506 --- /dev/null +++ b/src/infrastructure/logging/__init__.py @@ -0,0 +1,29 @@ +from .logger import ( + app_logger, + detector_logger, + recognizer_logger, + service_logger, + repository_logger, + vector_store_logger, + use_case_logger, + api_logger, + data_logger, + ComponentLogger, + DataLifecycleLogger, + FaceRecognitionLogger +) + +__all__ = [ + "app_logger", + "detector_logger", + "recognizer_logger", + "service_logger", + "repository_logger", + "vector_store_logger", + "use_case_logger", + "api_logger", + "data_logger", + "ComponentLogger", + "DataLifecycleLogger", + "FaceRecognitionLogger" +] \ No newline at end of file diff --git a/src/infrastructure/logging/logger.py b/src/infrastructure/logging/logger.py new file mode 100644 index 0000000..d99bd5e --- /dev/null +++ b/src/infrastructure/logging/logger.py @@ -0,0 +1,212 @@ +import logging +import json +import sys +from datetime import datetime +from typing import Any, Dict, Optional +from pathlib import Path +import traceback + +class FaceRecognitionLogger: + """Centralized logging system for face recognition service""" + + def __init__(self, name: str = "face_recognition", log_level: str = "DEBUG"): + self.logger = logging.getLogger(name) + self.logger.setLevel(getattr(logging, log_level.upper())) + + # Remove existing handlers to avoid duplicates + for handler in self.logger.handlers[:]: + self.logger.removeHandler(handler) + + # Create formatters + detailed_formatter = logging.Formatter( + '%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s' + ) + + json_formatter = JsonFormatter() + + # Console handler with detailed format + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(detailed_formatter) + console_handler.setLevel(logging.DEBUG) + + # File handler with JSON format for structured logging + log_dir = Path("/app/logs") + log_dir.mkdir(exist_ok=True) + + file_handler = logging.FileHandler(log_dir / "face_recognition.log") + file_handler.setFormatter(json_formatter) + file_handler.setLevel(logging.DEBUG) + + # Error file handler + error_handler = logging.FileHandler(log_dir / "errors.log") + error_handler.setFormatter(json_formatter) + error_handler.setLevel(logging.ERROR) + + self.logger.addHandler(console_handler) + self.logger.addHandler(file_handler) + self.logger.addHandler(error_handler) + + def _log_with_context(self, level: str, message: str, **kwargs): + """Log with additional context information""" + context = { + "timestamp": datetime.utcnow().isoformat(), + "message": message, + **kwargs + } + + getattr(self.logger, level.lower())(json.dumps(context, default=str)) + + def debug(self, message: str, **kwargs): + self._log_with_context("DEBUG", message, **kwargs) + + def info(self, message: str, **kwargs): + self._log_with_context("INFO", message, **kwargs) + + def warning(self, message: str, **kwargs): + self._log_with_context("WARNING", message, **kwargs) + + def error(self, message: str, error: Exception = None, **kwargs): + context = kwargs.copy() + if error: + context.update({ + "error_type": error.__class__.__name__, + "error_message": str(error), + "traceback": traceback.format_exc() + }) + self._log_with_context("ERROR", message, **context) + + def critical(self, message: str, error: Exception = None, **kwargs): + context = kwargs.copy() + if error: + context.update({ + "error_type": error.__class__.__name__, + "error_message": str(error), + "traceback": traceback.format_exc() + }) + self._log_with_context("CRITICAL", message, **context) + +class JsonFormatter(logging.Formatter): + """JSON formatter for structured logging""" + + def format(self, record): + log_entry = { + "timestamp": datetime.fromtimestamp(record.created).isoformat(), + "level": record.levelname, + "logger": record.name, + "module": record.module, + "function": record.funcName, + "line": record.lineno, + "message": record.getMessage() + } + + if record.exc_info: + log_entry["exception"] = self.formatException(record.exc_info) + + return json.dumps(log_entry, default=str) + +# Create global logger instance +app_logger = FaceRecognitionLogger() + +# Specialized loggers for different components +class ComponentLogger: + def __init__(self, component_name: str): + self.component = component_name + self.logger = app_logger + + def _add_component_context(self, **kwargs): + return {"component": self.component, **kwargs} + + def debug(self, message: str, **kwargs): + self.logger.debug(message, **self._add_component_context(**kwargs)) + + def info(self, message: str, **kwargs): + self.logger.info(message, **self._add_component_context(**kwargs)) + + def warning(self, message: str, **kwargs): + self.logger.warning(message, **self._add_component_context(**kwargs)) + + def error(self, message: str, error: Exception = None, **kwargs): + self.logger.error(message, error=error, **self._add_component_context(**kwargs)) + + def critical(self, message: str, error: Exception = None, **kwargs): + self.logger.critical(message, error=error, **self._add_component_context(**kwargs)) + +# Component-specific loggers +detector_logger = ComponentLogger("face_detector") +recognizer_logger = ComponentLogger("face_recognizer") +service_logger = ComponentLogger("face_service") +repository_logger = ComponentLogger("repository") +vector_store_logger = ComponentLogger("vector_store") +use_case_logger = ComponentLogger("use_case") +api_logger = ComponentLogger("api") + +class DataLifecycleLogger: + """Specialized logger for tracking data lifecycle""" + + def __init__(self): + self.logger = ComponentLogger("data_lifecycle") + + def log_face_detection(self, image_info: Dict, faces_detected: int, processing_time: float): + self.logger.info( + "Face detection completed", + operation="face_detection", + faces_detected=faces_detected, + processing_time_seconds=processing_time, + **image_info + ) + + def log_face_registration(self, user_id: str, face_id: str, quality_score: float, embedding_size: int): + self.logger.info( + "Face registered successfully", + operation="face_registration", + user_id=user_id, + face_id=face_id, + quality_score=quality_score, + embedding_size=embedding_size + ) + + def log_face_verification(self, user_id: str, result: bool, confidence: float, threshold: float, processing_time: float): + self.logger.info( + "Face verification completed", + operation="face_verification", + user_id=user_id, + verification_result=result, + confidence=confidence, + threshold=threshold, + processing_time_seconds=processing_time + ) + + def log_face_identification(self, result_user_id: Optional[str], confidence: float, + candidates_count: int, processing_time: float): + self.logger.info( + "Face identification completed", + operation="face_identification", + identified_user_id=result_user_id, + confidence=confidence, + candidates_count=candidates_count, + processing_time_seconds=processing_time + ) + + def log_database_operation(self, operation: str, table: str, record_id: str, success: bool, **kwargs): + self.logger.info( + f"Database operation: {operation}", + operation="database", + db_operation=operation, + table=table, + record_id=record_id, + success=success, + **kwargs + ) + + def log_vector_operation(self, operation: str, vector_id: str, success: bool, **kwargs): + self.logger.info( + f"Vector store operation: {operation}", + operation="vector_store", + vector_operation=operation, + vector_id=vector_id, + success=success, + **kwargs + ) + +# Global data lifecycle logger +data_logger = DataLifecycleLogger() \ No newline at end of file diff --git a/src/infrastructure/ml/__init__.py b/src/infrastructure/ml/__init__.py new file mode 100644 index 0000000..fe115fd --- /dev/null +++ b/src/infrastructure/ml/__init__.py @@ -0,0 +1,4 @@ +from .detectors import RetinaFaceDetector +from .recognizers import SFaceRecognizer + +__all__ = ["RetinaFaceDetector", "SFaceRecognizer"] \ No newline at end of file diff --git a/src/infrastructure/ml/detectors.py b/src/infrastructure/ml/detectors.py new file mode 100644 index 0000000..ae27ebf --- /dev/null +++ b/src/infrastructure/ml/detectors.py @@ -0,0 +1,144 @@ +import cv2 +import numpy as np +from typing import List, Tuple, Optional +from insightface.app import FaceAnalysis +from insightface.data import get_image as ins_get_image +import time + +from src.infrastructure.logging import detector_logger, data_logger + +class RetinaFaceDetector: + def __init__(self): + detector_logger.info("Initializing RetinaFace detector") + try: + self.app = FaceAnalysis(providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) + self.app.prepare(ctx_id=0, det_size=(640, 640)) + detector_logger.info("RetinaFace detector initialized successfully", + providers=self.app.models.keys() if hasattr(self.app, 'models') else "unknown") + except Exception as e: + detector_logger.error("Failed to initialize RetinaFace detector", error=e) + raise + + def detect_faces(self, image: np.ndarray) -> List[dict]: + start_time = time.time() + image_info = { + "image_shape": image.shape, + "image_dtype": str(image.dtype), + "image_size_bytes": image.nbytes + } + + detector_logger.debug("Starting face detection", **image_info) + + try: + faces = self.app.get(image) + processing_time = time.time() - start_time + + detected_faces = [] + for i, face in enumerate(faces): + face_dict = { + 'bbox': face.bbox.tolist(), + 'landmarks': face.kps.tolist() if face.kps is not None else None, + 'det_score': float(face.det_score), + 'embedding': face.normed_embedding if hasattr(face, 'normed_embedding') else None + } + detected_faces.append(face_dict) + + detector_logger.debug( + f"Face {i+1} detected", + face_index=i, + bbox=face_dict['bbox'], + detection_score=face_dict['det_score'], + has_landmarks=face_dict['landmarks'] is not None, + has_embedding=face_dict['embedding'] is not None + ) + + # Log data lifecycle + data_logger.log_face_detection(image_info, len(detected_faces), processing_time) + + detector_logger.info( + "Face detection completed", + faces_detected=len(detected_faces), + processing_time_seconds=processing_time, + **image_info + ) + + return detected_faces + + except Exception as e: + processing_time = time.time() - start_time + detector_logger.error( + "Face detection failed", + error=e, + processing_time_seconds=processing_time, + **image_info + ) + raise + + def align_face(self, image: np.ndarray, landmarks: np.ndarray, image_size: int = 112) -> np.ndarray: + if landmarks is None or len(landmarks) != 5: + return self._crop_face(image, landmarks) + + src_pts = np.array([ + [30.2946, 51.6963], + [65.5318, 51.5014], + [48.0252, 71.7366], + [33.5493, 92.3655], + [62.7299, 92.2041] + ], dtype=np.float32) + + if image_size == 112: + src_pts[:, 0] += 8.0 + + dst_pts = landmarks.astype(np.float32) + tform = cv2.estimateAffinePartial2D(dst_pts, src_pts)[0] + + aligned_face = cv2.warpAffine( + image, tform, (image_size, image_size), + flags=cv2.INTER_LINEAR + ) + + return aligned_face + + def _crop_face(self, image: np.ndarray, bbox: List[float]) -> np.ndarray: + x1, y1, x2, y2 = [int(coord) for coord in bbox[:4]] + + width = x2 - x1 + height = y2 - y1 + size = max(width, height) + + center_x = (x1 + x2) // 2 + center_y = (y1 + y2) // 2 + + x1 = max(0, center_x - size // 2) + y1 = max(0, center_y - size // 2) + x2 = min(image.shape[1], x1 + size) + y2 = min(image.shape[0], y1 + size) + + face_crop = image[y1:y2, x1:x2] + face_crop = cv2.resize(face_crop, (112, 112)) + + return face_crop + + def calculate_quality_score(self, face_dict: dict) -> float: + detector_logger.debug("Calculating quality score", face_data_keys=list(face_dict.keys())) + + det_score = face_dict.get('det_score', 0.0) + bbox = face_dict.get('bbox', []) + + if len(bbox) >= 4: + face_size = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1]) + size_score = min(face_size / 10000, 1.0) + else: + size_score = 0.0 + + quality_score = det_score * 0.7 + size_score * 0.3 + + detector_logger.debug( + "Quality score calculated", + detection_score=det_score, + size_score=size_score, + final_quality_score=quality_score, + face_size=face_size if len(bbox) >= 4 else None + ) + + return quality_score \ No newline at end of file diff --git a/src/infrastructure/ml/recognizers.py b/src/infrastructure/ml/recognizers.py new file mode 100644 index 0000000..b54db3f --- /dev/null +++ b/src/infrastructure/ml/recognizers.py @@ -0,0 +1,116 @@ +import cv2 +import numpy as np +import onnxruntime as ort +from typing import List, Tuple +import urllib.request +from pathlib import Path +import time + +from src.infrastructure.logging import recognizer_logger, data_logger + +class SFaceRecognizer: + def __init__(self): + recognizer_logger.info("Initializing SFace recognizer") + try: + self.model_path = self._download_model() + self.session = ort.InferenceSession( + self.model_path, + providers=['CUDAExecutionProvider', 'CPUExecutionProvider'] + ) + self.input_name = self.session.get_inputs()[0].name + self.output_name = self.session.get_outputs()[0].name + + recognizer_logger.info( + "SFace recognizer initialized successfully", + model_path=self.model_path, + input_name=self.input_name, + output_name=self.output_name, + providers=self.session.get_providers() + ) + except Exception as e: + recognizer_logger.error("Failed to initialize SFace recognizer", error=e) + raise + + def _download_model(self) -> str: + model_dir = Path("models") + model_dir.mkdir(exist_ok=True) + model_path = model_dir / "face_recognition_sface_2021dec_int8.onnx" + + if not model_path.exists(): + url = "https://github.com/opencv/opencv_zoo/raw/main/models/face_recognition_sface/face_recognition_sface_2021dec_int8.onnx" + urllib.request.urlretrieve(url, model_path) + + return str(model_path) + + def extract_embedding(self, aligned_face: np.ndarray) -> np.ndarray: + start_time = time.time() + original_shape = aligned_face.shape + + recognizer_logger.debug( + "Starting embedding extraction", + input_shape=original_shape, + input_dtype=str(aligned_face.dtype) + ) + + try: + if aligned_face.shape != (112, 112, 3): + aligned_face = cv2.resize(aligned_face, (112, 112)) + recognizer_logger.debug(f"Resized face from {original_shape} to {aligned_face.shape}") + + input_blob = cv2.dnn.blobFromImage( + aligned_face, 1.0 / 255, (112, 112), (0, 0, 0), swapRB=True + ) + + embedding = self.session.run( + [self.output_name], + {self.input_name: input_blob} + )[0] + + embedding = embedding.flatten() + embedding_norm = np.linalg.norm(embedding) + embedding = embedding / embedding_norm + + processing_time = time.time() - start_time + + recognizer_logger.debug( + "Embedding extraction completed", + embedding_shape=embedding.shape, + embedding_norm=float(embedding_norm), + final_norm=float(np.linalg.norm(embedding)), + processing_time_seconds=processing_time + ) + + return embedding + + except Exception as e: + processing_time = time.time() - start_time + recognizer_logger.error( + "Embedding extraction failed", + error=e, + processing_time_seconds=processing_time, + input_shape=original_shape + ) + raise + + def calculate_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float: + cosine_similarity = np.dot(embedding1, embedding2) + return float(cosine_similarity) + + def calculate_distance(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float: + return float(np.linalg.norm(embedding1 - embedding2)) + + def find_matches( + self, + query_embedding: np.ndarray, + database_embeddings: List[Tuple[str, np.ndarray]], + threshold: float = 0.7 + ) -> List[Tuple[str, float]]: + matches = [] + + for face_id, db_embedding in database_embeddings: + similarity = self.calculate_similarity(query_embedding, db_embedding) + if similarity >= threshold: + matches.append((face_id, similarity)) + + matches.sort(key=lambda x: x[1], reverse=True) + return matches \ No newline at end of file diff --git a/src/infrastructure/storage/__init__.py b/src/infrastructure/storage/__init__.py new file mode 100644 index 0000000..4258a1b --- /dev/null +++ b/src/infrastructure/storage/__init__.py @@ -0,0 +1,4 @@ +from .chromadb import ChromaDBVectorStore +from .filesystem import FileSystemStorage + +__all__ = ["ChromaDBVectorStore", "FileSystemStorage"] \ No newline at end of file diff --git a/src/infrastructure/storage/chromadb.py b/src/infrastructure/storage/chromadb.py new file mode 100644 index 0000000..746903f --- /dev/null +++ b/src/infrastructure/storage/chromadb.py @@ -0,0 +1,279 @@ +from typing import List, Optional, Tuple, Dict +import numpy as np +import chromadb +from chromadb.config import Settings as ChromaSettings +import uuid +import time + +from src.domain.repositories import VectorStore +from src.infrastructure.logging import vector_store_logger, data_logger + +class ChromaDBVectorStore(VectorStore): + def __init__(self, host: str = "localhost", port: int = 8000): + import requests + + vector_store_logger.info( + "Initializing ChromaDB vector store", + host=host, + port=port + ) + + # Wait for ChromaDB to be ready + vector_store_logger.info(f"Waiting for ChromaDB at {host}:{port}...") + max_retries = 30 + for i in range(max_retries): + try: + response = requests.get(f"http://{host}:{port}/api/v1/heartbeat") + if response.status_code == 200: + vector_store_logger.info("ChromaDB is ready!") + break + except Exception as e: + vector_store_logger.debug(f"ChromaDB heartbeat check failed (attempt {i+1})", error=e) + pass + if i == max_retries - 1: + vector_store_logger.warning("ChromaDB not responding, attempting to connect anyway...") + time.sleep(1) + + # Initialize ChromaDB client - try different connection methods + connection_successful = False + + # Method 1: Try direct connection without settings + try: + vector_store_logger.debug("Trying direct ChromaDB connection...") + self.client = chromadb.HttpClient(host=host, port=port) + self.collection = self.client.get_or_create_collection(name="face_embeddings") + vector_store_logger.info("Direct ChromaDB connection successful!") + connection_successful = True + except Exception as e: + vector_store_logger.debug("Direct connection failed", error=e) + + # Method 2: Try with basic settings + if not connection_successful: + try: + vector_store_logger.debug("Trying ChromaDB connection with settings...") + settings = chromadb.config.Settings(allow_reset=True) + self.client = chromadb.HttpClient(host=host, port=port, settings=settings) + self.collection = self.client.get_or_create_collection(name="face_embeddings") + vector_store_logger.info("ChromaDB connection with settings successful!") + connection_successful = True + except Exception as e: + vector_store_logger.debug("Connection with settings failed", error=e) + + # Method 3: Try using PersistentClient as fallback + if not connection_successful: + try: + vector_store_logger.debug("Trying persistent client fallback...") + import tempfile + self.client = chromadb.PersistentClient(path=tempfile.mkdtemp()) + self.collection = self.client.get_or_create_collection(name="face_embeddings") + vector_store_logger.warning("Using persistent client fallback!") + connection_successful = True + except Exception as e: + vector_store_logger.error("Persistent client fallback failed", error=e) + + if not connection_successful: + vector_store_logger.critical("Failed to connect to ChromaDB with all methods") + raise Exception("Failed to connect to ChromaDB with all methods") + + # Log successful initialization + collection_info = { + "name": self.collection.name if hasattr(self.collection, 'name') else "face_embeddings", + "client_type": type(self.client).__name__ + } + vector_store_logger.info("ChromaDB vector store initialized successfully", **collection_info) + + async def add(self, id: str, embedding: np.ndarray, metadata: Dict = None) -> bool: + start_time = time.time() + vector_store_logger.debug( + "Adding embedding to vector store", + vector_id=id, + embedding_shape=embedding.shape, + metadata_keys=list(metadata.keys()) if metadata else [] + ) + + try: + self.collection.add( + embeddings=[embedding.tolist()], + metadatas=[metadata or {}], + ids=[id] + ) + + processing_time = time.time() - start_time + data_logger.log_vector_operation("add", id, True, + processing_time_seconds=processing_time, + embedding_dimensions=len(embedding)) + + vector_store_logger.info( + "Embedding added successfully", + vector_id=id, + embedding_dimensions=len(embedding), + metadata_size=len(metadata) if metadata else 0, + processing_time_seconds=processing_time + ) + return True + + except Exception as e: + processing_time = time.time() - start_time + data_logger.log_vector_operation("add", id, False, + processing_time_seconds=processing_time, + error_message=str(e)) + + vector_store_logger.error( + "Failed to add embedding", + vector_id=id, + error=e, + processing_time_seconds=processing_time + ) + return False + + async def search_similar(self, embedding: np.ndarray, top_k: int = 10) -> List[Tuple[str, float, dict]]: + start_time = time.time() + vector_store_logger.debug( + "Searching for similar embeddings", + embedding_shape=embedding.shape, + top_k=top_k + ) + + try: + results = self.collection.query( + query_embeddings=[embedding.tolist()], + n_results=top_k + ) + + similar_faces = [] + if results['ids'][0]: + for i in range(len(results['ids'][0])): + face_id = results['ids'][0][i] + distance = results['distances'][0][i] + metadata = results['metadatas'][0][i] + similarity = 1 - distance + similar_faces.append((face_id, similarity, metadata)) + + vector_store_logger.debug( + f"Similar face found (rank {i+1})", + face_id=face_id, + distance=distance, + similarity=similarity, + user_id=metadata.get('user_id', 'unknown') + ) + + processing_time = time.time() - start_time + data_logger.log_vector_operation("search", f"query_top_{top_k}", True, + processing_time_seconds=processing_time, + results_found=len(similar_faces), + search_top_k=top_k) + + vector_store_logger.info( + "Similarity search completed", + query_embedding_shape=embedding.shape, + top_k_requested=top_k, + results_found=len(similar_faces), + processing_time_seconds=processing_time, + best_similarity=similar_faces[0][1] if similar_faces else 0.0 + ) + + return similar_faces + + except Exception as e: + processing_time = time.time() - start_time + data_logger.log_vector_operation("search", f"query_top_{top_k}", False, + processing_time_seconds=processing_time, + error_message=str(e)) + + vector_store_logger.error( + "Similarity search failed", + embedding_shape=embedding.shape, + top_k=top_k, + error=e, + processing_time_seconds=processing_time + ) + raise + + async def get_embedding(self, embedding_id: str) -> Optional[Tuple[np.ndarray, dict]]: + start_time = time.time() + vector_store_logger.debug("Retrieving embedding by ID", embedding_id=embedding_id) + + try: + results = self.collection.get( + ids=[embedding_id], + include=['embeddings', 'metadatas'] + ) + + processing_time = time.time() - start_time + + if results['ids']: + embedding = np.array(results['embeddings'][0]) + metadata = results['metadatas'][0] + + data_logger.log_vector_operation("get", embedding_id, True, + processing_time_seconds=processing_time, + embedding_dimensions=len(embedding)) + + vector_store_logger.info( + "Embedding retrieved successfully", + embedding_id=embedding_id, + embedding_shape=embedding.shape, + metadata_keys=list(metadata.keys()) if metadata else [], + processing_time_seconds=processing_time + ) + return embedding, metadata + else: + data_logger.log_vector_operation("get", embedding_id, False, + processing_time_seconds=processing_time, + error_message="Embedding not found") + + vector_store_logger.warning( + "Embedding not found", + embedding_id=embedding_id, + processing_time_seconds=processing_time + ) + return None + + except Exception as e: + processing_time = time.time() - start_time + data_logger.log_vector_operation("get", embedding_id, False, + processing_time_seconds=processing_time, + error_message=str(e)) + + vector_store_logger.error( + "Failed to retrieve embedding", + embedding_id=embedding_id, + error=e, + processing_time_seconds=processing_time + ) + raise + + async def delete(self, id: str) -> bool: + start_time = time.time() + vector_store_logger.debug("Deleting embedding", embedding_id=id) + + try: + self.collection.delete(ids=[id]) + + processing_time = time.time() - start_time + data_logger.log_vector_operation("delete", id, True, + processing_time_seconds=processing_time) + + vector_store_logger.info( + "Embedding deleted successfully", + embedding_id=id, + processing_time_seconds=processing_time + ) + return True + + except Exception as e: + processing_time = time.time() - start_time + data_logger.log_vector_operation("delete", id, False, + processing_time_seconds=processing_time, + error_message=str(e)) + + vector_store_logger.error( + "Failed to delete embedding", + embedding_id=id, + error=e, + processing_time_seconds=processing_time + ) + return False + + async def delete_embedding(self, embedding_id: str) -> bool: + return await self.delete(embedding_id) \ No newline at end of file diff --git a/src/infrastructure/storage/filesystem.py b/src/infrastructure/storage/filesystem.py new file mode 100644 index 0000000..982be57 --- /dev/null +++ b/src/infrastructure/storage/filesystem.py @@ -0,0 +1,42 @@ +from pathlib import Path +import shutil +import uuid +from datetime import datetime +import cv2 +import numpy as np + +class FileSystemStorage: + def __init__(self, base_path: str): + self.base_path = Path(base_path) + self.base_path.mkdir(parents=True, exist_ok=True) + + async def save_image(self, image: np.ndarray, user_id: str) -> str: + user_dir = self.base_path / user_id + user_dir.mkdir(exist_ok=True) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{timestamp}_{uuid.uuid4().hex[:8]}.jpg" + file_path = user_dir / filename + + cv2.imwrite(str(file_path), cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) + return str(file_path) + + async def load_image(self, file_path: str) -> np.ndarray: + image = cv2.imread(file_path) + return cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + + async def delete_image(self, file_path: str) -> bool: + try: + Path(file_path).unlink() + return True + except Exception: + return False + + async def delete_user_directory(self, user_id: str) -> bool: + try: + user_dir = self.base_path / user_id + if user_dir.exists(): + shutil.rmtree(user_dir) + return True + except Exception: + return False \ No newline at end of file diff --git a/src/presentation/__init__.py b/src/presentation/__init__.py new file mode 100644 index 0000000..7525d34 --- /dev/null +++ b/src/presentation/__init__.py @@ -0,0 +1,3 @@ +from .gradio_app import create_gradio_interface + +__all__ = ["create_gradio_interface"] \ No newline at end of file diff --git a/src/presentation/gradio_app.py b/src/presentation/gradio_app.py new file mode 100644 index 0000000..f177b76 --- /dev/null +++ b/src/presentation/gradio_app.py @@ -0,0 +1,353 @@ +import gradio as gr +import numpy as np +from typing import List, Optional +import asyncio +from PIL import Image +import cv2 + +from src.application.use_cases import ( + RegisterFaceUseCase, + VerifyFaceUseCase, + IdentifyFaceUseCase +) +from src.domain.entities import OperationType + +def create_gradio_interface( + register_use_case: RegisterFaceUseCase, + verify_use_case: VerifyFaceUseCase, + identify_use_case: IdentifyFaceUseCase +): + async def process_registration(user_name: str, images): + if not user_name: + return "Error: Please provide a user name" + + if not images: + return "Error: Please upload at least one image" + + np_images = [] + for img_file in images: + if hasattr(img_file, 'name'): + img = Image.open(img_file.name) + np_images.append(np.array(img)) + else: + np_images.append(np.array(img_file)) + + success, message = await register_use_case.execute(user_name, np_images) + + return message + + async def process_recognition(operation_type: str, user_name: Optional[str], image: Image.Image): + if not image: + return "Error: Please upload an image" + + np_image = np.array(image) + + if operation_type == OperationType.VERIFICATION.value: + if not user_name: + return "Error: Please provide a user name for verification" + + result = await verify_use_case.execute(user_name, np_image) + + if result.is_verified: + return f"✅ Verified as {user_name}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s" + else: + return f"❌ Not verified as {user_name}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s" + + else: + result = await identify_use_case.execute(np_image) + + if result.is_identified: + candidates_str = "\n".join([f" - {name}: {conf:.2%}" for name, conf in result.candidates[:5]]) + return f"✅ Identified as: {result.user_id}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s\n\nTop candidates:\n{candidates_str}" + else: + candidates_str = "\n".join([f" - {name}: {conf:.2%}" for name, conf in result.candidates[:5]]) + return f"❌ Person not identified\nBest match confidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s\n\nTop candidates:\n{candidates_str}" + + def sync_process_registration(user_name, images): + return asyncio.run(process_registration(user_name, images)) + + def sync_process_recognition(operation_type, user_name, image): + return asyncio.run(process_recognition(operation_type, user_name, image)) + + # Webcam streaming functions + def process_webcam_frame(frame, operation_type, user_name): + if frame is None: + return frame + + # Convert frame to PIL Image for processing + image = Image.fromarray(frame) + + try: + if operation_type == OperationType.VERIFICATION.value and user_name: + result = asyncio.run(verify_use_case.execute(user_name, np.array(image))) + + # Draw result on frame + cv2.putText(frame, f"Verification: {'✓' if result.is_verified else '✗'}", + (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, + (0, 255, 0) if result.is_verified else (0, 0, 255), 2) + cv2.putText(frame, f"Confidence: {result.confidence:.2%}", + (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) + + elif operation_type == OperationType.IDENTIFICATION.value: + result = asyncio.run(identify_use_case.execute(np.array(image))) + + # Draw result on frame + if result.is_identified: + cv2.putText(frame, f"Identified: {result.user_id}", + (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) + cv2.putText(frame, f"Confidence: {result.confidence:.2%}", + (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) + else: + cv2.putText(frame, "No match found", + (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) + + except Exception as e: + cv2.putText(frame, f"Error: {str(e)[:50]}", + (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) + + return frame + + # Webcam registration capture + captured_images = [] + + def capture_image_for_registration(frame): + if frame is not None: + captured_images.append(frame.copy()) + return f"Captured {len(captured_images)} images" + return "No frame to capture" + + def register_from_captures(user_name): + if not user_name: + return "Error: Please provide a user name" + + if len(captured_images) == 0: + return "Error: No images captured. Please use webcam to capture images first." + + try: + # Convert captured frames to numpy arrays + np_images = [np.array(img) for img in captured_images] + success, message = asyncio.run(register_use_case.execute(user_name, np_images)) + + # Clear captured images after registration + captured_images.clear() + return message + except Exception as e: + return f"Error during registration: {str(e)}" + + def clear_captured_images(): + captured_images.clear() + return "Cleared all captured images" + + # CSS for better layout + css = """ + .webcam-container { + max-width: 640px !important; + margin: 0 auto; + } + .capture-info { + text-align: center; + font-weight: bold; + color: #007bff; + } + """ + + with gr.Blocks(title="Face Recognition System", css=css) as interface: + gr.Markdown("# Face Recognition System") + gr.Markdown("Advanced face recognition using RetinaFace detector and SFace recognizer") + + with gr.Tabs(): + # File Upload Registration Tab + with gr.TabItem("📁 File Registration"): + gr.Markdown("## Register New User - Upload Files") + gr.Markdown("Upload multiple images for better recognition accuracy") + + with gr.Row(): + reg_user_name = gr.Textbox( + label="User Name", + placeholder="Enter user name..." + ) + reg_images = gr.File( + label="Upload Face Images", + file_count="multiple", + file_types=["image"] + ) + + reg_button = gr.Button("Register User", variant="primary") + reg_output = gr.Textbox(label="Registration Result", lines=3) + + reg_button.click( + fn=sync_process_registration, + inputs=[reg_user_name, reg_images], + outputs=reg_output + ) + + # Webcam Registration Tab + with gr.TabItem("📷 Webcam Registration"): + gr.Markdown("## Register New User - Using Webcam") + gr.Markdown("Capture multiple images from webcam for better recognition accuracy") + + with gr.Row(): + with gr.Column(): + webcam_reg_user_name = gr.Textbox( + label="User Name", + placeholder="Enter user name..." + ) + webcam_reg_image = gr.Image( + label="Webcam Feed", + sources=["webcam"], + type="numpy", + elem_classes=["webcam-container"] + ) + + with gr.Row(): + capture_btn = gr.Button("📸 Capture Image", variant="secondary") + clear_btn = gr.Button("🗑️ Clear Captures", variant="secondary") + register_webcam_btn = gr.Button("✅ Register User", variant="primary") + + with gr.Column(): + capture_status = gr.Textbox( + label="Capture Status", + value="Ready to capture images", + elem_classes=["capture-info"] + ) + webcam_reg_output = gr.Textbox(label="Registration Result", lines=8) + + capture_btn.click( + fn=capture_image_for_registration, + inputs=[webcam_reg_image], + outputs=[capture_status] + ) + + clear_btn.click( + fn=clear_captured_images, + outputs=[capture_status] + ) + + register_webcam_btn.click( + fn=register_from_captures, + inputs=[webcam_reg_user_name], + outputs=[webcam_reg_output] + ) + + # File Upload Recognition Tab + with gr.TabItem("📁 File Recognition"): + gr.Markdown("## Face Recognition - Upload Image") + gr.Markdown("Choose between verification (1:1) or identification (1:N)") + + with gr.Row(): + operation_type = gr.Radio( + choices=[OperationType.VERIFICATION.value, OperationType.IDENTIFICATION.value], + value=OperationType.VERIFICATION.value, + label="Operation Type" + ) + rec_user_name = gr.Textbox( + label="User Name (for verification)", + placeholder="Enter user name for verification...", + visible=True + ) + + rec_image = gr.Image( + label="Upload Face Image", + type="pil" + ) + + rec_button = gr.Button("Process", variant="primary") + rec_output = gr.Textbox(label="Recognition Result", lines=8) + + def update_user_name_visibility(operation): + return gr.update(visible=operation == OperationType.VERIFICATION.value) + + operation_type.change( + fn=update_user_name_visibility, + inputs=operation_type, + outputs=rec_user_name + ) + + rec_button.click( + fn=sync_process_recognition, + inputs=[operation_type, rec_user_name, rec_image], + outputs=rec_output + ) + + # Webcam Recognition Tab + with gr.TabItem("📷 Live Recognition"): + gr.Markdown("## Live Face Recognition - Webcam Stream") + gr.Markdown("Real-time face verification or identification using webcam") + + with gr.Row(): + with gr.Column(): + stream_operation_type = gr.Radio( + choices=[OperationType.VERIFICATION.value, OperationType.IDENTIFICATION.value], + value=OperationType.IDENTIFICATION.value, + label="Operation Type" + ) + stream_user_name = gr.Textbox( + label="User Name (for verification)", + placeholder="Enter user name for verification...", + visible=False + ) + + webcam_stream = gr.Image( + label="Live Webcam Feed", + sources=["webcam"], + type="numpy", + streaming=True, + elem_classes=["webcam-container"] + ) + + with gr.Column(): + gr.Markdown("### Instructions:") + gr.Markdown("- For **Identification**: The system will try to identify any face in the camera") + gr.Markdown("- For **Verification**: Enter a username to verify if the face matches that user") + gr.Markdown("- Results are displayed directly on the video feed") + + def update_stream_user_name_visibility(operation): + return gr.update(visible=operation == OperationType.VERIFICATION.value) + + stream_operation_type.change( + fn=update_stream_user_name_visibility, + inputs=stream_operation_type, + outputs=stream_user_name + ) + + # Note: Streaming removed due to Gradio version compatibility + # Use the capture button approach instead + stream_button = gr.Button("🔄 Process Current Frame", variant="primary") + stream_output = gr.Textbox(label="Recognition Result", lines=4) + + def process_current_frame(image, operation_type, user_name): + if image is None: + return "No image captured from webcam" + + try: + pil_image = Image.fromarray(image) + np_image = np.array(pil_image) + + if operation_type == OperationType.VERIFICATION.value and user_name: + result = asyncio.run(verify_use_case.execute(user_name, np_image)) + if result.is_verified: + return f"✅ Verified as {user_name}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s" + else: + return f"❌ Not verified as {user_name}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s" + + elif operation_type == OperationType.IDENTIFICATION.value: + result = asyncio.run(identify_use_case.execute(np_image)) + if result.is_identified: + candidates_str = "\n".join([f" - {name}: {conf:.2%}" for name, conf in result.candidates[:5]]) + return f"✅ Identified as: {result.user_id}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s\n\nTop candidates:\n{candidates_str}" + else: + candidates_str = "\n".join([f" - {name}: {conf:.2%}" for name, conf in result.candidates[:5]]) + return f"❌ Person not identified\nBest match confidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s\n\nTop candidates:\n{candidates_str}" + else: + return "Please select operation type and enter username for verification" + + except Exception as e: + return f"Error processing image: {str(e)}" + + stream_button.click( + fn=process_current_frame, + inputs=[webcam_stream, stream_operation_type, stream_user_name], + outputs=[stream_output] + ) + + return interface \ No newline at end of file