You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

161 lines
5.9 KiB

import cv2
import numpy as np
from typing import List, Tuple, Optional
from insightface.app import FaceAnalysis
from insightface.data import get_image as ins_get_image
import time
from src.infrastructure.logging import detector_logger, data_logger
class RetinaFaceDetector:
def __init__(self):
detector_logger.info("Initializing RetinaFace detector")
try:
# Import onnxruntime to check available providers
import onnxruntime as ort
# Check available providers and configure accordingly
available_providers = ort.get_available_providers()
providers = []
if 'CUDAExecutionProvider' in available_providers:
providers.append('CUDAExecutionProvider')
detector_logger.info("CUDA provider available for face detection")
elif 'CPUExecutionProvider' in available_providers:
providers.append('CPUExecutionProvider')
detector_logger.info("Using CPU provider for face detection")
else:
providers = available_providers
detector_logger.warning(f"Using available providers: {available_providers}")
self.app = FaceAnalysis(providers=providers)
self.app.prepare(ctx_id=0, det_size=(640, 640))
detector_logger.info("RetinaFace detector initialized successfully",
providers=self.app.models.keys() if hasattr(self.app, 'models') else "unknown")
except Exception as e:
detector_logger.error("Failed to initialize RetinaFace detector", error=e)
raise
def detect_faces(self, image: np.ndarray) -> List[dict]:
start_time = time.time()
image_info = {
"image_shape": image.shape,
"image_dtype": str(image.dtype),
"image_size_bytes": image.nbytes
}
detector_logger.debug("Starting face detection", **image_info)
try:
faces = self.app.get(image)
processing_time = time.time() - start_time
detected_faces = []
for i, face in enumerate(faces):
face_dict = {
'bbox': face.bbox.tolist(),
'landmarks': face.kps.tolist() if face.kps is not None else None,
'det_score': float(face.det_score),
'embedding': face.normed_embedding if hasattr(face, 'normed_embedding') else None
}
detected_faces.append(face_dict)
detector_logger.debug(
f"Face {i+1} detected",
face_index=i,
bbox=face_dict['bbox'],
detection_score=face_dict['det_score'],
has_landmarks=face_dict['landmarks'] is not None,
has_embedding=face_dict['embedding'] is not None
)
# Log data lifecycle
data_logger.log_face_detection(image_info, len(detected_faces), processing_time)
detector_logger.info(
"Face detection completed",
faces_detected=len(detected_faces),
processing_time_seconds=processing_time,
**image_info
)
return detected_faces
except Exception as e:
processing_time = time.time() - start_time
detector_logger.error(
"Face detection failed",
error=e,
processing_time_seconds=processing_time,
**image_info
)
raise
def align_face(self, image: np.ndarray, landmarks: np.ndarray, image_size: int = 112) -> np.ndarray:
if landmarks is None or len(landmarks) != 5:
return self._crop_face(image, landmarks)
src_pts = np.array([
[30.2946, 51.6963],
[65.5318, 51.5014],
[48.0252, 71.7366],
[33.5493, 92.3655],
[62.7299, 92.2041]
], dtype=np.float32)
if image_size == 112:
src_pts[:, 0] += 8.0
dst_pts = landmarks.astype(np.float32)
tform = cv2.estimateAffinePartial2D(dst_pts, src_pts)[0]
aligned_face = cv2.warpAffine(
image, tform, (image_size, image_size),
flags=cv2.INTER_LINEAR
)
return aligned_face
def _crop_face(self, image: np.ndarray, bbox: List[float]) -> np.ndarray:
x1, y1, x2, y2 = [int(coord) for coord in bbox[:4]]
width = x2 - x1
height = y2 - y1
size = max(width, height)
center_x = (x1 + x2) // 2
center_y = (y1 + y2) // 2
x1 = max(0, center_x - size // 2)
y1 = max(0, center_y - size // 2)
x2 = min(image.shape[1], x1 + size)
y2 = min(image.shape[0], y1 + size)
face_crop = image[y1:y2, x1:x2]
face_crop = cv2.resize(face_crop, (112, 112))
return face_crop
def calculate_quality_score(self, face_dict: dict) -> float:
detector_logger.debug("Calculating quality score", face_data_keys=list(face_dict.keys()))
det_score = face_dict.get('det_score', 0.0)
bbox = face_dict.get('bbox', [])
if len(bbox) >= 4:
face_size = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
size_score = min(face_size / 10000, 1.0)
else:
size_score = 0.0
quality_score = det_score * 0.7 + size_score * 0.3
detector_logger.debug(
"Quality score calculated",
detection_score=det_score,
size_score=size_score,
final_quality_score=quality_score,
face_size=face_size if len(bbox) >= 4 else None
)
return quality_score