From 46d766a2ba697bd76dcb68ed16090cc9c4427a9c Mon Sep 17 00:00:00 2001 From: charlieyl Date: Sat, 28 Dec 2024 13:09:45 +0800 Subject: [PATCH 01/28] [logs]add deploy param --- .../device_model_deployment.py | 23 +++++++++++++++++++ .../master_protocol_manager.py | 5 ++++ 2 files changed, 28 insertions(+) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py index baee7a297..647882c84 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py @@ -65,6 +65,10 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, request_json = dict() logging.info("[Worker] Model deployment is starting...") + logging.info("=" * 80) + logging.info("[Device Model Deployment] Received start deployment request: {}".format(request_json)) + logging.info("=" * 80) + # Real gpu per replica (container-level) num_gpus = gpu_per_replica gpu_ids, gpu_attach_cmd = None, "" @@ -213,6 +217,25 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, detach=True, command=customized_image_entry_cmd, ) + + logging.info("=" * 80) + logging.info("[Device Model Deployment] Creating container with following parameters:") + logging.info("=" * 80) + logging.info("Image: {}".format(inference_image_name)) + logging.info("Container name: {}".format(default_server_container_name)) + logging.info("Volumes:") + for vol in volumes: + logging.info(" - {}".format(vol)) + logging.info("Ports: [{}]".format(port_inside_container)) + logging.info("Environment variables:") + for key, value in environment.items(): + logging.info(" {} = {}".format(key, value)) + logging.info("Host config:") + for key, value in host_config_dict.items(): + logging.info(" {} = {}".format(key, value)) + logging.info("Command: {}".format(customized_image_entry_cmd)) + logging.info("=" * 80) + client.api.start(container=new_container.get("Id")) except Exception as e: logging.error(f"Failed to create the container with exception {e}, traceback : {traceback.format_exc()}") diff --git a/python/fedml/computing/scheduler/model_scheduler/master_protocol_manager.py b/python/fedml/computing/scheduler/model_scheduler/master_protocol_manager.py index 9e0d51b58..e06c5162e 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_protocol_manager.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_protocol_manager.py @@ -155,6 +155,11 @@ def callback_start_deployment(self, topic, payload): # Get deployment params request_json = json.loads(payload) + + logging.info("=" * 80) + logging.info("[Master Protocol Manager] Received start deployment request: {}".format(request_json)) + logging.info("=" * 80) + run_id = request_json["end_point_id"] end_point_name = request_json["end_point_name"] token = request_json["token"] From a912c5738a90992fd06fd2394e068dcde9139aa0 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Tue, 11 Feb 2025 12:53:28 +0800 Subject: [PATCH 02/28] [bugfix] Handle deployment failure by deleting deployed replicas and releasing GPU --- .../computing/scheduler/model_scheduler/master_job_runner.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py index 00b08acfb..bc943307f 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py @@ -11,6 +11,7 @@ import fedml from fedml.core.mlops import MLOpsRuntimeLog, MLOpsConfigs from fedml.core.mlops.mlops_runtime_log import MLOpsFormatter +from .device_model_msg_object import FedMLModelMsgObject from .device_client_constants import ClientConstants from .device_model_cache import FedMLModelCache from .device_server_constants import ServerConstants @@ -278,6 +279,9 @@ def process_deployment_result_message(self, topic=None, payload=None): end_point_id, end_point_name, payload_json["model_name"], "", ServerConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, message_center=self.message_center) + # when report failed to the MLOps, need to delete the replica has successfully deployed and release the gpu + model_msg_object = FedMLModelMsgObject(topic, payload) + self.send_deployment_delete_request_to_edges(payload, model_msg_object, message_center=self.message_center) return # Failure handler, send the rollback message to the worker devices only if it has not been rollback From c6bfe20e84915c8d692961a1c9ad6e26f3fdbb0c Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 12 Feb 2025 11:11:47 +0800 Subject: [PATCH 03/28] [refactor] Disable request timeout middleware in device model inference --- .../model_scheduler/device_model_inference.py | 80 +++++++++---------- 1 file changed, 40 insertions(+), 40 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py index 9adc17538..6c37f1032 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py @@ -46,46 +46,46 @@ class Settings: redis_password=Settings.redis_password) -@api.middleware("http") -async def auth_middleware(request: Request, call_next): - if "/inference" in request.url.path or "/api/v1/predict" in request.url.path: - try: - # Attempt to parse the JSON body. - request_json = await request.json() - except json.JSONDecodeError: - return JSONResponse( - {"error": True, "message": "Invalid JSON."}, - status_code=status.HTTP_400_BAD_REQUEST) - - # Get endpoint's total pending requests. - end_point_id = request_json.get("end_point_id", None) - pending_requests_num = FEDML_MODEL_CACHE.get_pending_requests_counter(end_point_id) - if pending_requests_num: - # Fetch metrics of the past k=3 requests. - pask_k_metrics = FEDML_MODEL_CACHE.get_endpoint_metrics( - end_point_id=end_point_id, - k_recent=3) - - # Get the request timeout from the endpoint settings. - request_timeout_s = FEDML_MODEL_CACHE.get_endpoint_settings(end_point_id) \ - .get(ServerConstants.INFERENCE_REQUEST_TIMEOUT_KEY, ServerConstants.INFERENCE_REQUEST_TIMEOUT_DEFAULT) - - # Only proceed if the past k metrics collection is not empty. - if pask_k_metrics: - # Measure the average latency in seconds(!), hence the 0.001 multiplier. - past_k_latencies_sec = \ - [float(j_obj["current_latency"]) * 0.001 for j_obj in pask_k_metrics] - mean_latency = sum(past_k_latencies_sec) / len(past_k_latencies_sec) - - # If timeout threshold is exceeded then cancel and return time out error. - should_block = (mean_latency * pending_requests_num) > request_timeout_s - if should_block: - return JSONResponse( - {"error": True, "message": "Request timed out."}, - status_code=status.HTTP_504_GATEWAY_TIMEOUT) - - response = await call_next(request) - return response +# @api.middleware("http") +# async def auth_middleware(request: Request, call_next): +# if "/inference" in request.url.path or "/api/v1/predict" in request.url.path: +# try: +# # Attempt to parse the JSON body. +# request_json = await request.json() +# except json.JSONDecodeError: +# return JSONResponse( +# {"error": True, "message": "Invalid JSON."}, +# status_code=status.HTTP_400_BAD_REQUEST) + +# # Get endpoint's total pending requests. +# end_point_id = request_json.get("end_point_id", None) +# pending_requests_num = FEDML_MODEL_CACHE.get_pending_requests_counter(end_point_id) +# if pending_requests_num: +# # Fetch metrics of the past k=3 requests. +# pask_k_metrics = FEDML_MODEL_CACHE.get_endpoint_metrics( +# end_point_id=end_point_id, +# k_recent=3) + +# # Get the request timeout from the endpoint settings. +# request_timeout_s = FEDML_MODEL_CACHE.get_endpoint_settings(end_point_id) \ +# .get(ServerConstants.INFERENCE_REQUEST_TIMEOUT_KEY, ServerConstants.INFERENCE_REQUEST_TIMEOUT_DEFAULT) + +# # Only proceed if the past k metrics collection is not empty. +# if pask_k_metrics: +# # Measure the average latency in seconds(!), hence the 0.001 multiplier. +# past_k_latencies_sec = \ +# [float(j_obj["current_latency"]) * 0.001 for j_obj in pask_k_metrics] +# mean_latency = sum(past_k_latencies_sec) / len(past_k_latencies_sec) + +# # If timeout threshold is exceeded then cancel and return time out error. +# should_block = (mean_latency * pending_requests_num) > request_timeout_s +# if should_block: +# return JSONResponse( +# {"error": True, "message": "Request timed out."}, +# status_code=status.HTTP_504_GATEWAY_TIMEOUT) + +# response = await call_next(request) +# return response @api.on_event("startup") From 7134c15c38bb0f06bfa3ddc9d7491dbb3996dcb3 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 12 Feb 2025 12:25:22 +0800 Subject: [PATCH 04/28] add logs --- .../device_http_inference_protocol.py | 12 ++++++++++++ .../model_scheduler/device_model_inference.py | 11 +++++++++++ 2 files changed, 23 insertions(+) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py index 28d50d5a5..f71caa110 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py @@ -1,4 +1,6 @@ import logging +import time +import uuid import httpx import traceback @@ -99,12 +101,22 @@ async def stream_generator(inference_url, input_json, method="POST"): async def redirect_non_stream_req_to_worker(inference_type, inference_url, model_api_headers, model_inference_json, timeout=None, method="POST"): response_ok = True + request_id = str(uuid.uuid4())[:8] # 生成短UUID作为请求ID + start_time = time.time() + logging.info(f"[Request-{request_id}] Starting HTTP request to {inference_url}") + try: async with httpx.AsyncClient() as client: response = await client.request( method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout ) + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"[Request-{request_id}] Completed HTTP request. Time taken: {elapsed_time:.3f} seconds") except Exception as e: + end_time = time.time() + elapsed_time = end_time - start_time + logging.error(f"[Request-{request_id}] Failed HTTP request after {elapsed_time:.3f} seconds. Error: {str(e)}") response_ok = False model_inference_result = {"error": e} return response_ok, model_inference_result diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py index 6c37f1032..35d47f8db 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py @@ -4,6 +4,7 @@ import time import traceback import os +import uuid from typing import Any, Mapping, MutableMapping, Union from urllib.parse import urlparse @@ -198,6 +199,7 @@ async def _predict( # Always increase the pending requests counter on a new incoming request. FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, increase=True) inference_response = {} + request_uuid = str(uuid.uuid4()) # Generate unique request ID try: in_end_point_id = end_point_id @@ -260,6 +262,10 @@ async def _predict( output_list = input_json.get("outputs", []) # main execution of redirecting the inference request to the idle device + inference_start_time = time.time() + start_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(inference_start_time)) + logging.info(f"[Request {request_uuid}] Starting send_inference_request at {start_time_str}") + inference_response = await send_inference_request( idle_device, end_point_id, @@ -269,6 +275,11 @@ async def _predict( inference_type=in_return_type, connectivity_type=connectivity_type, path=path, request_method=request_method) + + inference_end_time = time.time() + end_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(inference_end_time)) + inference_duration = inference_end_time - inference_start_time + logging.info(f"[Request {request_uuid}] Completed send_inference_request at {end_time_str}, duration: {inference_duration:.3f} seconds") # Calculate model metrics try: From f7552c49aa6da10a3cf3148a7982745b9e3f6090 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 12 Feb 2025 13:57:39 +0800 Subject: [PATCH 05/28] [logs] Always enable log file --- python/fedml/core/mlops/mlops_runtime_log.py | 25 ++++++++++---------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/python/fedml/core/mlops/mlops_runtime_log.py b/python/fedml/core/mlops/mlops_runtime_log.py index 0fc5db3d2..258f41344 100644 --- a/python/fedml/core/mlops/mlops_runtime_log.py +++ b/python/fedml/core/mlops/mlops_runtime_log.py @@ -139,10 +139,11 @@ def __init__(self, args): self.stdout_handle = None self.logger = None self.args = args - if hasattr(args, "using_mlops"): - self.should_write_log_file = args.using_mlops - else: - self.should_write_log_file = False + # if hasattr(args, "using_mlops"): + # self.should_write_log_file = args.using_mlops + # else: + # self.should_write_log_file = False + self.should_write_log_file = True if not hasattr(args, "log_file_dir"): setattr(args, "log_file_dir", "./logs") self.log_file_dir = args.log_file_dir @@ -175,14 +176,14 @@ def init_logs(self, log_level=None): self.logger.setLevel(log_level) self.logger.handlers.clear() self.logger.addHandler(self.stdout_handle) - if hasattr(self, "should_write_log_file") and self.should_write_log_file: - run_id, edge_id = self.args.run_id, MLOpsLoggingUtils.get_edge_id_from_args(self.args) - log_config_file = os.path.join(self.log_file_dir, MLOpsLoggingUtils.LOG_CONFIG_FILE) - file_handle = MLOpsFileHandler(filepath=log_file_path, log_config_file=log_config_file, run_id=run_id, - edge_id=edge_id) - file_handle.setFormatter(self.format_str) - file_handle.setLevel(logging.INFO) - self.logger.addHandler(file_handle) + # if hasattr(self, "should_write_log_file") and self.should_write_log_file: + run_id, edge_id = self.args.run_id, MLOpsLoggingUtils.get_edge_id_from_args(self.args) + log_config_file = os.path.join(self.log_file_dir, MLOpsLoggingUtils.LOG_CONFIG_FILE) + file_handle = MLOpsFileHandler(filepath=log_file_path, log_config_file=log_config_file, run_id=run_id, + edge_id=edge_id) + file_handle.setFormatter(self.format_str) + file_handle.setLevel(logging.INFO) + self.logger.addHandler(file_handle) logging.root = self.logger # Rewrite sys.stdout to redirect stdout (i.e print()) to Logger sys.stdout.write = self.logger.info From d3b447a1e0abfa728e413335e8a5decec8c255ba Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 12 Feb 2025 14:16:10 +0800 Subject: [PATCH 06/28] [refactor] Optimize HTTP inference client and log file handling --- .../device_http_inference_protocol.py | 25 +++++++++++++------ python/fedml/core/mlops/mlops_runtime_log.py | 25 +++++++++---------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py index f71caa110..ef98b085b 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py @@ -14,6 +14,15 @@ class FedMLHttpInference: + _http_client = None # Class variable for shared HTTP client + + @classmethod + async def get_http_client(cls): + if cls._http_client is None: + limits = httpx.Limits(max_keepalive_connections=100, max_connections=100) + cls._http_client = httpx.AsyncClient(limits=limits) + return cls._http_client + def __init__(self): pass @@ -101,18 +110,18 @@ async def stream_generator(inference_url, input_json, method="POST"): async def redirect_non_stream_req_to_worker(inference_type, inference_url, model_api_headers, model_inference_json, timeout=None, method="POST"): response_ok = True - request_id = str(uuid.uuid4())[:8] # 生成短UUID作为请求ID + request_id = str(uuid.uuid4())[:8] start_time = time.time() logging.info(f"[Request-{request_id}] Starting HTTP request to {inference_url}") try: - async with httpx.AsyncClient() as client: - response = await client.request( - method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout - ) - end_time = time.time() - elapsed_time = end_time - start_time - logging.info(f"[Request-{request_id}] Completed HTTP request. Time taken: {elapsed_time:.3f} seconds") + client = await FedMLHttpInference.get_http_client() + response = await client.request( + method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout + ) + end_time = time.time() + elapsed_time = end_time - start_time + logging.info(f"[Request-{request_id}] Completed HTTP request. Time taken: {elapsed_time:.3f} seconds") except Exception as e: end_time = time.time() elapsed_time = end_time - start_time diff --git a/python/fedml/core/mlops/mlops_runtime_log.py b/python/fedml/core/mlops/mlops_runtime_log.py index 258f41344..0fc5db3d2 100644 --- a/python/fedml/core/mlops/mlops_runtime_log.py +++ b/python/fedml/core/mlops/mlops_runtime_log.py @@ -139,11 +139,10 @@ def __init__(self, args): self.stdout_handle = None self.logger = None self.args = args - # if hasattr(args, "using_mlops"): - # self.should_write_log_file = args.using_mlops - # else: - # self.should_write_log_file = False - self.should_write_log_file = True + if hasattr(args, "using_mlops"): + self.should_write_log_file = args.using_mlops + else: + self.should_write_log_file = False if not hasattr(args, "log_file_dir"): setattr(args, "log_file_dir", "./logs") self.log_file_dir = args.log_file_dir @@ -176,14 +175,14 @@ def init_logs(self, log_level=None): self.logger.setLevel(log_level) self.logger.handlers.clear() self.logger.addHandler(self.stdout_handle) - # if hasattr(self, "should_write_log_file") and self.should_write_log_file: - run_id, edge_id = self.args.run_id, MLOpsLoggingUtils.get_edge_id_from_args(self.args) - log_config_file = os.path.join(self.log_file_dir, MLOpsLoggingUtils.LOG_CONFIG_FILE) - file_handle = MLOpsFileHandler(filepath=log_file_path, log_config_file=log_config_file, run_id=run_id, - edge_id=edge_id) - file_handle.setFormatter(self.format_str) - file_handle.setLevel(logging.INFO) - self.logger.addHandler(file_handle) + if hasattr(self, "should_write_log_file") and self.should_write_log_file: + run_id, edge_id = self.args.run_id, MLOpsLoggingUtils.get_edge_id_from_args(self.args) + log_config_file = os.path.join(self.log_file_dir, MLOpsLoggingUtils.LOG_CONFIG_FILE) + file_handle = MLOpsFileHandler(filepath=log_file_path, log_config_file=log_config_file, run_id=run_id, + edge_id=edge_id) + file_handle.setFormatter(self.format_str) + file_handle.setLevel(logging.INFO) + self.logger.addHandler(file_handle) logging.root = self.logger # Rewrite sys.stdout to redirect stdout (i.e print()) to Logger sys.stdout.write = self.logger.info From 591011120ed6dda2935dd6915e69566bcc59503a Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 12 Feb 2025 14:45:47 +0800 Subject: [PATCH 07/28] [perf] Optimize Uvicorn server configuration for improved inference gateway performance --- .../device_http_inference_protocol.py | 2 +- .../model_scheduler/master_job_runner.py | 23 +++++++++++-------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py index ef98b085b..7f248ee28 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py @@ -19,7 +19,7 @@ class FedMLHttpInference: @classmethod async def get_http_client(cls): if cls._http_client is None: - limits = httpx.Limits(max_keepalive_connections=100, max_connections=100) + limits = httpx.Limits(max_keepalive_connections=50, max_connections=1000) cls._http_client = httpx.AsyncClient(limits=limits) return cls._http_client diff --git a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py index bc943307f..c61e1d437 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py @@ -452,15 +452,20 @@ def start_device_inference_gateway(): if inference_gateway_pids is None or len(inference_gateway_pids) <= 0: cur_dir = os.path.dirname(__file__) fedml_base_dir = os.path.dirname(os.path.dirname(os.path.dirname(cur_dir))) - inference_gateway_process = ServerConstants.exec_console_with_script(f"{python_program} " - f"-m uvicorn {inference_gw_cmd} " - f"--host 0.0.0.0 " - f"--port {str(inference_port)} " - f"--reload --reload-delay 3 " - f"--reload-dir {fedml_base_dir} " - f"--log-level info", - should_capture_stdout=False, - should_capture_stderr=False) + inference_gateway_process = ServerConstants.exec_console_with_script( + f"{python_program} -m uvicorn {inference_gw_cmd} " + f"--host 0.0.0.0 " + f"--port {str(inference_port)} " + f"--workers 10 " + f"--loop uvloop " + f"--http httptools " + f"--limit-concurrency 1000 " + f"--backlog 2048 " + f"--timeout-keep-alive 75 " + f"--log-level warning ", + should_capture_stdout=False, + should_capture_stderr=False + ) return inference_gateway_process else: return inference_gateway_pids[0] From a7280376a69f631a63909844f0dd7157e4cd3c59 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 12 Feb 2025 16:18:10 +0800 Subject: [PATCH 08/28] [perf] Disable uvloop and httptools in model inference gateway --- .../scheduler/model_scheduler/master_job_runner.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py index c61e1d437..4fb585756 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py @@ -446,19 +446,20 @@ def start_device_inference_gateway(): python_program = get_python_program() inference_port = ServerConstants.get_inference_master_gateway_port() if not ServerConstants.is_running_on_k8s(): - logging.info(f"start the model inference gateway...") inference_gw_cmd = "fedml.computing.scheduler.model_scheduler.device_model_inference:api" inference_gateway_pids = RunProcessUtils.get_pid_from_cmd_line(inference_gw_cmd) if inference_gateway_pids is None or len(inference_gateway_pids) <= 0: cur_dir = os.path.dirname(__file__) fedml_base_dir = os.path.dirname(os.path.dirname(os.path.dirname(cur_dir))) + workers = 10 + logging.info(f"start the model inference gateway workers[{workers}] no uvloop/httptools...") inference_gateway_process = ServerConstants.exec_console_with_script( f"{python_program} -m uvicorn {inference_gw_cmd} " f"--host 0.0.0.0 " f"--port {str(inference_port)} " - f"--workers 10 " - f"--loop uvloop " - f"--http httptools " + f"--workers {workers} " + # f"--loop uvloop " + # f"--http httptools " f"--limit-concurrency 1000 " f"--backlog 2048 " f"--timeout-keep-alive 75 " From 3e1ae10b0785f2cb5bd9f054f37064ad53badb0d Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 12 Feb 2025 16:35:40 +0800 Subject: [PATCH 09/28] [perf] Reduce model inference gateway workers from 10 to 2 --- .../computing/scheduler/model_scheduler/master_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py index 4fb585756..bfc3b5d35 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py @@ -451,7 +451,7 @@ def start_device_inference_gateway(): if inference_gateway_pids is None or len(inference_gateway_pids) <= 0: cur_dir = os.path.dirname(__file__) fedml_base_dir = os.path.dirname(os.path.dirname(os.path.dirname(cur_dir))) - workers = 10 + workers = 2 logging.info(f"start the model inference gateway workers[{workers}] no uvloop/httptools...") inference_gateway_process = ServerConstants.exec_console_with_script( f"{python_program} -m uvicorn {inference_gw_cmd} " From d6d67e82e80b6c6ed20f96a6e6eff392df37f504 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 12 Feb 2025 16:55:20 +0800 Subject: [PATCH 10/28] [perf] Optimize HTTP inference client and Uvicorn server configuration --- .../device_http_inference_protocol.py | 25 ++++++++++++------- .../model_scheduler/master_job_runner.py | 6 ++--- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py index 7f248ee28..4f4c19aae 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py @@ -19,7 +19,11 @@ class FedMLHttpInference: @classmethod async def get_http_client(cls): if cls._http_client is None: - limits = httpx.Limits(max_keepalive_connections=50, max_connections=1000) + limits = httpx.Limits( + max_keepalive_connections=100, + max_connections=1000, + keepalive_expiry=60 + ) cls._http_client = httpx.AsyncClient(limits=limits) return cls._http_client @@ -39,8 +43,9 @@ async def is_inference_ready(inference_url, path="ready", timeout=None): # TODO (Raphael): Support more methods and return codes rules. try: - async with httpx.AsyncClient() as client: - ready_response = await client.get(url=ready_url, timeout=timeout) + # async with httpx.AsyncClient() as client: + client = await FedMLHttpInference.get_http_client() + ready_response = await client.get(url=ready_url, timeout=timeout) if isinstance(ready_response, (Response, StreamingResponse)): error_code = ready_response.status_code @@ -99,12 +104,13 @@ async def run_http_inference_with_curl_request( async def stream_generator(inference_url, input_json, method="POST"): - async with httpx.AsyncClient() as client: - async with client.stream(method, inference_url, json=input_json, - timeout=ClientConstants.WORKER_STREAM_API_TIMEOUT) as response: - async for chunk in response.aiter_lines(): - # we consumed a newline, need to put it back - yield f"{chunk}\n" + # async with httpx.AsyncClient() as client: + client = await FedMLHttpInference.get_http_client() + async with client.stream(method, inference_url, json=input_json, + timeout=ClientConstants.WORKER_STREAM_API_TIMEOUT) as response: + async for chunk in response.aiter_lines(): + # we consumed a newline, need to put it back + yield f"{chunk}\n" async def redirect_non_stream_req_to_worker(inference_type, inference_url, model_api_headers, model_inference_json, @@ -115,6 +121,7 @@ async def redirect_non_stream_req_to_worker(inference_type, inference_url, model logging.info(f"[Request-{request_id}] Starting HTTP request to {inference_url}") try: + # async with httpx.AsyncClient() as client: client = await FedMLHttpInference.get_http_client() response = await client.request( method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout diff --git a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py index bfc3b5d35..aa4abeef8 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py @@ -451,7 +451,7 @@ def start_device_inference_gateway(): if inference_gateway_pids is None or len(inference_gateway_pids) <= 0: cur_dir = os.path.dirname(__file__) fedml_base_dir = os.path.dirname(os.path.dirname(os.path.dirname(cur_dir))) - workers = 2 + workers = 4 logging.info(f"start the model inference gateway workers[{workers}] no uvloop/httptools...") inference_gateway_process = ServerConstants.exec_console_with_script( f"{python_program} -m uvicorn {inference_gw_cmd} " @@ -460,9 +460,9 @@ def start_device_inference_gateway(): f"--workers {workers} " # f"--loop uvloop " # f"--http httptools " - f"--limit-concurrency 1000 " + f"--limit-concurrency 1024 " f"--backlog 2048 " - f"--timeout-keep-alive 75 " + f"--timeout-keep-alive 60 " f"--log-level warning ", should_capture_stdout=False, should_capture_stderr=False From 3bc06666a51b911fc0f3412f3f141a9047976bc1 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Thu, 13 Feb 2025 12:36:06 +0800 Subject: [PATCH 11/28] [perf] Remove verbose logging in model inference request handling --- .../device_http_inference_protocol.py | 18 +++++++++--------- .../model_scheduler/device_model_inference.py | 19 +++++++++---------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py index 4f4c19aae..00f18a78e 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py @@ -116,9 +116,9 @@ async def stream_generator(inference_url, input_json, method="POST"): async def redirect_non_stream_req_to_worker(inference_type, inference_url, model_api_headers, model_inference_json, timeout=None, method="POST"): response_ok = True - request_id = str(uuid.uuid4())[:8] - start_time = time.time() - logging.info(f"[Request-{request_id}] Starting HTTP request to {inference_url}") + # request_id = str(uuid.uuid4())[:8] + # start_time = time.time() + # logging.info(f"[Request-{request_id}] Starting HTTP request to {inference_url}") try: # async with httpx.AsyncClient() as client: @@ -126,13 +126,13 @@ async def redirect_non_stream_req_to_worker(inference_type, inference_url, model response = await client.request( method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout ) - end_time = time.time() - elapsed_time = end_time - start_time - logging.info(f"[Request-{request_id}] Completed HTTP request. Time taken: {elapsed_time:.3f} seconds") + # end_time = time.time() + # elapsed_time = end_time - start_time + # logging.info(f"[Request-{request_id}] Completed HTTP request. Time taken: {elapsed_time:.3f} seconds") except Exception as e: - end_time = time.time() - elapsed_time = end_time - start_time - logging.error(f"[Request-{request_id}] Failed HTTP request after {elapsed_time:.3f} seconds. Error: {str(e)}") + # end_time = time.time() + # elapsed_time = end_time - start_time + # logging.error(f"[Request-{request_id}] Failed HTTP request after {elapsed_time:.3f} seconds. Error: {str(e)}") response_ok = False model_inference_result = {"error": e} return response_ok, model_inference_result diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py index 35d47f8db..feabbf321 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py @@ -199,8 +199,6 @@ async def _predict( # Always increase the pending requests counter on a new incoming request. FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, increase=True) inference_response = {} - request_uuid = str(uuid.uuid4()) # Generate unique request ID - try: in_end_point_id = end_point_id in_end_point_name = input_json.get("end_point_name", None) @@ -261,11 +259,12 @@ async def _predict( input_list["stream"] = input_list.get("stream", stream_flag) output_list = input_json.get("outputs", []) - # main execution of redirecting the inference request to the idle device - inference_start_time = time.time() - start_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(inference_start_time)) - logging.info(f"[Request {request_uuid}] Starting send_inference_request at {start_time_str}") + # request_uuid = str(uuid.uuid4()) # Generate unique request ID + # inference_start_time = time.time() + # start_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(inference_start_time)) + # logging.info(f"[Request {request_uuid}] Starting send_inference_request at {start_time_str}") + # main execution of redirecting the inference request to the idle device inference_response = await send_inference_request( idle_device, end_point_id, @@ -276,10 +275,10 @@ async def _predict( connectivity_type=connectivity_type, path=path, request_method=request_method) - inference_end_time = time.time() - end_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(inference_end_time)) - inference_duration = inference_end_time - inference_start_time - logging.info(f"[Request {request_uuid}] Completed send_inference_request at {end_time_str}, duration: {inference_duration:.3f} seconds") + # inference_end_time = time.time() + # end_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(inference_end_time)) + # inference_duration = inference_end_time - inference_start_time + # logging.info(f"[Request {request_uuid}] Completed send_inference_request at {end_time_str}, duration: {inference_duration:.3f} seconds") # Calculate model metrics try: From ad22de4e398cffe7ef22fec428d73e0a77f59f9b Mon Sep 17 00:00:00 2001 From: charlieyl Date: Thu, 13 Feb 2025 14:56:07 +0800 Subject: [PATCH 12/28] [bugfix-combination] Add model configuration details for deployment failure handling --- .../device_model_msg_object.py | 39 ++++++++++--------- .../model_scheduler/master_job_runner.py | 12 +++++- .../model_scheduler/worker_job_runner.py | 25 ++++++------ .../worker_protocol_manager.py | 2 +- python/setup.py | 2 +- 5 files changed, 46 insertions(+), 34 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py b/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py index 6ec05f64e..5d2ac8319 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py @@ -43,31 +43,34 @@ def __init__(self, topic, payload): request_json = json.loads(payload) self.msg_topic = topic self.request_json = request_json - self.run_id = request_json["end_point_id"] - self.end_point_name = request_json["end_point_name"] - self.token = request_json["token"] - self.user_id = request_json["user_id"] - self.user_name = request_json["user_name"] - self.device_ids = request_json["device_ids"] - self.device_objs = request_json["device_objs"] + self.run_id = request_json.get("end_point_id") + self.end_point_name = request_json.get("end_point_name", "") + self.token = request_json.get("token", "") + self.user_id = request_json.get("user_id") + self.user_name = request_json.get("user_name", "") + self.device_ids = request_json.get("device_ids", []) + self.device_objs = request_json.get("device_objs", []) - self.model_config = request_json["model_config"] - self.model_name = self.model_config["model_name"] - self.model_id = self.model_config["model_id"] - self.model_version = self.model_config["model_version"] - self.model_storage_url = self.model_config["model_storage_url"] - self.scale_min = self.model_config.get("instance_scale_min", 0) - self.scale_max = self.model_config.get("instance_scale_max", 0) - self.inference_engine = self.model_config.get("inference_engine", 0) - self.inference_end_point_id = self.run_id + # check if model_config is in request_json and is not None + self.scale_min = 1 + self.max_unavailable_rate = 0.1 + if "model_config" in request_json and request_json["model_config"] is not None: + self.model_config = request_json["model_config"] + self.model_name = self.model_config["model_name"] + self.model_id = self.model_config["model_id"] + self.model_version = self.model_config["model_version"] + self.model_storage_url = self.model_config["model_storage_url"] + self.scale_min = self.model_config.get("instance_scale_min", 0) + self.scale_max = self.model_config.get("instance_scale_max", 0) + self.inference_engine = self.model_config.get("inference_engine", 0) + self.max_unavailable_rate = self.model_config.get("max_unavailable_rate", 0.1) + self.inference_end_point_id = self.run_id self.request_json["run_id"] = self.run_id self.gpu_topology = self.get_devices_avail_gpus() self.gpu_per_replica = self.get_gpu_per_replica() - self.max_unavailable_rate = self.model_config.get("max_unavailable_rate", 0.1) - def get_devices_avail_gpus(self): """ { diff --git a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py index aa4abeef8..d6829719a 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py @@ -279,9 +279,17 @@ def process_deployment_result_message(self, topic=None, payload=None): end_point_id, end_point_name, payload_json["model_name"], "", ServerConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, message_center=self.message_center) + # when report failed to the MLOps, need to delete the replica has successfully deployed and release the gpu - model_msg_object = FedMLModelMsgObject(topic, payload) - self.send_deployment_delete_request_to_edges(payload, model_msg_object, message_center=self.message_center) + model_config = dict() + model_config["model_name"] = payload_json["model_name"] + model_config["model_id"] = payload_json["model_id"] + model_config["model_version"] = payload_json["model_version"] + # add model_config to the payload for the delete request + payload_json["model_config"] = model_config + payload_for_del_deploy = json.dumps(payload_json) + model_msg_object = FedMLModelMsgObject(topic, payload_for_del_deploy) + self.send_deployment_delete_request_to_edges(payload_for_del_deploy, model_msg_object, message_center=self.message_center) return # Failure handler, send the rollback message to the worker devices only if it has not been rollback diff --git a/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py index c73630fb6..42c53b549 100755 --- a/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py @@ -261,8 +261,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center, # Send failed result back to master _ = self.send_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, - model_id, model_name, inference_output_url, inference_model_version, inference_port, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, + model_id, model_name, inference_output_url, model_version, inference_port, inference_engine, model_metadata, model_config) self.status_reporter.run_id = self.run_id @@ -272,7 +272,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, # Send failed successful result back to master logging.info("Finished deployment, continue to send results to master...") result_payload = self.send_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port_external, inference_engine, model_metadata, model_config, replica_no=rank + 1, connectivity=connectivity @@ -283,7 +283,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, logging.info("inference_port_external {} != inference_port {}".format( inference_port_external, inference_port)) result_payload = self.construct_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port, inference_engine, model_metadata, model_config, replica_no=rank + 1, connectivity=connectivity @@ -317,7 +317,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, # Report the deletion msg to master result_payload = self.send_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DELETED, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DELETED, model_id, model_name, inference_output_url, model_version, inference_port_external, inference_engine, model_metadata, model_config, replica_no=rank_to_delete + 1) @@ -395,8 +395,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center, run_id, self.edge_id, replica_occupied_gpu_ids) self.send_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, - model_id, model_name, inference_output_url, inference_model_version, inference_port, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, + model_id, model_name, inference_output_url, model_version, inference_port, inference_engine, model_metadata, model_config) self.status_reporter.run_id = self.run_id @@ -407,7 +407,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, else: logging.info("Finished deployment, continue to send results to master...") result_payload = self.send_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port_external, inference_engine, model_metadata, model_config, replica_no=rank + 1, connectivity=connectivity @@ -417,7 +417,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, logging.info("inference_port_external {} != inference_port {}".format( inference_port_external, inference_port)) result_payload = self.construct_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port, inference_engine, model_metadata, model_config, replica_no=rank + 1, connectivity=connectivity @@ -441,12 +441,13 @@ def run_impl(self, run_extend_queue_list, sender_message_center, logging.error(f"Unsupported op {op} with op num {op_num}") return False - def construct_deployment_results(self, end_point_name, device_id, model_status, + def construct_deployment_results(self, end_point_name, device_id, device_ids, model_status, model_id, model_name, model_inference_url, model_version, inference_port, inference_engine, model_metadata, model_config, replica_no=1, connectivity=ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT): deployment_results_payload = {"end_point_id": self.run_id, "end_point_name": end_point_name, + "device_ids": device_ids, "model_id": model_id, "model_name": model_name, "model_url": model_inference_url, "model_version": model_version, "port": inference_port, @@ -460,7 +461,7 @@ def construct_deployment_results(self, end_point_name, device_id, model_status, } return deployment_results_payload - def send_deployment_results(self, end_point_name, device_id, model_status, + def send_deployment_results(self, end_point_name, device_id, device_ids, model_status, model_id, model_name, model_inference_url, model_version, inference_port, inference_engine, model_metadata, model_config, replica_no=1, @@ -469,7 +470,7 @@ def send_deployment_results(self, end_point_name, device_id, model_status, self.run_id, device_id) deployment_results_payload = self.construct_deployment_results( - end_point_name, device_id, model_status, + end_point_name, device_id, device_ids, model_status, model_id, model_name, model_inference_url, model_version, inference_port, inference_engine, model_metadata, model_config, replica_no=replica_no, connectivity=connectivity) diff --git a/python/fedml/computing/scheduler/model_scheduler/worker_protocol_manager.py b/python/fedml/computing/scheduler/model_scheduler/worker_protocol_manager.py index b1d0bebc4..7f7b041b7 100755 --- a/python/fedml/computing/scheduler/model_scheduler/worker_protocol_manager.py +++ b/python/fedml/computing/scheduler/model_scheduler/worker_protocol_manager.py @@ -170,7 +170,7 @@ def callback_start_deployment(self, topic, payload): ClientConstants.save_run_process(run_id, process.pid) def callback_delete_deployment(self, topic, payload): - logging.info("[Worker] callback_delete_deployment") + logging.info("[Worker] callback_delete_deployment, topic: {}, payload: {}".format(topic, payload)) # Parse payload as the model message object. model_msg_object = FedMLModelMsgObject(topic, payload) diff --git a/python/setup.py b/python/setup.py index 032bdb4ee..d09bed3c3 100644 --- a/python/setup.py +++ b/python/setup.py @@ -126,7 +126,7 @@ def finalize_options(self): setup( name="fedml", - version="0.9.2", + version="0.9.6-dev", author="FedML Team", author_email="ch@fedml.ai", description="A research and production integrated edge-cloud library for " From 2528f4f082a69333ee66582e69492cf0ed0ba335 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Thu, 13 Feb 2025 17:02:30 +0800 Subject: [PATCH 13/28] [bugfix] Update default model configuration parameters for safer deployment --- .../scheduler/model_scheduler/device_model_msg_object.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py b/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py index 5d2ac8319..6a21c880b 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py @@ -59,10 +59,10 @@ def __init__(self, topic, payload): self.model_name = self.model_config["model_name"] self.model_id = self.model_config["model_id"] self.model_version = self.model_config["model_version"] - self.model_storage_url = self.model_config["model_storage_url"] - self.scale_min = self.model_config.get("instance_scale_min", 0) - self.scale_max = self.model_config.get("instance_scale_max", 0) - self.inference_engine = self.model_config.get("inference_engine", 0) + self.model_storage_url = self.model_config.get("model_storage_url", "") + self.scale_min = self.model_config.get("instance_scale_min", 1) + self.scale_max = self.model_config.get("instance_scale_max", 1) + self.inference_engine = self.model_config.get("inference_engine") self.max_unavailable_rate = self.model_config.get("max_unavailable_rate", 0.1) self.inference_end_point_id = self.run_id From 60be0f71ad3cec4bdcc690b9cd091ce8db19ab13 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Fri, 14 Feb 2025 17:47:22 +0800 Subject: [PATCH 14/28] [feature] Add endpoint_name parameter to model deployment method --- .../computing/scheduler/model_scheduler/device_model_cards.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py b/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py index 8697d0a62..1600b58bd 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py @@ -59,7 +59,8 @@ def serve_model_on_premise(self, model_name, endpoint_name, master_device_ids, if use_remote: if not self.deploy_model(model_name, device_type, target_devices, "", user_api_key, - additional_params_dict, use_local_deployment, endpoint_id=endpoint_id): + additional_params_dict, use_local_deployment, endpoint_name=endpoint_name, + endpoint_id=endpoint_id): print("Failed to deploy model") return False return True From 7853c9064d8c8a45ceee04a50d8d1b57c7d7f690 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Mon, 17 Feb 2025 16:58:41 +0800 Subject: [PATCH 15/28] [bugfix] Restore full GPU card selection parameters in NvidiaGPUtil --- .../computing/scheduler/comm_utils/gpu_utils/nvidia_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/fedml/computing/scheduler/comm_utils/gpu_utils/nvidia_utils.py b/python/fedml/computing/scheduler/comm_utils/gpu_utils/nvidia_utils.py index a6717de8c..34d0c3be1 100644 --- a/python/fedml/computing/scheduler/comm_utils/gpu_utils/nvidia_utils.py +++ b/python/fedml/computing/scheduler/comm_utils/gpu_utils/nvidia_utils.py @@ -25,8 +25,7 @@ def get_gpu_cards() -> List[GPUCard]: @staticmethod def get_available_gpu_card_ids(order: str, limit: int, max_load: float, max_memory: float) -> List[int]: - # return GPUtil.getAvailable(order=order, limit=limit, maxLoad=max_load, maxMemory=max_memory) - return GPUtil.getAvailable(order='random', limit=limit) + return GPUtil.getAvailable(order=order, limit=limit, maxLoad=max_load, maxMemory=max_memory) @staticmethod def get_docker_gpu_device_mapping(gpu_ids: List[int], num_gpus: int = 0) -> Optional[Dict]: From 068ed5141d7fff84b3271a40a0fb4672f3bb86ba Mon Sep 17 00:00:00 2001 From: charlieyl Date: Mon, 17 Feb 2025 18:26:42 +0800 Subject: [PATCH 16/28] [perf] Reduce job metrics reporting sleep interval to 15 seconds and disable MQTT debug logging --- python/fedml/core/mlops/mlops_job_perfs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fedml/core/mlops/mlops_job_perfs.py b/python/fedml/core/mlops/mlops_job_perfs.py index fe3d92155..fe205323e 100644 --- a/python/fedml/core/mlops/mlops_job_perfs.py +++ b/python/fedml/core/mlops/mlops_job_perfs.py @@ -175,7 +175,7 @@ def report_job_stats_entry(self, sys_event): logging.debug("exception when reporting job pref: {}.".format(traceback.format_exc())) pass - time.sleep(10) + time.sleep(15) logging.info("Job metrics process is about to exit.") mqtt_mgr.loop_stop() From 8ac783d7e1f552769f2c7b7c5e24cd67d9c23df9 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Tue, 18 Feb 2025 10:40:21 +0800 Subject: [PATCH 17/28] [chore] Bump version to 0.9.6-dev202502181030 --- python/fedml/__init__.py | 2 +- python/setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/fedml/__init__.py b/python/fedml/__init__.py index 677d06b4e..2ddeac361 100644 --- a/python/fedml/__init__.py +++ b/python/fedml/__init__.py @@ -37,7 +37,7 @@ _global_training_type = None _global_comm_backend = None -__version__ = "0.9.2" +__version__ = "0.9.6-dev202502181030" # This is the deployment environment used for different roles (RD/PM/BD/Public Developers). Potential VALUE: local, dev, test, release diff --git a/python/setup.py b/python/setup.py index d09bed3c3..31d248323 100644 --- a/python/setup.py +++ b/python/setup.py @@ -126,7 +126,7 @@ def finalize_options(self): setup( name="fedml", - version="0.9.6-dev", + version="0.9.6-dev202502181030", author="FedML Team", author_email="ch@fedml.ai", description="A research and production integrated edge-cloud library for " From b11558b3cd09150d4ae60422afa4f2b1b2732eb9 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Tue, 18 Feb 2025 11:12:43 +0800 Subject: [PATCH 18/28] [feature] Enhance container log retrieval for exited containers --- .../device_model_deployment.py | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py index 647882c84..7fab64d9b 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py @@ -342,10 +342,15 @@ def log_deployment_output(end_point_id, model_id, cmd_container_name, cmd_type, if container_obj is not None: out_logs, err_logs = None, None try: - out_logs = container_obj.logs(stdout=True, stderr=False, stream=False, follow=False, - since=last_log_time) - err_logs = container_obj.logs(stdout=False, stderr=True, stream=False, follow=False, - since=last_log_time) + if container_obj.status == "exited": + # If the container has exited, we need to get the whole logs from the container + out_logs = container_obj.logs(stdout=True, stderr=False, stream=False, follow=False) + err_logs = container_obj.logs(stdout=False, stderr=True, stream=False, follow=False) + else: + out_logs = container_obj.logs(stdout=True, stderr=False, stream=False, follow=False, + since=last_log_time) + err_logs = container_obj.logs(stdout=False, stderr=True, stream=False, follow=False, + since=last_log_time) except Exception as e: logging.error(f"Failed to get the logs from the container with exception {e}") pass @@ -355,16 +360,29 @@ def log_deployment_output(end_point_id, model_id, cmd_container_name, cmd_type, if err_logs is not None: err_logs = sys_utils.decode_our_err_result(err_logs) if len(err_logs) > 0: - logging.error(f"{format(err_logs)}") + logging.error(f"[-- Container Error Logs Start --]\n{format(err_logs)}\n[-- Container Error Logs End --]") if out_logs is not None: out_logs = sys_utils.decode_our_err_result(out_logs) if len(out_logs) > 0: - logging.info(f"{format(out_logs)}") + logging.info(f"[-- Container Stdout Logs Start --]\n{format(out_logs)}\n[-- Container Stdout Logs End --]") if container_obj.status == "exited": logging.info("Container {} has exited, automatically remove it".format(cmd_container_name)) + # try to get the logs from the filesystem + if out_logs is None or err_logs is None: + try: + logs_path = f"/var/lib/docker/containers/{container_obj.id}/{container_obj.id}-json.log" + if os.path.exists(logs_path): + with open(logs_path, 'r') as f: + raw_logs = f.readlines() + out_logs = '\n'.join([line for line in raw_logs if '"stream":"stdout"' in line]) + err_logs = '\n'.join([line for line in raw_logs if '"stream":"stderr"' in line]) + logging.error(f"read Container Error Logs from log file: {err_logs}") + except Exception as e: + logging.warning(f"Failed to read logs from filesystem: {str(e)}") + # Save the failed log into ~/.fedml/fedml-model-client/fedml/logs/failed_logs/ # $run_id/$container_name.log try: From f376824cca0c3a31ab02f011834b2e3856078ac0 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 19 Feb 2025 11:19:43 +0800 Subject: [PATCH 19/28] [bugfix] Improve lock handling in MLOps logging utilities --- python/fedml/core/mlops/mlops_runtime_log.py | 8 +++- python/fedml/core/mlops/mlops_utils.py | 45 +++++++++++++++++--- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/python/fedml/core/mlops/mlops_runtime_log.py b/python/fedml/core/mlops/mlops_runtime_log.py index 0fc5db3d2..6b8f5fb9d 100644 --- a/python/fedml/core/mlops/mlops_runtime_log.py +++ b/python/fedml/core/mlops/mlops_runtime_log.py @@ -60,8 +60,11 @@ def update_config_and_rotate(self, source, dest): MLOpsLoggingUtils.release_lock() def __initialize_config(self): + lock_acquired = False try: - MLOpsLoggingUtils.acquire_lock() + lock_acquired = MLOpsLoggingUtils.acquire_lock(block=True) + if not lock_acquired: + raise RuntimeError("Failed to acquire lock") config_data = MLOpsLoggingUtils.load_log_config(run_id=self.run_id, device_id=self.edge_id, log_config_file=self.log_config_file) if not config_data: @@ -72,7 +75,8 @@ def __initialize_config(self): except Exception as e: raise ValueError("Error initializing log config: {}".format(e)) finally: - MLOpsLoggingUtils.release_lock() + if lock_acquired: + MLOpsLoggingUtils.release_lock() class MLOpsFormatter(logging.Formatter): diff --git a/python/fedml/core/mlops/mlops_utils.py b/python/fedml/core/mlops/mlops_utils.py index 8bde9e429..f18a88e21 100644 --- a/python/fedml/core/mlops/mlops_utils.py +++ b/python/fedml/core/mlops/mlops_utils.py @@ -1,4 +1,5 @@ import json +import logging import multiprocessing import os import time @@ -75,13 +76,37 @@ class MLOpsLoggingUtils: @staticmethod def acquire_lock(block=True): - return MLOpsLoggingUtils._lock.acquire(block) + logging.info("acquire_lock start, block: {}".format(block)) + lock_acquired = MLOpsLoggingUtils._lock.acquire(block) + logging.info("acquire_lock end, lock_acquired: {}".format(lock_acquired)) + return lock_acquired @staticmethod def release_lock(): - # Purposefully acquire lock with non-blocking call to make it idempotent - MLOpsLoggingUtils._lock.acquire(block=False) - MLOpsLoggingUtils._lock.release() + # # Purposefully acquire lock with non-blocking call to make it idempotent + # MLOpsLoggingUtils._lock.acquire(block=False) + # MLOpsLoggingUtils._lock.release() + + # modify by charlie + # release_lock method may have incorrect implementation: + # -> The acquire(block=False) in release_lock may incorrectly acquire and release the lock, especially in a multi-threaded environment. + # -> If the current thread already holds the lock, acquire(block=False) will fail (return False) in multiprocessing.Lock, + # because the lock is not re-entrant, and cross-thread use may lead to undefined behavior. + # -> Therefore, the acquire call in release_lock may fail, causing subsequent release() to throw an exception, + # or incorrectly release the lock held by other threads. + + # modify by charlie + # acquire the lock and release it in old lock implementation + # perhaps cause the lock is released in the wrong place + # so we need to release the lock directly + try: + logging.info("release_lock start") + MLOpsLoggingUtils._lock.release() + logging.info("release_lock end") + except ValueError as e: + # The lock is not acquired, ignore it + logging.warning("release_lock error: {}".format(e)) + pass @staticmethod def build_log_file_path_with_run_params( @@ -185,9 +210,15 @@ def save_log_config(run_id, device_id, log_config_file, config_data): @staticmethod def load_yaml_config(log_config_file): """Helper function to load a yaml config file""" - if MLOpsLoggingUtils._lock.acquire(block=False): - MLOpsLoggingUtils._lock.release() - raise ValueError("Able to acquire lock. This means lock was not acquired by the caller") + + # modify by charlie + # the lock is acquired in the caller, so the check is not necessary, + # it should be removed to avoid potential exceptions and logical conflicts. + + # if MLOpsLoggingUtils._lock.acquire(block=False): + # MLOpsLoggingUtils._lock.release() + # raise ValueError("Able to acquire lock. This means lock was not acquired by the caller") + if not os.path.exists(log_config_file): MLOpsLoggingUtils.generate_yaml_doc({}, log_config_file) with open(log_config_file, "r") as stream: From cd976a58bd29d94c44030d6890b501e781452745 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 19 Feb 2025 12:37:49 +0800 Subject: [PATCH 20/28] [refactor] Simplify lock handling in MLOps logging utilities --- python/fedml/core/mlops/mlops_utils.py | 32 ++++++++++++++------------ 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/python/fedml/core/mlops/mlops_utils.py b/python/fedml/core/mlops/mlops_utils.py index f18a88e21..f56e10b52 100644 --- a/python/fedml/core/mlops/mlops_utils.py +++ b/python/fedml/core/mlops/mlops_utils.py @@ -76,16 +76,18 @@ class MLOpsLoggingUtils: @staticmethod def acquire_lock(block=True): - logging.info("acquire_lock start, block: {}".format(block)) - lock_acquired = MLOpsLoggingUtils._lock.acquire(block) - logging.info("acquire_lock end, lock_acquired: {}".format(lock_acquired)) - return lock_acquired + return MLOpsLoggingUtils._lock.acquire(block) + + # logging.info("acquire_lock start, block: {}".format(block)) + # lock_acquired = MLOpsLoggingUtils._lock.acquire(block) + # logging.info("acquire_lock end, lock_acquired: {}".format(lock_acquired)) + # return lock_acquired @staticmethod def release_lock(): - # # Purposefully acquire lock with non-blocking call to make it idempotent - # MLOpsLoggingUtils._lock.acquire(block=False) - # MLOpsLoggingUtils._lock.release() + # Purposefully acquire lock with non-blocking call to make it idempotent + MLOpsLoggingUtils._lock.acquire(block=False) + MLOpsLoggingUtils._lock.release() # modify by charlie # release_lock method may have incorrect implementation: @@ -99,14 +101,14 @@ def release_lock(): # acquire the lock and release it in old lock implementation # perhaps cause the lock is released in the wrong place # so we need to release the lock directly - try: - logging.info("release_lock start") - MLOpsLoggingUtils._lock.release() - logging.info("release_lock end") - except ValueError as e: - # The lock is not acquired, ignore it - logging.warning("release_lock error: {}".format(e)) - pass + # try: + # logging.info("release_lock start") + # MLOpsLoggingUtils._lock.release() + # logging.info("release_lock end") + # except ValueError as e: + # # The lock is not acquired, ignore it + # logging.warning("release_lock error: {}".format(e)) + # pass @staticmethod def build_log_file_path_with_run_params( From add749a2cb7f257b3cf20e5e9abc70d569039345 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 19 Feb 2025 15:48:29 +0800 Subject: [PATCH 21/28] [feature] Add robust database operation error handling decorator --- .../model_scheduler/device_model_db.py | 69 ++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_db.py b/python/fedml/computing/scheduler/model_scheduler/device_model_db.py index 606d8c010..5be1b55ae 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_db.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_db.py @@ -11,6 +11,8 @@ from fedml.core.common.singleton import Singleton from sqlalchemy.sql import text from typing import List, Dict +import functools +from sqlalchemy import exc Base = declarative_base() @@ -25,7 +27,57 @@ def __init__(self): self.db_engine = None if not hasattr(self, "db_base_dir"): self.db_base_dir = None - + + @staticmethod + def db_operation(func): + """decorator: handle the database operation exceptions""" + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + try: + # open the database connection + self.open_job_db() + # execute the function + return func(self, *args, **kwargs) + except ( + # session state error + exc.InvalidRequestError, # including "prepared state" error + exc.StatementError, # SQL statement execution error + # connection error + exc.DBAPIError, # base class of database API error + exc.OperationalError, # database operation error (e.g. connection failure) + exc.DisconnectionError, # connection disconnected + # transaction error + exc.InvalidatePoolError, # connection pool invalid + exc.TimeoutError, # connection timeout + exc.ResourceClosedError, # resource (e.g. cursor) closed + # concurrent error + exc.PendingRollbackError, # pending rollback transaction + exc.IntegrityError # integrity constraint violation + ) as e: + logging.error(f"Database error in {func.__name__}, rebuilding session: {e}") + # rollback any unfinished transactions + if self.db_connection: + try: + self.db_connection.rollback() + except: + pass + try: + self.db_connection.close() + except: + pass + # set the db connection to None, then open again in open_job_db method + self.db_connection = None + # retry open the database connection + self.open_job_db() + # retry execute the function + return func(self, *args, **kwargs) + except Exception as e: + # other unexpected errors, record logs and raise + logging.error(f"Unexpected error in {func.__name__}: {e}") + self.db_connection = None + raise + return wrapper + @staticmethod def get_instance(): return FedMLModelDatabase() @@ -134,6 +186,7 @@ def get_deployment_status_with_device_id(self, end_point_id, end_point_name, mod return None + @db_operation def delete_deployment_status(self, end_point_id, end_point_name, model_name, model_version=None): self.open_job_db() if model_version is None: @@ -149,6 +202,7 @@ def delete_deployment_status(self, end_point_id, end_point_name, model_name, mod FedMLDeploymentResultInfoModel.model_version == f'{model_version}')).delete() self.db_connection.commit() + @db_operation def delete_deployment_result(self, end_point_id, end_point_name, model_name, model_version=None): self.open_job_db() if model_version is None: @@ -164,6 +218,7 @@ def delete_deployment_result(self, end_point_id, end_point_name, model_name, mod FedMLDeploymentResultInfoModel.model_version == f'{model_version}')).delete() self.db_connection.commit() + @db_operation def delete_deployment_result_with_device_id(self, end_point_id, end_point_name, model_name, device_id): self.open_job_db() self.db_connection.query(FedMLDeploymentResultInfoModel).filter( @@ -173,6 +228,7 @@ def delete_deployment_result_with_device_id(self, end_point_id, end_point_name, FedMLDeploymentResultInfoModel.device_id == f'{device_id}')).delete() self.db_connection.commit() + @db_operation def delete_deployment_result_with_device_id_and_rank(self, end_point_id, end_point_name, model_name, device_id, replica_rank): replica_no = replica_rank + 1 @@ -185,6 +241,7 @@ def delete_deployment_result_with_device_id_and_rank(self, end_point_id, end_poi FedMLDeploymentResultInfoModel.replica_no == f'{replica_no}')).delete() self.db_connection.commit() + @db_operation def delete_deployment_run_info(self, end_point_id): # db / table -> model-deployment.db / "deployment_run_info" self.open_job_db() @@ -343,6 +400,7 @@ def drop_table(self): except Exception as e: pass + @db_operation def get_deployment_results_info(self, end_point_id, end_point_name, model_name, model_version): self.open_job_db() if model_version is None: @@ -358,11 +416,13 @@ def get_deployment_results_info(self, end_point_id, end_point_name, model_name, FedMLDeploymentResultInfoModel.model_version == f'{model_version}')).all() return result_info + @db_operation def _get_all_deployment_results_info(self): self.open_job_db() result_info = self.db_connection.query(FedMLDeploymentResultInfoModel).all() return result_info + @db_operation def set_deployment_results_info(self, end_point_id, end_point_name, model_name, model_version, device_id, deployment_result=None, deployment_status=None, replica_no=None): @@ -402,12 +462,14 @@ def set_deployment_results_info(self, end_point_id, end_point_name, self.db_connection.commit() + @db_operation def get_deployment_run_info(self, end_point_id): self.open_job_db() run_info = self.db_connection.query(FedMLDeploymentRunInfoModel). \ filter_by(end_point_id=f'{end_point_id}').first() return run_info + @db_operation def set_deployment_run_info(self, end_point_id, end_point_name, end_point_status=None, device_info=None, activated=None, token=None): @@ -435,6 +497,7 @@ def set_deployment_run_info(self, end_point_id, end_point_name, self.db_connection.commit() + @db_operation def get_deployment_auth_info(self, end_point_id, end_point_name, model_name): self.open_job_db() run_info = self.db_connection.query(FedMLDeploymentAuthInfoModel). \ @@ -443,6 +506,7 @@ def get_deployment_auth_info(self, end_point_id, end_point_name, model_name): FedMLDeploymentAuthInfoModel.model_name == f'{model_name}')).first() return run_info + @db_operation def set_deployment_auth_info(self, end_point_id, end_point_name, model_name, token): self.open_job_db() auth_info = self.db_connection.query(FedMLDeploymentAuthInfoModel). \ @@ -462,6 +526,7 @@ def set_deployment_auth_info(self, end_point_id, end_point_name, model_name, tok self.db_connection.commit() + @db_operation def get_latest_end_point_metrics(self, end_point_id, end_point_name, model_name, model_version): self.open_job_db() endpoint_metric = self.db_connection.query(FedMLEndPointMetricsModel). \ @@ -473,6 +538,7 @@ def get_latest_end_point_metrics(self, end_point_id, end_point_name, model_name, return endpoint_metric[-1] return None + @db_operation def get_end_point_metrics_by_index(self, end_point_id, end_point_name, model_name, model_version, index): self.open_job_db() endpoint_metric = self.db_connection.query(FedMLEndPointMetricsModel). \ @@ -483,6 +549,7 @@ def get_end_point_metrics_by_index(self, end_point_id, end_point_name, model_nam offset(index).limit(1).first() return endpoint_metric + @db_operation def set_end_point_metrics(self, end_point_id, end_point_name, model_name, model_version, total_latency=None, avg_latency=None, current_latency=None, From 46b8ce7459374dafcfc3095e86bf91eacffb5771 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 19 Feb 2025 16:09:22 +0800 Subject: [PATCH 22/28] [upd] Bump version to 0.9.6-dev202502191600 --- python/fedml/__init__.py | 2 +- python/setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/fedml/__init__.py b/python/fedml/__init__.py index 2ddeac361..6ceaecc36 100644 --- a/python/fedml/__init__.py +++ b/python/fedml/__init__.py @@ -37,7 +37,7 @@ _global_training_type = None _global_comm_backend = None -__version__ = "0.9.6-dev202502181030" +__version__ = "0.9.6-dev202502191600" # This is the deployment environment used for different roles (RD/PM/BD/Public Developers). Potential VALUE: local, dev, test, release diff --git a/python/setup.py b/python/setup.py index 31d248323..7cd9a2591 100644 --- a/python/setup.py +++ b/python/setup.py @@ -126,7 +126,7 @@ def finalize_options(self): setup( name="fedml", - version="0.9.6-dev202502181030", + version="0.9.6-dev202502191600", author="FedML Team", author_email="ch@fedml.ai", description="A research and production integrated edge-cloud library for " From 38a930a073bad0d72ae948718193544abb3bb66f Mon Sep 17 00:00:00 2001 From: charlieyl Date: Wed, 19 Feb 2025 19:16:21 +0800 Subject: [PATCH 23/28] [debug] Add logging for endpoint replica information in job monitor --- .../fedml/computing/scheduler/comm_utils/job_monitor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/fedml/computing/scheduler/comm_utils/job_monitor.py b/python/fedml/computing/scheduler/comm_utils/job_monitor.py index b8237d93b..966d28e43 100644 --- a/python/fedml/computing/scheduler/comm_utils/job_monitor.py +++ b/python/fedml/computing/scheduler/comm_utils/job_monitor.py @@ -207,6 +207,7 @@ def monitor_replicas_number(): res_to_mlops = {} # endpoint_id -> num_replica for endpoint_detail in res_frm_db: + logging.info(f"endpoint_detail: {endpoint_detail}") endpoint_replicas_details = {} if isinstance(endpoint_detail, str): endpoint_replicas_details = json.loads(endpoint_detail) @@ -218,11 +219,13 @@ def monitor_replicas_number(): endpoint_replica_details = {} if isinstance(endpoint_replicas_details["result"], str): endpoint_replica_details = json.loads(endpoint_replicas_details["result"]) - + + logging.info(f"endpoint_replica_details: {endpoint_replica_details}") res_to_mlops[endpoint_replica_details["end_point_id"]] = res_to_mlops.get( endpoint_replica_details["end_point_id"], 0) + 1 for endpoint_id, num_replica in res_to_mlops.items(): + logging.info(f"endpoint_id: {endpoint_id}, num_replica: {num_replica}") num_replica_url_path = "fedmlModelServer/api/v1/endpoint/replica-info" mlops_prefix = fedml._get_backend_service() url = f"{mlops_prefix}/{num_replica_url_path}" @@ -240,13 +243,15 @@ def monitor_replicas_number(): "replicaNumber": int(num_replica), "timestamp": int(time.time() * 1000) } - + logging.info(f"req_header: {req_header}") + logging.info(f"req_body: {req_body}") try: response = requests.post( url, headers=req_header, json=req_body ) + logging.info(f"endpoint_id: {endpoint_id}, response: {response}") if response.status_code != 200: logging.error(f"Failed to send the replica number request to MLOps platform.") else: From 69c0d844c554bd57e26590d646fc97dc956490fd Mon Sep 17 00:00:00 2001 From: charlieyl Date: Thu, 20 Feb 2025 18:14:41 +0800 Subject: [PATCH 24/28] [debug] Re-enable logging for model monitoring metrics --- .../computing/scheduler/model_scheduler/device_model_monitor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py b/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py index 472cab84a..5c4901120 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py @@ -141,7 +141,7 @@ def send_monitoring_metrics(self, index): "total_request_num": int(total_request_num), "timestamp": timestamp, "edgeId": device_id} - # logging.info("send monitor metrics {}".format(json.dumps(deployment_monitoring_payload))) + logging.info("send monitor metrics {}".format(json.dumps(deployment_monitoring_payload))) self.monitor_mqtt_mgr.send_message_json(deployment_monitoring_topic, json.dumps(deployment_monitoring_payload)) self.monitor_mqtt_mgr.send_message_json(deployment_monitoring_topic_prefix, From dcfe268f3d111465044a011f461d9c303383015f Mon Sep 17 00:00:00 2001 From: charlieyl Date: Thu, 20 Feb 2025 19:59:13 +0800 Subject: [PATCH 25/28] [refactor] Improve lock handling and error management in MLOps logging utilities --- python/fedml/core/mlops/mlops_runtime_log.py | 59 +++++++------- .../core/mlops/mlops_runtime_log_daemon.py | 80 +++++++++++++------ python/fedml/core/mlops/mlops_utils.py | 8 +- 3 files changed, 92 insertions(+), 55 deletions(-) diff --git a/python/fedml/core/mlops/mlops_runtime_log.py b/python/fedml/core/mlops/mlops_runtime_log.py index 6b8f5fb9d..4d2d68f50 100644 --- a/python/fedml/core/mlops/mlops_runtime_log.py +++ b/python/fedml/core/mlops/mlops_runtime_log.py @@ -33,38 +33,39 @@ def __init__(self, run_id, edge_id, log_config_file, filepath): def update_config_and_rotate(self, source, dest): # source = current log file name # dest = log file name (dated) - MLOpsLoggingUtils.acquire_lock() - - # Check if the source and destination files exist. If it does, return - if os.path.exists(source): - # Copy the contents of the source file to the destination file - shutil.copy(source, dest) - # Clear everything in the source file - with open(source, 'w') as src_file: - src_file.truncate(0) - src_file.close() - - config_data = MLOpsLoggingUtils.load_log_config(self.run_id, self.edge_id, - self.log_config_file) - - # Update file name of current log file - config_data[self.rotate_count].file_path = dest - self.rotate_count += 1 - - # Store the rotate count, and corresponding log file name in the config file - rotated_log_file = LogFile(file_path=source) - config_data[self.rotate_count] = rotated_log_file - MLOpsLoggingUtils.save_log_config(run_id=self.run_id, device_id=self.edge_id, - log_config_file=self.log_config_file, - config_data=config_data) - MLOpsLoggingUtils.release_lock() + lock_acquired = MLOpsLoggingUtils.acquire_lock() + try: + # Check if the source and destination files exist. If it does, return + if os.path.exists(source): + # Copy the contents of the source file to the destination file + shutil.copy(source, dest) + # Clear everything in the source file + with open(source, 'w') as src_file: + src_file.truncate(0) + src_file.close() + + config_data = MLOpsLoggingUtils.load_log_config(self.run_id, self.edge_id, + self.log_config_file) + + # Update file name of current log file + config_data[self.rotate_count].file_path = dest + self.rotate_count += 1 + + # Store the rotate count, and corresponding log file name in the config file + rotated_log_file = LogFile(file_path=source) + config_data[self.rotate_count] = rotated_log_file + MLOpsLoggingUtils.save_log_config(run_id=self.run_id, device_id=self.edge_id, + log_config_file=self.log_config_file, + config_data=config_data) + except Exception as e: + raise ValueError("Error updating log config: {}".format(e)) + finally: + if lock_acquired: + MLOpsLoggingUtils.release_lock() def __initialize_config(self): - lock_acquired = False + lock_acquired = MLOpsLoggingUtils.acquire_lock(block=True) try: - lock_acquired = MLOpsLoggingUtils.acquire_lock(block=True) - if not lock_acquired: - raise RuntimeError("Failed to acquire lock") config_data = MLOpsLoggingUtils.load_log_config(run_id=self.run_id, device_id=self.edge_id, log_config_file=self.log_config_file) if not config_data: diff --git a/python/fedml/core/mlops/mlops_runtime_log_daemon.py b/python/fedml/core/mlops/mlops_runtime_log_daemon.py index ff06dc91b..921851965 100644 --- a/python/fedml/core/mlops/mlops_runtime_log_daemon.py +++ b/python/fedml/core/mlops/mlops_runtime_log_daemon.py @@ -120,15 +120,19 @@ def log_upload(self, run_id, device_id): line_start_req = line_end_req # Update the uploaded file index - MLOpsLoggingUtils.acquire_lock() - config_data = MLOpsLoggingUtils.load_log_config(run_id, device_id, + lock_acquired = MLOpsLoggingUtils.acquire_lock() + try: + config_data = MLOpsLoggingUtils.load_log_config(run_id, device_id, self.log_config_file) - - config_data[self.file_rotate_count].uploaded_file_index = uploaded_file_index - MLOpsLoggingUtils.save_log_config(run_id=run_id, device_id=device_id, - log_config_file=self.log_config_file, - config_data=config_data) - MLOpsLoggingUtils.release_lock() + config_data[self.file_rotate_count].uploaded_file_index = uploaded_file_index + MLOpsLoggingUtils.save_log_config(run_id=run_id, device_id=device_id, + log_config_file=self.log_config_file, + config_data=config_data) + except Exception as e: + raise ValueError("Error updating log config: {}".format(e)) + finally: + if lock_acquired: + MLOpsLoggingUtils.release_lock() @staticmethod def __format_log_lines(log_lines: list, line_start_req: int, line_end_req: int): @@ -296,37 +300,65 @@ def log_process(self, process_event): print("Log Process exits normally.") def fetch_file_path_and_index(self) -> (str, int): + lock_acquired = False try: upload_file_index = None - MLOpsLoggingUtils.acquire_lock() - config_data = MLOpsLoggingUtils.load_log_config(run_id=self.run_id, device_id=self.device_id, - log_config_file=self.log_config_file) - MLOpsLoggingUtils.release_lock() + # Acquire lock for initial config read + lock_acquired = MLOpsLoggingUtils.acquire_lock() + try: + config_data = MLOpsLoggingUtils.load_log_config( + run_id=self.run_id, + device_id=self.device_id, + log_config_file=self.log_config_file + ) + finally: + if lock_acquired: + MLOpsLoggingUtils.release_lock() + lock_acquired = False + if config_data is not None: config_len = len(config_data) upload_file_config = config_data.get(self.file_rotate_count, None) if upload_file_config is not None: - file_path, uploaded_file_index = upload_file_config.file_path, upload_file_config.uploaded_file_index + file_path = upload_file_config.file_path + uploaded_file_index = upload_file_config.uploaded_file_index shutil.copyfile(file_path, self.log_file_path) - if MLOpsRuntimeLogProcessor.is_file_rotated(self.log_file_path, uploaded_file_index, config_len, - self.file_rotate_count): - MLOpsLoggingUtils.acquire_lock() - config_data = MLOpsLoggingUtils.load_log_config(run_id=self.run_id, device_id=self.device_id, - log_config_file=self.log_config_file) - config_data[self.file_rotate_count].upload_complete = True - MLOpsLoggingUtils.save_log_config(run_id=self.run_id, device_id=self.device_id, - log_config_file=self.log_config_file, config_data=config_data) - MLOpsLoggingUtils.release_lock() + + if MLOpsRuntimeLogProcessor.is_file_rotated( + self.log_file_path, uploaded_file_index, config_len, self.file_rotate_count + ): + # Acquire new lock for config update + lock_acquired = MLOpsLoggingUtils.acquire_lock() + try: + config_data = MLOpsLoggingUtils.load_log_config( + run_id=self.run_id, + device_id=self.device_id, + log_config_file=self.log_config_file + ) + config_data[self.file_rotate_count].upload_complete = True + MLOpsLoggingUtils.save_log_config( + run_id=self.run_id, + device_id=self.device_id, + log_config_file=self.log_config_file, + config_data=config_data + ) + finally: + if lock_acquired: + MLOpsLoggingUtils.release_lock() + lock_acquired = False + self.file_rotate_count += 1 - # Re-fetch file path and index if file is rotated + # Recursive call without holding any locks return self.fetch_file_path_and_index() return uploaded_file_index return upload_file_index + except Exception as e: raise ValueError(f"Failed to open log file. Exception: {e}") finally: - MLOpsLoggingUtils.release_lock() + if lock_acquired: + MLOpsLoggingUtils.release_lock() @staticmethod def is_file_rotated(file_path, uploaded_file_index, config_len, rotate_count): diff --git a/python/fedml/core/mlops/mlops_utils.py b/python/fedml/core/mlops/mlops_utils.py index f56e10b52..5fab10dd0 100644 --- a/python/fedml/core/mlops/mlops_utils.py +++ b/python/fedml/core/mlops/mlops_utils.py @@ -203,10 +203,14 @@ def save_log_config(run_id, device_id, log_config_file, config_data): log_config_key = "log_config_{}_{}".format(run_id, device_id) log_config = MLOpsLoggingUtils.load_yaml_config(log_config_file) log_config[log_config_key] = MLOpsLoggingUtils.__convert_to_dict(config_data) + # Use with statement to ensure file is properly closed with open(log_config_file, "w") as stream: - yaml.dump(log_config, stream) + # use safe_dump to avoid the problem of the lock + yaml.safe_dump(log_config, stream) except Exception as e: - MLOpsLoggingUtils.release_lock() + # modify by charlie + # Don't release lock here - let caller handle it + # MLOpsLoggingUtils.release_lock() raise ValueError("Error saving log config: {}".format(e)) @staticmethod From a26e1b3b412582c2f95f4d8624675cd55b61b3b9 Mon Sep 17 00:00:00 2001 From: charlieyl Date: Thu, 20 Feb 2025 20:00:30 +0800 Subject: [PATCH 26/28] [upd] Bump version to 0.9.6-dev202502202000 --- python/fedml/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/fedml/__init__.py b/python/fedml/__init__.py index 6ceaecc36..108ea8ae0 100644 --- a/python/fedml/__init__.py +++ b/python/fedml/__init__.py @@ -37,7 +37,7 @@ _global_training_type = None _global_comm_backend = None -__version__ = "0.9.6-dev202502191600" +__version__ = "0.9.6-dev202502202000" # This is the deployment environment used for different roles (RD/PM/BD/Public Developers). Potential VALUE: local, dev, test, release From cdb32e982193d3c14172354dcb046cb99205c37d Mon Sep 17 00:00:00 2001 From: charlieyl Date: Thu, 20 Feb 2025 20:01:51 +0800 Subject: [PATCH 27/28] [upd] Bump version to 0.9.6-dev202502202000-2 --- python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/setup.py b/python/setup.py index 7cd9a2591..59be71c5e 100644 --- a/python/setup.py +++ b/python/setup.py @@ -126,7 +126,7 @@ def finalize_options(self): setup( name="fedml", - version="0.9.6-dev202502191600", + version="0.9.6-dev202502202000", author="FedML Team", author_email="ch@fedml.ai", description="A research and production integrated edge-cloud library for " From 4dd13dffb9bd189433eefb78ffb18c9a62c56eef Mon Sep 17 00:00:00 2001 From: charlieyl Date: Thu, 20 Feb 2025 20:09:19 +0800 Subject: [PATCH 28/28] [debug] Remove verbose logging in job monitor and model metrics --- python/fedml/computing/scheduler/comm_utils/job_monitor.py | 2 -- .../computing/scheduler/model_scheduler/device_model_monitor.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/python/fedml/computing/scheduler/comm_utils/job_monitor.py b/python/fedml/computing/scheduler/comm_utils/job_monitor.py index 966d28e43..a1d0cc25e 100644 --- a/python/fedml/computing/scheduler/comm_utils/job_monitor.py +++ b/python/fedml/computing/scheduler/comm_utils/job_monitor.py @@ -225,7 +225,6 @@ def monitor_replicas_number(): endpoint_replica_details["end_point_id"], 0) + 1 for endpoint_id, num_replica in res_to_mlops.items(): - logging.info(f"endpoint_id: {endpoint_id}, num_replica: {num_replica}") num_replica_url_path = "fedmlModelServer/api/v1/endpoint/replica-info" mlops_prefix = fedml._get_backend_service() url = f"{mlops_prefix}/{num_replica_url_path}" @@ -243,7 +242,6 @@ def monitor_replicas_number(): "replicaNumber": int(num_replica), "timestamp": int(time.time() * 1000) } - logging.info(f"req_header: {req_header}") logging.info(f"req_body: {req_body}") try: response = requests.post( diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py b/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py index 5c4901120..472cab84a 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py @@ -141,7 +141,7 @@ def send_monitoring_metrics(self, index): "total_request_num": int(total_request_num), "timestamp": timestamp, "edgeId": device_id} - logging.info("send monitor metrics {}".format(json.dumps(deployment_monitoring_payload))) + # logging.info("send monitor metrics {}".format(json.dumps(deployment_monitoring_payload))) self.monitor_mqtt_mgr.send_message_json(deployment_monitoring_topic, json.dumps(deployment_monitoring_payload)) self.monitor_mqtt_mgr.send_message_json(deployment_monitoring_topic_prefix,