master
Artem-Darius Weber 5 months ago
commit 32a8496c3f

@ -0,0 +1,6 @@
MONGODB_URL=mongodb://admin:admin123@localhost:27017/face_recognition?authSource=admin
MONGODB_DB=face_recognition
CHROMA_HOST=localhost
CHROMA_PORT=8000
UPLOAD_DIR=/app/data/uploads
GPU_ENABLED=true

2
.gitignore vendored

@ -0,0 +1,2 @@
data/
models/

@ -0,0 +1,29 @@
FROM nvidia/cuda:11.8.0-cudnn8-runtime-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1
RUN apt-get update && apt-get install -y \
python3.10 \
python3-pip \
python3-opencv \
libglib2.0-0 \
libsm6 \
libxext6 \
libxrender-dev \
libgomp1 \
wget \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
COPY . .
RUN mkdir -p /app/data/uploads /app/data/chroma /app/models
EXPOSE 7860
CMD ["python3", "main.py"]

@ -0,0 +1,71 @@
version: '3.8'
services:
mongodb:
image: mongo:7.0
container_name: face_recognition_mongo
restart: always
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin123
MONGO_INITDB_DATABASE: face_recognition
networks:
- face_recognition_net
chromadb:
image: chromadb/chroma:latest
container_name: face_recognition_chroma
restart: always
ports:
- "8000:8000"
volumes:
- chroma_data:/chroma/chroma
environment:
- IS_PERSISTENT=TRUE
- PERSIST_DIRECTORY=/chroma/chroma
- ANONYMIZED_TELEMETRY=FALSE
networks:
- face_recognition_net
face_recognition:
build:
context: .
dockerfile: Dockerfile
container_name: face_recognition_app
restart: always
ports:
- "7860:7860"
volumes:
- ./data:/app/data
- ./models:/app/models
environment:
- MONGODB_URL=mongodb://admin:admin123@mongodb:27017/face_recognition?authSource=admin
- MONGODB_DB=face_recognition
- CHROMA_HOST=chromadb
- CHROMA_PORT=8000
- UPLOAD_DIR=/app/data/uploads
- NVIDIA_VISIBLE_DEVICES=all
depends_on:
- mongodb
- chromadb
networks:
- face_recognition_net
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
mongo_data:
chroma_data:
networks:
face_recognition_net:
driver: bridge

@ -0,0 +1,55 @@
from pathlib import Path
import gradio as gr
import numpy as np
from typing import Optional, Tuple, List
import cv2
from datetime import datetime
import asyncio
from concurrent.futures import ThreadPoolExecutor
from src.domain.entities import Face, VerificationResult, IdentificationResult
from src.domain.repositories import FaceRepository
from src.infrastructure.database.mongodb import MongoDBFaceRepository
from src.infrastructure.storage.chromadb import ChromaDBVectorStore
from src.infrastructure.storage.filesystem import FileSystemStorage
from src.application.use_cases import (
RegisterFaceUseCase,
VerifyFaceUseCase,
IdentifyFaceUseCase
)
from src.application.services import FaceRecognitionService
from src.infrastructure.ml.detectors import RetinaFaceDetector
from src.infrastructure.ml.recognizers import SFaceRecognizer
from src.presentation.gradio_app import create_gradio_interface
from src.infrastructure.config import Settings
settings = Settings()
async def initialize_app():
face_repository = MongoDBFaceRepository(settings.MONGODB_URL, settings.MONGODB_DB)
vector_store = ChromaDBVectorStore(settings.CHROMA_HOST, settings.CHROMA_PORT)
file_storage = FileSystemStorage(settings.UPLOAD_DIR)
detector = RetinaFaceDetector()
recognizer = SFaceRecognizer()
face_service = FaceRecognitionService(
face_repository=face_repository,
vector_store=vector_store,
verification_threshold=settings.VERIFICATION_THRESHOLD,
identification_threshold=settings.IDENTIFICATION_THRESHOLD
)
register_use_case = RegisterFaceUseCase(face_service)
verify_use_case = VerifyFaceUseCase(face_service)
identify_use_case = IdentifyFaceUseCase(face_service)
return create_gradio_interface(
register_use_case=register_use_case,
verify_use_case=verify_use_case,
identify_use_case=identify_use_case
)
if __name__ == "__main__":
app = asyncio.run(initialize_app())
app.launch(server_name="0.0.0.0", server_port=7860)

@ -0,0 +1,14 @@
gradio==4.16.0
numpy==1.24.3
opencv-python==4.8.1.78
Pillow==10.2.0
motor==3.3.2
pymongo==4.6.1
chromadb==0.4.22
insightface==0.7.3
onnxruntime-gpu==1.16.3
pydantic>=2.0.0,<3.0.0
pydantic-settings>=2.0.0
python-multipart==0.0.6
uvicorn==0.27.0
fastapi==0.109.0

@ -0,0 +1 @@
__version__ = "1.0.0"

@ -0,0 +1,7 @@
from .services import FaceRecognitionService
from .use_cases import RegisterFaceUseCase, VerifyFaceUseCase, IdentifyFaceUseCase
__all__ = [
"FaceRecognitionService",
"RegisterFaceUseCase", "VerifyFaceUseCase", "IdentifyFaceUseCase"
]

@ -0,0 +1,440 @@
from typing import List, Optional, Tuple
import numpy as np
import uuid
from datetime import datetime
import time
from src.domain.entities import Face, User
from src.domain.repositories import FaceRepository, VectorStore
from src.infrastructure.ml.detectors import RetinaFaceDetector
from src.infrastructure.ml.recognizers import SFaceRecognizer
from src.infrastructure.storage.chromadb import ChromaDBVectorStore
from src.infrastructure.storage.filesystem import FileSystemStorage
from src.infrastructure.logging import service_logger, data_logger
class FaceRecognitionService:
def __init__(
self,
face_repository: FaceRepository,
vector_store: Optional[VectorStore] = None,
verification_threshold: float = 0.6,
identification_threshold: float = 0.5
):
service_logger.info(
"Initializing Face Recognition Service",
verification_threshold=verification_threshold,
identification_threshold=identification_threshold
)
try:
self.face_repository = face_repository
self.vector_store = vector_store or ChromaDBVectorStore()
self.detector = RetinaFaceDetector()
self.recognizer = SFaceRecognizer()
self.file_storage = FileSystemStorage("/app/data/uploads")
self.verification_threshold = verification_threshold
self.identification_threshold = identification_threshold
service_logger.info(
"Face Recognition Service initialized successfully",
components_loaded=["detector", "recognizer", "vector_store", "file_storage", "face_repository"]
)
except Exception as e:
service_logger.error("Failed to initialize Face Recognition Service", error=e)
raise
async def register_face(self, user_id: str, image: np.ndarray) -> Optional[str]:
start_time = time.time()
service_logger.info(
"Starting face registration",
user_id=user_id,
image_shape=image.shape
)
try:
# Step 1: Detect faces
detected_faces = self.detector.detect_faces(image)
if not detected_faces:
service_logger.warning(
"No faces detected in image",
user_id=user_id,
processing_time_seconds=time.time() - start_time
)
return None
service_logger.debug(
f"Detected {len(detected_faces)} faces, selecting best one",
user_id=user_id,
faces_count=len(detected_faces)
)
# Step 2: Select best face
best_face = max(detected_faces, key=lambda x: x.get('det_score', 0))
# Step 3: Check quality
quality_score = self.detector.calculate_quality_score(best_face)
if quality_score < 0.5:
service_logger.warning(
"Face quality too low for registration",
user_id=user_id,
quality_score=quality_score,
quality_threshold=0.5,
processing_time_seconds=time.time() - start_time
)
return None
service_logger.debug(
"Face quality check passed",
user_id=user_id,
quality_score=quality_score
)
# Step 4: Align face
if best_face.get('landmarks') is not None:
aligned_face = self.detector.align_face(image, np.array(best_face['landmarks']))
service_logger.debug("Face aligned using landmarks", user_id=user_id)
else:
aligned_face = self.detector._crop_face(image, best_face['bbox'])
service_logger.debug("Face cropped using bbox", user_id=user_id)
# Step 5: Extract embedding
embedding = self.recognizer.extract_embedding(aligned_face)
# Step 6: Generate unique ID and save image
face_id = str(uuid.uuid4())
image_path = await self.file_storage.save_image(aligned_face, user_id)
service_logger.debug(
"Face image saved",
user_id=user_id,
face_id=face_id,
image_path=image_path
)
# Step 7: Create Face entity
face = Face(
id=face_id,
user_id=user_id,
image_path=image_path,
embedding=embedding,
created_at=datetime.utcnow(),
quality_score=quality_score,
bbox=best_face['bbox'],
landmarks=best_face.get('landmarks')
)
# Step 8: Save to repository
await self.face_repository.save(face)
service_logger.debug("Face saved to database", user_id=user_id, face_id=face_id)
# Step 9: Add to vector store
metadata = {
'user_id': user_id,
'face_id': face_id,
'quality_score': quality_score
}
await self.vector_store.add(face_id, embedding, metadata)
service_logger.debug("Face embedding added to vector store", face_id=face_id)
# Log successful registration
processing_time = time.time() - start_time
data_logger.log_face_registration(user_id, face_id, quality_score, len(embedding))
service_logger.info(
"Face registration completed successfully",
user_id=user_id,
face_id=face_id,
quality_score=quality_score,
embedding_dimensions=len(embedding),
processing_time_seconds=processing_time
)
return face_id
except Exception as e:
processing_time = time.time() - start_time
service_logger.error(
"Face registration failed",
user_id=user_id,
error=e,
processing_time_seconds=processing_time
)
raise
async def verify_face(self, user_id: str, image: np.ndarray) -> Tuple[bool, float]:
start_time = time.time()
service_logger.info(
"Starting face verification",
user_id=user_id,
image_shape=image.shape,
verification_threshold=self.verification_threshold
)
try:
# Step 1: Detect faces
detected_faces = self.detector.detect_faces(image)
if not detected_faces:
processing_time = time.time() - start_time
service_logger.warning(
"No faces detected for verification",
user_id=user_id,
processing_time_seconds=processing_time
)
data_logger.log_face_verification(user_id, False, 0.0, self.verification_threshold, processing_time)
return False, 0.0
service_logger.debug(
f"Detected {len(detected_faces)} faces for verification",
user_id=user_id,
faces_count=len(detected_faces)
)
# Step 2: Select best face
best_face = max(detected_faces, key=lambda x: x.get('det_score', 0))
service_logger.debug(
"Selected best face for verification",
user_id=user_id,
detection_score=best_face.get('det_score', 0)
)
# Step 3: Align face
if best_face.get('landmarks') is not None:
aligned_face = self.detector.align_face(image, np.array(best_face['landmarks']))
service_logger.debug("Face aligned using landmarks for verification", user_id=user_id)
else:
aligned_face = self.detector._crop_face(image, best_face['bbox'])
service_logger.debug("Face cropped using bbox for verification", user_id=user_id)
# Step 4: Extract embedding
query_embedding = self.recognizer.extract_embedding(aligned_face)
service_logger.debug(
"Query embedding extracted for verification",
user_id=user_id,
embedding_shape=query_embedding.shape
)
# Step 5: Get user's registered faces
user_faces = await self.face_repository.get_by_user_id(user_id)
if not user_faces:
processing_time = time.time() - start_time
service_logger.warning(
"No registered faces found for user",
user_id=user_id,
processing_time_seconds=processing_time
)
data_logger.log_face_verification(user_id, False, 0.0, self.verification_threshold, processing_time)
return False, 0.0
service_logger.debug(
f"Found {len(user_faces)} registered faces for user",
user_id=user_id,
registered_faces_count=len(user_faces)
)
# Step 6: Calculate similarities with all user faces
max_similarity = 0.0
similarities = []
for i, face in enumerate(user_faces):
similarity = self.recognizer.calculate_similarity(query_embedding, face.embedding)
similarities.append(similarity)
max_similarity = max(max_similarity, similarity)
service_logger.debug(
f"Similarity calculated with face {i+1}",
user_id=user_id,
face_id=face.id,
similarity=similarity,
quality_score=face.quality_score
)
# Step 7: Make verification decision
is_verified = max_similarity >= self.verification_threshold
processing_time = time.time() - start_time
# Log verification result
data_logger.log_face_verification(user_id, is_verified, max_similarity, self.verification_threshold, processing_time)
service_logger.info(
"Face verification completed",
user_id=user_id,
verification_result=is_verified,
max_similarity=max_similarity,
verification_threshold=self.verification_threshold,
registered_faces_compared=len(user_faces),
all_similarities=similarities,
processing_time_seconds=processing_time
)
return is_verified, max_similarity
except Exception as e:
processing_time = time.time() - start_time
service_logger.error(
"Face verification failed",
user_id=user_id,
error=e,
processing_time_seconds=processing_time
)
raise
async def identify_face(self, image: np.ndarray) -> Tuple[Optional[str], float, List[Tuple[str, float]]]:
start_time = time.time()
service_logger.info(
"Starting face identification",
image_shape=image.shape,
identification_threshold=self.identification_threshold
)
try:
# Step 1: Detect faces
detected_faces = self.detector.detect_faces(image)
if not detected_faces:
processing_time = time.time() - start_time
service_logger.warning(
"No faces detected for identification",
processing_time_seconds=processing_time
)
data_logger.log_face_identification(None, 0.0, 0, processing_time)
return None, 0.0, []
service_logger.debug(
f"Detected {len(detected_faces)} faces for identification",
faces_count=len(detected_faces)
)
# Step 2: Select best face
best_face = max(detected_faces, key=lambda x: x.get('det_score', 0))
service_logger.debug(
"Selected best face for identification",
detection_score=best_face.get('det_score', 0)
)
# Step 3: Align face
if best_face.get('landmarks') is not None:
aligned_face = self.detector.align_face(image, np.array(best_face['landmarks']))
service_logger.debug("Face aligned using landmarks for identification")
else:
aligned_face = self.detector._crop_face(image, best_face['bbox'])
service_logger.debug("Face cropped using bbox for identification")
# Step 4: Extract embedding
query_embedding = self.recognizer.extract_embedding(aligned_face)
service_logger.debug(
"Query embedding extracted for identification",
embedding_shape=query_embedding.shape
)
# Step 5: Search similar faces in vector store
similar_faces = await self.vector_store.search_similar(query_embedding, top_k=10)
service_logger.debug(
f"Vector search completed, found {len(similar_faces)} similar faces",
similar_faces_count=len(similar_faces),
search_top_k=10
)
# Log detailed similarity results
for i, (face_id, similarity, metadata) in enumerate(similar_faces):
service_logger.debug(
f"Similar face {i+1} found",
face_id=face_id,
similarity=similarity,
user_id=metadata.get('user_id', 'unknown'),
quality_score=metadata.get('quality_score', 'unknown')
)
# Step 6: Filter candidates by threshold and group by user
candidates = []
user_scores = {}
valid_matches = 0
for face_id, similarity, metadata in similar_faces:
if similarity >= self.identification_threshold:
valid_matches += 1
user_id = metadata.get('user_id')
if user_id:
if user_id not in user_scores:
user_scores[user_id] = []
user_scores[user_id].append(similarity)
service_logger.debug(
"Valid match found",
face_id=face_id,
user_id=user_id,
similarity=similarity,
threshold=self.identification_threshold
)
service_logger.debug(
f"Found {valid_matches} matches above threshold",
valid_matches=valid_matches,
identification_threshold=self.identification_threshold,
unique_users=len(user_scores)
)
# Step 7: Calculate best score per user
for user_id, scores in user_scores.items():
max_score = max(scores)
avg_score = sum(scores) / len(scores)
candidates.append((user_id, max_score))
service_logger.debug(
"User candidate scores calculated",
user_id=user_id,
max_score=max_score,
avg_score=avg_score,
face_matches=len(scores),
all_scores=scores
)
# Step 8: Sort candidates by confidence
candidates.sort(key=lambda x: x[1], reverse=True)
# Step 9: Determine final result
best_user_id = candidates[0][0] if candidates else None
best_confidence = candidates[0][1] if candidates else 0.0
processing_time = time.time() - start_time
# Log identification result
data_logger.log_face_identification(best_user_id, best_confidence, len(candidates), processing_time)
service_logger.info(
"Face identification completed",
identified_user_id=best_user_id,
confidence=best_confidence,
identification_threshold=self.identification_threshold,
candidates_count=len(candidates),
all_candidates=candidates[:5], # Log top 5 candidates
vector_matches=len(similar_faces),
valid_matches=valid_matches,
processing_time_seconds=processing_time
)
return best_user_id, best_confidence, candidates
except Exception as e:
processing_time = time.time() - start_time
service_logger.error(
"Face identification failed",
error=e,
processing_time_seconds=processing_time
)
raise
async def update_user_mean_embedding(self, user_id: str) -> Optional[np.ndarray]:
user_faces = await self.face_repository.get_by_user_id(user_id)
if not user_faces:
return None
embeddings = [face.embedding for face in user_faces]
mean_embedding = np.mean(embeddings, axis=0)
mean_embedding = mean_embedding / np.linalg.norm(mean_embedding)
return mean_embedding

@ -0,0 +1,122 @@
from typing import Optional, List, Tuple
import numpy as np
from datetime import datetime
from src.domain.entities import User, VerificationResult, IdentificationResult
from src.application.services import FaceRecognitionService
from src.infrastructure.database.mongodb import MongoDBUserRepository
class RegisterFaceUseCase:
def __init__(self, face_service: FaceRecognitionService):
self.face_service = face_service
from src.infrastructure.config import Settings
settings = Settings()
self.user_repository = MongoDBUserRepository(
settings.MONGODB_URL,
settings.MONGODB_DB
)
async def execute(self, user_name: str, images: List[np.ndarray]) -> Tuple[bool, str]:
user = await self.user_repository.get_user_by_name(user_name)
if not user:
user = User(
id="",
name=user_name,
created_at=datetime.utcnow(),
face_ids=[]
)
user_id = await self.user_repository.save(user)
user.id = user_id
else:
user_id = user.id
face_ids = []
for image in images:
face_id = await self.face_service.register_face(user_id, image)
if face_id:
face_ids.append(face_id)
if not face_ids:
return False, "No valid faces detected in the provided images"
user.face_ids.extend(face_ids)
mean_embedding = await self.face_service.update_user_mean_embedding(user_id)
user.mean_embedding = mean_embedding
await self.user_repository.update_user(user)
return True, f"Successfully registered {len(face_ids)} face(s) for user {user_name}"
class VerifyFaceUseCase:
def __init__(self, face_service: FaceRecognitionService):
self.face_service = face_service
from src.infrastructure.config import Settings
settings = Settings()
self.user_repository = MongoDBUserRepository(
settings.MONGODB_URL,
settings.MONGODB_DB
)
async def execute(self, user_name: str, image: np.ndarray) -> VerificationResult:
import time
start_time = time.time()
user = await self.user_repository.get_user_by_name(user_name)
if not user:
return VerificationResult(
is_verified=False,
confidence=0.0,
user_id="",
face_id="",
threshold=self.face_service.verification_threshold,
processing_time=time.time() - start_time
)
is_verified, confidence = await self.face_service.verify_face(user.id, image)
return VerificationResult(
is_verified=is_verified,
confidence=confidence,
user_id=user.id,
face_id="",
threshold=self.face_service.verification_threshold,
processing_time=time.time() - start_time
)
class IdentifyFaceUseCase:
def __init__(self, face_service: FaceRecognitionService):
self.face_service = face_service
from src.infrastructure.config import Settings
settings = Settings()
self.user_repository = MongoDBUserRepository(
settings.MONGODB_URL,
settings.MONGODB_DB
)
async def execute(self, image: np.ndarray) -> IdentificationResult:
import time
start_time = time.time()
user_id, confidence, candidates = await self.face_service.identify_face(image)
candidate_names = []
for candidate_user_id, candidate_confidence in candidates:
user = await self.user_repository.get_by_id(candidate_user_id)
if user:
candidate_names.append((user.name, candidate_confidence))
identified_user_name = None
if user_id:
user = await self.user_repository.get_by_id(user_id)
if user:
identified_user_name = user.name
return IdentificationResult(
is_identified=user_id is not None,
user_id=identified_user_name,
confidence=confidence,
candidates=candidate_names,
threshold=self.face_service.identification_threshold,
processing_time=time.time() - start_time
)

@ -0,0 +1,7 @@
from .entities import Face, User, VerificationResult, IdentificationResult, ProcessedFace, OperationType
from .repositories import FaceRepository, UserRepository, VectorStore
__all__ = [
"Face", "User", "VerificationResult", "IdentificationResult", "ProcessedFace", "OperationType",
"FaceRepository", "UserRepository", "VectorStore"
]

@ -0,0 +1,54 @@
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Tuple
import numpy as np
from enum import Enum
from pydantic import BaseModel
class OperationType(str, Enum):
REGISTER = "register"
VERIFICATION = "verify"
IDENTIFICATION = "identify"
@dataclass
class Face:
id: str
user_id: str
image_path: str
embedding: np.ndarray
created_at: datetime
quality_score: float
bbox: List[float]
landmarks: Optional[List[List[float]]] = None
@dataclass
class User:
id: str
name: str
created_at: datetime
face_ids: List[str]
mean_embedding: Optional[np.ndarray] = None
@dataclass
class VerificationResult:
is_verified: bool
confidence: float
user_id: str
face_id: str
threshold: float
processing_time: float
@dataclass
class IdentificationResult:
is_identified: bool
user_id: Optional[str]
confidence: float
candidates: List[Tuple[str, float]]
threshold: float
processing_time: float
class ProcessedFace(BaseModel):
bbox: List[float] # [x1, y1, x2, y2]
landmarks: List[List[float]] # [[x1, y1], [x2, y2], ...]
quality_score: float
embedding: List[float]

@ -0,0 +1,48 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Tuple, Dict
import numpy as np
from .entities import Face, User
class FaceRepository(ABC):
@abstractmethod
async def save(self, face: Face) -> str:
pass
@abstractmethod
async def get_by_id(self, face_id: str) -> Optional[Face]:
pass
@abstractmethod
async def get_by_user_id(self, user_id: str) -> List[Face]:
pass
@abstractmethod
async def delete(self, face_id: str) -> bool:
pass
class UserRepository(ABC):
@abstractmethod
async def save(self, user: User) -> str:
pass
@abstractmethod
async def get_by_id(self, user_id: str) -> Optional[User]:
pass
@abstractmethod
async def delete(self, user_id: str) -> bool:
pass
class VectorStore(ABC):
@abstractmethod
async def add(self, id: str, embedding: np.ndarray, metadata: Dict = None) -> bool:
pass
@abstractmethod
async def search_similar(self, embedding: np.ndarray, top_k: int = 10) -> List[Tuple[str, float, dict]]:
pass
@abstractmethod
async def delete(self, id: str) -> bool:
pass

@ -0,0 +1,3 @@
from .config import Settings
__all__ = ["Settings"]

@ -0,0 +1,22 @@
from pydantic_settings import BaseSettings
from pathlib import Path
class Settings(BaseSettings):
MONGODB_URL: str = "mongodb://mongo:27017"
MONGODB_DB: str = "face_recognition"
CHROMA_HOST: str = "chromadb"
CHROMA_PORT: int = 8000
UPLOAD_DIR: str = "/app/data/uploads"
QUALITY_THRESHOLD: float = 0.7
VERIFICATION_THRESHOLD: float = 0.8
IDENTIFICATION_THRESHOLD: float = 0.75
MAX_FACES_PER_USER: int = 10
GPU_ENABLED: bool = True
class Config:
env_file = ".env"
env_file_encoding = "utf-8"

@ -0,0 +1,3 @@
from .mongodb import MongoDBFaceRepository, MongoDBUserRepository
__all__ = ["MongoDBFaceRepository", "MongoDBUserRepository"]

@ -0,0 +1,299 @@
from typing import List, Optional
import numpy as np
from motor.motor_asyncio import AsyncIOMotorClient
from bson import ObjectId
from datetime import datetime
import time
from src.domain.entities import Face, User
from src.domain.repositories import FaceRepository, UserRepository
from src.infrastructure.logging import repository_logger, data_logger
class MongoDBFaceRepository(FaceRepository):
def __init__(self, connection_string: str, database_name: str):
repository_logger.info(
"Initializing MongoDB Face Repository",
connection_string=connection_string.replace(connection_string.split('@')[-1].split('/')[0], '***') if '@' in connection_string else connection_string,
database_name=database_name
)
try:
self.client = AsyncIOMotorClient(connection_string)
self.db = self.client[database_name]
self.faces_collection = self.db.faces
self.users_collection = self.db.users
repository_logger.info("MongoDB Face Repository initialized successfully")
except Exception as e:
repository_logger.error("Failed to initialize MongoDB Face Repository", error=e)
raise
async def save(self, face: Face) -> str:
start_time = time.time()
repository_logger.debug(
"Saving face to database",
face_id=face.id,
user_id=face.user_id,
quality_score=face.quality_score
)
try:
face_dict = {
"user_id": face.user_id,
"image_path": face.image_path,
"embedding": face.embedding.tolist(),
"created_at": face.created_at,
"quality_score": face.quality_score,
"bbox": face.bbox,
"landmarks": face.landmarks
}
result = await self.faces_collection.insert_one(face_dict)
face_id = str(result.inserted_id)
processing_time = time.time() - start_time
data_logger.log_database_operation(
"insert", "faces", face_id, True,
processing_time_seconds=processing_time,
user_id=face.user_id
)
repository_logger.info(
"Face saved successfully",
face_id=face_id,
user_id=face.user_id,
embedding_dimensions=len(face.embedding),
quality_score=face.quality_score,
processing_time_seconds=processing_time
)
return face_id
except Exception as e:
processing_time = time.time() - start_time
data_logger.log_database_operation(
"insert", "faces", face.id or "unknown", False,
processing_time_seconds=processing_time,
error_message=str(e)
)
repository_logger.error(
"Failed to save face",
face_id=face.id,
user_id=face.user_id,
error=e,
processing_time_seconds=processing_time
)
raise
async def get_by_id(self, face_id: str) -> Optional[Face]:
start_time = time.time()
repository_logger.debug("Retrieving face by ID", face_id=face_id)
try:
face_dict = await self.faces_collection.find_one({"_id": ObjectId(face_id)})
processing_time = time.time() - start_time
if face_dict:
face = self._dict_to_face(face_dict)
data_logger.log_database_operation(
"select", "faces", face_id, True,
processing_time_seconds=processing_time,
user_id=face.user_id
)
repository_logger.info(
"Face retrieved successfully",
face_id=face_id,
user_id=face.user_id,
processing_time_seconds=processing_time
)
return face
else:
data_logger.log_database_operation(
"select", "faces", face_id, False,
processing_time_seconds=processing_time,
error_message="Face not found"
)
repository_logger.warning(
"Face not found",
face_id=face_id,
processing_time_seconds=processing_time
)
return None
except Exception as e:
processing_time = time.time() - start_time
data_logger.log_database_operation(
"select", "faces", face_id, False,
processing_time_seconds=processing_time,
error_message=str(e)
)
repository_logger.error(
"Failed to retrieve face",
face_id=face_id,
error=e,
processing_time_seconds=processing_time
)
raise
async def get_by_user_id(self, user_id: str) -> List[Face]:
start_time = time.time()
repository_logger.debug("Retrieving faces by user ID", user_id=user_id)
try:
cursor = self.faces_collection.find({"user_id": user_id})
faces = []
async for face_dict in cursor:
faces.append(self._dict_to_face(face_dict))
processing_time = time.time() - start_time
data_logger.log_database_operation(
"select_multiple", "faces", user_id, True,
processing_time_seconds=processing_time,
records_found=len(faces)
)
repository_logger.info(
"User faces retrieved successfully",
user_id=user_id,
faces_count=len(faces),
processing_time_seconds=processing_time
)
return faces
except Exception as e:
processing_time = time.time() - start_time
data_logger.log_database_operation(
"select_multiple", "faces", user_id, False,
processing_time_seconds=processing_time,
error_message=str(e)
)
repository_logger.error(
"Failed to retrieve user faces",
user_id=user_id,
error=e,
processing_time_seconds=processing_time
)
raise
async def delete(self, face_id: str) -> bool:
start_time = time.time()
repository_logger.debug("Deleting face from database", face_id=face_id)
try:
result = await self.faces_collection.delete_one({"_id": ObjectId(face_id)})
success = result.deleted_count > 0
processing_time = time.time() - start_time
data_logger.log_database_operation(
"delete", "faces", face_id, success,
processing_time_seconds=processing_time,
deleted_count=result.deleted_count
)
if success:
repository_logger.info(
"Face deleted successfully",
face_id=face_id,
processing_time_seconds=processing_time
)
else:
repository_logger.warning(
"Face not found for deletion",
face_id=face_id,
processing_time_seconds=processing_time
)
return success
except Exception as e:
processing_time = time.time() - start_time
data_logger.log_database_operation(
"delete", "faces", face_id, False,
processing_time_seconds=processing_time,
error_message=str(e)
)
repository_logger.error(
"Failed to delete face",
face_id=face_id,
error=e,
processing_time_seconds=processing_time
)
raise
def _dict_to_face(self, face_dict: dict) -> Face:
return Face(
id=str(face_dict["_id"]),
user_id=face_dict["user_id"],
image_path=face_dict["image_path"],
embedding=np.array(face_dict["embedding"]),
created_at=face_dict["created_at"],
quality_score=face_dict["quality_score"],
bbox=face_dict["bbox"],
landmarks=face_dict.get("landmarks")
)
class MongoDBUserRepository(UserRepository):
def __init__(self, connection_string: str, database_name: str):
self.client = AsyncIOMotorClient(connection_string)
self.db = self.client[database_name]
self.users_collection = self.db.users
async def save(self, user: User) -> str:
user_dict = {
"name": user.name,
"created_at": user.created_at,
"face_ids": user.face_ids,
"mean_embedding": user.mean_embedding.tolist() if user.mean_embedding is not None else None
}
result = await self.users_collection.insert_one(user_dict)
return str(result.inserted_id)
async def get_by_id(self, user_id: str) -> Optional[User]:
user_dict = await self.users_collection.find_one({"_id": ObjectId(user_id)})
if user_dict:
return self._dict_to_user(user_dict)
return None
async def get_user_by_name(self, name: str) -> Optional[User]:
user_dict = await self.users_collection.find_one({"name": name})
if user_dict:
return self._dict_to_user(user_dict)
return None
async def update_user(self, user: User) -> bool:
user_dict = {
"name": user.name,
"face_ids": user.face_ids,
"mean_embedding": user.mean_embedding.tolist() if user.mean_embedding is not None else None
}
result = await self.users_collection.update_one(
{"_id": ObjectId(user.id)},
{"$set": user_dict}
)
return result.modified_count > 0
async def delete(self, user_id: str) -> bool:
result = await self.users_collection.delete_one({"_id": ObjectId(user_id)})
return result.deleted_count > 0
async def get_all_users(self) -> List[User]:
cursor = self.users_collection.find()
users = []
async for user_dict in cursor:
users.append(self._dict_to_user(user_dict))
return users
def _dict_to_user(self, user_dict: dict) -> User:
return User(
id=str(user_dict["_id"]),
name=user_dict["name"],
created_at=user_dict["created_at"],
face_ids=user_dict["face_ids"],
mean_embedding=np.array(user_dict["mean_embedding"]) if user_dict.get("mean_embedding") else None
)

@ -0,0 +1,29 @@
from .logger import (
app_logger,
detector_logger,
recognizer_logger,
service_logger,
repository_logger,
vector_store_logger,
use_case_logger,
api_logger,
data_logger,
ComponentLogger,
DataLifecycleLogger,
FaceRecognitionLogger
)
__all__ = [
"app_logger",
"detector_logger",
"recognizer_logger",
"service_logger",
"repository_logger",
"vector_store_logger",
"use_case_logger",
"api_logger",
"data_logger",
"ComponentLogger",
"DataLifecycleLogger",
"FaceRecognitionLogger"
]

@ -0,0 +1,212 @@
import logging
import json
import sys
from datetime import datetime
from typing import Any, Dict, Optional
from pathlib import Path
import traceback
class FaceRecognitionLogger:
"""Centralized logging system for face recognition service"""
def __init__(self, name: str = "face_recognition", log_level: str = "DEBUG"):
self.logger = logging.getLogger(name)
self.logger.setLevel(getattr(logging, log_level.upper()))
# Remove existing handlers to avoid duplicates
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
# Create formatters
detailed_formatter = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s'
)
json_formatter = JsonFormatter()
# Console handler with detailed format
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(detailed_formatter)
console_handler.setLevel(logging.DEBUG)
# File handler with JSON format for structured logging
log_dir = Path("/app/logs")
log_dir.mkdir(exist_ok=True)
file_handler = logging.FileHandler(log_dir / "face_recognition.log")
file_handler.setFormatter(json_formatter)
file_handler.setLevel(logging.DEBUG)
# Error file handler
error_handler = logging.FileHandler(log_dir / "errors.log")
error_handler.setFormatter(json_formatter)
error_handler.setLevel(logging.ERROR)
self.logger.addHandler(console_handler)
self.logger.addHandler(file_handler)
self.logger.addHandler(error_handler)
def _log_with_context(self, level: str, message: str, **kwargs):
"""Log with additional context information"""
context = {
"timestamp": datetime.utcnow().isoformat(),
"message": message,
**kwargs
}
getattr(self.logger, level.lower())(json.dumps(context, default=str))
def debug(self, message: str, **kwargs):
self._log_with_context("DEBUG", message, **kwargs)
def info(self, message: str, **kwargs):
self._log_with_context("INFO", message, **kwargs)
def warning(self, message: str, **kwargs):
self._log_with_context("WARNING", message, **kwargs)
def error(self, message: str, error: Exception = None, **kwargs):
context = kwargs.copy()
if error:
context.update({
"error_type": error.__class__.__name__,
"error_message": str(error),
"traceback": traceback.format_exc()
})
self._log_with_context("ERROR", message, **context)
def critical(self, message: str, error: Exception = None, **kwargs):
context = kwargs.copy()
if error:
context.update({
"error_type": error.__class__.__name__,
"error_message": str(error),
"traceback": traceback.format_exc()
})
self._log_with_context("CRITICAL", message, **context)
class JsonFormatter(logging.Formatter):
"""JSON formatter for structured logging"""
def format(self, record):
log_entry = {
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
"level": record.levelname,
"logger": record.name,
"module": record.module,
"function": record.funcName,
"line": record.lineno,
"message": record.getMessage()
}
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry, default=str)
# Create global logger instance
app_logger = FaceRecognitionLogger()
# Specialized loggers for different components
class ComponentLogger:
def __init__(self, component_name: str):
self.component = component_name
self.logger = app_logger
def _add_component_context(self, **kwargs):
return {"component": self.component, **kwargs}
def debug(self, message: str, **kwargs):
self.logger.debug(message, **self._add_component_context(**kwargs))
def info(self, message: str, **kwargs):
self.logger.info(message, **self._add_component_context(**kwargs))
def warning(self, message: str, **kwargs):
self.logger.warning(message, **self._add_component_context(**kwargs))
def error(self, message: str, error: Exception = None, **kwargs):
self.logger.error(message, error=error, **self._add_component_context(**kwargs))
def critical(self, message: str, error: Exception = None, **kwargs):
self.logger.critical(message, error=error, **self._add_component_context(**kwargs))
# Component-specific loggers
detector_logger = ComponentLogger("face_detector")
recognizer_logger = ComponentLogger("face_recognizer")
service_logger = ComponentLogger("face_service")
repository_logger = ComponentLogger("repository")
vector_store_logger = ComponentLogger("vector_store")
use_case_logger = ComponentLogger("use_case")
api_logger = ComponentLogger("api")
class DataLifecycleLogger:
"""Specialized logger for tracking data lifecycle"""
def __init__(self):
self.logger = ComponentLogger("data_lifecycle")
def log_face_detection(self, image_info: Dict, faces_detected: int, processing_time: float):
self.logger.info(
"Face detection completed",
operation="face_detection",
faces_detected=faces_detected,
processing_time_seconds=processing_time,
**image_info
)
def log_face_registration(self, user_id: str, face_id: str, quality_score: float, embedding_size: int):
self.logger.info(
"Face registered successfully",
operation="face_registration",
user_id=user_id,
face_id=face_id,
quality_score=quality_score,
embedding_size=embedding_size
)
def log_face_verification(self, user_id: str, result: bool, confidence: float, threshold: float, processing_time: float):
self.logger.info(
"Face verification completed",
operation="face_verification",
user_id=user_id,
verification_result=result,
confidence=confidence,
threshold=threshold,
processing_time_seconds=processing_time
)
def log_face_identification(self, result_user_id: Optional[str], confidence: float,
candidates_count: int, processing_time: float):
self.logger.info(
"Face identification completed",
operation="face_identification",
identified_user_id=result_user_id,
confidence=confidence,
candidates_count=candidates_count,
processing_time_seconds=processing_time
)
def log_database_operation(self, operation: str, table: str, record_id: str, success: bool, **kwargs):
self.logger.info(
f"Database operation: {operation}",
operation="database",
db_operation=operation,
table=table,
record_id=record_id,
success=success,
**kwargs
)
def log_vector_operation(self, operation: str, vector_id: str, success: bool, **kwargs):
self.logger.info(
f"Vector store operation: {operation}",
operation="vector_store",
vector_operation=operation,
vector_id=vector_id,
success=success,
**kwargs
)
# Global data lifecycle logger
data_logger = DataLifecycleLogger()

@ -0,0 +1,4 @@
from .detectors import RetinaFaceDetector
from .recognizers import SFaceRecognizer
__all__ = ["RetinaFaceDetector", "SFaceRecognizer"]

@ -0,0 +1,144 @@
import cv2
import numpy as np
from typing import List, Tuple, Optional
from insightface.app import FaceAnalysis
from insightface.data import get_image as ins_get_image
import time
from src.infrastructure.logging import detector_logger, data_logger
class RetinaFaceDetector:
def __init__(self):
detector_logger.info("Initializing RetinaFace detector")
try:
self.app = FaceAnalysis(providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
self.app.prepare(ctx_id=0, det_size=(640, 640))
detector_logger.info("RetinaFace detector initialized successfully",
providers=self.app.models.keys() if hasattr(self.app, 'models') else "unknown")
except Exception as e:
detector_logger.error("Failed to initialize RetinaFace detector", error=e)
raise
def detect_faces(self, image: np.ndarray) -> List[dict]:
start_time = time.time()
image_info = {
"image_shape": image.shape,
"image_dtype": str(image.dtype),
"image_size_bytes": image.nbytes
}
detector_logger.debug("Starting face detection", **image_info)
try:
faces = self.app.get(image)
processing_time = time.time() - start_time
detected_faces = []
for i, face in enumerate(faces):
face_dict = {
'bbox': face.bbox.tolist(),
'landmarks': face.kps.tolist() if face.kps is not None else None,
'det_score': float(face.det_score),
'embedding': face.normed_embedding if hasattr(face, 'normed_embedding') else None
}
detected_faces.append(face_dict)
detector_logger.debug(
f"Face {i+1} detected",
face_index=i,
bbox=face_dict['bbox'],
detection_score=face_dict['det_score'],
has_landmarks=face_dict['landmarks'] is not None,
has_embedding=face_dict['embedding'] is not None
)
# Log data lifecycle
data_logger.log_face_detection(image_info, len(detected_faces), processing_time)
detector_logger.info(
"Face detection completed",
faces_detected=len(detected_faces),
processing_time_seconds=processing_time,
**image_info
)
return detected_faces
except Exception as e:
processing_time = time.time() - start_time
detector_logger.error(
"Face detection failed",
error=e,
processing_time_seconds=processing_time,
**image_info
)
raise
def align_face(self, image: np.ndarray, landmarks: np.ndarray, image_size: int = 112) -> np.ndarray:
if landmarks is None or len(landmarks) != 5:
return self._crop_face(image, landmarks)
src_pts = np.array([
[30.2946, 51.6963],
[65.5318, 51.5014],
[48.0252, 71.7366],
[33.5493, 92.3655],
[62.7299, 92.2041]
], dtype=np.float32)
if image_size == 112:
src_pts[:, 0] += 8.0
dst_pts = landmarks.astype(np.float32)
tform = cv2.estimateAffinePartial2D(dst_pts, src_pts)[0]
aligned_face = cv2.warpAffine(
image, tform, (image_size, image_size),
flags=cv2.INTER_LINEAR
)
return aligned_face
def _crop_face(self, image: np.ndarray, bbox: List[float]) -> np.ndarray:
x1, y1, x2, y2 = [int(coord) for coord in bbox[:4]]
width = x2 - x1
height = y2 - y1
size = max(width, height)
center_x = (x1 + x2) // 2
center_y = (y1 + y2) // 2
x1 = max(0, center_x - size // 2)
y1 = max(0, center_y - size // 2)
x2 = min(image.shape[1], x1 + size)
y2 = min(image.shape[0], y1 + size)
face_crop = image[y1:y2, x1:x2]
face_crop = cv2.resize(face_crop, (112, 112))
return face_crop
def calculate_quality_score(self, face_dict: dict) -> float:
detector_logger.debug("Calculating quality score", face_data_keys=list(face_dict.keys()))
det_score = face_dict.get('det_score', 0.0)
bbox = face_dict.get('bbox', [])
if len(bbox) >= 4:
face_size = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
size_score = min(face_size / 10000, 1.0)
else:
size_score = 0.0
quality_score = det_score * 0.7 + size_score * 0.3
detector_logger.debug(
"Quality score calculated",
detection_score=det_score,
size_score=size_score,
final_quality_score=quality_score,
face_size=face_size if len(bbox) >= 4 else None
)
return quality_score

@ -0,0 +1,116 @@
import cv2
import numpy as np
import onnxruntime as ort
from typing import List, Tuple
import urllib.request
from pathlib import Path
import time
from src.infrastructure.logging import recognizer_logger, data_logger
class SFaceRecognizer:
def __init__(self):
recognizer_logger.info("Initializing SFace recognizer")
try:
self.model_path = self._download_model()
self.session = ort.InferenceSession(
self.model_path,
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
self.input_name = self.session.get_inputs()[0].name
self.output_name = self.session.get_outputs()[0].name
recognizer_logger.info(
"SFace recognizer initialized successfully",
model_path=self.model_path,
input_name=self.input_name,
output_name=self.output_name,
providers=self.session.get_providers()
)
except Exception as e:
recognizer_logger.error("Failed to initialize SFace recognizer", error=e)
raise
def _download_model(self) -> str:
model_dir = Path("models")
model_dir.mkdir(exist_ok=True)
model_path = model_dir / "face_recognition_sface_2021dec_int8.onnx"
if not model_path.exists():
url = "https://github.com/opencv/opencv_zoo/raw/main/models/face_recognition_sface/face_recognition_sface_2021dec_int8.onnx"
urllib.request.urlretrieve(url, model_path)
return str(model_path)
def extract_embedding(self, aligned_face: np.ndarray) -> np.ndarray:
start_time = time.time()
original_shape = aligned_face.shape
recognizer_logger.debug(
"Starting embedding extraction",
input_shape=original_shape,
input_dtype=str(aligned_face.dtype)
)
try:
if aligned_face.shape != (112, 112, 3):
aligned_face = cv2.resize(aligned_face, (112, 112))
recognizer_logger.debug(f"Resized face from {original_shape} to {aligned_face.shape}")
input_blob = cv2.dnn.blobFromImage(
aligned_face, 1.0 / 255, (112, 112), (0, 0, 0), swapRB=True
)
embedding = self.session.run(
[self.output_name],
{self.input_name: input_blob}
)[0]
embedding = embedding.flatten()
embedding_norm = np.linalg.norm(embedding)
embedding = embedding / embedding_norm
processing_time = time.time() - start_time
recognizer_logger.debug(
"Embedding extraction completed",
embedding_shape=embedding.shape,
embedding_norm=float(embedding_norm),
final_norm=float(np.linalg.norm(embedding)),
processing_time_seconds=processing_time
)
return embedding
except Exception as e:
processing_time = time.time() - start_time
recognizer_logger.error(
"Embedding extraction failed",
error=e,
processing_time_seconds=processing_time,
input_shape=original_shape
)
raise
def calculate_similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
cosine_similarity = np.dot(embedding1, embedding2)
return float(cosine_similarity)
def calculate_distance(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float:
return float(np.linalg.norm(embedding1 - embedding2))
def find_matches(
self,
query_embedding: np.ndarray,
database_embeddings: List[Tuple[str, np.ndarray]],
threshold: float = 0.7
) -> List[Tuple[str, float]]:
matches = []
for face_id, db_embedding in database_embeddings:
similarity = self.calculate_similarity(query_embedding, db_embedding)
if similarity >= threshold:
matches.append((face_id, similarity))
matches.sort(key=lambda x: x[1], reverse=True)
return matches

@ -0,0 +1,4 @@
from .chromadb import ChromaDBVectorStore
from .filesystem import FileSystemStorage
__all__ = ["ChromaDBVectorStore", "FileSystemStorage"]

@ -0,0 +1,279 @@
from typing import List, Optional, Tuple, Dict
import numpy as np
import chromadb
from chromadb.config import Settings as ChromaSettings
import uuid
import time
from src.domain.repositories import VectorStore
from src.infrastructure.logging import vector_store_logger, data_logger
class ChromaDBVectorStore(VectorStore):
def __init__(self, host: str = "localhost", port: int = 8000):
import requests
vector_store_logger.info(
"Initializing ChromaDB vector store",
host=host,
port=port
)
# Wait for ChromaDB to be ready
vector_store_logger.info(f"Waiting for ChromaDB at {host}:{port}...")
max_retries = 30
for i in range(max_retries):
try:
response = requests.get(f"http://{host}:{port}/api/v1/heartbeat")
if response.status_code == 200:
vector_store_logger.info("ChromaDB is ready!")
break
except Exception as e:
vector_store_logger.debug(f"ChromaDB heartbeat check failed (attempt {i+1})", error=e)
pass
if i == max_retries - 1:
vector_store_logger.warning("ChromaDB not responding, attempting to connect anyway...")
time.sleep(1)
# Initialize ChromaDB client - try different connection methods
connection_successful = False
# Method 1: Try direct connection without settings
try:
vector_store_logger.debug("Trying direct ChromaDB connection...")
self.client = chromadb.HttpClient(host=host, port=port)
self.collection = self.client.get_or_create_collection(name="face_embeddings")
vector_store_logger.info("Direct ChromaDB connection successful!")
connection_successful = True
except Exception as e:
vector_store_logger.debug("Direct connection failed", error=e)
# Method 2: Try with basic settings
if not connection_successful:
try:
vector_store_logger.debug("Trying ChromaDB connection with settings...")
settings = chromadb.config.Settings(allow_reset=True)
self.client = chromadb.HttpClient(host=host, port=port, settings=settings)
self.collection = self.client.get_or_create_collection(name="face_embeddings")
vector_store_logger.info("ChromaDB connection with settings successful!")
connection_successful = True
except Exception as e:
vector_store_logger.debug("Connection with settings failed", error=e)
# Method 3: Try using PersistentClient as fallback
if not connection_successful:
try:
vector_store_logger.debug("Trying persistent client fallback...")
import tempfile
self.client = chromadb.PersistentClient(path=tempfile.mkdtemp())
self.collection = self.client.get_or_create_collection(name="face_embeddings")
vector_store_logger.warning("Using persistent client fallback!")
connection_successful = True
except Exception as e:
vector_store_logger.error("Persistent client fallback failed", error=e)
if not connection_successful:
vector_store_logger.critical("Failed to connect to ChromaDB with all methods")
raise Exception("Failed to connect to ChromaDB with all methods")
# Log successful initialization
collection_info = {
"name": self.collection.name if hasattr(self.collection, 'name') else "face_embeddings",
"client_type": type(self.client).__name__
}
vector_store_logger.info("ChromaDB vector store initialized successfully", **collection_info)
async def add(self, id: str, embedding: np.ndarray, metadata: Dict = None) -> bool:
start_time = time.time()
vector_store_logger.debug(
"Adding embedding to vector store",
vector_id=id,
embedding_shape=embedding.shape,
metadata_keys=list(metadata.keys()) if metadata else []
)
try:
self.collection.add(
embeddings=[embedding.tolist()],
metadatas=[metadata or {}],
ids=[id]
)
processing_time = time.time() - start_time
data_logger.log_vector_operation("add", id, True,
processing_time_seconds=processing_time,
embedding_dimensions=len(embedding))
vector_store_logger.info(
"Embedding added successfully",
vector_id=id,
embedding_dimensions=len(embedding),
metadata_size=len(metadata) if metadata else 0,
processing_time_seconds=processing_time
)
return True
except Exception as e:
processing_time = time.time() - start_time
data_logger.log_vector_operation("add", id, False,
processing_time_seconds=processing_time,
error_message=str(e))
vector_store_logger.error(
"Failed to add embedding",
vector_id=id,
error=e,
processing_time_seconds=processing_time
)
return False
async def search_similar(self, embedding: np.ndarray, top_k: int = 10) -> List[Tuple[str, float, dict]]:
start_time = time.time()
vector_store_logger.debug(
"Searching for similar embeddings",
embedding_shape=embedding.shape,
top_k=top_k
)
try:
results = self.collection.query(
query_embeddings=[embedding.tolist()],
n_results=top_k
)
similar_faces = []
if results['ids'][0]:
for i in range(len(results['ids'][0])):
face_id = results['ids'][0][i]
distance = results['distances'][0][i]
metadata = results['metadatas'][0][i]
similarity = 1 - distance
similar_faces.append((face_id, similarity, metadata))
vector_store_logger.debug(
f"Similar face found (rank {i+1})",
face_id=face_id,
distance=distance,
similarity=similarity,
user_id=metadata.get('user_id', 'unknown')
)
processing_time = time.time() - start_time
data_logger.log_vector_operation("search", f"query_top_{top_k}", True,
processing_time_seconds=processing_time,
results_found=len(similar_faces),
search_top_k=top_k)
vector_store_logger.info(
"Similarity search completed",
query_embedding_shape=embedding.shape,
top_k_requested=top_k,
results_found=len(similar_faces),
processing_time_seconds=processing_time,
best_similarity=similar_faces[0][1] if similar_faces else 0.0
)
return similar_faces
except Exception as e:
processing_time = time.time() - start_time
data_logger.log_vector_operation("search", f"query_top_{top_k}", False,
processing_time_seconds=processing_time,
error_message=str(e))
vector_store_logger.error(
"Similarity search failed",
embedding_shape=embedding.shape,
top_k=top_k,
error=e,
processing_time_seconds=processing_time
)
raise
async def get_embedding(self, embedding_id: str) -> Optional[Tuple[np.ndarray, dict]]:
start_time = time.time()
vector_store_logger.debug("Retrieving embedding by ID", embedding_id=embedding_id)
try:
results = self.collection.get(
ids=[embedding_id],
include=['embeddings', 'metadatas']
)
processing_time = time.time() - start_time
if results['ids']:
embedding = np.array(results['embeddings'][0])
metadata = results['metadatas'][0]
data_logger.log_vector_operation("get", embedding_id, True,
processing_time_seconds=processing_time,
embedding_dimensions=len(embedding))
vector_store_logger.info(
"Embedding retrieved successfully",
embedding_id=embedding_id,
embedding_shape=embedding.shape,
metadata_keys=list(metadata.keys()) if metadata else [],
processing_time_seconds=processing_time
)
return embedding, metadata
else:
data_logger.log_vector_operation("get", embedding_id, False,
processing_time_seconds=processing_time,
error_message="Embedding not found")
vector_store_logger.warning(
"Embedding not found",
embedding_id=embedding_id,
processing_time_seconds=processing_time
)
return None
except Exception as e:
processing_time = time.time() - start_time
data_logger.log_vector_operation("get", embedding_id, False,
processing_time_seconds=processing_time,
error_message=str(e))
vector_store_logger.error(
"Failed to retrieve embedding",
embedding_id=embedding_id,
error=e,
processing_time_seconds=processing_time
)
raise
async def delete(self, id: str) -> bool:
start_time = time.time()
vector_store_logger.debug("Deleting embedding", embedding_id=id)
try:
self.collection.delete(ids=[id])
processing_time = time.time() - start_time
data_logger.log_vector_operation("delete", id, True,
processing_time_seconds=processing_time)
vector_store_logger.info(
"Embedding deleted successfully",
embedding_id=id,
processing_time_seconds=processing_time
)
return True
except Exception as e:
processing_time = time.time() - start_time
data_logger.log_vector_operation("delete", id, False,
processing_time_seconds=processing_time,
error_message=str(e))
vector_store_logger.error(
"Failed to delete embedding",
embedding_id=id,
error=e,
processing_time_seconds=processing_time
)
return False
async def delete_embedding(self, embedding_id: str) -> bool:
return await self.delete(embedding_id)

@ -0,0 +1,42 @@
from pathlib import Path
import shutil
import uuid
from datetime import datetime
import cv2
import numpy as np
class FileSystemStorage:
def __init__(self, base_path: str):
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
async def save_image(self, image: np.ndarray, user_id: str) -> str:
user_dir = self.base_path / user_id
user_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{uuid.uuid4().hex[:8]}.jpg"
file_path = user_dir / filename
cv2.imwrite(str(file_path), cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
return str(file_path)
async def load_image(self, file_path: str) -> np.ndarray:
image = cv2.imread(file_path)
return cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
async def delete_image(self, file_path: str) -> bool:
try:
Path(file_path).unlink()
return True
except Exception:
return False
async def delete_user_directory(self, user_id: str) -> bool:
try:
user_dir = self.base_path / user_id
if user_dir.exists():
shutil.rmtree(user_dir)
return True
except Exception:
return False

@ -0,0 +1,3 @@
from .gradio_app import create_gradio_interface
__all__ = ["create_gradio_interface"]

@ -0,0 +1,353 @@
import gradio as gr
import numpy as np
from typing import List, Optional
import asyncio
from PIL import Image
import cv2
from src.application.use_cases import (
RegisterFaceUseCase,
VerifyFaceUseCase,
IdentifyFaceUseCase
)
from src.domain.entities import OperationType
def create_gradio_interface(
register_use_case: RegisterFaceUseCase,
verify_use_case: VerifyFaceUseCase,
identify_use_case: IdentifyFaceUseCase
):
async def process_registration(user_name: str, images):
if not user_name:
return "Error: Please provide a user name"
if not images:
return "Error: Please upload at least one image"
np_images = []
for img_file in images:
if hasattr(img_file, 'name'):
img = Image.open(img_file.name)
np_images.append(np.array(img))
else:
np_images.append(np.array(img_file))
success, message = await register_use_case.execute(user_name, np_images)
return message
async def process_recognition(operation_type: str, user_name: Optional[str], image: Image.Image):
if not image:
return "Error: Please upload an image"
np_image = np.array(image)
if operation_type == OperationType.VERIFICATION.value:
if not user_name:
return "Error: Please provide a user name for verification"
result = await verify_use_case.execute(user_name, np_image)
if result.is_verified:
return f"✅ Verified as {user_name}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s"
else:
return f"❌ Not verified as {user_name}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s"
else:
result = await identify_use_case.execute(np_image)
if result.is_identified:
candidates_str = "\n".join([f" - {name}: {conf:.2%}" for name, conf in result.candidates[:5]])
return f"✅ Identified as: {result.user_id}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s\n\nTop candidates:\n{candidates_str}"
else:
candidates_str = "\n".join([f" - {name}: {conf:.2%}" for name, conf in result.candidates[:5]])
return f"❌ Person not identified\nBest match confidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s\n\nTop candidates:\n{candidates_str}"
def sync_process_registration(user_name, images):
return asyncio.run(process_registration(user_name, images))
def sync_process_recognition(operation_type, user_name, image):
return asyncio.run(process_recognition(operation_type, user_name, image))
# Webcam streaming functions
def process_webcam_frame(frame, operation_type, user_name):
if frame is None:
return frame
# Convert frame to PIL Image for processing
image = Image.fromarray(frame)
try:
if operation_type == OperationType.VERIFICATION.value and user_name:
result = asyncio.run(verify_use_case.execute(user_name, np.array(image)))
# Draw result on frame
cv2.putText(frame, f"Verification: {'' if result.is_verified else ''}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1,
(0, 255, 0) if result.is_verified else (0, 0, 255), 2)
cv2.putText(frame, f"Confidence: {result.confidence:.2%}",
(10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
elif operation_type == OperationType.IDENTIFICATION.value:
result = asyncio.run(identify_use_case.execute(np.array(image)))
# Draw result on frame
if result.is_identified:
cv2.putText(frame, f"Identified: {result.user_id}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
cv2.putText(frame, f"Confidence: {result.confidence:.2%}",
(10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
else:
cv2.putText(frame, "No match found",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
except Exception as e:
cv2.putText(frame, f"Error: {str(e)[:50]}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
return frame
# Webcam registration capture
captured_images = []
def capture_image_for_registration(frame):
if frame is not None:
captured_images.append(frame.copy())
return f"Captured {len(captured_images)} images"
return "No frame to capture"
def register_from_captures(user_name):
if not user_name:
return "Error: Please provide a user name"
if len(captured_images) == 0:
return "Error: No images captured. Please use webcam to capture images first."
try:
# Convert captured frames to numpy arrays
np_images = [np.array(img) for img in captured_images]
success, message = asyncio.run(register_use_case.execute(user_name, np_images))
# Clear captured images after registration
captured_images.clear()
return message
except Exception as e:
return f"Error during registration: {str(e)}"
def clear_captured_images():
captured_images.clear()
return "Cleared all captured images"
# CSS for better layout
css = """
.webcam-container {
max-width: 640px !important;
margin: 0 auto;
}
.capture-info {
text-align: center;
font-weight: bold;
color: #007bff;
}
"""
with gr.Blocks(title="Face Recognition System", css=css) as interface:
gr.Markdown("# Face Recognition System")
gr.Markdown("Advanced face recognition using RetinaFace detector and SFace recognizer")
with gr.Tabs():
# File Upload Registration Tab
with gr.TabItem("📁 File Registration"):
gr.Markdown("## Register New User - Upload Files")
gr.Markdown("Upload multiple images for better recognition accuracy")
with gr.Row():
reg_user_name = gr.Textbox(
label="User Name",
placeholder="Enter user name..."
)
reg_images = gr.File(
label="Upload Face Images",
file_count="multiple",
file_types=["image"]
)
reg_button = gr.Button("Register User", variant="primary")
reg_output = gr.Textbox(label="Registration Result", lines=3)
reg_button.click(
fn=sync_process_registration,
inputs=[reg_user_name, reg_images],
outputs=reg_output
)
# Webcam Registration Tab
with gr.TabItem("📷 Webcam Registration"):
gr.Markdown("## Register New User - Using Webcam")
gr.Markdown("Capture multiple images from webcam for better recognition accuracy")
with gr.Row():
with gr.Column():
webcam_reg_user_name = gr.Textbox(
label="User Name",
placeholder="Enter user name..."
)
webcam_reg_image = gr.Image(
label="Webcam Feed",
sources=["webcam"],
type="numpy",
elem_classes=["webcam-container"]
)
with gr.Row():
capture_btn = gr.Button("📸 Capture Image", variant="secondary")
clear_btn = gr.Button("🗑️ Clear Captures", variant="secondary")
register_webcam_btn = gr.Button("✅ Register User", variant="primary")
with gr.Column():
capture_status = gr.Textbox(
label="Capture Status",
value="Ready to capture images",
elem_classes=["capture-info"]
)
webcam_reg_output = gr.Textbox(label="Registration Result", lines=8)
capture_btn.click(
fn=capture_image_for_registration,
inputs=[webcam_reg_image],
outputs=[capture_status]
)
clear_btn.click(
fn=clear_captured_images,
outputs=[capture_status]
)
register_webcam_btn.click(
fn=register_from_captures,
inputs=[webcam_reg_user_name],
outputs=[webcam_reg_output]
)
# File Upload Recognition Tab
with gr.TabItem("📁 File Recognition"):
gr.Markdown("## Face Recognition - Upload Image")
gr.Markdown("Choose between verification (1:1) or identification (1:N)")
with gr.Row():
operation_type = gr.Radio(
choices=[OperationType.VERIFICATION.value, OperationType.IDENTIFICATION.value],
value=OperationType.VERIFICATION.value,
label="Operation Type"
)
rec_user_name = gr.Textbox(
label="User Name (for verification)",
placeholder="Enter user name for verification...",
visible=True
)
rec_image = gr.Image(
label="Upload Face Image",
type="pil"
)
rec_button = gr.Button("Process", variant="primary")
rec_output = gr.Textbox(label="Recognition Result", lines=8)
def update_user_name_visibility(operation):
return gr.update(visible=operation == OperationType.VERIFICATION.value)
operation_type.change(
fn=update_user_name_visibility,
inputs=operation_type,
outputs=rec_user_name
)
rec_button.click(
fn=sync_process_recognition,
inputs=[operation_type, rec_user_name, rec_image],
outputs=rec_output
)
# Webcam Recognition Tab
with gr.TabItem("📷 Live Recognition"):
gr.Markdown("## Live Face Recognition - Webcam Stream")
gr.Markdown("Real-time face verification or identification using webcam")
with gr.Row():
with gr.Column():
stream_operation_type = gr.Radio(
choices=[OperationType.VERIFICATION.value, OperationType.IDENTIFICATION.value],
value=OperationType.IDENTIFICATION.value,
label="Operation Type"
)
stream_user_name = gr.Textbox(
label="User Name (for verification)",
placeholder="Enter user name for verification...",
visible=False
)
webcam_stream = gr.Image(
label="Live Webcam Feed",
sources=["webcam"],
type="numpy",
streaming=True,
elem_classes=["webcam-container"]
)
with gr.Column():
gr.Markdown("### Instructions:")
gr.Markdown("- For **Identification**: The system will try to identify any face in the camera")
gr.Markdown("- For **Verification**: Enter a username to verify if the face matches that user")
gr.Markdown("- Results are displayed directly on the video feed")
def update_stream_user_name_visibility(operation):
return gr.update(visible=operation == OperationType.VERIFICATION.value)
stream_operation_type.change(
fn=update_stream_user_name_visibility,
inputs=stream_operation_type,
outputs=stream_user_name
)
# Note: Streaming removed due to Gradio version compatibility
# Use the capture button approach instead
stream_button = gr.Button("🔄 Process Current Frame", variant="primary")
stream_output = gr.Textbox(label="Recognition Result", lines=4)
def process_current_frame(image, operation_type, user_name):
if image is None:
return "No image captured from webcam"
try:
pil_image = Image.fromarray(image)
np_image = np.array(pil_image)
if operation_type == OperationType.VERIFICATION.value and user_name:
result = asyncio.run(verify_use_case.execute(user_name, np_image))
if result.is_verified:
return f"✅ Verified as {user_name}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s"
else:
return f"❌ Not verified as {user_name}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s"
elif operation_type == OperationType.IDENTIFICATION.value:
result = asyncio.run(identify_use_case.execute(np_image))
if result.is_identified:
candidates_str = "\n".join([f" - {name}: {conf:.2%}" for name, conf in result.candidates[:5]])
return f"✅ Identified as: {result.user_id}\nConfidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s\n\nTop candidates:\n{candidates_str}"
else:
candidates_str = "\n".join([f" - {name}: {conf:.2%}" for name, conf in result.candidates[:5]])
return f"❌ Person not identified\nBest match confidence: {result.confidence:.2%}\nThreshold: {result.threshold:.2%}\nProcessing time: {result.processing_time:.3f}s\n\nTop candidates:\n{candidates_str}"
else:
return "Please select operation type and enter username for verification"
except Exception as e:
return f"Error processing image: {str(e)}"
stream_button.click(
fn=process_current_frame,
inputs=[webcam_stream, stream_operation_type, stream_user_name],
outputs=[stream_output]
)
return interface
Loading…
Cancel
Save