diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e9e644a5..9e57f733 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,7 @@ jobs: strategy: max-parallel: 4 matrix: - os: [macos-11] + os: [macos-13] python-version: [ 3.8, 3.9 ] fail-fast: false runs-on: ${{ matrix.os }} @@ -43,7 +43,8 @@ jobs: env: LC_ALL: "en_US.UTF-8" run: | - brew install mysql + brew install mysql@8.0 + brew link mysql@8.0 --force mysql.server start mysql --host=$CP_MYSQL_TEST_HOST --user=$CP_MYSQL_TEST_USER --execute="CREATE DATABASE $CP_MYSQL_TEST_DB;" --skip-password - name: Installation @@ -51,7 +52,7 @@ jobs: | pip install pyinstaller pip install --upgrade pip setuptools wheel - pip install numpy>=1.20.1 + pip install "numpy>=1.20.1,<2" "Cython<3" # git clone https://github.com/CellProfiler/CellProfiler.git ~/cellprofiler pip install -e .[test] # pip install -e ~/cellprofiler diff --git a/cellprofiler_core/analysis/_runner.py b/cellprofiler_core/analysis/_runner.py index efbee900..d099dabd 100644 --- a/cellprofiler_core/analysis/_runner.py +++ b/cellprofiler_core/analysis/_runner.py @@ -102,7 +102,7 @@ def __init__( # should have jobserver() call load_measurements_from_buffer() rather # than interface() doing so. Currently, passing measurements in this # way seems like it might be buggy: - # http://code.google.com/p/h5py/issues/detail?id=244 + # https://github.com/h5py/h5py/issues/244 self.received_measurements_queue = queue.Queue(maxsize=10) self.shared_dicts = None diff --git a/cellprofiler_core/utilities/zmq/__init__.py b/cellprofiler_core/utilities/zmq/__init__.py index 7bd69099..01faab0e 100644 --- a/cellprofiler_core/utilities/zmq/__init__.py +++ b/cellprofiler_core/utilities/zmq/__init__.py @@ -17,6 +17,7 @@ LockStatusRequest, Request, ) +from ._event import PollTimeoutException NOTIFY_SOCKET_ADDR = "inproc://BoundaryNotifications" SD_KEY_DICT = "__keydict__" diff --git a/cellprofiler_core/utilities/zmq/_event.py b/cellprofiler_core/utilities/zmq/_event.py new file mode 100644 index 00000000..ca125c73 --- /dev/null +++ b/cellprofiler_core/utilities/zmq/_event.py @@ -0,0 +1,4 @@ +class PollTimeoutException(Exception): + """Exception issued by a timeout from polling""" + + pass diff --git a/cellprofiler_core/worker/_worker.py b/cellprofiler_core/worker/_worker.py index 8c100020..e17f7dff 100644 --- a/cellprofiler_core/worker/_worker.py +++ b/cellprofiler_core/worker/_worker.py @@ -29,12 +29,15 @@ from ..constants.worker import the_zmq_context from ..measurement import Measurements from ..utilities.measurement import load_measurements_from_buffer +from ..utilities.zmq import PollTimeoutException from ..pipeline import CancelledException from ..preferences import get_awt_headless from ..preferences import set_preferences_from_dict from ..utilities.zmq.communicable.reply.upstream_exit import UpstreamExit from ..workspace import Workspace +LOGGER = logging.getLogger(__name__) + class Worker: """An analysis worker processing work at a given address @@ -124,6 +127,7 @@ def run(self): ) t0 = time.time() self.work_socket = the_zmq_context.socket(zmq.REQ) + self.work_socket.set_hwm(2000) self.work_socket.connect(self.work_request_address) # fetch a job the_request = Work(self.current_analysis_id) @@ -304,18 +308,21 @@ def do_job(self, job): return if worker_runs_post_group: - last_workspace.interaction_handler = self.interaction_handler - last_workspace.cancel_handler = self.cancel_handler - last_workspace.post_group_display_handler = ( - self.post_group_display_handler - ) - # There might be an exception in this call, but it will be - # handled elsewhere, and there's nothing we can do for it - # here. - current_pipeline.post_group( - last_workspace, current_measurements.get_grouping_keys() - ) - del last_workspace + if not last_workspace is None: + last_workspace.interaction_handler = self.interaction_handler + last_workspace.cancel_handler = self.cancel_handler + last_workspace.post_group_display_handler = ( + self.post_group_display_handler + ) + # There might be an exception in this call, but it will be + # handled elsewhere, and there's nothing we can do for it + # here. + current_pipeline.post_group( + last_workspace, current_measurements.get_grouping_keys() + ) + del last_workspace + else: + LOGGER.error("No workspace from last image set, cannot run post group") # send measurements back to server req = MeasurementsReport( @@ -323,7 +330,18 @@ def do_job(self, job): buf=current_measurements.file_contents(), image_set_numbers=image_set_numbers, ) - rep = self.send(req) + + while True: + try: + rep = self.send(req, timeout=4000) + break + except PollTimeoutException: + LOGGER.info(f"Worker sending MeasurementsReport halted, retrying for job {str(job.image_set_numbers)}") + self.work_socket.close(linger=0) + self.work_socket = the_zmq_context.socket(zmq.REQ) + self.work_socket.set_hwm(2000) + self.work_socket.connect(self.work_request_address) + continue except CancelledException: # Main thread received shutdown signal @@ -389,7 +407,7 @@ def omero_login_handler(self): rep = self.send(req) use_omero_credentials(rep.credentials) - def send(self, req, work_socket=None): + def send(self, req, work_socket=None, timeout=None): """Send a request and receive a reply req - request to send @@ -410,7 +428,10 @@ def send(self, req, work_socket=None): req.send_only(work_socket) response = None while response is None: - for socket, state in poller.poll(): + poll_res = poller.poll(timeout) + if len(poll_res) == 0: + raise PollTimeoutException + for socket, state in poll_res: if socket == self.notify_socket and state == zmq.POLLIN: notify_msg = self.notify_socket.recv() if notify_msg == NOTIFY_STOP: