Skip to content

Commit

Permalink
Add VPD projection and route
Browse files Browse the repository at this point in the history
  • Loading branch information
matteius committed Oct 29, 2023
1 parent db80da3 commit ab43ae5
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 48 deletions.
189 changes: 141 additions & 48 deletions opensensor/collection_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from opensensor.collections import (
CO2,
PH,
VPD,
DeviceMetadata,
Environment,
Humidity,
Expand Down Expand Up @@ -140,20 +141,6 @@ def get_collection_name(response_model: Type[T]):
return response_model.__name__


def _get_old_project_projection(response_model: Type[T]):
project_projection = {
"_id": False,
}
for field_name, _ in response_model.__fields__.items():
if field_name == "timestamp":
project_projection["timestamp"] = "$timestamp"
if field_name == "unit":
project_projection["unit"] = "$metadata.unit"
else:
project_projection[field_name] = f"${field_name}"
return project_projection


def _get_project_projection(response_model: Type[T]):
old_name = get_collection_name(response_model)
new_collection_name = new_collections[old_name]
Expand All @@ -170,37 +157,121 @@ def _get_project_projection(response_model: Type[T]):
return project_projection


def get_uniform_sample_pipeline(
response_model: Type[T],
device_ids: List[str], # Update the type of the device_id parameter to List[str]
def get_initial_match_clause(
device_ids: List[str],
device_name: str,
start_date: datetime,
end_date: datetime,
resolution: int,
old_collection: bool,
):
sampling_interval = timedelta(minutes=resolution)
if start_date is None:
start_date = datetime.utcnow() - timedelta(days=100)
if end_date is None:
end_date = datetime.utcnow()

# Defining the match clause for the pipeline
match_clause = {
"timestamp": {"$gte": start_date, "$lte": end_date},
"metadata.device_id": {
"$in": device_ids
}, # Use $in operator for matching any device_id in the list
"$in": device_ids # Use $in operator for matching any device_id in the list
},
"metadata.name": device_name,
}
return match_clause


def get_vpd_pipeline(
device_ids: List[str],
device_name: str,
start_date: datetime,
end_date: datetime,
resolution: int,
):
sampling_interval = timedelta(minutes=resolution)
match_clause = get_initial_match_clause(
device_ids, device_name, start_date, end_date, resolution
)

# We ensure both temperature and humidity exist for the calculation of VPD
match_clause["temp"] = {"$exists": True}
match_clause["rh"] = {"$exists": True}

# The MongoDB aggregation pipeline for VPD calculation
pipeline = [
{"$match": match_clause},
{
"$addFields": {
"group": {
"$floor": {
"$divide": [
{"$subtract": ["$timestamp", start_date]},
sampling_interval.total_seconds() * 1000,
]
}
}
}
},
{
"$group": {
"_id": "$group",
"temp": {"$avg": "$temp"},
"rh": {"$avg": "$rh"},
"timestamp": {"$first": "$timestamp"},
}
},
{
"$addFields": {
"satvp": {
"$multiply": [
0.61078,
{
"$exp": {
"$multiply": [
{"$divide": [17.27, {"$add": ["$temp", 237.3]}]},
"$temp",
]
}
},
]
}
}
},
{
"$addFields": {
"vpd": {"$multiply": ["$satvp", {"$subtract": [1, {"$divide": ["$rh", 100]}]}]}
}
},
{
"$project": {
"_id": False,
"timestamp": 1,
"vpd": 1,
"unit": "kPa", # assuming kPa as the unit for VPD
}
},
{"$sort": {"timestamp": 1}},
]
return pipeline


def get_uniform_sample_pipeline(
response_model: Type[T],
device_ids: List[str], # Update the type of the device_id parameter to List[str]
device_name: str,
start_date: datetime,
end_date: datetime,
resolution: int,
):
sampling_interval = timedelta(minutes=resolution)
match_clause = get_initial_match_clause(
device_ids, device_name, start_date, end_date, resolution
)

# Determine the $project
if old_collection:
project_projection = _get_old_project_projection(response_model)
else:
old_name = get_collection_name(response_model)
new_collection_name = new_collections[old_name]
project_projection = _get_project_projection(response_model)
match_clause[new_collection_name] = {"$exists": True}
old_name = get_collection_name(response_model)
new_collection_name = new_collections[old_name]
project_projection = _get_project_projection(response_model)
match_clause[new_collection_name] = {"$exists": True}

# Query a uniform sample of documents within the timestamp range
pipeline = [
Expand Down Expand Up @@ -234,6 +305,7 @@ def get_uniform_sample_pipeline(
"co2": CO2,
"readings": Moisture,
"pH": PH,
"VPD": VPD,
}
model_class_attributes = {v: k for k, v in model_classes.items()}

Expand All @@ -248,38 +320,57 @@ def sample_and_paginate_collection(
page: int,
size: int,
unit: str,
old_collection: bool,
):
api_keys, _ = get_api_keys_by_device_id(device_id)
device_ids, target_device_name = reduce_api_keys_to_device_ids(api_keys, device_id)
offset = (page - 1) * size
pipeline = get_uniform_sample_pipeline(
response_model,
device_ids,
target_device_name,
start_date,
end_date,
resolution,
old_collection,
)

# Determine the right pipeline to use based on the response model
if response_model is VPD:
pipeline = get_vpd_pipeline(
device_ids,
target_device_name,
start_date,
end_date,
resolution,
)
else:
pipeline = get_uniform_sample_pipeline(
response_model,
device_ids,
target_device_name,
start_date,
end_date,
resolution,
)

pipeline.extend([{"$skip": offset}, {"$limit": size}])

db = get_open_sensor_db()
collection = db[collection_name]
raw_data = list(collection.aggregate(pipeline))

# Add UTC offset to timestamp field
for item in raw_data:
item["timestamp"] = item["timestamp"].replace(tzinfo=timezone.utc).isoformat()
data_field = model_class_attributes[response_model]
model_class = model_classes[data_field]
data = [model_class(**item) for item in raw_data]
if data_field == "temp" and unit:
for item in data:
convert_temperature(item, unit)

if response_model is VPD:
# If the response model is VPD, you already have VPD-related data from the pipeline.
# So, you can directly use it to create the response model instances.
data = [VPD(**item) for item in raw_data]
else:
data_field = model_class_attributes[response_model]
model_class = model_classes[data_field]
data = [model_class(**item) for item in raw_data]
if data_field == "temp" and unit:
for item in data:
convert_temperature(item, unit)

# Re-run for total page count
pipeline.append({"$count": "total"})
data_count = list(collection.aggregate(pipeline))
total_count = data_count[0]["total"] if data else 0

return Page(items=data, total=total_count, page=page, size=size)


Expand Down Expand Up @@ -308,9 +399,6 @@ async def historical_data_route(
collection_name = user.collection_name
else:
collection_name = "FreeTier"
migration_finished = migration_complete(collection_name)
if not migration_finished:
collection_name = get_collection_name(entity)

return sample_and_paginate_collection(
entity,
Expand All @@ -322,7 +410,6 @@ async def historical_data_route(
page=page,
size=size,
unit=unit,
old_collection=not migration_finished,
)

return historical_data_route
Expand Down Expand Up @@ -355,6 +442,12 @@ async def historical_data_route(
response_model=Page[PH],
methods=["GET"],
)
router.add_api_route(
"/VPD/{device_id}",
create_historical_data_route(VPD),
response_model=Page[VPD],
methods=["GET"],
)


@router.post("/environment/")
Expand Down
10 changes: 10 additions & 0 deletions opensensor/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,13 @@ class Environment(BaseModel):
co2: CO2 | None = None
moisture: Moisture | None = None
pH: PH | None = None


class VPD(BaseModel):
"""
VPD is a Computed Projection from other data points.
"""

timestamp: datetime
vpd: float
unit: str

0 comments on commit ab43ae5

Please sign in to comment.