diff --git a/opensensor/collection_apis.py b/opensensor/collection_apis.py index 1c4c85e..5e56042 100644 --- a/opensensor/collection_apis.py +++ b/opensensor/collection_apis.py @@ -11,6 +11,7 @@ from opensensor.collections import ( CO2, PH, + VPD, DeviceMetadata, Environment, Humidity, @@ -140,20 +141,6 @@ def get_collection_name(response_model: Type[T]): return response_model.__name__ -def _get_old_project_projection(response_model: Type[T]): - project_projection = { - "_id": False, - } - for field_name, _ in response_model.__fields__.items(): - if field_name == "timestamp": - project_projection["timestamp"] = "$timestamp" - if field_name == "unit": - project_projection["unit"] = "$metadata.unit" - else: - project_projection[field_name] = f"${field_name}" - return project_projection - - def _get_project_projection(response_model: Type[T]): old_name = get_collection_name(response_model) new_collection_name = new_collections[old_name] @@ -170,37 +157,121 @@ def _get_project_projection(response_model: Type[T]): return project_projection -def get_uniform_sample_pipeline( - response_model: Type[T], - device_ids: List[str], # Update the type of the device_id parameter to List[str] +def get_initial_match_clause( + device_ids: List[str], device_name: str, start_date: datetime, end_date: datetime, resolution: int, - old_collection: bool, ): - sampling_interval = timedelta(minutes=resolution) if start_date is None: start_date = datetime.utcnow() - timedelta(days=100) if end_date is None: end_date = datetime.utcnow() + # Defining the match clause for the pipeline match_clause = { "timestamp": {"$gte": start_date, "$lte": end_date}, "metadata.device_id": { - "$in": device_ids - }, # Use $in operator for matching any device_id in the list + "$in": device_ids # Use $in operator for matching any device_id in the list + }, "metadata.name": device_name, } + return match_clause + + +def get_vpd_pipeline( + device_ids: List[str], + device_name: str, + start_date: datetime, + end_date: datetime, + resolution: int, +): + sampling_interval = timedelta(minutes=resolution) + match_clause = get_initial_match_clause( + device_ids, device_name, start_date, end_date, resolution + ) + + # We ensure both temperature and humidity exist for the calculation of VPD + match_clause["temp"] = {"$exists": True} + match_clause["rh"] = {"$exists": True} + + # The MongoDB aggregation pipeline for VPD calculation + pipeline = [ + {"$match": match_clause}, + { + "$addFields": { + "group": { + "$floor": { + "$divide": [ + {"$subtract": ["$timestamp", start_date]}, + sampling_interval.total_seconds() * 1000, + ] + } + } + } + }, + { + "$group": { + "_id": "$group", + "temp": {"$avg": "$temp"}, + "rh": {"$avg": "$rh"}, + "timestamp": {"$first": "$timestamp"}, + } + }, + { + "$addFields": { + "satvp": { + "$multiply": [ + 0.61078, + { + "$exp": { + "$multiply": [ + {"$divide": [17.27, {"$add": ["$temp", 237.3]}]}, + "$temp", + ] + } + }, + ] + } + } + }, + { + "$addFields": { + "vpd": {"$multiply": ["$satvp", {"$subtract": [1, {"$divide": ["$rh", 100]}]}]} + } + }, + { + "$project": { + "_id": False, + "timestamp": 1, + "vpd": 1, + "unit": "kPa", # assuming kPa as the unit for VPD + } + }, + {"$sort": {"timestamp": 1}}, + ] + return pipeline + + +def get_uniform_sample_pipeline( + response_model: Type[T], + device_ids: List[str], # Update the type of the device_id parameter to List[str] + device_name: str, + start_date: datetime, + end_date: datetime, + resolution: int, +): + sampling_interval = timedelta(minutes=resolution) + match_clause = get_initial_match_clause( + device_ids, device_name, start_date, end_date, resolution + ) # Determine the $project - if old_collection: - project_projection = _get_old_project_projection(response_model) - else: - old_name = get_collection_name(response_model) - new_collection_name = new_collections[old_name] - project_projection = _get_project_projection(response_model) - match_clause[new_collection_name] = {"$exists": True} + old_name = get_collection_name(response_model) + new_collection_name = new_collections[old_name] + project_projection = _get_project_projection(response_model) + match_clause[new_collection_name] = {"$exists": True} # Query a uniform sample of documents within the timestamp range pipeline = [ @@ -234,6 +305,7 @@ def get_uniform_sample_pipeline( "co2": CO2, "readings": Moisture, "pH": PH, + "VPD": VPD, } model_class_attributes = {v: k for k, v in model_classes.items()} @@ -248,38 +320,57 @@ def sample_and_paginate_collection( page: int, size: int, unit: str, - old_collection: bool, ): api_keys, _ = get_api_keys_by_device_id(device_id) device_ids, target_device_name = reduce_api_keys_to_device_ids(api_keys, device_id) offset = (page - 1) * size - pipeline = get_uniform_sample_pipeline( - response_model, - device_ids, - target_device_name, - start_date, - end_date, - resolution, - old_collection, - ) + + # Determine the right pipeline to use based on the response model + if response_model is VPD: + pipeline = get_vpd_pipeline( + device_ids, + target_device_name, + start_date, + end_date, + resolution, + ) + else: + pipeline = get_uniform_sample_pipeline( + response_model, + device_ids, + target_device_name, + start_date, + end_date, + resolution, + ) + pipeline.extend([{"$skip": offset}, {"$limit": size}]) db = get_open_sensor_db() collection = db[collection_name] raw_data = list(collection.aggregate(pipeline)) + # Add UTC offset to timestamp field for item in raw_data: item["timestamp"] = item["timestamp"].replace(tzinfo=timezone.utc).isoformat() - data_field = model_class_attributes[response_model] - model_class = model_classes[data_field] - data = [model_class(**item) for item in raw_data] - if data_field == "temp" and unit: - for item in data: - convert_temperature(item, unit) + + if response_model is VPD: + # If the response model is VPD, you already have VPD-related data from the pipeline. + # So, you can directly use it to create the response model instances. + data = [VPD(**item) for item in raw_data] + else: + data_field = model_class_attributes[response_model] + model_class = model_classes[data_field] + data = [model_class(**item) for item in raw_data] + if data_field == "temp" and unit: + for item in data: + convert_temperature(item, unit) + # Re-run for total page count pipeline.append({"$count": "total"}) data_count = list(collection.aggregate(pipeline)) total_count = data_count[0]["total"] if data else 0 + return Page(items=data, total=total_count, page=page, size=size) @@ -308,9 +399,6 @@ async def historical_data_route( collection_name = user.collection_name else: collection_name = "FreeTier" - migration_finished = migration_complete(collection_name) - if not migration_finished: - collection_name = get_collection_name(entity) return sample_and_paginate_collection( entity, @@ -322,7 +410,6 @@ async def historical_data_route( page=page, size=size, unit=unit, - old_collection=not migration_finished, ) return historical_data_route @@ -355,6 +442,12 @@ async def historical_data_route( response_model=Page[PH], methods=["GET"], ) +router.add_api_route( + "/VPD/{device_id}", + create_historical_data_route(VPD), + response_model=Page[VPD], + methods=["GET"], +) @router.post("/environment/") diff --git a/opensensor/collections.py b/opensensor/collections.py index a2d16ec..0fad60d 100644 --- a/opensensor/collections.py +++ b/opensensor/collections.py @@ -66,3 +66,13 @@ class Environment(BaseModel): co2: CO2 | None = None moisture: Moisture | None = None pH: PH | None = None + + +class VPD(BaseModel): + """ + VPD is a Computed Projection from other data points. + """ + + timestamp: datetime + vpd: float + unit: str