From f3118b42eed6511aedbbf7e9336ee0d513dde2aa Mon Sep 17 00:00:00 2001 From: Jules Date: Wed, 5 Feb 2025 09:57:02 -0800 Subject: [PATCH 1/2] Properly updated attribute controller scala --- app/controllers/AttributeController.scala | 351 +++++++++++++++------- 1 file changed, 246 insertions(+), 105 deletions(-) diff --git a/app/controllers/AttributeController.scala b/app/controllers/AttributeController.scala index bc7854bfd2..ad8e4b1a02 100644 --- a/app/controllers/AttributeController.scala +++ b/app/controllers/AttributeController.scala @@ -18,6 +18,10 @@ import models.label.{LabelTable, LabelTypeTable} import models.region.RegionTable import play.api.Play.current import play.api.{Logger, Play} +import anorm._ +import anorm.SqlParser._ +import play.api.db.DB + /** * Holds the HTTP requests associated with accessibility attributes and the label clustering used to create them. @@ -108,124 +112,261 @@ class AttributeController @Inject() (implicit val env: Environment[User, Session * Takes in results of single-user clustering, and adds the data to the relevant tables. * * @param key A key used for authentication. - * @param userId The user_id address of the user who's labels were clustered. + * @param userId The user_id address of the user whose labels were clustered. */ - def postSingleUserClusteringResults(key: String, userId: String) = UserAwareAction.async(BodyParsers.parse.json(maxLength = 1024 * 1024 * 100)) { implicit request => - // The maxLength argument above allows a 100MB max load size for the POST request. - if (authenticate(key)) { - // Validation https://www.playframework.com/documentation /2.3.x/ScalaJson - val submission = request.body.validate[AttributeFormats.ClusteringSubmission] - submission.fold( - errors => { - Logger.warn("Failed to parse JSON POST request for single-user clustering results.") - Logger.info(Json.prettyPrint(request.body)) - Future.successful(BadRequest(Json.obj("status" -> "Error", "message" -> JsError.toFlatJson(errors)))) - }, - submission => { - // Extract the thresholds, clusters, and labels, and put them into separate variables. - val thresholds: Map[String, Float] = submission.thresholds.map(t => (t.labelType, t.threshold)).toMap - val clusters: List[AttributeFormats.ClusterSubmission] = submission.clusters - val labels: List[AttributeFormats.ClusteredLabelSubmission] = submission.labels - - // Group the labels by the cluster they were put into. - val groupedLabels: Map[Int, List[AttributeFormats.ClusteredLabelSubmission]] = labels.groupBy(_.clusterNum) - val timestamp: Timestamp = new Timestamp(Instant.now.toEpochMilli) - - // Add corresponding entry to the user_clustering_session table - val userSessionId: Int = UserClusteringSessionTable.save(UserClusteringSession(0, userId, timestamp)) - // Add the clusters to user_attribute table, and the associated user_attribute_labels after each cluster. - for (cluster <- clusters) yield { - val attributeId: Int = - UserAttributeTable.save( - UserAttribute(0, - userSessionId, - thresholds(cluster.labelType), - LabelTypeTable.labelTypeToId(cluster.labelType).get, - RegionTable.selectRegionIdOfClosestNeighborhood(cluster.lng, cluster.lat), - cluster.lat, - cluster.lng, - cluster.severity, - cluster.temporary - ) - ) - // Add all the labels associated with that user_attribute to the user_attribute_label table. - groupedLabels get cluster.clusterNum match { - case Some(group) => - for (label <- group) yield { - UserAttributeLabelTable.save(UserAttributeLabel(0, attributeId, label.labelId)) + def postSingleUserClusteringResults(key: String, userId: String) = + UserAwareAction.async(BodyParsers.parse.json(maxLength = 1024 * 1024 * 100)) { implicit request => + // The maxLength argument above allows a 100MB max load size for the POST request. + if (authenticate(key)) { + // Validation https://www.playframework.com/documentation/2.3.x/ScalaJson + val submission = request.body.validate[AttributeFormats.ClusteringSubmission] + submission.fold( + errors => { + Logger.warn("Failed to parse JSON POST request for single-user clustering results.") + Logger.info(Json.prettyPrint(request.body)) + Future.successful(BadRequest(Json.obj("status" -> "Error", "message" -> JsError.toFlatJson(errors)))) + }, + submission => { + // Extract the thresholds, clusters, and labels, and put them into seperate variables. + val thresholds: Map[String, Float] = submission.thresholds.map(t => (t.labelType, t.threshold)).toMap + val clusters: List[AttributeFormats.ClusterSubmission] = submission.clusters + val labels: List[AttributeFormats.ClusteredLabelSubmission] = submission.labels + + // Group the labels by the cluster they were put into + val groupedLabels: Map[Int, List[AttributeFormats.ClusteredLabelSubmission]] = labels.groupBy(_.clusterNum) + val timestamp = new Timestamp(Instant.now.toEpochMilli) + + // This is the new user_clustering_session entry + val userSessionId: Int = UserClusteringSessionTable.save(UserClusteringSession(0, userId, timestamp)) + + // Gotta insert all the user_attributes in one pass to speed things up + DB.withConnection { implicit conn => + val rowPlaceholder = "(?,?,?,?,?,?,?,?)" + val allPlaceholders = clusters.map(_ => rowPlaceholder).mkString(", ") + + val insertSql = + s""" + INSERT INTO user_attribute ( + user_clustering_session_id, + clustering_threshold, + label_type_id, + region_id, + lat, + lng, + severity, + temporary + ) + VALUES $allPlaceholders + RETURNING user_attribute_id + """ + + val stmt = conn.prepareStatement(insertSql) + var paramIndex = 1 + + // Fill placeholders with cluster data + clusters.foreach { c => + stmt.setInt(paramIndex, userSessionId) + paramIndex += 1 + stmt.setFloat(paramIndex, thresholds(c.labelType)) + paramIndex += 1 + val labelTypeId = LabelTypeTable.labelTypeToId(c.labelType).get + stmt.setInt(paramIndex, labelTypeId) + paramIndex += 1 + val regionId = RegionTable.selectRegionIdOfClosestNeighborhood(c.lng, c.lat) + stmt.setInt(paramIndex, regionId) + paramIndex += 1 + stmt.setDouble(paramIndex, c.lat.toDouble) + paramIndex += 1 + stmt.setDouble(paramIndex, c.lng.toDouble) + paramIndex += 1 + c.severity match { + case Some(sv) => stmt.setInt(paramIndex, sv) + case None => stmt.setNull(paramIndex, java.sql.Types.INTEGER) } - case None => - Logger.warn("Cluster sent with no accompanying labels. Seems wrong!") + paramIndex += 1 + stmt.setBoolean(paramIndex, c.temporary) + paramIndex += 1 + } + + val rs = stmt.executeQuery() + val newUserAttrIds = collection.mutable.ArrayBuffer[Int]() + while (rs.next()) { + newUserAttrIds.append(rs.getInt("user_attribute_id")) + } + rs.close() + stmt.close() + + // Pair up each cluster with its labels + val labelTuples = clusters.zip(newUserAttrIds).flatMap { case (c, attrId) => + groupedLabels.getOrElse(c.clusterNum, Nil).map { lbl => + (attrId, lbl.labelId) + } + } + + // Insert all user_attribute_label rows at once + if (labelTuples.nonEmpty) { + val rowPlaceholder2 = "(?,?)" + val all2 = labelTuples.map(_ => rowPlaceholder2).mkString(", ") + val insertLabelSql = + s""" + INSERT INTO user_attribute_label ( + user_attribute_id, + label_id + ) + VALUES $all2 + """ + val stmt2 = conn.prepareStatement(insertLabelSql) + var i2 = 1 + + labelTuples.foreach { case (attrId, lblId) => + stmt2.setInt(i2, attrId) + i2 += 1 + stmt2.setInt(i2, lblId) + i2 += 1 + } + + stmt2.executeUpdate() + stmt2.close() + } } + + Future.successful(Ok(Json.obj("session" -> userSessionId))) } - Future.successful(Ok(Json.obj("session" -> userSessionId))) - } - ) - } else { - Future.successful(Ok(Json.obj("error_msg" -> "Could not authenticate."))) + ) + } else { + Future.successful(Ok(Json.obj("error_msg" -> "Could not authenticate."))) + } } - } /** * Takes in results of multi-user clustering, and adds the data to the relevant tables. * * @param key A key used for authentication. - * @param regionId The region who's labels were clustered. + * @param regionId The region whose labels were clustered. */ - def postMultiUserClusteringResults(key: String, regionId: Int) = UserAwareAction.async(BodyParsers.parse.json(maxLength = 1024 * 1024 * 100)) {implicit request => - // The maxLength argument above allows a 100MB max load size for the POST request. - if (authenticate(key)) { - // Validation https://www.playframework.com/documentation /2.3.x/ScalaJson - val submission = request.body.validate[AttributeFormats.ClusteringSubmission] - submission.fold( - errors => { - Logger.error("Failed to parse JSON POST request for multi-user clustering results.") - Logger.info(Json.prettyPrint(request.body)) - Future.successful(BadRequest(Json.obj("status" -> "Error", "message" -> JsError.toFlatJson(errors)))) - }, - submission => { - // Extract the thresholds, clusters, and labels, and put them into separate variables. - val thresholds: Map[String, Float] = submission.thresholds.map(t => (t.labelType, t.threshold)).toMap - val clusters: List[AttributeFormats.ClusterSubmission] = submission.clusters - val labels: List[AttributeFormats.ClusteredLabelSubmission] = submission.labels - - // Group the labels by the cluster they were put into. - val groupedLabels: Map[Int, List[AttributeFormats.ClusteredLabelSubmission]] = labels.groupBy(_.clusterNum) - val timestamp: Timestamp = new Timestamp(Instant.now.toEpochMilli) - - // Add corresponding entry to the global_clustering_session table - val globalSessionId: Int = GlobalClusteringSessionTable.save(GlobalClusteringSession(0, regionId, timestamp)) - - // Add the clusters to global_attribute table, and the associated user_attributes after each cluster. - for (cluster <- clusters) yield { - val attributeId: Int = - GlobalAttributeTable.save( - GlobalAttribute(0, - globalSessionId, - thresholds(cluster.labelType), - LabelTypeTable.labelTypeToId(cluster.labelType).get, - LabelTable.getStreetEdgeIdClosestToLatLng(cluster.lat, cluster.lng).get, - RegionTable.selectRegionIdOfClosestNeighborhood(cluster.lng, cluster.lat), - cluster.lat, - cluster.lng, - cluster.severity, - cluster.temporary) - ) - // Add all the associated labels to the global_attribute_user_attribute table. - groupedLabels get cluster.clusterNum match { - case Some(group) => - for (label <- group) yield { - GlobalAttributeUserAttributeTable.save(GlobalAttributeUserAttribute(0, attributeId, label.labelId)) + def postMultiUserClusteringResults(key: String, regionId: Int) = + UserAwareAction.async(BodyParsers.parse.json(maxLength = 1024 * 1024 * 100)) { implicit request => + // We keep the 100MB note: The maxLength argument above allows a 100MB max load size for the POST request. + if (authenticate(key)) { + val submission = request.body.validate[AttributeFormats.ClusteringSubmission] + submission.fold( + errors => { + Logger.error("Failed to parse JSON POST request for multi-user clustering results.") + Logger.info(Json.prettyPrint(request.body)) + Future.successful(BadRequest(Json.obj("status" -> "Error", "message" -> JsError.toFlatJson(errors)))) + }, + submission => { + val thresholds: Map[String, Float] = submission.thresholds.map(t => (t.labelType, t.threshold)).toMap + val clusters: List[AttributeFormats.ClusterSubmission] = submission.clusters + val labels: List[AttributeFormats.ClusteredLabelSubmission] = submission.labels + + val groupedLabels: Map[Int, List[AttributeFormats.ClusteredLabelSubmission]] = labels.groupBy(_.clusterNum) + val timestamp = new Timestamp(Instant.now.toEpochMilli) + + // This new approach also creates an entry in global_clustering_session + val globalSessionId: Int = + GlobalClusteringSessionTable.save(GlobalClusteringSession(0, regionId, timestamp)) + + // Do all the inserts for global_attribute in a single statement + DB.withConnection { implicit conn => + val rowPlaceholder = "(?,?,?,?,?,?,?,?,?)" + val allPlaceholders = clusters.map(_ => rowPlaceholder).mkString(", ") + + val insertSql = + s""" + INSERT INTO global_attribute ( + global_clustering_session_id, + clustering_threshold, + label_type_id, + street_edge_id, + region_id, + lat, + lng, + severity, + temporary + ) + VALUES $allPlaceholders + RETURNING global_attribute_id + """ + + val stmt = conn.prepareStatement(insertSql) + var idx = 1 + + clusters.foreach { c => + stmt.setInt(idx, globalSessionId) + idx += 1 + stmt.setFloat(idx, thresholds(c.labelType)) + idx += 1 + val labelTypeId = LabelTypeTable.labelTypeToId(c.labelType).get + stmt.setInt(idx, labelTypeId) + idx += 1 + val streetEdgeId = LabelTable.getStreetEdgeIdClosestToLatLng(c.lat, c.lng).get + stmt.setInt(idx, streetEdgeId) + idx += 1 + val regionVal = RegionTable.selectRegionIdOfClosestNeighborhood(c.lng, c.lat) + stmt.setInt(idx, regionVal) + idx += 1 + stmt.setDouble(idx, c.lat.toDouble) + idx += 1 + stmt.setDouble(idx, c.lng.toDouble) + idx += 1 + c.severity match { + case Some(sv) => stmt.setInt(idx, sv) + case None => stmt.setNull(idx, java.sql.Types.INTEGER) + } + idx += 1 + stmt.setBoolean(idx, c.temporary) + idx += 1 + } + + val rs = stmt.executeQuery() + val newGlobalAttrIds = collection.mutable.ArrayBuffer[Int]() + while (rs.next()) { + newGlobalAttrIds.append(rs.getInt("global_attribute_id")) + } + rs.close() + stmt.close() + + // Link global_attribute IDs to user_attributes + val linkTuples = clusters.zip(newGlobalAttrIds).flatMap { case (c, globAttrId) => + groupedLabels.getOrElse(c.clusterNum, Nil).map { lbl => + (globAttrId, lbl.labelId) } - case None => - Logger.warn("Cluster sent with no accompanying labels. Seems wrong!") + } + + // Insert all global_attribute_user_attribute rows in bulk + if (linkTuples.nonEmpty) { + val rowPlaceholder2 = "(?,?)" + val all2 = linkTuples.map(_ => rowPlaceholder2).mkString(", ") + val insertLinkSql = + s""" + INSERT INTO global_attribute_user_attribute ( + global_attribute_id, + user_attribute_id + ) + VALUES $all2 + """ + val stmt2 = conn.prepareStatement(insertLinkSql) + var i2 = 1 + + linkTuples.foreach { case (gAttrId, userAttrId) => + stmt2.setInt(i2, gAttrId) + i2 += 1 + stmt2.setInt(i2, userAttrId) + i2 += 1 + } + + stmt2.executeUpdate() + stmt2.close() + } } + + Future.successful(Ok(Json.obj("session" -> globalSessionId))) } - Future.successful(Ok(Json.obj("session" -> globalSessionId))) - } - ) - } else { - Future.successful(Ok(Json.obj("error_msg" -> "Could not authenticate."))) + ) + } else { + Future.successful(Ok(Json.obj("error_msg" -> "Could not authenticate."))) + } } - } + } From 4e0cb7f9975e975dea99a294003c0ef101375f2b Mon Sep 17 00:00:00 2001 From: Jules Date: Wed, 5 Feb 2025 09:59:00 -0800 Subject: [PATCH 2/2] Fixed import order --- app/controllers/AttributeController.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/controllers/AttributeController.scala b/app/controllers/AttributeController.scala index ad8e4b1a02..a6c9791afc 100644 --- a/app/controllers/AttributeController.scala +++ b/app/controllers/AttributeController.scala @@ -17,10 +17,11 @@ import models.attribute._ import models.label.{LabelTable, LabelTypeTable} import models.region.RegionTable import play.api.Play.current +import play.api.db.DB import play.api.{Logger, Play} import anorm._ import anorm.SqlParser._ -import play.api.db.DB + /**