diff --git a/setup.py b/setup.py
index e98c780fc..e94e72ce1 100644
--- a/setup.py
+++ b/setup.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
-"""Setup file and install script SciLife python scripts.
-"""
+"""Setup file and install script SciLife python scripts."""
+
from setuptools import find_packages, setup
try:
diff --git a/status/applications.py b/status/applications.py
index 936379606..b4e239314 100644
--- a/status/applications.py
+++ b/status/applications.py
@@ -1,5 +1,5 @@
-"""Handlers related to applications
-"""
+"""Handlers related to applications"""
+
import json
from collections import Counter
@@ -78,7 +78,7 @@ def list_applications_and_samples(self, start=None, end="z"):
# Projects per application
applications = Counter()
view = self.application.projects_db.view("project/date_applications")
- for row in view[[start, ""]:[end, "z"]]:
+ for row in view[[start, ""] : [end, "z"]]:
if row.key[1] is None:
# This corresponds to StatusDB:s notation
# and avoids conflict with 'None'.
@@ -89,7 +89,7 @@ def list_applications_and_samples(self, start=None, end="z"):
# Samples per application
samples = Counter()
view = self.application.projects_db.view("project/date_samples_applications")
- for row in view[[start, ""]:[end, "z"]]:
+ for row in view[[start, ""] : [end, "z"]]:
if row.key[1] is None:
samples["null"] += row.value
else:
diff --git a/status/barcode.py b/status/barcode.py
index 4a60d3b0e..bc5dda32a 100644
--- a/status/barcode.py
+++ b/status/barcode.py
@@ -1,5 +1,4 @@
-"""Handlers related to test for barcode printing
-"""
+"""Handlers related to test for barcode printing"""
import re
import subprocess
@@ -52,9 +51,7 @@ def post(self):
for _ in range(copies): # loops over copies to print
print_barcode(linesToPrint)
else: # file submitted is a text file
- for (
- line
- ) in (
+ for line in (
linesToPrint.splitlines()
): # split into the different lines of the text file
if self.get_argument(
@@ -112,9 +109,7 @@ def post(self):
if re.compile(r"^P\d+$").search(user_project_ID):
projectNo_only_extracted = re.search(
"P(.*)", user_project_ID
- ).group(
- 1
- ) # have only the number of the project ID
+ ).group(1) # have only the number of the project ID
for projects in range(0, int(projectNo)):
new_projectNo = int(projectNo_only_extracted) + projects
new_projectID = "P" + str(new_projectNo)
diff --git a/status/bioinfo_analysis.py b/status/bioinfo_analysis.py
index fba36a678..3663beb52 100644
--- a/status/bioinfo_analysis.py
+++ b/status/bioinfo_analysis.py
@@ -28,7 +28,7 @@ def post(self, project_id):
assert_project_id(project_id)
# Fetching documents one by one generates too many requests to statusdb
# Hopefully fetching all documents at once doesn't require too much memory
- view = v[[project_id, "", "", ""]:[project_id, "Z", "Z", "Z"]]
+ view = v[[project_id, "", "", ""] : [project_id, "Z", "Z", "Z"]]
cached_view = {}
for row in view.rows:
if tuple(row.key) in cached_view:
@@ -117,9 +117,9 @@ def get(self, project_id):
"lanes": {lane_id: bioinfo_qc}
}
elif lane_id not in bioinfo1[sample_id]["flowcells"][flowcell_id]["lanes"]:
- bioinfo1[sample_id]["flowcells"][flowcell_id]["lanes"][
- lane_id
- ] = bioinfo_qc
+ bioinfo1[sample_id]["flowcells"][flowcell_id]["lanes"][lane_id] = (
+ bioinfo_qc
+ )
else:
bioinfo1[sample_id]["flowcells"][flowcell_id]["lanes"][lane_id].update(
bioinfo_qc
@@ -136,9 +136,9 @@ def get(self, project_id):
"samples": {sample_id: bioinfo_qc}
}
elif sample_id not in bioinfo2[flowcell_id]["lanes"][lane_id]["samples"]:
- bioinfo2[flowcell_id]["lanes"][lane_id]["samples"][
- sample_id
- ] = bioinfo_qc
+ bioinfo2[flowcell_id]["lanes"][lane_id]["samples"][sample_id] = (
+ bioinfo_qc
+ )
else:
bioinfo2[flowcell_id]["lanes"][lane_id]["samples"][sample_id].update(
bioinfo_qc
diff --git a/status/clone_project.py b/status/clone_project.py
index 3c479a145..3eb2c0cfc 100644
--- a/status/clone_project.py
+++ b/status/clone_project.py
@@ -1,5 +1,3 @@
-
-
import requests
from genologics import lims
from genologics.config import BASEURI, PASSWORD, USERNAME
@@ -21,58 +19,86 @@ def get(self):
)
)
+
class LIMSProjectCloningHandler(SafeHandler):
"""Gets and posts the project data from LIMS for cloning it
URL: /api/v1/lims_project_data/([^/]*)$
"""
def get(self, projectid):
-
- proj_values = self.get_project_data(projectid, 'get')
+ proj_values = self.get_project_data(projectid, "get")
if not proj_values:
self.set_status(404)
self.write({"error": "Project not found"})
return
self.set_header("Content-type", "application/json")
self.write(proj_values)
-
+
def post(self, projectid):
-
- if not (self.get_current_user().is_proj_coord or self.get_current_user().is_any_admin):
+ if not (
+ self.get_current_user().is_proj_coord
+ or self.get_current_user().is_any_admin
+ ):
self.set_status(401)
return self.write(
"Error: You do not have the permissions for this operation!"
)
-
- new_proj = self.get_project_data(projectid, 'post')
- if 'error' in new_proj:
+
+ new_proj = self.get_project_data(projectid, "post")
+ if "error" in new_proj:
self.set_status(400)
- self.write({"error": new_proj['error']})
+ self.write({"error": new_proj["error"]})
return
-
+
self.set_status(201)
self.write(new_proj)
-
+
def get_project_data(self, projectid, type):
copy_udfs = {
- "Customer project reference", "Project Comment", "Type", "Application", "Reference genome",
- "Library construction method", "Sequencing setup", "Accredited (Data Analysis)",
- "Accredited (Data Processing)", "Accredited (Library Preparation)", "Accredited (Sequencing)", "Delivery type",
- "Agreement cost", "Invoice Reference", "Customer Project Description", "Project category", "Sample type",
- "Sample units ordered", "Library type (ready-made libraries)", "Sequence units ordered (lanes)", "Sequencing platform",
- "Flowcell", "Custom Primer", "Low Diversity", "Best practice bioinformatics", "Funding agency", "Project coordinator",
- "Library prep option", "Flowcell", "Organism", "PhiX spike-in (percent)", "Flowcell option", "Ethics permit number"
- }
+ "Customer project reference",
+ "Project Comment",
+ "Type",
+ "Application",
+ "Reference genome",
+ "Library construction method",
+ "Sequencing setup",
+ "Accredited (Data Analysis)",
+ "Accredited (Data Processing)",
+ "Accredited (Library Preparation)",
+ "Accredited (Sequencing)",
+ "Delivery type",
+ "Agreement cost",
+ "Invoice Reference",
+ "Customer Project Description",
+ "Project category",
+ "Sample type",
+ "Sample units ordered",
+ "Library type (ready-made libraries)",
+ "Sequence units ordered (lanes)",
+ "Sequencing platform",
+ "Flowcell",
+ "Custom Primer",
+ "Low Diversity",
+ "Best practice bioinformatics",
+ "Funding agency",
+ "Project coordinator",
+ "Library prep option",
+ "Flowcell",
+ "Organism",
+ "PhiX spike-in (percent)",
+ "Flowcell option",
+ "Ethics permit number",
+ }
lims_instance = lims.Lims(BASEURI, USERNAME, PASSWORD)
uri = lims_instance.get_uri(f"projects/{projectid}")
existing_project = Project(lims=lims_instance, uri=uri)
proj_values = {}
try:
- proj_values['name'] = existing_project.name
+ proj_values["name"] = existing_project.name
except requests.exceptions.HTTPError:
return {}
-
+
proj_values["researcher_id"] = existing_project.researcher.id
proj_values["Client"] = existing_project.researcher.name
proj_values["Account"] = existing_project.researcher.lab.name
@@ -81,25 +107,26 @@ def get_project_data(self, projectid, type):
for udf in copy_udfs:
if udf in existing_project.udf:
udfs[udf] = existing_project.udf[udf]
- proj_values['udfs'] = udfs
+ proj_values["udfs"] = udfs
- if type=='get':
+ if type == "get":
return proj_values
-
+
else:
- new_name = existing_project.name + '_CLONE'
+ new_name = existing_project.name + "_CLONE"
check_if_new_name_exists = lims_instance.get_projects(name=new_name)
if check_if_new_name_exists:
- return {'error': f'A project with the name {new_name} already exists'}
-
+ return {"error": f"A project with the name {new_name} already exists"}
+
try:
- new_project = Project.create(lims_instance, udfs=proj_values['udfs'], name=new_name,
- researcher=existing_project.researcher)
+ new_project = Project.create(
+ lims_instance,
+ udfs=proj_values["udfs"],
+ name=new_name,
+ researcher=existing_project.researcher,
+ )
except requests.exceptions.HTTPError as e:
- return {'error': e.message}
-
- return {'project_id': new_project.id, 'project_name': new_project.name}
-
-
+ return {"error": e.message}
+ return {"project_id": new_project.id, "project_name": new_project.name}
diff --git a/status/controls.py b/status/controls.py
index 1eef1ba54..0c333f404 100644
--- a/status/controls.py
+++ b/status/controls.py
@@ -1,77 +1,98 @@
"""
- Handler related to Controls page
+Handler related to Controls page
"""
+
from genologics.config import BASEURI
from status.util import SafeHandler
class ControlsHandler(SafeHandler):
-
def get(self):
- t = self.application.loader.load('controls.html')
+ t = self.application.loader.load("controls.html")
# get information from databases
ws_data, ws_name_data = self.worksets_data()
all_control_data = {}
- for control_data_type in ['negative', 'positive']:
- all_control_data[control_data_type] = self.collect_control_info(control_data_type, ws_data, ws_name_data)
-
+ for control_data_type in ["negative", "positive"]:
+ all_control_data[control_data_type] = self.collect_control_info(
+ control_data_type, ws_data, ws_name_data
+ )
+
# define headers for controls.html
headers = [
- ['Project', 'project'],
- ['Sample ID', 'sample_id'],
- ['Sample Name', 'customer_name'],
- ['Sample Status', 'status_manual'],
- ['Workset', 'workset_name'],
- ['Workset Projects', 'workset_projects'],
- ['Library Prep Status', 'prep_status'],
- ['Flowcell(s)', 'sequenced_fc'],
+ ["Project", "project"],
+ ["Sample ID", "sample_id"],
+ ["Sample Name", "customer_name"],
+ ["Sample Status", "status_manual"],
+ ["Workset", "workset_name"],
+ ["Workset Projects", "workset_projects"],
+ ["Library Prep Status", "prep_status"],
+ ["Flowcell(s)", "sequenced_fc"],
]
- #anything in here is used to create the .html page. In essence, anything listed here can be accessed in /controls.html
- self.write(
+ # anything in here is used to create the .html page. In essence, anything listed here can be accessed in /controls.html
+ self.write(
t.generate(
- gs_globals=self.application.gs_globals, user=self.get_current_user(),
- all_control_data = all_control_data, # control_data is a dictionary with the control data, it can be called in the html with the name before the equal sign
+ gs_globals=self.application.gs_globals,
+ user=self.get_current_user(),
+ all_control_data=all_control_data, # control_data is a dictionary with the control data, it can be called in the html with the name before the equal sign
headers=headers,
- ws_data = ws_data,
+ ws_data=ws_data,
lims_uri=BASEURI,
)
)
def find_control_data(self, control_type):
- """Find control data from the couchDB from project/controls view
- """
+ """Find control data from the couchDB from project/controls view"""
from collections import defaultdict
+
result = defaultdict(dict)
controls_view = self.application.projects_db.view("project/controls")
- all_cont_proj = controls_view[[control_type,'']:[control_type,"Z"]]
+ all_cont_proj = controls_view[[control_type, ""] : [control_type, "Z"]]
for cont_proj in all_cont_proj:
for cont_sample in cont_proj.value:
- for workset in cont_proj.value[cont_sample]: # here we create one entry in result for each workset, this will be one line in the controls table
+ for workset in cont_proj.value[
+ cont_sample
+ ]: # here we create one entry in result for each workset, this will be one line in the controls table
if workset != "no_workset":
- workset_sample_id = workset+cont_sample
+ workset_sample_id = workset + cont_sample
result[workset_sample_id]["sample_id"] = cont_sample
- result[workset_sample_id]["customer_name"] = cont_proj.value[cont_sample][workset]["customer_name"]
- if "status_manual" in cont_proj.value[cont_sample][workset]: # status originates from LIMS project overview, is often not set for controls
- result[workset_sample_id]["status_manual"] = cont_proj.value[cont_sample][workset]["status_manual"]
+ result[workset_sample_id]["customer_name"] = cont_proj.value[
+ cont_sample
+ ][workset]["customer_name"]
+ if (
+ "status_manual" in cont_proj.value[cont_sample][workset]
+ ): # status originates from LIMS project overview, is often not set for controls
+ result[workset_sample_id]["status_manual"] = (
+ cont_proj.value[cont_sample][workset]["status_manual"]
+ )
else:
- result[workset_sample_id]["status_manual"] = "* In Progress" # asterisk indicates that the status in LIMS is not set, the sample has a workset and so MUST be at least "In Progress"
+ result[workset_sample_id]["status_manual"] = (
+ "* In Progress" # asterisk indicates that the status in LIMS is not set, the sample has a workset and so MUST be at least "In Progress"
+ )
result[workset_sample_id]["project"] = cont_proj.key[1]
- result[workset_sample_id]["workset_name"] = cont_proj.value[cont_sample][workset]["workset_name"]
+ result[workset_sample_id]["workset_name"] = cont_proj.value[
+ cont_sample
+ ][workset]["workset_name"]
if "workset_id" in cont_proj.value[cont_sample][workset]:
- result[workset_sample_id]["workset_id"] = cont_proj.value[cont_sample][workset]["workset_id"]
+ result[workset_sample_id]["workset_id"] = cont_proj.value[
+ cont_sample
+ ][workset]["workset_id"]
else:
result[workset_sample_id]["workset_id"] = "NA"
if "prep_status" in cont_proj.value[cont_sample][workset]:
- result[workset_sample_id]["prep_status"] = cont_proj.value[cont_sample][workset]["prep_status"]
- else:
- result[workset_sample_id]["prep_status"] = ""
- result[workset_sample_id]["sequenced_fc"] = cont_proj.value[cont_sample][workset]["sequenced_fc"]
+ result[workset_sample_id]["prep_status"] = cont_proj.value[
+ cont_sample
+ ][workset]["prep_status"]
+ else:
+ result[workset_sample_id]["prep_status"] = ""
+ result[workset_sample_id]["sequenced_fc"] = cont_proj.value[
+ cont_sample
+ ][workset]["sequenced_fc"]
return result
-
+
def worksets_data(self):
"""retrieves projects for each workset and return a dictionary:
{"P.roject_00_01": "P12346" , "P.roject_00_02": "P23456", ...}
@@ -83,29 +104,39 @@ def worksets_data(self):
result = {}
result_just_ws_name = {}
- controls_ws_view = self.application.worksets_db.view("worksets/controls_project_list", descending=True)
+ controls_ws_view = self.application.worksets_db.view(
+ "worksets/controls_project_list", descending=True
+ )
for ws in controls_ws_view:
result[", ".join(ws.key)] = ws.value
result_just_ws_name[ws.key[1]] = ws.value
return result, result_just_ws_name
-
+
def collect_control_info(self, control_type, workset_data, workset_name_data):
"""
- - Get control data from couchDB via the function find_control_data
- - Add workset projects to each control in the control_data dictionary
+ - Get control data from couchDB via the function find_control_data
+ - Add workset projects to each control in the control_data dictionary
"""
- control_data = self.find_control_data(control_type+" control")
+ control_data = self.find_control_data(control_type + " control")
for control in control_data:
if control != "no_workset":
if "workset_id" in control_data[control]:
- control_ws_id_name = ", ".join([control_data[control]["workset_id"], control_data[control]["workset_name"]])
+ control_ws_id_name = ", ".join(
+ [
+ control_data[control]["workset_id"],
+ control_data[control]["workset_name"],
+ ]
+ )
if control_ws_id_name in workset_data:
- control_data[control]["workset_projects"] = workset_data[control_ws_id_name]
- elif control in workset_name_data: #if the sample doesn't have a workset_id I only use the workset name to retrieve the projects of the workset
- control_data[control]["workset_projects"] = workset_name_data[control_data[control]["workset_name"]]
+ control_data[control]["workset_projects"] = workset_data[
+ control_ws_id_name
+ ]
+ elif (
+ control in workset_name_data
+ ): # if the sample doesn't have a workset_id I only use the workset name to retrieve the projects of the workset
+ control_data[control]["workset_projects"] = workset_name_data[
+ control_data[control]["workset_name"]
+ ]
else:
control_data[control]["ws_not_found"] = True
return control_data
-
-
-
diff --git a/status/deliveries.py b/status/deliveries.py
index ceeebccae..7c586f86b 100644
--- a/status/deliveries.py
+++ b/status/deliveries.py
@@ -75,7 +75,7 @@ def get(self):
summary_view = self.application.projects_db.view(
"project/summary", descending=True
)
- summary_view = summary_view[["open", "Z"]:["open", ""]]
+ summary_view = summary_view[["open", "Z"] : ["open", ""]]
summary_data = {}
for project in summary_view:
# todo: check if this one works
@@ -221,12 +221,12 @@ def get(self):
lane_status = self.__aggregate_status(lane_statuses)
- runs_bioinfo[flowcell_id]["lanes"][lane_id][
- "lane_status"
- ] = lane_status
- runs_bioinfo[flowcell_id]["lanes"][lane_id][
- "checklist"
- ] = lane_checklists
+ runs_bioinfo[flowcell_id]["lanes"][lane_id]["lane_status"] = (
+ lane_status
+ )
+ runs_bioinfo[flowcell_id]["lanes"][lane_id]["checklist"] = (
+ lane_checklists
+ )
flowcell_statuses.append(lane_status)
# the same logic here -> agregate flowcell statuses
@@ -256,10 +256,7 @@ def get(self):
# project type (needed for filters)
project_type = (
- summary_data[project_id]
- .get("details")
- .get("type")
- or "unknown"
+ summary_data[project_id].get("details").get("type") or "unknown"
)
if project_type not in project_type_list:
diff --git a/status/flowcell.py b/status/flowcell.py
index 091bebdb3..3983c6906 100644
--- a/status/flowcell.py
+++ b/status/flowcell.py
@@ -153,10 +153,16 @@ def get(self, flowcell_id):
for lane in lane_details:
if lane["Project"] == proj and lane["clustersnb"]:
if lane["overthirty"]:
- weighted_sum_q30 += int(lane["clustersnb"].replace(",", "")) * float(lane["overthirty"])
+ weighted_sum_q30 += int(
+ lane["clustersnb"].replace(",", "")
+ ) * float(lane["overthirty"])
else:
- sum_yield_with_zero_q30 += int(lane["clustersnb"].replace(",", ""))
- weighted_mean_q30 = weighted_sum_q30 / (sum_project_lane_yield - sum_yield_with_zero_q30)
+ sum_yield_with_zero_q30 += int(
+ lane["clustersnb"].replace(",", "")
+ )
+ weighted_mean_q30 = weighted_sum_q30 / (
+ sum_project_lane_yield - sum_yield_with_zero_q30
+ )
else:
weighted_mean_q30 = 0
proj_lane_percentage_obtained = (
@@ -226,15 +232,27 @@ def get(self, flowcell_id):
for lane in lane_details:
if lane["SampleName"] == sample and lane["clustersnb"]:
if lane["overthirty"]:
- weighted_sum_q30 += int(lane["clustersnb"].replace(",", "")) * float(lane["overthirty"])
+ weighted_sum_q30 += int(
+ lane["clustersnb"].replace(",", "")
+ ) * float(lane["overthirty"])
else:
- sum_yield_with_zero_q30 += int(lane["clustersnb"].replace(",", ""))
+ sum_yield_with_zero_q30 += int(
+ lane["clustersnb"].replace(",", "")
+ )
if lane["mqs"]:
- weighted_sum_mqs += int(lane["clustersnb"].replace(",", "")) * float(lane["mqs"])
+ weighted_sum_mqs += int(
+ lane["clustersnb"].replace(",", "")
+ ) * float(lane["mqs"])
else:
- sum_yield_with_zero_mqs += int(lane["clustersnb"].replace(",", ""))
- weighted_mean_q30 = weighted_sum_q30 / (sum_sample_lane_yield - sum_yield_with_zero_q30)
- weighted_mqs = weighted_sum_mqs / (sum_sample_lane_yield - sum_yield_with_zero_mqs)
+ sum_yield_with_zero_mqs += int(
+ lane["clustersnb"].replace(",", "")
+ )
+ weighted_mean_q30 = weighted_sum_q30 / (
+ sum_sample_lane_yield - sum_yield_with_zero_q30
+ )
+ weighted_mqs = weighted_sum_mqs / (
+ sum_sample_lane_yield - sum_yield_with_zero_mqs
+ )
else:
weighted_mean_q30 = 0
weighted_mqs = 0
@@ -456,6 +474,7 @@ def get(self, name):
self.write(open(report_path).read())
+
class ElementFlowcellHandler(SafeHandler):
def get(self, name):
t = self.application.loader.load("element_flowcell.html")
@@ -467,18 +486,14 @@ def get(self, name):
)
)
+
def get_project_ids_from_names(project_names: list, projects_db) -> list[dict]:
"""Given a list of project names, perform a lookup to the projects db and return a json-style list of projects"""
projects = []
for project_name in project_names:
rows = projects_db.view("projects/name_to_id")[project_name].rows
if rows:
- projects.append(
- {
- "project_id": rows[0].value,
- "project_name": project_name
- }
- )
+ projects.append({"project_id": rows[0].value, "project_name": project_name})
return projects
@@ -489,19 +504,16 @@ def get_project_names_from_ids(project_ids: list, projects_db) -> list[dict]:
for project_id in project_ids:
rows = projects_db.view("projects/id_to_name")[project_id].rows
if rows:
- projects.append(
- {
- "project_id": project_id,
- "project_name": rows[0].value
- }
- )
+ projects.append({"project_id": project_id, "project_name": rows[0].value})
return projects
class ElementFlowcellDataHandler(SafeHandler):
def get(self, name):
- rows = self.application.element_runs_db.view('info/id', include_docs=True)[name].rows
+ rows = self.application.element_runs_db.view("info/id", include_docs=True)[
+ name
+ ].rows
if rows:
flowcell = rows[0].doc
@@ -509,37 +521,59 @@ def get(self, name):
project_names = []
demultiplexing_done = False
- if flowcell.get('Element', {}).get('Demultiplex_Stats', {}).get('Index_Assignment'):
+ if (
+ flowcell.get("Element", {})
+ .get("Demultiplex_Stats", {})
+ .get("Index_Assignment")
+ ):
demultiplexing_done = True
if demultiplexing_done:
samples_with_duplicates = [
- sample for sample in flowcell.get('Element', {}).get('Demultiplex_Stats', {}).get('Index_Assignment', [])
- ]
- project_names_with_duplicates = [sample.get('Project').replace('__', '.') for sample in samples_with_duplicates if sample.get('Project')]
+ sample
+ for sample in flowcell.get("Element", {})
+ .get("Demultiplex_Stats", {})
+ .get("Index_Assignment", [])
+ ]
+ project_names_with_duplicates = [
+ sample.get("Project").replace("__", ".")
+ for sample in samples_with_duplicates
+ if sample.get("Project")
+ ]
project_names = list(set(project_names_with_duplicates))
- projects = get_project_ids_from_names(project_names, self.application.projects_db)
+ projects = get_project_ids_from_names(
+ project_names, self.application.projects_db
+ )
else:
project_ids = []
- for lane in flowcell.get('instrument_generated_files', {}).get('AvitiRunStats.json', {}).get('LaneStats', {}):
- for sample in lane.get('IndexAssignments', {}).get('IndexSamples', {}):
- sample_name = sample.get('SampleName')
+ for lane in (
+ flowcell.get("instrument_generated_files", {})
+ .get("AvitiRunStats.json", {})
+ .get("LaneStats", {})
+ ):
+ for sample in lane.get("IndexAssignments", {}).get(
+ "IndexSamples", {}
+ ):
+ sample_name = sample.get("SampleName")
# Check that the sample name is on the format PX..X_Y..Y"
if re.match(r"^P\d+_\d+$", sample_name):
# Parse out the PXXXXXX number from the sample name on the format "
- project_id = sample_name.split('_')[0]
+ project_id = sample_name.split("_")[0]
project_ids.append(project_id)
project_ids = list(set(project_ids))
- projects = get_project_names_from_ids(project_ids, self.application.projects_db)
+ projects = get_project_names_from_ids(
+ project_ids, self.application.projects_db
+ )
- flowcell['projects'] = projects
+ flowcell["projects"] = projects
self.write(flowcell)
else:
self.set_status(404)
self.write({"error": f"No element flowcell found for run ID {name}"})
+
class ONTFlowcellHandler(SafeHandler):
"""Serves a page which shows information for a given ONT flowcell."""
diff --git a/status/flowcells.py b/status/flowcells.py
index bfcd750f1..22a4d59e7 100644
--- a/status/flowcells.py
+++ b/status/flowcells.py
@@ -1,5 +1,4 @@
-"""Set of handlers related with Flowcells
-"""
+"""Set of handlers related with Flowcells"""
import datetime
import json
@@ -161,9 +160,7 @@ def list_ont_flowcells(self):
return ont_flowcells, unfetched_runs
def list_element_flowcells(self):
- return self.application.element_runs_db.view(
- "info/summary", descending=True
- )
+ return self.application.element_runs_db.view("info/summary", descending=True)
def get(self):
# Default is to NOT show all flowcells
@@ -508,9 +505,7 @@ def get(self, query):
fc_date_run = fc_long_name.split("_")[0]
if len(fc_date_run) > 6:
fc_date_run = fc_date_run[-6:]
- fc_short_name = (
- fc_date_run + "_" + fc_long_name.split("_")[-1]
- )
+ fc_short_name = fc_date_run + "_" + fc_long_name.split("_")[-1]
for info_row in fc_view[fc_short_name]:
row.value["run_mode"] = info_row.value["run_mode"]
row.value["longer_read_length"] = info_row.value[
diff --git a/status/instruments.py b/status/instruments.py
index ce09e51b3..d7bce3643 100644
--- a/status/instruments.py
+++ b/status/instruments.py
@@ -31,23 +31,45 @@ def recover_logs(handler, search_string=None, inst_type="bravo"):
return valid_rows
elif inst_type == "biomek":
- instruments_list = {row.key: row.value for row in handler.application.instruments_db.view("info/id_to_name").rows}
+ instruments_list = {
+ row.key: row.value
+ for row in handler.application.instruments_db.view("info/id_to_name").rows
+ }
# by default, return all logs from the last week
- date_earlier = (datetime.datetime.now() - relativedelta(weeks=1)).strftime('%Y-%m-%dT%H:%M:%SZ')
- date_later = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
+ date_earlier = (datetime.datetime.now() - relativedelta(weeks=1)).strftime(
+ "%Y-%m-%dT%H:%M:%SZ"
+ )
+ date_later = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
valid_rows = []
if search_string:
ts1 = search_string.split("-")[0]
ts2 = search_string.split("-")[1]
- date_earlier = datetime.datetime.fromtimestamp(int(ts1)).strftime('%Y-%m-%dT%H:%M:%SZ')
- date_later = datetime.datetime.fromtimestamp(int(ts2)).strftime('%Y-%m-%dT%H:%M:%SZ')
+ date_earlier = datetime.datetime.fromtimestamp(int(ts1)).strftime(
+ "%Y-%m-%dT%H:%M:%SZ"
+ )
+ date_later = datetime.datetime.fromtimestamp(int(ts2)).strftime(
+ "%Y-%m-%dT%H:%M:%SZ"
+ )
- for row in handler.application.biomek_errs_db.view("dates/timestamp", startkey=date_earlier, endkey=date_later):
- date = datetime.datetime.strptime(row.key, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal())
+ for row in handler.application.biomek_errs_db.view(
+ "dates/timestamp", startkey=date_earlier, endkey=date_later
+ ):
+ date = (
+ datetime.datetime.strptime(row.key, "%Y-%m-%dT%H:%M:%S.%fZ")
+ .replace(tzinfo=tz.tzutc())
+ .astimezone(tz.tzlocal())
+ )
inst = f"{instruments_list[row.value['inst_id']]}({row.value['inst_id']})"
- method = row.value.get("method", 'diff')
+ method = row.value.get("method", "diff")
errs = row.value["errors"]
- valid_rows.append({"timestamp": f"{date}", "instrument_name": inst, "method": method, "message": errs})
+ valid_rows.append(
+ {
+ "timestamp": f"{date}",
+ "instrument_name": inst,
+ "method": method,
+ "message": errs,
+ }
+ )
return valid_rows
@@ -71,7 +93,7 @@ class InstrumentLogsHandler(SafeHandler):
Loaded through /instrument_logs/([^/]*)$
"""
- def get(self, search_string=None):
+ def get(self, search_string=None):
t = self.application.loader.load("instrument_logs.html")
self.write(
t.generate(
diff --git a/status/invoicing.py b/status/invoicing.py
index fff149ff9..09044e151 100644
--- a/status/invoicing.py
+++ b/status/invoicing.py
@@ -112,7 +112,9 @@ def post(self):
self.set_status(400)
return self.write("Error: Chosen agreement not found")
- agreement_doc["invoice_spec_generated_for"] = agreement_for_invoice_timestamp
+ agreement_doc["invoice_spec_generated_for"] = (
+ agreement_for_invoice_timestamp
+ )
agreement_doc["invoice_spec_generated_by"] = self.get_current_user().name
generated_at = int(datetime.datetime.now().timestamp() * 1000)
agreement_doc["invoice_spec_generated_at"] = generated_at
@@ -120,9 +122,9 @@ def post(self):
return_msg = "Invoice spec generated"
# # probably add a try-except here in the future
self.application.agreements_db.save(agreement_doc)
-
+
if action_type == "invalidate":
- generated_at = post_data["timestamp"]
+ generated_at = post_data["timestamp"]
return_msg = "Invoice spec invalidated"
proj_doc["invoice_spec_generated"] = generated_at
@@ -247,9 +249,7 @@ def post(self):
)
self.set_header("Content-Type", "application/zip")
- self.set_header(
- "Content-Disposition", f"attachment; filename={fileName}"
- )
+ self.set_header("Content-Disposition", f"attachment; filename={fileName}")
self.write(buff.getvalue())
buff.close()
self.finish()
@@ -323,9 +323,9 @@ def get_invoice_data(
proj_specs["invoice_created"] = datetime.datetime.fromtimestamp(
agreement_doc["invoice_spec_generated_at"] / 1000.0
).strftime("%Y-%m-%d")
- proj_specs[
- "contract_name"
- ] = f'{proj_id}_{agreement_doc["invoice_spec_generated_for"]}'
+ proj_specs["contract_name"] = (
+ f'{proj_id}_{agreement_doc["invoice_spec_generated_for"]}'
+ )
proj_specs["summary"] = markdown.markdown(
invoiced_agreement["agreement_summary"], extensions=["sane_lists"]
)
diff --git a/status/ngisweden_stats.py b/status/ngisweden_stats.py
index de3b57ed6..83dd6f645 100644
--- a/status/ngisweden_stats.py
+++ b/status/ngisweden_stats.py
@@ -13,4 +13,4 @@ def get(self):
t.generate(
gs_globals=self.application.gs_globals, user=self.get_current_user()
)
- )
\ No newline at end of file
+ )
diff --git a/status/ont_plot.py b/status/ont_plot.py
index ffbb2ff87..0e9ecd08f 100644
--- a/status/ont_plot.py
+++ b/status/ont_plot.py
@@ -16,9 +16,14 @@ def get(self, search_string=None):
first_term = last_month.isoformat()[2:10].replace("-", "")
second_term = datetime.datetime.now().isoformat()[2:10].replace("-", "")
else:
- first_term, second_term = search_string.split('-')
+ first_term, second_term = search_string.split("-")
- docs= [x.value for x in self.application.nanopore_runs_db.view("info/all_stats")['20'+first_term:'20'+second_term+'ZZZZ'].rows]
+ docs = [
+ x.value
+ for x in self.application.nanopore_runs_db.view("info/all_stats")[
+ "20" + first_term : "20" + second_term + "ZZZZ"
+ ].rows
+ ]
self.set_header("Content-type", "application/json")
self.write(json.dumps(docs))
@@ -26,7 +31,7 @@ def get(self, search_string=None):
class ONTFlowcellPlotHandler(SafeHandler):
"""Handles the ONT flowcell plot page
-
+
Loaded through /ont_flowcell_plot/([^/]*)$
"""
@@ -36,4 +41,4 @@ def get(self):
t.generate(
gs_globals=self.application.gs_globals, user=self.get_current_user()
)
- )
\ No newline at end of file
+ )
diff --git a/status/pricing.py b/status/pricing.py
index e6381bc7e..b000f2f30 100644
--- a/status/pricing.py
+++ b/status/pricing.py
@@ -816,10 +816,10 @@ def post(self):
template_text["appendices"], extensions=["sane_lists"]
)
for condition in template_text["first_page_text"]["specific_conditions"]:
- template_text["first_page_text"]["specific_conditions"][
- condition
- ] = markdown.markdown(
- template_text["first_page_text"]["specific_conditions"][condition]
+ template_text["first_page_text"]["specific_conditions"][condition] = (
+ markdown.markdown(
+ template_text["first_page_text"]["specific_conditions"][condition]
+ )
)
if "agreement_summary" not in quote_input.keys():
diff --git a/status/production.py b/status/production.py
index be9197655..a1db93c24 100644
--- a/status/production.py
+++ b/status/production.py
@@ -1,5 +1,5 @@
-""" Handlers related to data production.
-"""
+"""Handlers related to data production."""
+
from datetime import datetime
from dateutil import parser
@@ -30,4 +30,3 @@ def get(self):
user=self.get_current_user(),
)
)
-
diff --git a/status/projects.py b/status/projects.py
index edf3fa1e2..930ae77be 100644
--- a/status/projects.py
+++ b/status/projects.py
@@ -1,5 +1,5 @@
-""" Handlers for sequencing project information.
-"""
+"""Handlers for sequencing project information."""
+
import base64
import datetime
import itertools
@@ -190,9 +190,9 @@ def project_summary_data(self, row):
end_date = dateutil.parser.parse(row.value["queued"])
diff = (end_date - dateutil.parser.parse(row.value["open_date"])).days
if "queued" not in row.value and diff > 14:
- row.value[
- "days_in_reception_control"
- ] = f'{diff}'
+ row.value["days_in_reception_control"] = (
+ f'{diff}'
+ )
else:
row.value["days_in_reception_control"] = diff
@@ -336,7 +336,7 @@ def list_projects(self, filter_projects="all"):
statusdb_statuses.add(status)
for status in statusdb_statuses:
- view_calls.append(summary_view[[status, "Z"]:[status, ""]])
+ view_calls.append(summary_view[[status, "Z"] : [status, ""]])
filtered_projects = []
@@ -534,12 +534,12 @@ def search_project_names(self, search_string=""):
search_string in row_key.lower()
or search_string in row_value[0].lower()
or (row_value[1] and search_string in row_value[1].lower())
- or search_string in f'{row_value[0]}, {row_key}'.lower()
+ or search_string in f"{row_value[0]}, {row_key}".lower()
or (row_value[2] and search_string in row_value[2].lower())
):
project = {
"url": "/project/" + row_value[0],
- "name": f"{row_value[0]}, {row_key}"
+ "name": f"{row_value[0]}, {row_key}",
}
projects.append(project)
@@ -859,9 +859,7 @@ def post(self, project, type):
reason="No files to be downloaded!!",
)
self.set_header("Content-Type", "application/zip")
- self.set_header(
- "Content-Disposition", f"attachment; filename={fileName}"
- )
+ self.set_header("Content-Disposition", f"attachment; filename={fileName}")
self.write(f.getvalue())
f.close()
self.finish()
@@ -1124,7 +1122,7 @@ def get(self):
"project/summary_status", descending=True
)
for status in statuses:
- view_calls.append(view[[status, "Z"]:[status, ""]])
+ view_calls.append(view[[status, "Z"] : [status, ""]])
for row in itertools.chain.from_iterable(view_calls):
proj_id_name_lib = (
row.value["project_name"]
diff --git a/status/queues.py b/status/queues.py
index c18e07521..a9eac9285 100644
--- a/status/queues.py
+++ b/status/queues.py
@@ -462,15 +462,15 @@ def get(self):
conc_qpcr = row[0]
pool_groups[method][project]["final_loading_conc"] = final_loading_conc
- pool_groups[method][project]["plates"][container][
- "conc_qpcr"
- ] = conc_qpcr
+ pool_groups[method][project]["plates"][container]["conc_qpcr"] = (
+ conc_qpcr
+ )
pool_groups[method][project]["plates"][container]["pools"][-1][
"is_rerun"
] = is_rerun
- pool_groups[method][project]["plates"][container][
- "conc_pool"
- ] = pool_conc
+ pool_groups[method][project]["plates"][container]["conc_pool"] = (
+ pool_conc
+ )
self.set_header("Content-type", "application/json")
self.write(json.dumps(pool_groups))
@@ -692,87 +692,114 @@ def get(self):
t = self.application.loader.load("smartseq3_progress.html")
self.write(
t.generate(
- gs_globals=self.application.gs_globals,
+ gs_globals=self.application.gs_globals,
user=self.get_current_user(),
)
)
+
class SmartSeq3ProgressPageDataHandler(SafeHandler):
"""Serves a page with SmartSeq3 progress table with all samples in the workflow
URL: /api/v1/smartseq3_progress
"""
def get(self):
-
# Get all samples in the SmartSeq3 workflow
gen_log = logging.getLogger("tornado.general")
- workflow_name = 'Smart-seq3 for NovaSeqXPlus v1.0'
- sample_level_udfs_list = ['Sample Type', 'Sample Links', 'Cell Type', 'Tissue Type', 'Species Name', 'Comment']
- project_level_udfs_list = ['Sequence units ordered (lanes)']
+ workflow_name = "Smart-seq3 for NovaSeqXPlus v1.0"
+ sample_level_udfs_list = [
+ "Sample Type",
+ "Sample Links",
+ "Cell Type",
+ "Tissue Type",
+ "Species Name",
+ "Comment",
+ ]
+ project_level_udfs_list = ["Sequence units ordered (lanes)"]
# Define the step level udfs and the step names they are associated with
- step_level_udfs_definition = {'Plates to Send': ['Sample plate sent date'],
- 'Plates Sent': ['Sample plate received date'],
- 'Lysis, RT and pre-Amp': ['PCR Cycles'],
- 'cDNA QC': ['Optimal Cycle Number']}
+ step_level_udfs_definition = {
+ "Plates to Send": ["Sample plate sent date"],
+ "Plates Sent": ["Sample plate received date"],
+ "Lysis, RT and pre-Amp": ["PCR Cycles"],
+ "cDNA QC": ["Optimal Cycle Number"],
+ }
step_level_udfs_id = {}
- samples_in_step_dict= {}
+ samples_in_step_dict = {}
project_level_udfs = {}
geno_session = geno_utils.get_session()
- #Get all steps in the workflow with step ids and step name
- workflow_steps = geno_queries.get_all_steps_for_workflow(geno_session, workflow_name)
+ # Get all steps in the workflow with step ids and step name
+ workflow_steps = geno_queries.get_all_steps_for_workflow(
+ geno_session, workflow_name
+ )
stepid_to_stepindex = {}
-
+
for stepname, stepid, protocolname, stepindex in workflow_steps:
- samples_in_step_dict[stepindex] = {'stepname': stepname,
- 'protocolname': protocolname,
- 'stepid': stepid,
- 'samples': {}}
+ samples_in_step_dict[stepindex] = {
+ "stepname": stepname,
+ "protocolname": protocolname,
+ "stepid": stepid,
+ "samples": {},
+ }
stepid_to_stepindex[stepid] = stepindex
- #Connect stepid to udfname
- #We need this cos different versions of the workflow will have different stepids for the same stepname
+ # Connect stepid to udfname
+ # We need this cos different versions of the workflow will have different stepids for the same stepname
if stepname in step_level_udfs_definition:
step_level_udfs_id[stepid] = step_level_udfs_definition[stepname]
-
+
# Get all the information for each sample in given workflow
- samples = geno_queries.get_all_samples_in_a_workflow(geno_session, workflow_name)
+ samples = geno_queries.get_all_samples_in_a_workflow(
+ geno_session, workflow_name
+ )
for _, sample_name, sampleid, stepid, projectid, _ in samples:
- sample_dict = {'projectid': projectid}
+ sample_dict = {"projectid": projectid}
if projectid not in project_level_udfs:
- query_res = geno_queries.get_udfs_from_project(geno_session, projectid, project_level_udfs_list)
+ query_res = geno_queries.get_udfs_from_project(
+ geno_session, projectid, project_level_udfs_list
+ )
proj_data = {}
for udfname, udfvalue, _, projectname in query_res:
proj_data[udfname] = udfvalue
- #This is redundant
- proj_data['projectname'] = projectname
+ # This is redundant
+ proj_data["projectname"] = projectname
project_level_udfs[projectid] = proj_data
- #Get sample level udfs
- sample_level_udfs = geno_queries.get_udfs_from_sample(geno_session, sampleid, sample_level_udfs_list)
+ # Get sample level udfs
+ sample_level_udfs = geno_queries.get_udfs_from_sample(
+ geno_session, sampleid, sample_level_udfs_list
+ )
for udfname, udfvalue, _ in sample_level_udfs:
sample_dict[udfname] = udfvalue
-
- #Get reagent label
- sample_dict['Reagent Label'] = geno_queries.get_reagentlabel_for_sample(geno_session, sampleid)
- #Get udfs specific to a step and the steps before it
+ # Get reagent label
+ sample_dict["Reagent Label"] = geno_queries.get_reagentlabel_for_sample(
+ geno_session, sampleid
+ )
+
+ # Get udfs specific to a step and the steps before it
for step in stepid_to_stepindex:
# Only check steps before the current step
- if stepid_to_stepindex[step]<= stepid_to_stepindex[stepid]:
- #Check if the step has any udfs associated with it that we are interested in
+ if stepid_to_stepindex[step] <= stepid_to_stepindex[stepid]:
+ # Check if the step has any udfs associated with it that we are interested in
if step in step_level_udfs_id:
- step_level_udfs = geno_queries.get_sample_udfs_from_step(geno_session, sampleid, step, step_level_udfs_id[step])
+ step_level_udfs = geno_queries.get_sample_udfs_from_step(
+ geno_session, sampleid, step, step_level_udfs_id[step]
+ )
for udfname, udfvalue, _ in step_level_udfs:
if udfvalue:
- #If udfname was already set, check if the value is the same
+ # If udfname was already set, check if the value is the same
if udfname in sample_dict:
- #If the value is different, log a warning
+ # If the value is different, log a warning
if sample_dict[udfname] != udfvalue:
- gen_log.warn(f'Sample {sample_name} has different values for udf {udfname} in step {stepname} '
- f'previous value: {sample_dict[udfname]}, new value: {udfvalue}')
+ gen_log.warn(
+ f"Sample {sample_name} has different values for udf {udfname} in step {stepname} "
+ f"previous value: {sample_dict[udfname]}, new value: {udfvalue}"
+ )
else:
sample_dict[udfname] = udfvalue
- samples_in_step_dict[stepid_to_stepindex[stepid]]['samples'][sample_name] = sample_dict
+ samples_in_step_dict[stepid_to_stepindex[stepid]]["samples"][
+ sample_name
+ ] = sample_dict
self.set_header("Content-type", "application/json")
- self.write(json.dumps([samples_in_step_dict, project_level_udfs]))
\ No newline at end of file
+ self.write(json.dumps([samples_in_step_dict, project_level_udfs]))
diff --git a/status/reads_plot.py b/status/reads_plot.py
index c4c3238b5..978cecc00 100644
--- a/status/reads_plot.py
+++ b/status/reads_plot.py
@@ -16,13 +16,18 @@ def get(self, search_string=None):
first_term = last_month.isoformat()[2:10].replace("-", "")
second_term = datetime.datetime.now().isoformat()[2:10].replace("-", "")
else:
- first_term, second_term = search_string.split('-')
+ first_term, second_term = search_string.split("-")
- docs = [x.value for x in self.application.x_flowcells_db.view("plot/reads_yield")[first_term:second_term+'ZZZZ'].rows]
+ docs = [
+ x.value
+ for x in self.application.x_flowcells_db.view("plot/reads_yield")[
+ first_term : second_term + "ZZZZ"
+ ].rows
+ ]
for doc in docs:
- fc_yield = int(doc.get('total_yield')) / 1000000
- doc['total_yield'] = fc_yield
+ fc_yield = int(doc.get("total_yield")) / 1000000
+ doc["total_yield"] = fc_yield
self.set_header("Content-type", "application/json")
self.write(json.dumps(docs))
diff --git a/status/running_notes.py b/status/running_notes.py
index ebdc1f523..142c23dd6 100644
--- a/status/running_notes.py
+++ b/status/running_notes.py
@@ -143,7 +143,9 @@ def make_running_note(
"updated_at_utc": created_time.isoformat(),
}
# Save in running notes db
- gen_log.info(f"Running note to be created with id {newNote['_id']} by {user} at {created_time.isoformat()}")
+ gen_log.info(
+ f"Running note to be created with id {newNote['_id']} by {user} at {created_time.isoformat()}"
+ )
application.running_notes_db.save(newNote)
#### Check and send mail to tagged users (for project running notes as flowcell and workset notes are copied over)
if note_type == "project":
@@ -207,7 +209,7 @@ def make_running_note(
"project",
created_time,
)
- created_note = application.running_notes_db.get(newNote["_id"])
+ created_note = application.running_notes_db.get(newNote["_id"])
return created_note
@staticmethod
@@ -256,7 +258,9 @@ def notify_tagged_user(
if option == "Slack" or option == "Both":
nest_asyncio.apply()
client = slack_sdk.WebClient(token=application.slack_token)
- notification_text = f"{tagger} has {notf_text} in {project_id}, {project_name}!"
+ notification_text = (
+ f"{tagger} has {notf_text} in {project_id}, {project_name}!"
+ )
blocks = [
{
"type": "section",
@@ -312,7 +316,9 @@ def notify_tagged_user(
# default is email
if option == "E-mail" or option == "Both":
msg = MIMEMultipart("alternative")
- msg["Subject"] = f"[GenStat] Running Note:{project_id}, {project_name}"
+ msg["Subject"] = (
+ f"[GenStat] Running Note:{project_id}, {project_name}"
+ )
msg["From"] = "genomics-status"
msg["To"] = view_result[user]
text = f"{email_text} in the project {project_id}, {project_name}! The note is as follows\n\
diff --git a/status/sensorpush.py b/status/sensorpush.py
index 3ad583a30..1ce58962e 100644
--- a/status/sensorpush.py
+++ b/status/sensorpush.py
@@ -1,5 +1,4 @@
-"""Set of handlers related with Sensorpush data
-"""
+"""Set of handlers related with Sensorpush data"""
import datetime
import json
@@ -42,7 +41,7 @@ def get_samples(self, start_days_ago=14):
"intervals_higher": [],
}
for sensor_daily_row in samples_view[
- [sensor_original, start_time_str]:[sensor_original, "9999"]
+ [sensor_original, start_time_str] : [sensor_original, "9999"]
]:
_, timestamp = sensor_daily_row.key
doc = sensor_daily_row.value
diff --git a/status/sequencing.py b/status/sequencing.py
index bbe1f1c9c..cd339e2eb 100644
--- a/status/sequencing.py
+++ b/status/sequencing.py
@@ -1,5 +1,5 @@
-""" Handlers related to data sequencing statistics.
-"""
+"""Handlers related to data sequencing statistics."""
+
import json
from collections import defaultdict
from datetime import datetime
diff --git a/status/statistics.py b/status/statistics.py
index 1a227adf4..bd02be557 100644
--- a/status/statistics.py
+++ b/status/statistics.py
@@ -220,7 +220,9 @@ def __init__(self, *args, **kwargs):
),
}
self.flowcell_aggregates = {"bp_seq_per_week": ("dashboard/week_instr_bp", 2)}
- self.nanopore_flowcell_aggregates = {"bp_seq_per_week": ("dashboard/week_instr_bp", 2)}
+ self.nanopore_flowcell_aggregates = {
+ "bp_seq_per_week": ("dashboard/week_instr_bp", 2)
+ }
self.cleaning = get_clean_application_keys(self)
@@ -242,10 +244,10 @@ def get(self):
)
for fa in self.nanopore_flowcell_aggregates:
nanopore_stats = get_stats_data(
- self.application.nanopore_runs_db,
- self.nanopore_flowcell_aggregates[fa][0],
- self.nanopore_flowcell_aggregates[fa][1],
- self.cleaning,
+ self.application.nanopore_runs_db,
+ self.nanopore_flowcell_aggregates[fa][0],
+ self.nanopore_flowcell_aggregates[fa][1],
+ self.cleaning,
)
# Use |= to merge the resulting dictionary with what's already
diff --git a/status/suggestion_box.py b/status/suggestion_box.py
index 55449ea17..adbd958bb 100644
--- a/status/suggestion_box.py
+++ b/status/suggestion_box.py
@@ -51,25 +51,25 @@ def post(self):
user = self.get_current_user()
description = self.get_argument("description")
suggestion = self.get_argument("suggestion")
- deployment = "" if self.application.gs_globals['prod'] else "[STAGE] "
+ deployment = "" if self.application.gs_globals["prod"] else "[STAGE] "
jira = Jira(
- url=self.application.jira_url,
- username=self.application.jira_user,
- password=self.application.jira_api_token
+ url=self.application.jira_url,
+ username=self.application.jira_user,
+ password=self.application.jira_api_token,
)
summary = TITLE_TEMPLATE.format(deployment=deployment, title=title, area=area)
description = DESCRIPTION_TEMPLATE.format(
- date=date.ctime(),
- area=area,
- system=system,
- importance=importance,
- difficulty=difficulty,
- user=user.name,
- description=description,
- suggestion=suggestion,
- )
+ date=date.ctime(),
+ area=area,
+ system=system,
+ importance=importance,
+ difficulty=difficulty,
+ user=user.name,
+ description=description,
+ suggestion=suggestion,
+ )
new_card = jira.issue_create(
fields={
"project": {"key": self.application.jira_project_key},
@@ -84,17 +84,19 @@ def post(self):
# Save the information of the card in the database
doc = Document(
- date=date.isoformat(),
- card_id= new_card.get('id'),
- name=summary,
- url= f"{self.application.jira_url}/jira/core/projects/{self.application.jira_project_key}/board?selectedIssue={new_card.get('key')}",
- archived=False,
- source= "jira"
- )
-
- response = self.application.cloudant.post_document(db='suggestion_box', document=doc).get_result()
-
- if not response.get('ok'):
+ date=date.isoformat(),
+ card_id=new_card.get("id"),
+ name=summary,
+ url=f"{self.application.jira_url}/jira/core/projects/{self.application.jira_project_key}/board?selectedIssue={new_card.get('key')}",
+ archived=False,
+ source="jira",
+ )
+
+ response = self.application.cloudant.post_document(
+ db="suggestion_box", document=doc
+ ).get_result()
+
+ if not response.get("ok"):
self.set_status(500)
return
diff --git a/status/testing.py b/status/testing.py
index 3e10039fe..dd8bfcb0b 100644
--- a/status/testing.py
+++ b/status/testing.py
@@ -1,5 +1,5 @@
-""" Status Handlers used to test some functionalities while building layouts.
-"""
+"""Status Handlers used to test some functionalities while building layouts."""
+
import json
import random
diff --git a/status/util.py b/status/util.py
index 5316a95cd..8482eaf61 100644
--- a/status/util.py
+++ b/status/util.py
@@ -53,9 +53,7 @@ def is_sample_requirements_admin(self):
@property
def is_any_admin(self):
return (
- self.is_admin
- or self.is_pricing_admin
- or self.is_sample_requirements_admin
+ self.is_admin or self.is_pricing_admin or self.is_sample_requirements_admin
)
@property
diff --git a/status/worksets.py b/status/worksets.py
index 07d1a48f6..5ffe44d2c 100644
--- a/status/worksets.py
+++ b/status/worksets.py
@@ -1,6 +1,5 @@
"""Handlers for worksets"""
-
import datetime
import json
from collections import OrderedDict