From cf99ff7242cf697305798b6575d03e8938ec25cf Mon Sep 17 00:00:00 2001 From: Vitali Yanushchyk Date: Tue, 11 Jun 2024 03:48:02 -0400 Subject: [PATCH] chg ! refactor --- .../apps/faces/celery_tasks.py | 2 +- .../apps/faces/exceptions.py | 8 + src/hope_dedup_engine/apps/faces/forms.py | 24 ++ .../apps/faces/managers/__init__.py | 0 .../apps/faces/managers/net.py | 38 ++ .../apps/faces/managers/storage.py | 43 +++ .../apps/faces/services/__init__.py | 0 .../faces/services/duplication_detector.py | 105 ++++++ .../apps/faces/services/image_processor.py | 160 ++++++++ .../faces/utils/duplicate_groups_builder.py | 42 +++ .../apps/faces/utils/duplication_detector.py | 255 ------------- .../apps/faces/validators.py | 38 +- .../config/fragments/constance.py | 4 +- tests/conftest.py | 4 - tests/faces/conftest.py | 130 +++++++ tests/faces/faces_const.py | 5 +- tests/faces/fixtures/celery_tasks.py | 36 -- tests/faces/fixtures/duplication_detector.py | 81 ---- tests/faces/test_celery_tasks.py | 8 +- tests/faces/test_duplicate_groups_builder.py | 30 ++ tests/faces/test_duplication_detector.py | 347 ++++++------------ tests/faces/test_forms.py | 34 ++ tests/faces/test_image_processor.py | 124 +++++++ tests/faces/test_net_manager.py | 12 + tests/faces/test_storage_manager.py | 34 ++ tests/faces/test_validators.py | 41 --- 26 files changed, 928 insertions(+), 677 deletions(-) create mode 100644 src/hope_dedup_engine/apps/faces/exceptions.py create mode 100644 src/hope_dedup_engine/apps/faces/forms.py create mode 100644 src/hope_dedup_engine/apps/faces/managers/__init__.py create mode 100644 src/hope_dedup_engine/apps/faces/managers/net.py create mode 100644 src/hope_dedup_engine/apps/faces/managers/storage.py create mode 100644 src/hope_dedup_engine/apps/faces/services/__init__.py create mode 100644 src/hope_dedup_engine/apps/faces/services/duplication_detector.py create mode 100644 src/hope_dedup_engine/apps/faces/services/image_processor.py create mode 100644 src/hope_dedup_engine/apps/faces/utils/duplicate_groups_builder.py delete mode 100644 src/hope_dedup_engine/apps/faces/utils/duplication_detector.py create mode 100644 tests/faces/conftest.py delete mode 100644 tests/faces/fixtures/celery_tasks.py delete mode 100644 tests/faces/fixtures/duplication_detector.py create mode 100644 tests/faces/test_duplicate_groups_builder.py create mode 100644 tests/faces/test_forms.py create mode 100644 tests/faces/test_image_processor.py create mode 100644 tests/faces/test_net_manager.py create mode 100644 tests/faces/test_storage_manager.py delete mode 100644 tests/faces/test_validators.py diff --git a/src/hope_dedup_engine/apps/faces/celery_tasks.py b/src/hope_dedup_engine/apps/faces/celery_tasks.py index 2c156cfb..abfbd24d 100644 --- a/src/hope_dedup_engine/apps/faces/celery_tasks.py +++ b/src/hope_dedup_engine/apps/faces/celery_tasks.py @@ -2,8 +2,8 @@ from celery import shared_task, states +from hope_dedup_engine.apps.faces.services.duplication_detector import DuplicationDetector from hope_dedup_engine.apps.faces.utils.celery_utils import task_lifecycle -from hope_dedup_engine.apps.faces.utils.duplication_detector import DuplicationDetector @shared_task(bind=True, soft_time_limit=0.5 * 60 * 60, time_limit=1 * 60 * 60) diff --git a/src/hope_dedup_engine/apps/faces/exceptions.py b/src/hope_dedup_engine/apps/faces/exceptions.py new file mode 100644 index 00000000..ff8a42f4 --- /dev/null +++ b/src/hope_dedup_engine/apps/faces/exceptions.py @@ -0,0 +1,8 @@ +class StorageKeyError(Exception): + """ + Exception raised when the storage key does not exist. + """ + + def __init__(self, key: str) -> None: + self.key = key + super().__init__(f"Storage key '{key}' does not exist.") diff --git a/src/hope_dedup_engine/apps/faces/forms.py b/src/hope_dedup_engine/apps/faces/forms.py new file mode 100644 index 00000000..1b8288f4 --- /dev/null +++ b/src/hope_dedup_engine/apps/faces/forms.py @@ -0,0 +1,24 @@ +from django.forms import CharField, ValidationError + + +class MeanValuesTupleField(CharField): + def to_python(self, value): + try: + values = tuple(map(float, value.split(", "))) + if len(values) != 3: + raise ValueError("The tuple must have exactly three elements.") + if not all(-255 <= v <= 255 for v in values): + raise ValueError("Each value in the tuple must be between -255 and 255.") + return values + except Exception as e: + raise ValidationError( + """ + Enter a valid tuple of three float values separated by commas and spaces, e.g. '0.0, 0.0, 0.0'. + Each value must be between -255 and 255. + """ + ) from e + + def prepare_value(self, value): + if isinstance(value, tuple): + return ", ".join(map(str, value)) + return super().prepare_value(value) diff --git a/src/hope_dedup_engine/apps/faces/managers/__init__.py b/src/hope_dedup_engine/apps/faces/managers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/hope_dedup_engine/apps/faces/managers/net.py b/src/hope_dedup_engine/apps/faces/managers/net.py new file mode 100644 index 00000000..61e61407 --- /dev/null +++ b/src/hope_dedup_engine/apps/faces/managers/net.py @@ -0,0 +1,38 @@ +from django.conf import settings + +import cv2 +from constance import config + +from hope_dedup_engine.apps.core.storage import CV2DNNStorage + + +class DNNInferenceManager: + """ + A class to manage the loading and configuration of a neural network model using OpenCV's DNN module. + + The DNNInferenceManager class provides functionality to load a neural network model from Caffe files stored in a + specified storage and configure the model with preferred backend and target settings. + """ + + def __init__(self, storage: CV2DNNStorage) -> None: + """ + Loads and configures the neural network model using the specified storage. + + Args: + storage (CV2DNNStorage): The storage object from which to load the neural network model. + """ + self.net = cv2.dnn.readNetFromCaffe( + storage.path(settings.PROTOTXT_FILE), + storage.path(settings.CAFFEMODEL_FILE), + ) + self.net.setPreferableBackend(int(config.DNN_BACKEND)) + self.net.setPreferableTarget(int(config.DNN_TARGET)) + + def get_model(self) -> cv2.dnn_Net: + """ + Get the loaded and configured neural network model. + + Returns: + cv2.dnn_Net: The neural network model loaded and configured by this manager. + """ + return self.net diff --git a/src/hope_dedup_engine/apps/faces/managers/storage.py b/src/hope_dedup_engine/apps/faces/managers/storage.py new file mode 100644 index 00000000..da19e449 --- /dev/null +++ b/src/hope_dedup_engine/apps/faces/managers/storage.py @@ -0,0 +1,43 @@ +from django.conf import settings + +from hope_dedup_engine.apps.core.storage import CV2DNNStorage, HDEAzureStorage, HOPEAzureStorage +from hope_dedup_engine.apps.faces.exceptions import StorageKeyError + + +class StorageManager: + """ + A class to manage different types of storage systems used in the application. + """ + + def __init__(self) -> None: + """ + Initialize the StorageManager. + + Raises: + FileNotFoundError: If any of the required DNN model files do not exist in the storage. + """ + self.storages = { + "images": HOPEAzureStorage(), + "cv2dnn": CV2DNNStorage(settings.CV2DNN_PATH), + "encoded": HDEAzureStorage(), + } + for file in (settings.PROTOTXT_FILE, settings.CAFFEMODEL_FILE): + if not self.storages.get("cv2dnn").exists(file): + raise FileNotFoundError(f"File {file} does not exist in storage.") + + def get_storage(self, key: str) -> HOPEAzureStorage | CV2DNNStorage | HDEAzureStorage: + """ + Get the storage object for the given key. + + Args: + key (str): The key associated with the desired storage backend. + + Returns: + HOPEAzureStorage | CV2DNNStorage | HDEAzureStorage: The storage object associated with the given key. + + Raises: + StorageKeyError: If the given key does not exist in the storages dictionary. + """ + if key not in self.storages: + raise StorageKeyError(key) + return self.storages[key] diff --git a/src/hope_dedup_engine/apps/faces/services/__init__.py b/src/hope_dedup_engine/apps/faces/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/hope_dedup_engine/apps/faces/services/duplication_detector.py b/src/hope_dedup_engine/apps/faces/services/duplication_detector.py new file mode 100644 index 00000000..bd83cd99 --- /dev/null +++ b/src/hope_dedup_engine/apps/faces/services/duplication_detector.py @@ -0,0 +1,105 @@ +import logging +import os + +import face_recognition +import numpy as np + +from hope_dedup_engine.apps.faces.managers.storage import StorageManager +from hope_dedup_engine.apps.faces.services.image_processor import ImageProcessor +from hope_dedup_engine.apps.faces.utils.duplicate_groups_builder import DuplicateGroupsBuilder +from hope_dedup_engine.apps.faces.validators import IgnorePairsValidator + + +class DuplicationDetector: + """ + A class to detect and process duplicate faces in images. + """ + + logger: logging.Logger = logging.getLogger(__name__) + + def __init__(self, filenames: tuple[str], ignore_pairs: tuple[str, str] = tuple()) -> None: + """ + Initialize the DuplicationDetector with the given filenames and ignore pairs. + + Args: + filenames (tuple[str]): The filenames of the images to process. + ignore_pairs (tuple[tuple[str, str]], optional): + The pairs of filenames to ignore. Defaults to an empty tuple. + """ + self.filenames = filenames + self.ignore_set = IgnorePairsValidator.validate(ignore_pairs) + self.storages = StorageManager() + self.image_processor = ImageProcessor() + + def _encodings_filename(self, filename: str) -> str: + """ + Generate the filename for the face encodings of a given image. + + Args: + filename (str): The filename of the image. + + Returns: + str: The filename for the face encodings. + """ + return f"{filename}.npy" + + def _has_encodings(self, filename: str) -> bool: + """ + Check if the face encodings for a given image exist in storage. + + Args: + filename (str): The filename of the image. + + Returns: + bool: True if the encodings exist, False otherwise. + """ + return self.storages.get_storage("encoded").exists(self._encodings_filename(filename)) + + def _load_encodings_all(self) -> dict[str, list[np.ndarray]]: + """ + Load all face encodings from storage. + + Returns: + dict[str, list[np.ndarray]]: A dictionary with filenames as keys and lists of face encodings as values. + """ + data: dict[str, list[np.ndarray]] = {} + try: + _, files = self.storages.get_storage("encoded").listdir("") + for file in files: + if self._has_encodings(filename := os.path.splitext(file)[0]): + with self.storages.get_storage("encoded").open(file, "rb") as f: + data[filename] = np.load(f, allow_pickle=False) + except Exception as e: + self.logger.exception("Error loading encodings.") + raise e + return data + + def find_duplicates(self) -> tuple[tuple[str]]: + """ + Find and return a list of duplicate images based on face encodings. + + Returns: + tuple[tuple[str]]: A tuple of tuples, where each inner tuple contains the filenames of duplicate images. + """ + try: + for filename in self.filenames: + if not self._has_encodings(filename): + self.image_processor.encode_face(filename, self._encodings_filename(filename)) + encodings_all = self._load_encodings_all() + + checked = set() + for path1, encodings1 in encodings_all.items(): + for path2, encodings2 in encodings_all.items(): + if path1 < path2 and (path1, path2) not in self.ignore_set: + min_distance = float("inf") + for encoding1 in encodings1: + if ( + current_min := min(face_recognition.face_distance(encodings2, encoding1)) + ) < min_distance: + min_distance = current_min + checked.add((path1, path2, min_distance)) + + return DuplicateGroupsBuilder.build(checked) + except Exception as e: + self.logger.exception("Error finding duplicates for images %s", self.filenames) + raise e diff --git a/src/hope_dedup_engine/apps/faces/services/image_processor.py b/src/hope_dedup_engine/apps/faces/services/image_processor.py new file mode 100644 index 00000000..510e6f99 --- /dev/null +++ b/src/hope_dedup_engine/apps/faces/services/image_processor.py @@ -0,0 +1,160 @@ +import logging +import re +from dataclasses import dataclass, field + +from django.conf import settings +from django.core.exceptions import ValidationError + +import cv2 +import face_recognition +import numpy as np +from constance import config + +from hope_dedup_engine.apps.faces.managers.net import DNNInferenceManager +from hope_dedup_engine.apps.faces.managers.storage import StorageManager + + +@dataclass(frozen=True, slots=True) +class FaceEncodingsConfig: + num_jitters: int + model: str + + +@dataclass(frozen=True, slots=True) +class BlobFromImageConfig: + shape: dict[str, int] = field(init=False) + scale_factor: float + mean_values: tuple[float, float, float] + + def __post_init__(self) -> None: + object.__setattr__(self, "shape", self._get_shape()) + mean_values = self.mean_values + if isinstance(mean_values, str): + mean_values = tuple(map(float, mean_values.split(", "))) + object.__setattr__(self, "mean_values", mean_values) + + def _get_shape(self) -> dict[str, int]: + pattern = r"input_shape\s*\{\s*dim:\s*(\d+)\s*dim:\s*(\d+)\s*dim:\s*(\d+)\s*dim:\s*(\d+)\s*\}" + with open(settings.PROTOTXT_FILE, "r") as file: + if match := re.search(pattern, file.read()): + return { + "batch_size": int(match.group(1)), + "channels": int(match.group(2)), + "height": int(match.group(3)), + "width": int(match.group(4)), + } + else: + raise ValidationError("Could not find input_shape in prototxt file.") + + +class ImageProcessor: + """ + A class to handle image processing tasks, including face detection and encoding. + + """ + + logger: logging.Logger = logging.getLogger(__name__) + + def __init__(self) -> None: + """ + Initialize the ImageProcessor with the required configurations. + """ + self.storages = StorageManager() + self.net = DNNInferenceManager(self.storages.get_storage("cv2dnn")).get_model() + + self.blob_from_image_cfg = BlobFromImageConfig( + scale_factor=config.BLOB_FROM_IMAGE_SCALE_FACTOR, mean_values=config.BLOB_FROM_IMAGE_MEAN_VALUES + ) + self.face_encodings_cfg = FaceEncodingsConfig( + num_jitters=config.FACE_ENCODINGS_NUM_JITTERS, + model=config.FACE_ENCODINGS_MODEL, + ) + self.face_detection_confidence: float = config.FACE_DETECTION_CONFIDENCE + self.distance_threshold: float = config.FACE_DISTANCE_THRESHOLD + self.nms_threshold: float = config.NMS_THRESHOLD + + def _get_face_detections_dnn(self, filename: str) -> list[tuple[int, int, int, int]]: + """ + Detect faces in an image using the DNN model. + + Args: + filename (str): The filename of the image to process. + + Returns: + list[tuple[int, int, int, int]]: A list of tuples representing face regions in the image. + """ + face_regions: list[tuple[int, int, int, int]] = [] + try: + with self.storages.get_storage("images").open(filename, "rb") as img_file: + img_array = np.frombuffer(img_file.read(), dtype=np.uint8) + # Decode image from binary buffer to 3D numpy array (height, width, channels of BlueGreeRed color space) + image = cv2.imdecode(img_array, cv2.IMREAD_COLOR) + (h, w) = image.shape[:2] + # Create a blob (4D tensor) from the image + blob = cv2.dnn.blobFromImage( + image=cv2.resize( + image, dsize=(self.blob_from_image_cfg.shape["height"], self.blob_from_image_cfg.shape["width"]) + ), + size=(self.blob_from_image_cfg.shape["height"], self.blob_from_image_cfg.shape["width"]), + scalefactor=self.blob_from_image_cfg.scale_factor, + mean=self.blob_from_image_cfg.mean_values, + ) + self.net.setInput(blob) + # Forward pass to get output with shape (1, 1, N, 7), + # where N is the number of faces and 7 are the detection values: + # 1st: image index (0), 2nd: class label (0), 3rd: confidence (0-1), + # 4th-5th: x, y coordinates, 6th-7th: width, height + detections = self.net.forward() + boxes, confidences = [], [] + for i in range(detections.shape[2]): + confidence = detections[0, 0, i, 2] + # Filter out weak detections by ensuring the confidence is greater than the minimum confidence + if confidence > self.face_detection_confidence: + box = (detections[0, 0, i, 3:7] * np.array([w, h, w, h])).astype("int") + boxes.append(box) + confidences.append(confidence) + if boxes: + # Apply non-maxima suppression to suppress weak, overlapping bounding boxes + indices = cv2.dnn.NMSBoxes(boxes, confidences, self.face_detection_confidence, self.nms_threshold) + if indices is not None: + for i in indices: + face_regions.append(tuple(boxes[i])) + except Exception as e: + self.logger.exception("Error processing face detection for image %s", filename) + raise e + return face_regions + + def encode_face(self, filename: str, encodings_filename: str) -> None: + """ + Encode faces detected in an image and save the encodings to storage. + + Args: + filename (str): The filename of the image to process. + encodings_filename (str): The filename to save the face encodings. + """ + try: + with self.storages.get_storage("images").open(filename, "rb") as img_file: + image = face_recognition.load_image_file(img_file) + encodings: list = [] + face_regions = self._get_face_detections_dnn(filename) + if not face_regions: + self.logger.error("No face regions detected in image %s", filename) + else: + for region in face_regions: + if isinstance(region, (list, tuple)) and len(region) == 4: + top, right, bottom, left = region + face_encodings = face_recognition.face_encodings( + image, + [(top, right, bottom, left)], + num_jitters=self.face_encodings_cfg.num_jitters, + model=self.face_encodings_cfg.model, + ) + encodings.extend(face_encodings) + else: + self.logger.error("Invalid face region %s", region) + return + with self.storages.get_storage("encoded").open(encodings_filename, "wb") as f: + np.save(f, encodings) + except Exception as e: + self.logger.exception("Error processing face encodings for image %s", filename) + raise e diff --git a/src/hope_dedup_engine/apps/faces/utils/duplicate_groups_builder.py b/src/hope_dedup_engine/apps/faces/utils/duplicate_groups_builder.py new file mode 100644 index 00000000..af3d354f --- /dev/null +++ b/src/hope_dedup_engine/apps/faces/utils/duplicate_groups_builder.py @@ -0,0 +1,42 @@ +from collections import defaultdict + +from constance import config + + +class DuplicateGroupsBuilder: + @staticmethod + def build(checked: set[tuple[str, str, float]]) -> tuple[tuple[str]]: + """ + Transform a set of tuples with distances into a tuple of grouped duplicate paths. + + Args: + checked (set[tuple[str, str, float]]): A set of tuples containing the paths and their distances. + distance_threshold (float): The threshold to consider for grouping duplicates. + + Returns: + tuple[tuple[str]]: A tuple of grouped duplicate paths. + """ + # Dictionary to store connections between paths where distances are less than the threshold + groups = [] + connections = defaultdict(set) + distance_threshold: float = config.FACE_DISTANCE_THRESHOLD + for path1, path2, dist in checked: + if dist < distance_threshold: + connections[path1].add(path2) + connections[path2].add(path1) + # Iterate over each path and form groups + for path, neighbors in connections.items(): + # Check if the path has already been included in any group + if not any(path in group for group in groups): + new_group = {path} + queue = list(neighbors) + # Try to expand the group ensuring each new path is duplicated to all in the group + while queue: + neighbor = queue.pop(0) + if neighbor not in new_group and all(neighbor in connections[member] for member in new_group): + new_group.add(neighbor) + # Add neighbors of the current neighbor, excluding those already in the group + queue.extend([n for n in connections[neighbor] if n not in new_group]) + # Add the newly formed group to the list of groups + groups.append(new_group) + return tuple(map(tuple, groups)) diff --git a/src/hope_dedup_engine/apps/faces/utils/duplication_detector.py b/src/hope_dedup_engine/apps/faces/utils/duplication_detector.py deleted file mode 100644 index c0683943..00000000 --- a/src/hope_dedup_engine/apps/faces/utils/duplication_detector.py +++ /dev/null @@ -1,255 +0,0 @@ -import logging -import os -import re -from collections import defaultdict -from dataclasses import dataclass - -from django.conf import settings - -import cv2 -import face_recognition -import numpy as np -from constance import config - -from hope_dedup_engine.apps.core.storage import CV2DNNStorage, HDEAzureStorage, HOPEAzureStorage - - -class DuplicationDetector: - """ - A class to detect and process duplicate faces in images. - """ - - @dataclass(frozen=True, slots=True) - class BlobFromImageConfig: - shape: dict[str, int] - scale_factor: float - mean_values: tuple[float, float, float] - - @dataclass(frozen=True, slots=True) - class FaceEncodingsConfig: - num_jitters: int - model: str - - logger: logging.Logger = logging.getLogger(__name__) - - def __init__(self, filenames: tuple[str], ignore_pairs: tuple[str, str] = tuple()) -> None: - """ - Initialize the DuplicationDetector with the given filenames. - - Args: - filenames (list[str]): The filenames of the images to process. - ignore_pairs (list[tuple[str, str]]): The pairs of filenames to ignore. - """ - self.storages: dict[str, CV2DNNStorage | HDEAzureStorage | HOPEAzureStorage] = { - "images": HOPEAzureStorage(), - "cv2dnn": CV2DNNStorage(settings.CV2DNN_PATH), - "encoded": HDEAzureStorage(), - } - - for file in (settings.PROTOTXT_FILE, settings.CAFFEMODEL_FILE): - if not self.storages.get("cv2dnn").exists(file): - raise FileNotFoundError(f"File {file} does not exist in storage.") - - self.net: cv2.dnn_Net = self._set_net(self.storages.get("cv2dnn")) - - self.filenames: tuple[str] = filenames - self.ignore_set: set[tuple[str, str]] = self._get_pairs_to_ignore(ignore_pairs) - - self.blob_from_image_cfg = self.BlobFromImageConfig( - shape=self._get_shape(), - scale_factor=config.BLOB_FROM_IMAGE_SCALE_FACTOR, - mean_values=( - tuple(map(float, config.BLOB_FROM_IMAGE_MEAN_VALUES.split(", "))) - if isinstance(config.BLOB_FROM_IMAGE_MEAN_VALUES, str) - else config.BLOB_FROM_IMAGE_MEAN_VALUES - ), - ) - self.face_detection_confidence: float = config.FACE_DETECTION_CONFIDENCE - self.distance_threshold: float = config.FACE_DISTANCE_THRESHOLD - self.face_encodings_cfg = self.FaceEncodingsConfig( - num_jitters=config.FACE_ENCODINGS_NUM_JITTERS, - model=config.FACE_ENCODINGS_MODEL, - ) - - self.nms_threshold: float = config.NMS_THRESHOLD - - def _set_net(self, storage: CV2DNNStorage) -> cv2.dnn_Net: - net = cv2.dnn.readNetFromCaffe( - storage.path(settings.PROTOTXT_FILE), - storage.path(settings.CAFFEMODEL_FILE), - ) - net.setPreferableBackend(int(config.DNN_BACKEND)) - net.setPreferableTarget(int(config.DNN_TARGET)) - return net - - def _get_shape(self) -> dict[str, int]: - pattern = r"input_shape\s*\{\s*" r"dim:\s*(\d+)\s*" r"dim:\s*(\d+)\s*" r"dim:\s*(\d+)\s*" r"dim:\s*(\d+)\s*\}" - with open(settings.PROTOTXT_FILE, "r") as file: - if match := re.search(pattern, file.read()): - return { - "batch_size": int(match.group(1)), - "channels": int(match.group(2)), - "height": int(match.group(3)), - "width": int(match.group(4)), - } - else: - raise ValueError("Could not find input_shape in prototxt file.") - - def _get_pairs_to_ignore(self, ignore: tuple[tuple[str, str]]) -> set[tuple[str, str]]: - ignore = tuple(tuple(pair) for pair in ignore) - if not ignore: - return set() - if all( - isinstance(pair, tuple) and len(pair) == 2 and all(isinstance(item, str) and item for item in pair) - for pair in ignore - ): - return {(item1, item2) for item1, item2 in ignore} | {(item2, item1) for item1, item2 in ignore} - elif len(ignore) == 2 and all(isinstance(item, str) for item in ignore): - return {(ignore[0], ignore[1]), (ignore[1], ignore[0])} - else: - raise ValueError( - "Invalid format for 'ignore'. Expected tuple of tuples each containing exactly two strings." - ) - - def _encodings_filename(self, filename: str) -> str: - return f"{filename}.npy" - - def _has_encodings(self, filename: str) -> bool: - return self.storages["encoded"].exists(self._encodings_filename(filename)) - - def _get_face_detections_dnn(self, filename: str) -> list[tuple[int, int, int, int]]: - face_regions: list[tuple[int, int, int, int]] = [] - try: - with self.storages["images"].open(filename, "rb") as img_file: - img_array = np.frombuffer(img_file.read(), dtype=np.uint8) - # Decode image from binary buffer to 3D numpy array (height, width, channels of BlueGreeRed color space) - image = cv2.imdecode(img_array, cv2.IMREAD_COLOR) - (h, w) = image.shape[:2] - # Create a blob (4D tensor) from the image - blob = cv2.dnn.blobFromImage( - image=cv2.resize( - image, dsize=(self.blob_from_image_cfg.shape["height"], self.blob_from_image_cfg.shape["width"]) - ), - size=(self.blob_from_image_cfg.shape["height"], self.blob_from_image_cfg.shape["width"]), - scalefactor=self.blob_from_image_cfg.scale_factor, - mean=self.blob_from_image_cfg.mean_values, - ) - self.net.setInput(blob) - # Forward pass to get output with shape (1, 1, N, 7), - # where N is the number of faces and 7 are the detection values: - # 1st: image index (0), 2nd: class label (0), 3rd: confidence (0-1), - # 4th-5th: x, y coordinates, 6th-7th: width, height - detections = self.net.forward() - boxes, confidences = [], [] - for i in range(detections.shape[2]): - confidence = detections[0, 0, i, 2] - # Filter out weak detections by ensuring the confidence is greater than the minimum confidence - if confidence > self.face_detection_confidence: - box = (detections[0, 0, i, 3:7] * np.array([w, h, w, h])).astype("int") - boxes.append(box) - confidences.append(confidence) - if boxes: - # Apply non-maxima suppression to suppress weak, overlapping bounding boxes - indices = cv2.dnn.NMSBoxes(boxes, confidences, self.face_detection_confidence, self.nms_threshold) - if indices is not None: - for i in indices: - face_regions.append(tuple(boxes[i])) - except Exception as e: - self.logger.exception("Error processing face detection for image %s", filename) - raise e - return face_regions - - def _load_encodings_all(self) -> dict[str, list[np.ndarray]]: - data: dict[str, list[np.ndarray]] = {} - try: - _, files = self.storages["encoded"].listdir("") - for file in files: - if self._has_encodings(filename := os.path.splitext(file)[0]): - with self.storages["encoded"].open(file, "rb") as f: - data[filename] = np.load(f, allow_pickle=False) - except Exception as e: - self.logger.exception("Error loading encodings.") - raise e - return data - - def _encode_face(self, filename: str) -> None: - try: - with self.storages["images"].open(filename, "rb") as img_file: - image = face_recognition.load_image_file(img_file) - encodings: list = [] - face_regions = self._get_face_detections_dnn(filename) - if not face_regions: - self.logger.error("No face regions detected in image %s", filename) - else: - for region in face_regions: - if isinstance(region, (list, tuple)) and len(region) == 4: - top, right, bottom, left = region - face_encodings = face_recognition.face_encodings( - image, - [(top, right, bottom, left)], - num_jitters=self.face_encodings_cfg.num_jitters, - model=self.face_encodings_cfg.model, - ) - encodings.extend(face_encodings) - else: - self.logger.error("Invalid face region %s", region) - with self.storages["encoded"].open(self._encodings_filename(filename), "wb") as f: - np.save(f, encodings) - except Exception as e: - self.logger.exception("Error processing face encodings for image %s", filename) - raise e - - def _get_duplicated_groups(self, checked: set[tuple[str, str, float]]) -> tuple[tuple[str]]: - # Dictionary to store connections between paths where distances are less than the threshold - groups = [] - connections = defaultdict(set) - for path1, path2, dist in checked: - if dist < self.distance_threshold: - connections[path1].add(path2) - connections[path2].add(path1) - # Iterate over each path and form groups - for path, neighbors in connections.items(): - # Check if the path has already been included in any group - if not any(path in group for group in groups): - new_group = {path} - queue = list(neighbors) - # Try to expand the group ensuring each new path is duplicated to all in the group - while queue: - neighbor = queue.pop(0) - if neighbor not in new_group and all(neighbor in connections[member] for member in new_group): - new_group.add(neighbor) - # Add neighbors of the current neighbor, excluding those already in the group - queue.extend([n for n in connections[neighbor] if n not in new_group]) - # Add the newly formed group to the list of groups - groups.append(new_group) - return tuple(map(tuple, groups)) - - def find_duplicates(self) -> tuple[tuple[str]]: - """ - Find and return a list of duplicate images based on face encodings. - - Returns: - tuple[tuple[str]]: A tuple of filenames of duplicate images. - """ - try: - for filename in self.filenames: - if not self._has_encodings(filename): - self._encode_face(filename) - encodings_all = self._load_encodings_all() - - checked = set() - for path1, encodings1 in encodings_all.items(): - for path2, encodings2 in encodings_all.items(): - if path1 < path2 and (path1, path2) not in self.ignore_set: - min_distance = float("inf") - for encoding1 in encodings1: - if ( - current_min := min(face_recognition.face_distance(encodings2, encoding1)) - ) < min_distance: - min_distance = current_min - checked.add((path1, path2, min_distance)) - - return self._get_duplicated_groups(checked) - except Exception as e: - self.logger.exception("Error finding duplicates for images %s", self.filenames) - raise e diff --git a/src/hope_dedup_engine/apps/faces/validators.py b/src/hope_dedup_engine/apps/faces/validators.py index 1b8288f4..c1ab9a50 100644 --- a/src/hope_dedup_engine/apps/faces/validators.py +++ b/src/hope_dedup_engine/apps/faces/validators.py @@ -1,24 +1,20 @@ -from django.forms import CharField, ValidationError +from django.core.exceptions import ValidationError -class MeanValuesTupleField(CharField): - def to_python(self, value): - try: - values = tuple(map(float, value.split(", "))) - if len(values) != 3: - raise ValueError("The tuple must have exactly three elements.") - if not all(-255 <= v <= 255 for v in values): - raise ValueError("Each value in the tuple must be between -255 and 255.") - return values - except Exception as e: +class IgnorePairsValidator: + @staticmethod + def validate(ignore: tuple[tuple[str, str]]) -> set[tuple[str, str]]: + ignore = tuple(tuple(pair) for pair in ignore) + if not ignore: + return set() + if all( + isinstance(pair, tuple) and len(pair) == 2 and all(isinstance(item, str) and item for item in pair) + for pair in ignore + ): + return {(item1, item2) for item1, item2 in ignore} | {(item2, item1) for item1, item2 in ignore} + elif len(ignore) == 2 and all(isinstance(item, str) for item in ignore): + return {(ignore[0], ignore[1]), (ignore[1], ignore[0])} + else: raise ValidationError( - """ - Enter a valid tuple of three float values separated by commas and spaces, e.g. '0.0, 0.0, 0.0'. - Each value must be between -255 and 255. - """ - ) from e - - def prepare_value(self, value): - if isinstance(value, tuple): - return ", ".join(map(str, value)) - return super().prepare_value(value) + "Invalid format for 'ignore'. Expected tuple of tuples each containing exactly two strings." + ) diff --git a/src/hope_dedup_engine/config/fragments/constance.py b/src/hope_dedup_engine/config/fragments/constance.py index 555dbc49..787ca2d5 100644 --- a/src/hope_dedup_engine/config/fragments/constance.py +++ b/src/hope_dedup_engine/config/fragments/constance.py @@ -37,7 +37,7 @@ "tuple_field", ), "FACE_DETECTION_CONFIDENCE": ( - 0.7, + 0.5, """ Specifies the minimum confidence score required for a detected face to be considered valid. Detections with confidence scores below this threshold are discarded as likely false positives. @@ -128,5 +128,5 @@ "choices": (("small", "SMALL"), ("large", "LARGE")), }, ], - "tuple_field": ["hope_dedup_engine.apps.faces.validators.MeanValuesTupleField", {}], + "tuple_field": ["hope_dedup_engine.apps.faces.forms.MeanValuesTupleField", {}], } diff --git a/tests/conftest.py b/tests/conftest.py index 3734f277..df46bfbc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -62,7 +62,3 @@ def setup(db): def mocked_responses(): with responses.RequestsMock(assert_all_requests_are_fired=False) as rsps: yield rsps - - -from faces.fixtures.celery_tasks import * # noqa: E402, F401, F403 -from faces.fixtures.duplication_detector import * # noqa: E402, F401, F403 diff --git a/tests/faces/conftest.py b/tests/faces/conftest.py new file mode 100644 index 00000000..dba1f743 --- /dev/null +++ b/tests/faces/conftest.py @@ -0,0 +1,130 @@ +from io import BytesIO +from unittest.mock import MagicMock, mock_open, patch + +import cv2 +import numpy as np +import pytest +from faces_const import ( + BLOB_SHAPE, + DEPLOY_PROTO_CONTENT, + DEPLOY_PROTO_SHAPE, + FACE_DETECTIONS, + FACE_REGIONS_VALID, + FILENAMES, + IGNORE_PAIRS, + IMAGE_SIZE, + RESIZED_IMAGE_SIZE, +) +from freezegun import freeze_time +from PIL import Image +from pytest_mock import MockerFixture + +from docker import from_env +from hope_dedup_engine.apps.core.storage import CV2DNNStorage, HDEAzureStorage, HOPEAzureStorage +from hope_dedup_engine.apps.faces.managers.net import DNNInferenceManager +from hope_dedup_engine.apps.faces.managers.storage import StorageManager +from hope_dedup_engine.apps.faces.services.duplication_detector import DuplicationDetector +from hope_dedup_engine.apps.faces.services.image_processor import BlobFromImageConfig, ImageProcessor + + +@pytest.fixture +def mock_storage_manager(mocker: MockerFixture) -> StorageManager: + mocker.patch.object(CV2DNNStorage, "exists", return_value=True) + mocker.patch.object(HDEAzureStorage, "exists", return_value=True) + mocker.patch.object(HOPEAzureStorage, "exists", return_value=True) + yield StorageManager() + + +@pytest.fixture +def mock_hde_azure_storage(): + return MagicMock(spec=HDEAzureStorage) + + +@pytest.fixture +def mock_hope_azure_storage(): + return MagicMock(spec=HOPEAzureStorage) + + +@pytest.fixture +def mock_prototxt_file(): + return mock_open(read_data=DEPLOY_PROTO_CONTENT) + + +@pytest.fixture +def mock_net_manager(mocker: MockerFixture) -> DNNInferenceManager: + mock_net = mocker.Mock() + mocker.patch("cv2.dnn.readNetFromCaffe", return_value=mock_net) + yield mock_net + + +@pytest.fixture +def mock_image_processor( + mocker: MockerFixture, mock_storage_manager, mock_net_manager, mock_open_context_manager +) -> ImageProcessor: + mocker.patch.object(BlobFromImageConfig, "_get_shape", return_value=DEPLOY_PROTO_SHAPE) + mock_processor = ImageProcessor() + mocker.patch.object(mock_processor.storages.get_storage("images"), "open", return_value=mock_open_context_manager) + yield mock_processor + + +@pytest.fixture +def image_bytes_io(): + img_byte_arr = BytesIO() + image = Image.new("RGB", (100, 100), color="red") + image.save(img_byte_arr, format="JPEG") + img_byte_arr.seek(0) + img_byte_arr.fake_open = lambda *_: BytesIO(img_byte_arr.getvalue()) + yield img_byte_arr + + +@pytest.fixture +def mock_open_context_manager(image_bytes_io): + mock_open_context_manager = MagicMock() + mock_open_context_manager.__enter__.return_value = image_bytes_io + yield mock_open_context_manager + + +@pytest.fixture +def mock_net(): + mock_net = MagicMock(spec=cv2.dnn_Net) # Mocking the neural network object + mock_detections = np.array([[FACE_DETECTIONS]], dtype=np.float32) # Mocking the detections array + mock_expected_regions = FACE_REGIONS_VALID + mock_net.forward.return_value = mock_detections # Setting up the forward method of the mock network + mock_imdecode = MagicMock(return_value=np.ones(IMAGE_SIZE, dtype=np.uint8)) + mock_resize = MagicMock(return_value=np.ones(RESIZED_IMAGE_SIZE, dtype=np.uint8)) + mock_blob = np.zeros(BLOB_SHAPE) + yield mock_net, mock_imdecode, mock_resize, mock_blob, mock_expected_regions + + +@pytest.fixture +def mock_dd(mock_image_processor, mock_net_manager, mock_storage_manager): + detector = DuplicationDetector(FILENAMES, IGNORE_PAIRS) + yield detector + + +@pytest.fixture(scope="session") +def docker_client(): + client = from_env() + yield client + client.close() + + +@pytest.fixture +def mock_redis_client(): + with patch("redis.Redis.set") as mock_set, patch("redis.Redis.delete") as mock_delete: + yield mock_set, mock_delete + + +@pytest.fixture +def mock_dd_find(): + with patch( + "hope_dedup_engine.apps.faces.services.duplication_detector.DuplicationDetector.find_duplicates" + ) as mock_find: + mock_find.return_value = (FILENAMES[:2],) # Assuming the first two are duplicates based on mock data + yield mock_find + + +@pytest.fixture +def time_control(): + with freeze_time("2024-01-01") as frozen_time: + yield frozen_time diff --git a/tests/faces/faces_const.py b/tests/faces/faces_const.py index 64b2c543..78f16901 100644 --- a/tests/faces/faces_const.py +++ b/tests/faces/faces_const.py @@ -1,6 +1,7 @@ from typing import Final FILENAME: Final[str] = "test_file.jpg" +FILENAME_ENCODED: Final[str] = "test_file.jpg.npy" FILENAME_ENCODED_FORMAT: Final[str] = "{}.npy" FILENAMES: Final[list[str]] = ["test_file.jpg", "test_file2.jpg", "test_file3.jpg"] IGNORE_PAIRS: Final[list[tuple[str, str]]] = [ @@ -23,7 +24,9 @@ (10, 10, 20, 20), (30, 30, 40, 40), ] -FACE_DETECTION_CONFIDENCE: Final[float] = 0.7 +BLOB_FROM_IMAGE_SCALE_FACTOR: Final[float] = 1.0 +BLOB_FROM_IMAGE_MEAN_VALUES: Final[tuple[float, float, float]] = (104.0, 177.0, 123.0) +FACE_DETECTION_CONFIDENCE: Final[float] = 0.5 FACE_DETECTIONS: Final[list[tuple[float]]] = [ (0, 0, 0.95, 0.1, 0.1, 0.2, 0.2), # with confidence 0.95 -> valid detection (0, 0, 0.75, 0.3, 0.3, 0.4, 0.4), # with confidence 0.75 -> valid detection diff --git a/tests/faces/fixtures/celery_tasks.py b/tests/faces/fixtures/celery_tasks.py deleted file mode 100644 index 7bf0602c..00000000 --- a/tests/faces/fixtures/celery_tasks.py +++ /dev/null @@ -1,36 +0,0 @@ -from unittest.mock import patch - -import pytest -from freezegun import freeze_time - -from docker import from_env - -from ..faces_const import FILENAMES - - -@pytest.fixture(scope="session") -def docker_client(): - client = from_env() - yield client - client.close() - - -@pytest.fixture -def mock_redis_client(): - with patch("redis.Redis.set") as mock_set, patch("redis.Redis.delete") as mock_delete: - yield mock_set, mock_delete - - -@pytest.fixture -def mock_dd_find(): - with patch( - "hope_dedup_engine.apps.faces.utils.duplication_detector.DuplicationDetector.find_duplicates" - ) as mock_find: - mock_find.return_value = (FILENAMES[:2],) # Assuming the first two are duplicates based on mock data - yield mock_find - - -@pytest.fixture -def time_control(): - with freeze_time("2024-01-01") as frozen_time: - yield frozen_time diff --git a/tests/faces/fixtures/duplication_detector.py b/tests/faces/fixtures/duplication_detector.py deleted file mode 100644 index 748c498b..00000000 --- a/tests/faces/fixtures/duplication_detector.py +++ /dev/null @@ -1,81 +0,0 @@ -from io import BytesIO -from unittest.mock import MagicMock, mock_open, patch - -import cv2 -import numpy as np -import pytest -from PIL import Image - -from hope_dedup_engine.apps.core.storage import CV2DNNStorage, HDEAzureStorage, HOPEAzureStorage -from hope_dedup_engine.apps.faces.utils.duplication_detector import DuplicationDetector - -from ..faces_const import ( - BLOB_SHAPE, - DEPLOY_PROTO_CONTENT, - FACE_DETECTIONS, - FACE_REGIONS_VALID, - FILENAMES, - IGNORE_PAIRS, - IMAGE_SIZE, - RESIZED_IMAGE_SIZE, -) - - -@pytest.fixture -def dd(mock_hope_azure_storage, mock_cv2dnn_storage, mock_hde_azure_storage, mock_prototxt_file, db): - with ( - patch("hope_dedup_engine.apps.faces.utils.duplication_detector.CV2DNNStorage", mock_cv2dnn_storage), - patch("hope_dedup_engine.apps.faces.utils.duplication_detector.HOPEAzureStorage", mock_hope_azure_storage), - patch("hope_dedup_engine.apps.faces.utils.duplication_detector.HDEAzureStorage", mock_hde_azure_storage), - patch("builtins.open", mock_prototxt_file), - ): - return DuplicationDetector(FILENAMES, IGNORE_PAIRS) - - -@pytest.fixture -def mock_prototxt_file(): - return mock_open(read_data=DEPLOY_PROTO_CONTENT) - - -@pytest.fixture -def mock_cv2dnn_storage(): - return MagicMock(spec=CV2DNNStorage) - - -@pytest.fixture -def mock_hde_azure_storage(): - return MagicMock(spec=HDEAzureStorage) - - -@pytest.fixture -def mock_hope_azure_storage(): - return MagicMock(spec=HOPEAzureStorage) - - -@pytest.fixture -def image_bytes_io(dd): - img_byte_arr = BytesIO() - image = Image.new("RGB", (100, 100), color="red") - image.save(img_byte_arr, format="JPEG") - img_byte_arr.seek(0) - img_byte_arr.fake_open = lambda *_: BytesIO(img_byte_arr.getvalue()) - return img_byte_arr - - -@pytest.fixture -def mock_open_context_manager(image_bytes_io): - mock_open_context_manager = MagicMock() - mock_open_context_manager.__enter__.return_value = image_bytes_io - return mock_open_context_manager - - -@pytest.fixture -def mock_net(): - mock_net = MagicMock(spec=cv2.dnn_Net) # Mocking the neural network object - mock_detections = np.array([[FACE_DETECTIONS]], dtype=np.float32) # Mocking the detections array - mock_expected_regions = FACE_REGIONS_VALID - mock_net.forward.return_value = mock_detections # Setting up the forward method of the mock network - mock_imdecode = MagicMock(return_value=np.ones(IMAGE_SIZE, dtype=np.uint8)) - mock_resize = MagicMock(return_value=np.ones(RESIZED_IMAGE_SIZE, dtype=np.uint8)) - mock_blob = np.zeros(BLOB_SHAPE) - return mock_net, mock_imdecode, mock_resize, mock_blob, mock_expected_regions diff --git a/tests/faces/test_celery_tasks.py b/tests/faces/test_celery_tasks.py index bf6fe492..e75fdb5b 100644 --- a/tests/faces/test_celery_tasks.py +++ b/tests/faces/test_celery_tasks.py @@ -11,12 +11,12 @@ @pytest.mark.parametrize("lock_is_acquired", [True, False]) -def test_deduplicate_task_locking(mock_redis_client, mock_dd_find, dd, lock_is_acquired): +def test_deduplicate_task_locking(mock_redis_client, mock_dd_find, mock_dd, lock_is_acquired): mock_set, mock_delete = mock_redis_client mock_set.return_value = lock_is_acquired mock_find = mock_dd_find - with patch("hope_dedup_engine.apps.faces.celery_tasks.DuplicationDetector", return_value=dd): + with patch("hope_dedup_engine.apps.faces.celery_tasks.DuplicationDetector", return_value=mock_dd): task_result = deduplicate.apply(args=(FILENAMES, IGNORE_PAIRS)).get() hash_value = _get_hash(FILENAMES, IGNORE_PAIRS) @@ -39,7 +39,7 @@ def test_deduplicate_task_locking(mock_redis_client, mock_dd_find, dd, lock_is_a (CELERY_TASK_DELAYS["CustomException"], Exception("Simulated custom task failure")), ], ) -def test_deduplicate_task_exception_handling(mock_redis_client, mock_dd_find, time_control, dd, delay, exception): +def test_deduplicate_task_exception_handling(mock_redis_client, mock_dd_find, time_control, mock_dd, delay, exception): mock_set, mock_delete = mock_redis_client mock_find = mock_dd_find mock_find.side_effect = exception @@ -48,7 +48,7 @@ def test_deduplicate_task_exception_handling(mock_redis_client, mock_dd_find, ti with ( pytest.raises(type(exception)) as exc_info, - patch("hope_dedup_engine.apps.faces.celery_tasks.DuplicationDetector", return_value=dd), + patch("hope_dedup_engine.apps.faces.celery_tasks.DuplicationDetector", return_value=mock_dd), ): task = deduplicate.apply(args=(FILENAMES, IGNORE_PAIRS)) assert exc_info.value == exception diff --git a/tests/faces/test_duplicate_groups_builder.py b/tests/faces/test_duplicate_groups_builder.py new file mode 100644 index 00000000..a5aca5ec --- /dev/null +++ b/tests/faces/test_duplicate_groups_builder.py @@ -0,0 +1,30 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from hope_dedup_engine.apps.faces.utils.duplicate_groups_builder import DuplicateGroupsBuilder + + +@pytest.mark.parametrize( + "checked, threshold, expected_groups", + [ + ({("path1", "path2", 0.2), ("path2", "path3", 0.1)}, 0.3, (("path1", "path2"), ("path3", "path2"))), + ({("path1", "path2", 0.2), ("path2", "path3", 0.4)}, 0.3, (("path1", "path2"),)), + ({("path1", "path2", 0.4), ("path2", "path3", 0.4)}, 0.3, ()), + ( + {("path1", "path2", 0.2), ("path2", "path3", 0.2), ("path3", "path4", 0.2)}, + 0.3, + (("path4", "path3"), ("path1", "path2")), + ), + ], +) +def test_duplicate_groups_builder(checked, threshold, expected_groups): + def sort_nested_tuples(nested_tuples: tuple[tuple[str]]) -> tuple[tuple[str]]: + sorted_inner = tuple(tuple(sorted(inner_tuple)) for inner_tuple in nested_tuples) + sorted_outer = tuple(sorted(sorted_inner)) + return sorted_outer + + mock_config = MagicMock() + mock_config.FACE_DISTANCE_THRESHOLD = threshold + with patch("hope_dedup_engine.apps.faces.utils.duplicate_groups_builder.config", mock_config): + DuplicateGroupsBuilder.build(checked) diff --git a/tests/faces/test_duplication_detector.py b/tests/faces/test_duplication_detector.py index d74a818c..c4eb8796 100644 --- a/tests/faces/test_duplication_detector.py +++ b/tests/faces/test_duplication_detector.py @@ -1,63 +1,29 @@ import os -from unittest.mock import MagicMock, mock_open, patch +from io import BytesIO +from unittest.mock import patch -from django.conf import settings +from django.core.exceptions import ValidationError -import cv2 import numpy as np import pytest from constance import config -from faces_const import DEPLOY_PROTO_SHAPE, FACE_REGIONS_INVALID, FILENAME, FILENAME_ENCODED_FORMAT, FILENAMES +from faces_const import FILENAME, FILENAME_ENCODED_FORMAT, FILENAMES -from hope_dedup_engine.apps.faces.utils.duplication_detector import DuplicationDetector +from hope_dedup_engine.apps.faces.managers.storage import StorageManager +from hope_dedup_engine.apps.faces.services.duplication_detector import DuplicationDetector +from hope_dedup_engine.apps.faces.services.image_processor import ImageProcessor -def test_duplication_detector_initialization(dd): - assert isinstance(dd.net, cv2.dnn_Net) - assert dd.filenames == FILENAMES - assert dd.face_detection_confidence == config.FACE_DETECTION_CONFIDENCE - assert dd.distance_threshold == config.FACE_DISTANCE_THRESHOLD - assert dd.nms_threshold == config.NMS_THRESHOLD - - assert isinstance(dd.blob_from_image_cfg, DuplicationDetector.BlobFromImageConfig) - assert dd.blob_from_image_cfg.scale_factor == config.BLOB_FROM_IMAGE_SCALE_FACTOR - if isinstance(config.BLOB_FROM_IMAGE_MEAN_VALUES, str): - expected_mean_values = tuple(map(float, config.BLOB_FROM_IMAGE_MEAN_VALUES.split(", "))) - else: - expected_mean_values = config.BLOB_FROM_IMAGE_MEAN_VALUES - assert dd.blob_from_image_cfg.mean_values == expected_mean_values - - assert isinstance(dd.face_encodings_cfg, DuplicationDetector.FaceEncodingsConfig) - assert dd.face_encodings_cfg.num_jitters == config.FACE_ENCODINGS_NUM_JITTERS - assert dd.face_encodings_cfg.model == config.FACE_ENCODINGS_MODEL - - -def test_get_shape(dd, mock_prototxt_file): - with patch("builtins.open", mock_prototxt_file): - shape = dd._get_shape() - assert shape == DEPLOY_PROTO_SHAPE - - -def test_set_net(dd, mock_cv2dnn_storage, mock_net): - mock_net_instance, *_ = mock_net - with patch("cv2.dnn.readNetFromCaffe", return_value=mock_net_instance) as mock_read_net: - net = dd._set_net(mock_cv2dnn_storage) - mock_read_net.assert_called_once_with( - mock_cv2dnn_storage.path(settings.PROTOTXT_FILE), - mock_cv2dnn_storage.path(settings.CAFFEMODEL_FILE), - ) - - assert net == mock_net_instance - mock_net_instance.setPreferableBackend.assert_called_once_with(int(config.DNN_BACKEND)) - mock_net_instance.setPreferableTarget.assert_called_once_with(int(config.DNN_TARGET)) - - for storage_name, storage in dd.storages.items(): - assert isinstance(storage, MagicMock) - if storage_name == "cv2dnn": - storage.exists.assert_any_call(settings.PROTOTXT_FILE) - storage.exists.assert_any_call(settings.CAFFEMODEL_FILE) - storage.path.assert_any_call(settings.PROTOTXT_FILE) - storage.path.assert_any_call(settings.CAFFEMODEL_FILE) +def test_init_successful(mock_dd): + assert mock_dd.filenames == FILENAMES + assert isinstance(mock_dd.storages, StorageManager) + assert isinstance(mock_dd.image_processor, ImageProcessor) + assert mock_dd.image_processor.face_detection_confidence == config.FACE_DETECTION_CONFIDENCE + assert mock_dd.image_processor.distance_threshold == config.FACE_DISTANCE_THRESHOLD + assert mock_dd.image_processor.nms_threshold == config.NMS_THRESHOLD + assert mock_dd.image_processor.face_encodings_cfg.num_jitters == config.FACE_ENCODINGS_NUM_JITTERS + assert mock_dd.image_processor.face_encodings_cfg.model == config.FACE_ENCODINGS_MODEL + assert mock_dd.image_processor.blob_from_image_cfg.scale_factor == config.BLOB_FROM_IMAGE_SCALE_FACTOR @pytest.mark.parametrize( @@ -80,13 +46,9 @@ def test_set_net(dd, mock_cv2dnn_storage, mock_net): ), ], ) -def test_get_pairs_to_ignore_success(mock_cv2dnn_storage, mock_prototxt_file, ignore_input, expected_output): - with ( - patch("hope_dedup_engine.apps.faces.utils.duplication_detector.CV2DNNStorage", mock_cv2dnn_storage), - patch("builtins.open", mock_prototxt_file), - ): - dd = DuplicationDetector(FILENAMES, ignore_input) - assert dd.ignore_set == expected_output +def test_get_pairs_to_ignore_success(mock_storage_manager, mock_image_processor, ignore_input, expected_output): + dd = DuplicationDetector(FILENAMES, ignore_input) + assert dd.ignore_set == expected_output @pytest.mark.parametrize( @@ -102,219 +64,142 @@ def test_get_pairs_to_ignore_success(mock_cv2dnn_storage, mock_prototxt_file, ig (("", "file2.jpg"),), ], ) -def test_get_pairs_to_ignore_exception_handling(mock_cv2dnn_storage, mock_prototxt_file, ignore_input): - with ( - pytest.raises(ValueError), - patch("hope_dedup_engine.apps.faces.utils.duplication_detector.CV2DNNStorage", mock_cv2dnn_storage), - patch("builtins.open", mock_prototxt_file), - ): +def test_get_pairs_to_ignore_exception_handling(mock_storage_manager, ignore_input): + with pytest.raises(ValidationError): DuplicationDetector(filenames=FILENAMES, ignore_pairs=ignore_input) -@pytest.mark.parametrize("missing_file", [settings.PROTOTXT_FILE, settings.CAFFEMODEL_FILE]) -def test_initialization_missing_files_in_cv2dnn_storage(mock_cv2dnn_storage, missing_file): - with ( - pytest.raises(FileNotFoundError), - patch("hope_dedup_engine.apps.faces.utils.duplication_detector.CV2DNNStorage", mock_cv2dnn_storage), - ): - mock_cv2dnn_storage.exists.side_effect = lambda filename: filename != missing_file - DuplicationDetector(FILENAME) - mock_cv2dnn_storage.exists.assert_any_call(missing_file) - - -def test_encodings_filename(dd): - assert dd._encodings_filename(FILENAME) == FILENAME_ENCODED_FORMAT.format(FILENAME) +def test_encodings_filename(mock_dd): + assert mock_dd._encodings_filename(FILENAME) == FILENAME_ENCODED_FORMAT.format(FILENAME) @pytest.mark.parametrize("file_exists", [True, False]) -def test_has_encodings(dd, file_exists): - dd.storages["encoded"].exists.return_value = file_exists - assert dd._has_encodings(FILENAME) == file_exists - dd.storages["encoded"].exists.assert_called_with(FILENAME_ENCODED_FORMAT.format(FILENAME)) - - -def test_get_face_detections_dnn_no_detections(dd, mock_open_context_manager): - with ( - patch.object(dd.storages["images"], "open", return_value=mock_open_context_manager), - patch.object(dd, "_get_face_detections_dnn", return_value=[]), - ): - face_regions = dd._get_face_detections_dnn() - assert len(face_regions) == 0 - - -def test_get_face_detections_dnn_with_detections(dd, mock_net, mock_open_context_manager): - net, imdecode, resize, _, expected_regions = mock_net - with ( - patch.object(dd.storages["images"], "open", return_value=mock_open_context_manager), - patch("cv2.imdecode", imdecode), - patch("cv2.resize", resize), - patch.object(dd, "net", net), - ): - face_regions = dd._get_face_detections_dnn(FILENAME) - - assert face_regions == expected_regions - for region in face_regions: - assert isinstance(region, tuple) - assert len(region) == 4 +def test_has_encodings(mock_dd, file_exists): + with patch.object(mock_dd.storages.get_storage("encoded"), "exists") as file_exists_mock: + file_exists_mock.return_value = file_exists + assert mock_dd._has_encodings(FILENAME) == file_exists + mock_dd.storages.get_storage("encoded").exists.assert_called_with(FILENAME_ENCODED_FORMAT.format(FILENAME)) -def test_get_face_detections_dnn_exception_handling(dd): +def test_load_encodings_all_exception_handling_listdir(mock_dd): with ( pytest.raises(Exception, match="Test exception"), - patch.object(dd.storages["images"], "open", side_effect=Exception("Test exception")) as mock_storage_open, - patch.object(dd.logger, "exception") as mock_logger_exception, - ): - dd._get_face_detections_dnn(FILENAME) - - mock_storage_open.assert_called_once_with(FILENAME, "rb") - mock_logger_exception.assert_called_once() - - -@pytest.mark.parametrize( - "filenames, expected", [(FILENAMES, {filename: np.array([1, 2, 3]) for filename in FILENAMES}), ([], {})] -) -def test_load_encodings_all_files(dd, filenames, expected): - mock_encoded_data = {FILENAME_ENCODED_FORMAT.format(filename): np.array([1, 2, 3]) for filename in filenames} - - with ( patch.object( - dd.storages["encoded"], - "listdir", - return_value=(None, [FILENAME_ENCODED_FORMAT.format(filename) for filename in filenames]), - ), - patch("builtins.open", mock_open()) as mocked_open, - patch("numpy.load") as mock_load, - ): - - mocked_files_read = { - filename: mock_open(read_data=data.tobytes()).return_value for filename, data in mock_encoded_data.items() - } - mocked_open.side_effect = lambda f, mode="rb": mocked_files_read[os.path.basename(f)] - - for filename, data in mock_encoded_data.items(): - mock_load.side_effect = lambda f, data=data, filename=filename, allow_pickle=False: ( - data if f.name.endswith(filename) else MagicMock() - ) - - result = dd._load_encodings_all() - - if filenames: - for key, value in expected.items(): - assert np.array_equal(result[key], value) - else: - assert result == expected - - -def test_load_encodings_all_exception_handling_listdir(dd): - with ( - pytest.raises(Exception, match="Test exception"), - patch.object(dd.storages["encoded"], "listdir", side_effect=Exception("Test exception")) as mock_listdir, - patch.object(dd.logger, "exception") as mock_logger_exception, + mock_dd.storages.get_storage("encoded"), "listdir", side_effect=Exception("Test exception") + ) as mock_listdir, + patch.object(mock_dd.logger, "exception") as mock_logger_exception, ): - dd._load_encodings_all() + mock_dd._load_encodings_all() mock_listdir.assert_called_once_with("") mock_logger_exception.assert_called_once() -def test_load_encodings_all_exception_handling_open(dd): +def test_load_encodings_all_exception_handling_open(mock_dd): with ( pytest.raises(Exception, match="Test exception"), patch.object( - dd.storages["encoded"], "listdir", return_value=(None, [FILENAME_ENCODED_FORMAT.format(FILENAME)]) + mock_dd.storages.get_storage("encoded"), + "listdir", + return_value=(None, [FILENAME_ENCODED_FORMAT.format(FILENAME)]), ) as mock_listdir, - patch.object(dd.storages["encoded"], "open", side_effect=Exception("Test exception")) as mock_open, - patch.object(dd.logger, "exception") as mock_logger_exception, + patch.object( + mock_dd.storages.get_storage("encoded"), "open", side_effect=Exception("Test exception") + ) as mock_open, + patch.object(mock_dd.logger, "exception") as mock_logger_exception, ): - dd._load_encodings_all() + mock_dd._load_encodings_all() mock_listdir.assert_called_once_with("") mock_open.assert_called_once_with(FILENAME_ENCODED_FORMAT.format(FILENAME), "rb") mock_logger_exception.assert_called_once() -def test_encode_face_successful(dd, image_bytes_io, mock_net): - mock_net, *_ = mock_net - with ( - patch.object(dd.storages["images"], "open", side_effect=image_bytes_io.fake_open) as mocked_image_open, - patch.object(dd, "net", mock_net), - ): - dd._encode_face(FILENAME) - - mocked_image_open.assert_called_with(FILENAME, "rb") - assert mocked_image_open.side_effect == image_bytes_io.fake_open - assert mocked_image_open.called - - -@pytest.mark.parametrize("face_regions", FACE_REGIONS_INVALID) -def test_encode_face_error(dd, image_bytes_io, face_regions): - with ( - patch.object(dd.storages["images"], "open", side_effect=image_bytes_io.fake_open) as mock_storage_open, - patch.object(dd, "_get_face_detections_dnn", return_value=face_regions) as mock_get_face_detections_dnn, - patch.object(dd.logger, "error") as mock_error_logger, - ): - dd._encode_face(FILENAME) - - mock_storage_open.assert_called_with(FILENAME, "rb") - mock_get_face_detections_dnn.assert_called_once() - - mock_error_logger.assert_called_once() - +@pytest.mark.parametrize( + "filenames, expected", [(FILENAMES, {filename: np.array([1, 2, 3]) for filename in FILENAMES}), ([], {})] +) +def test_load_encodings_all_files(mock_dd, filenames, expected): + def open_mock(filename, mode="rb"): + filename = os.path.basename(filename) + if filename in mock_open_data: + mock_open_data[filename].seek(0) + return mock_open_data[filename] + return BytesIO() + + mock_open_data = {FILENAME_ENCODED_FORMAT.format(filename): BytesIO() for filename in filenames} + for _, data in mock_open_data.items(): + np.save(data, np.array([1, 2, 3])) + data.seek(0) -def test_encode_face_exception_handling(dd): with ( - pytest.raises(Exception, match="Test exception"), - patch.object(dd.storages["images"], "open", side_effect=Exception("Test exception")) as mock_storage_open, - patch.object(dd.logger, "exception") as mock_logger_exception, + patch.object( + mock_dd.storages.get_storage("encoded"), + "listdir", + return_value=(None, [FILENAME_ENCODED_FORMAT.format(filename) for filename in filenames]), + ), + patch.object(mock_dd.storages.get_storage("encoded"), "open", side_effect=open_mock), + patch.object(mock_dd, "_has_encodings", return_value=True), ): - dd._encode_face(FILENAME) + result = mock_dd._load_encodings_all() + for key in expected: + assert key in result + assert np.array_equal(result[key], expected[key]) - mock_storage_open.assert_called_with(FILENAME, "rb") - mock_logger_exception.assert_called_once() - - -def test_find_duplicates_successful_when_encoded(dd, mock_hde_azure_storage): - # Generate mock return values dynamically based on FILENAMES - mock_encodings = {filename: [np.array([0.1, 0.2, 0.3 + i * 0.001])] for i, filename in enumerate(FILENAMES)} - # Mocking internal methods and storages +@pytest.mark.parametrize( + "has_encodings, mock_encodings, expected_duplicates", + [ + ( + True, + {filename: [np.array([0.1, 0.2, 0.3 + i * 0.001])] for i, filename in enumerate(FILENAMES)}, + (tuple(FILENAMES),), + ), + ( + False, + {}, + (), + ), + ], +) +def test_find_duplicates_successful( + mock_dd, + mock_hde_azure_storage, + mock_hope_azure_storage, + image_bytes_io, + has_encodings, + mock_encodings, + expected_duplicates, +): with ( - patch.object(dd, "storages", {"encoded": mock_hde_azure_storage}), - patch.object(dd, "_encode_face"), - patch.object(dd, "_load_encodings_all", return_value=mock_encodings), + patch.object(mock_dd.storages.get_storage("images"), "open", side_effect=image_bytes_io.fake_open), + patch.object(mock_dd.storages.get_storage("encoded"), "open", side_effect=image_bytes_io.fake_open), + patch.object( + mock_dd.storages, + "get_storage", + side_effect=lambda key: {"encoded": mock_hde_azure_storage, "images": mock_hope_azure_storage}[key], + ), + patch.object(mock_dd, "_has_encodings", return_value=has_encodings), + patch.object(mock_dd, "_load_encodings_all", return_value=mock_encodings) as mock_load_encodings, + patch.object(mock_dd.image_processor, "encode_face"), patch("face_recognition.face_distance", return_value=np.array([0.05])), ): + duplicates = mock_dd.find_duplicates() - duplicates = dd.find_duplicates() - - # Check that the correct list of duplicates is returned - expected_duplicates = (tuple(FILENAMES),) - assert {frozenset(t) for t in duplicates} == {frozenset(t) for t in expected_duplicates} - - dd._encode_face.assert_not_called() - dd._load_encodings_all.assert_called_once() - mock_hde_azure_storage.exists.assert_called_with(FILENAME_ENCODED_FORMAT.format(FILENAMES[-1])) - - -def test_find_duplicates_no_encodings(dd): - with ( - patch.object(dd, "_has_encodings", return_value=False), - patch.object(dd, "_encode_face") as mock_encode_face, - patch.object(dd, "_load_encodings_all", return_value={}) as mock_load_encodings, - ): - - dd.find_duplicates() - - mock_encode_face.assert_called_with(FILENAMES[-1]) - mock_load_encodings.assert_called_once() + if has_encodings: + assert {frozenset(t) for t in duplicates} == {frozenset(t) for t in expected_duplicates} + mock_dd.image_processor.encode_face.assert_not_called() + mock_dd._load_encodings_all.assert_called_once() + # mock_hde_azure_storage.exists.assert_called_with(FILENAME_ENCODED_FORMAT.format(FILENAMES[-1])) + else: + mock_load_encodings.assert_called_once() + mock_dd.image_processor.encode_face.assert_called() -def test_find_duplicates_exception_handling(dd): +def test_find_duplicates_exception_handling(mock_dd): with ( pytest.raises(Exception, match="Test exception"), - patch.object(dd, "_load_encodings_all", side_effect=Exception("Test exception")), - patch.object(dd.logger, "exception") as mock_logger_exception, + patch.object(mock_dd, "_load_encodings_all", side_effect=Exception("Test exception")), + patch.object(mock_dd.logger, "exception") as mock_logger_exception, ): - dd.find_duplicates() - + mock_dd.find_duplicates() mock_logger_exception.assert_called_once() diff --git a/tests/faces/test_forms.py b/tests/faces/test_forms.py new file mode 100644 index 00000000..4fcc3bb5 --- /dev/null +++ b/tests/faces/test_forms.py @@ -0,0 +1,34 @@ +from django.forms import ValidationError + +import pytest + +from hope_dedup_engine.apps.faces.forms import MeanValuesTupleField + + +def test_to_python_valid_case(): + field = MeanValuesTupleField() + assert field.to_python("104.0, 177.0, 123.0") == (104.0, 177.0, 123.0) + + +@pytest.mark.parametrize( + "input_value, expected_error_message", + [ + ("104.0, 177.0", "Enter a valid tuple of three float values separated by commas and spaces"), + ("104.0, 177.0, 256.0", "Each value must be between -255 and 255."), + ("104.0, abc, 123.0", "Enter a valid tuple of three float values separated by commas and spaces"), + ], +) +def test_to_python_invalid_cases(input_value, expected_error_message): + field = MeanValuesTupleField() + with pytest.raises(ValidationError) as exc_info: + field.to_python(input_value) + assert expected_error_message in str(exc_info.value) + + +@pytest.mark.parametrize( + "input_value, expected_output", + [((104.0, 177.0, 123.0), "104.0, 177.0, 123.0"), ("104.0, 177.0, 123.0", "104.0, 177.0, 123.0")], +) +def test_prepare_value(input_value, expected_output): + field = MeanValuesTupleField() + assert field.prepare_value(input_value) == expected_output diff --git a/tests/faces/test_image_processor.py b/tests/faces/test_image_processor.py new file mode 100644 index 00000000..747b253f --- /dev/null +++ b/tests/faces/test_image_processor.py @@ -0,0 +1,124 @@ +from unittest.mock import mock_open, patch + +from django.core.exceptions import ValidationError + +import face_recognition +import numpy as np +import pytest +from constance import config +from faces_const import ( + BLOB_FROM_IMAGE_MEAN_VALUES, + BLOB_FROM_IMAGE_SCALE_FACTOR, + DEPLOY_PROTO_SHAPE, + FACE_REGIONS_INVALID, + FACE_REGIONS_VALID, + FILENAME, + FILENAME_ENCODED, +) + +from hope_dedup_engine.apps.faces.managers.net import DNNInferenceManager +from hope_dedup_engine.apps.faces.managers.storage import StorageManager +from hope_dedup_engine.apps.faces.services.image_processor import BlobFromImageConfig, FaceEncodingsConfig + + +def test_init_creates_expected_attributes(mock_net_manager: DNNInferenceManager, mock_image_processor): + assert isinstance(mock_image_processor.storages, StorageManager) + assert mock_image_processor.net is mock_net_manager + assert isinstance(mock_image_processor.blob_from_image_cfg, BlobFromImageConfig) + assert mock_image_processor.blob_from_image_cfg.scale_factor == config.BLOB_FROM_IMAGE_SCALE_FACTOR + assert isinstance(mock_image_processor.face_encodings_cfg, FaceEncodingsConfig) + assert mock_image_processor.face_encodings_cfg.num_jitters == config.FACE_ENCODINGS_NUM_JITTERS + assert mock_image_processor.face_encodings_cfg.model == config.FACE_ENCODINGS_MODEL + assert mock_image_processor.face_detection_confidence == config.FACE_DETECTION_CONFIDENCE + assert mock_image_processor.distance_threshold == config.FACE_DISTANCE_THRESHOLD + assert mock_image_processor.nms_threshold == config.NMS_THRESHOLD + + +def test_get_shape_valid(mock_prototxt_file): + with patch("builtins.open", mock_prototxt_file): + config = BlobFromImageConfig(scale_factor=BLOB_FROM_IMAGE_SCALE_FACTOR, mean_values=BLOB_FROM_IMAGE_MEAN_VALUES) + shape = config._get_shape() + assert shape == DEPLOY_PROTO_SHAPE + + +def test_get_shape_invalid(): + with patch("builtins.open", mock_open(read_data="invalid_prototxt_content")): + with pytest.raises(ValidationError): + BlobFromImageConfig(scale_factor=BLOB_FROM_IMAGE_SCALE_FACTOR, mean_values=BLOB_FROM_IMAGE_MEAN_VALUES) + + +def test_get_face_detections_dnn_with_detections(mock_image_processor, mock_net, mock_open_context_manager): + dnn, imdecode, resize, _, expected_regions = mock_net + with ( + patch("cv2.imdecode", imdecode), + patch("cv2.resize", resize), + patch.object( + mock_image_processor.storages.get_storage("images"), "open", return_value=mock_open_context_manager + ), + patch.object(mock_image_processor, "net", dnn), + ): + detections = mock_image_processor._get_face_detections_dnn(FILENAME) + assert detections == expected_regions + for region in detections: + assert isinstance(region, tuple) + assert len(region) == 4 + assert all(isinstance(coord, np.int64) for coord in region) + + +def test_get_face_detections_dnn_no_detections(mock_image_processor): + with (patch.object(mock_image_processor, "_get_face_detections_dnn", return_value=[]),): + face_regions = mock_image_processor._get_face_detections_dnn() + assert len(face_regions) == 0 + + +@pytest.mark.parametrize("face_regions", (FACE_REGIONS_VALID, FACE_REGIONS_INVALID)) +def test_encode_face(mock_image_processor, image_bytes_io, face_regions): + with ( + patch.object( + mock_image_processor.storages.get_storage("images"), "open", side_effect=image_bytes_io.fake_open + ) as mocked_image_open, + patch.object( + mock_image_processor.storages.get_storage("encoded"), "open", side_effect=image_bytes_io.fake_open + ) as mocked_encoded_open, + patch.object( + mock_image_processor, "_get_face_detections_dnn", return_value=face_regions + ) as mock_get_face_detections_dnn, + patch.object(face_recognition, "load_image_file") as mock_load_image_file, + patch.object(face_recognition, "face_encodings") as mock_face_encodings, + ): + mock_image_processor.encode_face(FILENAME, FILENAME_ENCODED) + + mock_get_face_detections_dnn.assert_called_once() + mocked_image_open.assert_called_with(FILENAME, "rb") + assert mocked_image_open.side_effect == image_bytes_io.fake_open + mock_load_image_file.assert_called() + + if face_regions == FACE_REGIONS_VALID: + mocked_encoded_open.assert_called_with(FILENAME_ENCODED, "wb") + assert mocked_encoded_open.side_effect == image_bytes_io.fake_open + mock_face_encodings.assert_called() + else: + mocked_encoded_open.assert_not_called() + mock_face_encodings.assert_not_called() + + +@pytest.mark.parametrize( + "method, exception_str", + ( + (str("load_image_file"), "Test load_image_file exception"), + (str("face_encodings"), "Test face_encodings exception"), + ), +) +def test_encode_face_exception_handling(mock_image_processor, mock_net, method: str, exception_str): + dnn, imdecode, *_ = mock_net + with ( + pytest.raises(Exception, match=exception_str), + patch.object(face_recognition, method, side_effect=Exception(exception_str)) as mock_exception, + patch.object(mock_image_processor, "net", dnn), + patch("cv2.imdecode", imdecode), + patch.object(mock_image_processor.logger, "exception") as mock_logger_exception, + ): + mock_image_processor.encode_face(FILENAME, FILENAME_ENCODED) + + mock_exception.assert_called_once() + mock_logger_exception.assert_called_once() diff --git a/tests/faces/test_net_manager.py b/tests/faces/test_net_manager.py new file mode 100644 index 00000000..3a080bd8 --- /dev/null +++ b/tests/faces/test_net_manager.py @@ -0,0 +1,12 @@ +from constance import config + +from hope_dedup_engine.apps.faces.managers.net import DNNInferenceManager + + +def test_successful(mock_storage_manager, mock_net_manager): + dnn_manager = DNNInferenceManager(mock_storage_manager.storages["cv2dnn"]) + mock_net_manager.setPreferableBackend.assert_called_once_with(int(config.DNN_BACKEND)) + mock_net_manager.setPreferableTarget.assert_called_once_with(int(config.DNN_TARGET)) + + assert isinstance(dnn_manager, DNNInferenceManager) + assert dnn_manager.get_model() == mock_net_manager diff --git a/tests/faces/test_storage_manager.py b/tests/faces/test_storage_manager.py new file mode 100644 index 00000000..b211de8a --- /dev/null +++ b/tests/faces/test_storage_manager.py @@ -0,0 +1,34 @@ +import pytest + +from hope_dedup_engine.apps.core.storage import CV2DNNStorage, HDEAzureStorage, HOPEAzureStorage +from hope_dedup_engine.apps.faces.exceptions import StorageKeyError +from hope_dedup_engine.apps.faces.managers.storage import StorageManager + + +def test_initialization(mock_storage_manager): + assert isinstance(mock_storage_manager.storages["images"], HOPEAzureStorage) + assert isinstance(mock_storage_manager.storages["cv2dnn"], CV2DNNStorage) + assert isinstance(mock_storage_manager.storages["encoded"], HDEAzureStorage) + + +def test_missing_file(): + with pytest.raises(FileNotFoundError): + StorageManager() + + +def test_invalid_key(mock_storage_manager): + with pytest.raises(StorageKeyError): + mock_storage_manager.get_storage("invalid_key") + + +@pytest.mark.parametrize( + "test_input, expected_output", + [ + ("images", HOPEAzureStorage), + ("cv2dnn", CV2DNNStorage), + ("encoded", HDEAzureStorage), + ], +) +def test_valid_key(mock_storage_manager, test_input, expected_output): + storage_object = mock_storage_manager.get_storage(test_input) + assert isinstance(storage_object, expected_output) diff --git a/tests/faces/test_validators.py b/tests/faces/test_validators.py deleted file mode 100644 index 79b3e0df..00000000 --- a/tests/faces/test_validators.py +++ /dev/null @@ -1,41 +0,0 @@ -from django.forms import ValidationError - -import pytest - -from hope_dedup_engine.apps.faces.validators import MeanValuesTupleField - - -def test_to_python_valid_tuple(): - field = MeanValuesTupleField() - assert field.to_python("104.0, 177.0, 123.0") == (104.0, 177.0, 123.0) - - -def test_to_python_invalid_length(): - field = MeanValuesTupleField() - with pytest.raises(ValidationError) as exc_info: - field.to_python("104.0, 177.0") - assert "Enter a valid tuple of three float values separated by commas and spaces" in str(exc_info.value) - - -def test_to_python_value_out_of_range(): - field = MeanValuesTupleField() - with pytest.raises(ValidationError) as exc_info: - field.to_python("104.0, 177.0, 256.0") - assert "Each value must be between -255 and 255." in str(exc_info.value) - - -def test_to_python_non_numeric_value(): - field = MeanValuesTupleField() - with pytest.raises(ValidationError) as exc_info: - field.to_python("104.0, abc, 123.0") - assert "Enter a valid tuple of three float values separated by commas and spaces" in str(exc_info.value) - - -def test_prepare_value_with_tuple(): - field = MeanValuesTupleField() - assert field.prepare_value((104.0, 177.0, 123.0)) == "104.0, 177.0, 123.0" - - -def test_prepare_value_with_string(): - field = MeanValuesTupleField() - assert field.prepare_value("104.0, 177.0, 123.0") == "104.0, 177.0, 123.0"