diff --git a/python/fedml/__init__.py b/python/fedml/__init__.py index 677d06b4e5..108ea8ae0b 100644 --- a/python/fedml/__init__.py +++ b/python/fedml/__init__.py @@ -37,7 +37,7 @@ _global_training_type = None _global_comm_backend = None -__version__ = "0.9.2" +__version__ = "0.9.6-dev202502202000" # This is the deployment environment used for different roles (RD/PM/BD/Public Developers). Potential VALUE: local, dev, test, release diff --git a/python/fedml/computing/scheduler/comm_utils/gpu_utils/nvidia_utils.py b/python/fedml/computing/scheduler/comm_utils/gpu_utils/nvidia_utils.py index a6717de8cb..34d0c3be1c 100644 --- a/python/fedml/computing/scheduler/comm_utils/gpu_utils/nvidia_utils.py +++ b/python/fedml/computing/scheduler/comm_utils/gpu_utils/nvidia_utils.py @@ -25,8 +25,7 @@ def get_gpu_cards() -> List[GPUCard]: @staticmethod def get_available_gpu_card_ids(order: str, limit: int, max_load: float, max_memory: float) -> List[int]: - # return GPUtil.getAvailable(order=order, limit=limit, maxLoad=max_load, maxMemory=max_memory) - return GPUtil.getAvailable(order='random', limit=limit) + return GPUtil.getAvailable(order=order, limit=limit, maxLoad=max_load, maxMemory=max_memory) @staticmethod def get_docker_gpu_device_mapping(gpu_ids: List[int], num_gpus: int = 0) -> Optional[Dict]: diff --git a/python/fedml/computing/scheduler/comm_utils/job_monitor.py b/python/fedml/computing/scheduler/comm_utils/job_monitor.py index b8237d93ba..a1d0cc25ec 100644 --- a/python/fedml/computing/scheduler/comm_utils/job_monitor.py +++ b/python/fedml/computing/scheduler/comm_utils/job_monitor.py @@ -207,6 +207,7 @@ def monitor_replicas_number(): res_to_mlops = {} # endpoint_id -> num_replica for endpoint_detail in res_frm_db: + logging.info(f"endpoint_detail: {endpoint_detail}") endpoint_replicas_details = {} if isinstance(endpoint_detail, str): endpoint_replicas_details = json.loads(endpoint_detail) @@ -218,7 +219,8 @@ def monitor_replicas_number(): endpoint_replica_details = {} if isinstance(endpoint_replicas_details["result"], str): endpoint_replica_details = json.loads(endpoint_replicas_details["result"]) - + + logging.info(f"endpoint_replica_details: {endpoint_replica_details}") res_to_mlops[endpoint_replica_details["end_point_id"]] = res_to_mlops.get( endpoint_replica_details["end_point_id"], 0) + 1 @@ -240,13 +242,14 @@ def monitor_replicas_number(): "replicaNumber": int(num_replica), "timestamp": int(time.time() * 1000) } - + logging.info(f"req_body: {req_body}") try: response = requests.post( url, headers=req_header, json=req_body ) + logging.info(f"endpoint_id: {endpoint_id}, response: {response}") if response.status_code != 200: logging.error(f"Failed to send the replica number request to MLOps platform.") else: diff --git a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py index 28d50d5a50..00f18a78e4 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_http_inference_protocol.py @@ -1,4 +1,6 @@ import logging +import time +import uuid import httpx import traceback @@ -12,6 +14,19 @@ class FedMLHttpInference: + _http_client = None # Class variable for shared HTTP client + + @classmethod + async def get_http_client(cls): + if cls._http_client is None: + limits = httpx.Limits( + max_keepalive_connections=100, + max_connections=1000, + keepalive_expiry=60 + ) + cls._http_client = httpx.AsyncClient(limits=limits) + return cls._http_client + def __init__(self): pass @@ -28,8 +43,9 @@ async def is_inference_ready(inference_url, path="ready", timeout=None): # TODO (Raphael): Support more methods and return codes rules. try: - async with httpx.AsyncClient() as client: - ready_response = await client.get(url=ready_url, timeout=timeout) + # async with httpx.AsyncClient() as client: + client = await FedMLHttpInference.get_http_client() + ready_response = await client.get(url=ready_url, timeout=timeout) if isinstance(ready_response, (Response, StreamingResponse)): error_code = ready_response.status_code @@ -88,23 +104,35 @@ async def run_http_inference_with_curl_request( async def stream_generator(inference_url, input_json, method="POST"): - async with httpx.AsyncClient() as client: - async with client.stream(method, inference_url, json=input_json, - timeout=ClientConstants.WORKER_STREAM_API_TIMEOUT) as response: - async for chunk in response.aiter_lines(): - # we consumed a newline, need to put it back - yield f"{chunk}\n" + # async with httpx.AsyncClient() as client: + client = await FedMLHttpInference.get_http_client() + async with client.stream(method, inference_url, json=input_json, + timeout=ClientConstants.WORKER_STREAM_API_TIMEOUT) as response: + async for chunk in response.aiter_lines(): + # we consumed a newline, need to put it back + yield f"{chunk}\n" async def redirect_non_stream_req_to_worker(inference_type, inference_url, model_api_headers, model_inference_json, timeout=None, method="POST"): response_ok = True + # request_id = str(uuid.uuid4())[:8] + # start_time = time.time() + # logging.info(f"[Request-{request_id}] Starting HTTP request to {inference_url}") + try: - async with httpx.AsyncClient() as client: - response = await client.request( - method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout - ) + # async with httpx.AsyncClient() as client: + client = await FedMLHttpInference.get_http_client() + response = await client.request( + method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout + ) + # end_time = time.time() + # elapsed_time = end_time - start_time + # logging.info(f"[Request-{request_id}] Completed HTTP request. Time taken: {elapsed_time:.3f} seconds") except Exception as e: + # end_time = time.time() + # elapsed_time = end_time - start_time + # logging.error(f"[Request-{request_id}] Failed HTTP request after {elapsed_time:.3f} seconds. Error: {str(e)}") response_ok = False model_inference_result = {"error": e} return response_ok, model_inference_result diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py b/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py index 8697d0a62c..1600b58bd2 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_cards.py @@ -59,7 +59,8 @@ def serve_model_on_premise(self, model_name, endpoint_name, master_device_ids, if use_remote: if not self.deploy_model(model_name, device_type, target_devices, "", user_api_key, - additional_params_dict, use_local_deployment, endpoint_id=endpoint_id): + additional_params_dict, use_local_deployment, endpoint_name=endpoint_name, + endpoint_id=endpoint_id): print("Failed to deploy model") return False return True diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_db.py b/python/fedml/computing/scheduler/model_scheduler/device_model_db.py index 606d8c010b..5be1b55ae5 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_db.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_db.py @@ -11,6 +11,8 @@ from fedml.core.common.singleton import Singleton from sqlalchemy.sql import text from typing import List, Dict +import functools +from sqlalchemy import exc Base = declarative_base() @@ -25,7 +27,57 @@ def __init__(self): self.db_engine = None if not hasattr(self, "db_base_dir"): self.db_base_dir = None - + + @staticmethod + def db_operation(func): + """decorator: handle the database operation exceptions""" + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + try: + # open the database connection + self.open_job_db() + # execute the function + return func(self, *args, **kwargs) + except ( + # session state error + exc.InvalidRequestError, # including "prepared state" error + exc.StatementError, # SQL statement execution error + # connection error + exc.DBAPIError, # base class of database API error + exc.OperationalError, # database operation error (e.g. connection failure) + exc.DisconnectionError, # connection disconnected + # transaction error + exc.InvalidatePoolError, # connection pool invalid + exc.TimeoutError, # connection timeout + exc.ResourceClosedError, # resource (e.g. cursor) closed + # concurrent error + exc.PendingRollbackError, # pending rollback transaction + exc.IntegrityError # integrity constraint violation + ) as e: + logging.error(f"Database error in {func.__name__}, rebuilding session: {e}") + # rollback any unfinished transactions + if self.db_connection: + try: + self.db_connection.rollback() + except: + pass + try: + self.db_connection.close() + except: + pass + # set the db connection to None, then open again in open_job_db method + self.db_connection = None + # retry open the database connection + self.open_job_db() + # retry execute the function + return func(self, *args, **kwargs) + except Exception as e: + # other unexpected errors, record logs and raise + logging.error(f"Unexpected error in {func.__name__}: {e}") + self.db_connection = None + raise + return wrapper + @staticmethod def get_instance(): return FedMLModelDatabase() @@ -134,6 +186,7 @@ def get_deployment_status_with_device_id(self, end_point_id, end_point_name, mod return None + @db_operation def delete_deployment_status(self, end_point_id, end_point_name, model_name, model_version=None): self.open_job_db() if model_version is None: @@ -149,6 +202,7 @@ def delete_deployment_status(self, end_point_id, end_point_name, model_name, mod FedMLDeploymentResultInfoModel.model_version == f'{model_version}')).delete() self.db_connection.commit() + @db_operation def delete_deployment_result(self, end_point_id, end_point_name, model_name, model_version=None): self.open_job_db() if model_version is None: @@ -164,6 +218,7 @@ def delete_deployment_result(self, end_point_id, end_point_name, model_name, mod FedMLDeploymentResultInfoModel.model_version == f'{model_version}')).delete() self.db_connection.commit() + @db_operation def delete_deployment_result_with_device_id(self, end_point_id, end_point_name, model_name, device_id): self.open_job_db() self.db_connection.query(FedMLDeploymentResultInfoModel).filter( @@ -173,6 +228,7 @@ def delete_deployment_result_with_device_id(self, end_point_id, end_point_name, FedMLDeploymentResultInfoModel.device_id == f'{device_id}')).delete() self.db_connection.commit() + @db_operation def delete_deployment_result_with_device_id_and_rank(self, end_point_id, end_point_name, model_name, device_id, replica_rank): replica_no = replica_rank + 1 @@ -185,6 +241,7 @@ def delete_deployment_result_with_device_id_and_rank(self, end_point_id, end_poi FedMLDeploymentResultInfoModel.replica_no == f'{replica_no}')).delete() self.db_connection.commit() + @db_operation def delete_deployment_run_info(self, end_point_id): # db / table -> model-deployment.db / "deployment_run_info" self.open_job_db() @@ -343,6 +400,7 @@ def drop_table(self): except Exception as e: pass + @db_operation def get_deployment_results_info(self, end_point_id, end_point_name, model_name, model_version): self.open_job_db() if model_version is None: @@ -358,11 +416,13 @@ def get_deployment_results_info(self, end_point_id, end_point_name, model_name, FedMLDeploymentResultInfoModel.model_version == f'{model_version}')).all() return result_info + @db_operation def _get_all_deployment_results_info(self): self.open_job_db() result_info = self.db_connection.query(FedMLDeploymentResultInfoModel).all() return result_info + @db_operation def set_deployment_results_info(self, end_point_id, end_point_name, model_name, model_version, device_id, deployment_result=None, deployment_status=None, replica_no=None): @@ -402,12 +462,14 @@ def set_deployment_results_info(self, end_point_id, end_point_name, self.db_connection.commit() + @db_operation def get_deployment_run_info(self, end_point_id): self.open_job_db() run_info = self.db_connection.query(FedMLDeploymentRunInfoModel). \ filter_by(end_point_id=f'{end_point_id}').first() return run_info + @db_operation def set_deployment_run_info(self, end_point_id, end_point_name, end_point_status=None, device_info=None, activated=None, token=None): @@ -435,6 +497,7 @@ def set_deployment_run_info(self, end_point_id, end_point_name, self.db_connection.commit() + @db_operation def get_deployment_auth_info(self, end_point_id, end_point_name, model_name): self.open_job_db() run_info = self.db_connection.query(FedMLDeploymentAuthInfoModel). \ @@ -443,6 +506,7 @@ def get_deployment_auth_info(self, end_point_id, end_point_name, model_name): FedMLDeploymentAuthInfoModel.model_name == f'{model_name}')).first() return run_info + @db_operation def set_deployment_auth_info(self, end_point_id, end_point_name, model_name, token): self.open_job_db() auth_info = self.db_connection.query(FedMLDeploymentAuthInfoModel). \ @@ -462,6 +526,7 @@ def set_deployment_auth_info(self, end_point_id, end_point_name, model_name, tok self.db_connection.commit() + @db_operation def get_latest_end_point_metrics(self, end_point_id, end_point_name, model_name, model_version): self.open_job_db() endpoint_metric = self.db_connection.query(FedMLEndPointMetricsModel). \ @@ -473,6 +538,7 @@ def get_latest_end_point_metrics(self, end_point_id, end_point_name, model_name, return endpoint_metric[-1] return None + @db_operation def get_end_point_metrics_by_index(self, end_point_id, end_point_name, model_name, model_version, index): self.open_job_db() endpoint_metric = self.db_connection.query(FedMLEndPointMetricsModel). \ @@ -483,6 +549,7 @@ def get_end_point_metrics_by_index(self, end_point_id, end_point_name, model_nam offset(index).limit(1).first() return endpoint_metric + @db_operation def set_end_point_metrics(self, end_point_id, end_point_name, model_name, model_version, total_latency=None, avg_latency=None, current_latency=None, diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py index baee7a2973..7fab64d9b9 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_deployment.py @@ -65,6 +65,10 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, request_json = dict() logging.info("[Worker] Model deployment is starting...") + logging.info("=" * 80) + logging.info("[Device Model Deployment] Received start deployment request: {}".format(request_json)) + logging.info("=" * 80) + # Real gpu per replica (container-level) num_gpus = gpu_per_replica gpu_ids, gpu_attach_cmd = None, "" @@ -213,6 +217,25 @@ def start_deployment(end_point_id, end_point_name, model_id, model_version, detach=True, command=customized_image_entry_cmd, ) + + logging.info("=" * 80) + logging.info("[Device Model Deployment] Creating container with following parameters:") + logging.info("=" * 80) + logging.info("Image: {}".format(inference_image_name)) + logging.info("Container name: {}".format(default_server_container_name)) + logging.info("Volumes:") + for vol in volumes: + logging.info(" - {}".format(vol)) + logging.info("Ports: [{}]".format(port_inside_container)) + logging.info("Environment variables:") + for key, value in environment.items(): + logging.info(" {} = {}".format(key, value)) + logging.info("Host config:") + for key, value in host_config_dict.items(): + logging.info(" {} = {}".format(key, value)) + logging.info("Command: {}".format(customized_image_entry_cmd)) + logging.info("=" * 80) + client.api.start(container=new_container.get("Id")) except Exception as e: logging.error(f"Failed to create the container with exception {e}, traceback : {traceback.format_exc()}") @@ -319,10 +342,15 @@ def log_deployment_output(end_point_id, model_id, cmd_container_name, cmd_type, if container_obj is not None: out_logs, err_logs = None, None try: - out_logs = container_obj.logs(stdout=True, stderr=False, stream=False, follow=False, - since=last_log_time) - err_logs = container_obj.logs(stdout=False, stderr=True, stream=False, follow=False, - since=last_log_time) + if container_obj.status == "exited": + # If the container has exited, we need to get the whole logs from the container + out_logs = container_obj.logs(stdout=True, stderr=False, stream=False, follow=False) + err_logs = container_obj.logs(stdout=False, stderr=True, stream=False, follow=False) + else: + out_logs = container_obj.logs(stdout=True, stderr=False, stream=False, follow=False, + since=last_log_time) + err_logs = container_obj.logs(stdout=False, stderr=True, stream=False, follow=False, + since=last_log_time) except Exception as e: logging.error(f"Failed to get the logs from the container with exception {e}") pass @@ -332,16 +360,29 @@ def log_deployment_output(end_point_id, model_id, cmd_container_name, cmd_type, if err_logs is not None: err_logs = sys_utils.decode_our_err_result(err_logs) if len(err_logs) > 0: - logging.error(f"{format(err_logs)}") + logging.error(f"[-- Container Error Logs Start --]\n{format(err_logs)}\n[-- Container Error Logs End --]") if out_logs is not None: out_logs = sys_utils.decode_our_err_result(out_logs) if len(out_logs) > 0: - logging.info(f"{format(out_logs)}") + logging.info(f"[-- Container Stdout Logs Start --]\n{format(out_logs)}\n[-- Container Stdout Logs End --]") if container_obj.status == "exited": logging.info("Container {} has exited, automatically remove it".format(cmd_container_name)) + # try to get the logs from the filesystem + if out_logs is None or err_logs is None: + try: + logs_path = f"/var/lib/docker/containers/{container_obj.id}/{container_obj.id}-json.log" + if os.path.exists(logs_path): + with open(logs_path, 'r') as f: + raw_logs = f.readlines() + out_logs = '\n'.join([line for line in raw_logs if '"stream":"stdout"' in line]) + err_logs = '\n'.join([line for line in raw_logs if '"stream":"stderr"' in line]) + logging.error(f"read Container Error Logs from log file: {err_logs}") + except Exception as e: + logging.warning(f"Failed to read logs from filesystem: {str(e)}") + # Save the failed log into ~/.fedml/fedml-model-client/fedml/logs/failed_logs/ # $run_id/$container_name.log try: diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py index 9adc17538d..feabbf321b 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_inference.py @@ -4,6 +4,7 @@ import time import traceback import os +import uuid from typing import Any, Mapping, MutableMapping, Union from urllib.parse import urlparse @@ -46,46 +47,46 @@ class Settings: redis_password=Settings.redis_password) -@api.middleware("http") -async def auth_middleware(request: Request, call_next): - if "/inference" in request.url.path or "/api/v1/predict" in request.url.path: - try: - # Attempt to parse the JSON body. - request_json = await request.json() - except json.JSONDecodeError: - return JSONResponse( - {"error": True, "message": "Invalid JSON."}, - status_code=status.HTTP_400_BAD_REQUEST) - - # Get endpoint's total pending requests. - end_point_id = request_json.get("end_point_id", None) - pending_requests_num = FEDML_MODEL_CACHE.get_pending_requests_counter(end_point_id) - if pending_requests_num: - # Fetch metrics of the past k=3 requests. - pask_k_metrics = FEDML_MODEL_CACHE.get_endpoint_metrics( - end_point_id=end_point_id, - k_recent=3) - - # Get the request timeout from the endpoint settings. - request_timeout_s = FEDML_MODEL_CACHE.get_endpoint_settings(end_point_id) \ - .get(ServerConstants.INFERENCE_REQUEST_TIMEOUT_KEY, ServerConstants.INFERENCE_REQUEST_TIMEOUT_DEFAULT) - - # Only proceed if the past k metrics collection is not empty. - if pask_k_metrics: - # Measure the average latency in seconds(!), hence the 0.001 multiplier. - past_k_latencies_sec = \ - [float(j_obj["current_latency"]) * 0.001 for j_obj in pask_k_metrics] - mean_latency = sum(past_k_latencies_sec) / len(past_k_latencies_sec) - - # If timeout threshold is exceeded then cancel and return time out error. - should_block = (mean_latency * pending_requests_num) > request_timeout_s - if should_block: - return JSONResponse( - {"error": True, "message": "Request timed out."}, - status_code=status.HTTP_504_GATEWAY_TIMEOUT) - - response = await call_next(request) - return response +# @api.middleware("http") +# async def auth_middleware(request: Request, call_next): +# if "/inference" in request.url.path or "/api/v1/predict" in request.url.path: +# try: +# # Attempt to parse the JSON body. +# request_json = await request.json() +# except json.JSONDecodeError: +# return JSONResponse( +# {"error": True, "message": "Invalid JSON."}, +# status_code=status.HTTP_400_BAD_REQUEST) + +# # Get endpoint's total pending requests. +# end_point_id = request_json.get("end_point_id", None) +# pending_requests_num = FEDML_MODEL_CACHE.get_pending_requests_counter(end_point_id) +# if pending_requests_num: +# # Fetch metrics of the past k=3 requests. +# pask_k_metrics = FEDML_MODEL_CACHE.get_endpoint_metrics( +# end_point_id=end_point_id, +# k_recent=3) + +# # Get the request timeout from the endpoint settings. +# request_timeout_s = FEDML_MODEL_CACHE.get_endpoint_settings(end_point_id) \ +# .get(ServerConstants.INFERENCE_REQUEST_TIMEOUT_KEY, ServerConstants.INFERENCE_REQUEST_TIMEOUT_DEFAULT) + +# # Only proceed if the past k metrics collection is not empty. +# if pask_k_metrics: +# # Measure the average latency in seconds(!), hence the 0.001 multiplier. +# past_k_latencies_sec = \ +# [float(j_obj["current_latency"]) * 0.001 for j_obj in pask_k_metrics] +# mean_latency = sum(past_k_latencies_sec) / len(past_k_latencies_sec) + +# # If timeout threshold is exceeded then cancel and return time out error. +# should_block = (mean_latency * pending_requests_num) > request_timeout_s +# if should_block: +# return JSONResponse( +# {"error": True, "message": "Request timed out."}, +# status_code=status.HTTP_504_GATEWAY_TIMEOUT) + +# response = await call_next(request) +# return response @api.on_event("startup") @@ -198,7 +199,6 @@ async def _predict( # Always increase the pending requests counter on a new incoming request. FEDML_MODEL_CACHE.update_pending_requests_counter(end_point_id, increase=True) inference_response = {} - try: in_end_point_id = end_point_id in_end_point_name = input_json.get("end_point_name", None) @@ -259,6 +259,11 @@ async def _predict( input_list["stream"] = input_list.get("stream", stream_flag) output_list = input_json.get("outputs", []) + # request_uuid = str(uuid.uuid4()) # Generate unique request ID + # inference_start_time = time.time() + # start_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(inference_start_time)) + # logging.info(f"[Request {request_uuid}] Starting send_inference_request at {start_time_str}") + # main execution of redirecting the inference request to the idle device inference_response = await send_inference_request( idle_device, @@ -269,6 +274,11 @@ async def _predict( inference_type=in_return_type, connectivity_type=connectivity_type, path=path, request_method=request_method) + + # inference_end_time = time.time() + # end_time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(inference_end_time)) + # inference_duration = inference_end_time - inference_start_time + # logging.info(f"[Request {request_uuid}] Completed send_inference_request at {end_time_str}, duration: {inference_duration:.3f} seconds") # Calculate model metrics try: diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py b/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py index 6ec05f64ed..6a21c880b0 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_msg_object.py @@ -43,31 +43,34 @@ def __init__(self, topic, payload): request_json = json.loads(payload) self.msg_topic = topic self.request_json = request_json - self.run_id = request_json["end_point_id"] - self.end_point_name = request_json["end_point_name"] - self.token = request_json["token"] - self.user_id = request_json["user_id"] - self.user_name = request_json["user_name"] - self.device_ids = request_json["device_ids"] - self.device_objs = request_json["device_objs"] + self.run_id = request_json.get("end_point_id") + self.end_point_name = request_json.get("end_point_name", "") + self.token = request_json.get("token", "") + self.user_id = request_json.get("user_id") + self.user_name = request_json.get("user_name", "") + self.device_ids = request_json.get("device_ids", []) + self.device_objs = request_json.get("device_objs", []) - self.model_config = request_json["model_config"] - self.model_name = self.model_config["model_name"] - self.model_id = self.model_config["model_id"] - self.model_version = self.model_config["model_version"] - self.model_storage_url = self.model_config["model_storage_url"] - self.scale_min = self.model_config.get("instance_scale_min", 0) - self.scale_max = self.model_config.get("instance_scale_max", 0) - self.inference_engine = self.model_config.get("inference_engine", 0) - self.inference_end_point_id = self.run_id + # check if model_config is in request_json and is not None + self.scale_min = 1 + self.max_unavailable_rate = 0.1 + if "model_config" in request_json and request_json["model_config"] is not None: + self.model_config = request_json["model_config"] + self.model_name = self.model_config["model_name"] + self.model_id = self.model_config["model_id"] + self.model_version = self.model_config["model_version"] + self.model_storage_url = self.model_config.get("model_storage_url", "") + self.scale_min = self.model_config.get("instance_scale_min", 1) + self.scale_max = self.model_config.get("instance_scale_max", 1) + self.inference_engine = self.model_config.get("inference_engine") + self.max_unavailable_rate = self.model_config.get("max_unavailable_rate", 0.1) + self.inference_end_point_id = self.run_id self.request_json["run_id"] = self.run_id self.gpu_topology = self.get_devices_avail_gpus() self.gpu_per_replica = self.get_gpu_per_replica() - self.max_unavailable_rate = self.model_config.get("max_unavailable_rate", 0.1) - def get_devices_avail_gpus(self): """ { diff --git a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py index 00b08acfb8..d6829719ad 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_job_runner.py @@ -11,6 +11,7 @@ import fedml from fedml.core.mlops import MLOpsRuntimeLog, MLOpsConfigs from fedml.core.mlops.mlops_runtime_log import MLOpsFormatter +from .device_model_msg_object import FedMLModelMsgObject from .device_client_constants import ClientConstants from .device_model_cache import FedMLModelCache from .device_server_constants import ServerConstants @@ -278,6 +279,17 @@ def process_deployment_result_message(self, topic=None, payload=None): end_point_id, end_point_name, payload_json["model_name"], "", ServerConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, message_center=self.message_center) + + # when report failed to the MLOps, need to delete the replica has successfully deployed and release the gpu + model_config = dict() + model_config["model_name"] = payload_json["model_name"] + model_config["model_id"] = payload_json["model_id"] + model_config["model_version"] = payload_json["model_version"] + # add model_config to the payload for the delete request + payload_json["model_config"] = model_config + payload_for_del_deploy = json.dumps(payload_json) + model_msg_object = FedMLModelMsgObject(topic, payload_for_del_deploy) + self.send_deployment_delete_request_to_edges(payload_for_del_deploy, model_msg_object, message_center=self.message_center) return # Failure handler, send the rollback message to the worker devices only if it has not been rollback @@ -442,21 +454,27 @@ def start_device_inference_gateway(): python_program = get_python_program() inference_port = ServerConstants.get_inference_master_gateway_port() if not ServerConstants.is_running_on_k8s(): - logging.info(f"start the model inference gateway...") inference_gw_cmd = "fedml.computing.scheduler.model_scheduler.device_model_inference:api" inference_gateway_pids = RunProcessUtils.get_pid_from_cmd_line(inference_gw_cmd) if inference_gateway_pids is None or len(inference_gateway_pids) <= 0: cur_dir = os.path.dirname(__file__) fedml_base_dir = os.path.dirname(os.path.dirname(os.path.dirname(cur_dir))) - inference_gateway_process = ServerConstants.exec_console_with_script(f"{python_program} " - f"-m uvicorn {inference_gw_cmd} " - f"--host 0.0.0.0 " - f"--port {str(inference_port)} " - f"--reload --reload-delay 3 " - f"--reload-dir {fedml_base_dir} " - f"--log-level info", - should_capture_stdout=False, - should_capture_stderr=False) + workers = 4 + logging.info(f"start the model inference gateway workers[{workers}] no uvloop/httptools...") + inference_gateway_process = ServerConstants.exec_console_with_script( + f"{python_program} -m uvicorn {inference_gw_cmd} " + f"--host 0.0.0.0 " + f"--port {str(inference_port)} " + f"--workers {workers} " + # f"--loop uvloop " + # f"--http httptools " + f"--limit-concurrency 1024 " + f"--backlog 2048 " + f"--timeout-keep-alive 60 " + f"--log-level warning ", + should_capture_stdout=False, + should_capture_stderr=False + ) return inference_gateway_process else: return inference_gateway_pids[0] diff --git a/python/fedml/computing/scheduler/model_scheduler/master_protocol_manager.py b/python/fedml/computing/scheduler/model_scheduler/master_protocol_manager.py index 9e0d51b588..e06c5162eb 100755 --- a/python/fedml/computing/scheduler/model_scheduler/master_protocol_manager.py +++ b/python/fedml/computing/scheduler/model_scheduler/master_protocol_manager.py @@ -155,6 +155,11 @@ def callback_start_deployment(self, topic, payload): # Get deployment params request_json = json.loads(payload) + + logging.info("=" * 80) + logging.info("[Master Protocol Manager] Received start deployment request: {}".format(request_json)) + logging.info("=" * 80) + run_id = request_json["end_point_id"] end_point_name = request_json["end_point_name"] token = request_json["token"] diff --git a/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py b/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py index c73630fb65..42c53b549a 100755 --- a/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/worker_job_runner.py @@ -261,8 +261,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center, # Send failed result back to master _ = self.send_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, - model_id, model_name, inference_output_url, inference_model_version, inference_port, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, + model_id, model_name, inference_output_url, model_version, inference_port, inference_engine, model_metadata, model_config) self.status_reporter.run_id = self.run_id @@ -272,7 +272,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, # Send failed successful result back to master logging.info("Finished deployment, continue to send results to master...") result_payload = self.send_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port_external, inference_engine, model_metadata, model_config, replica_no=rank + 1, connectivity=connectivity @@ -283,7 +283,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, logging.info("inference_port_external {} != inference_port {}".format( inference_port_external, inference_port)) result_payload = self.construct_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port, inference_engine, model_metadata, model_config, replica_no=rank + 1, connectivity=connectivity @@ -317,7 +317,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, # Report the deletion msg to master result_payload = self.send_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DELETED, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DELETED, model_id, model_name, inference_output_url, model_version, inference_port_external, inference_engine, model_metadata, model_config, replica_no=rank_to_delete + 1) @@ -395,8 +395,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center, run_id, self.edge_id, replica_occupied_gpu_ids) self.send_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, - model_id, model_name, inference_output_url, inference_model_version, inference_port, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED, + model_id, model_name, inference_output_url, model_version, inference_port, inference_engine, model_metadata, model_config) self.status_reporter.run_id = self.run_id @@ -407,7 +407,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, else: logging.info("Finished deployment, continue to send results to master...") result_payload = self.send_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port_external, inference_engine, model_metadata, model_config, replica_no=rank + 1, connectivity=connectivity @@ -417,7 +417,7 @@ def run_impl(self, run_extend_queue_list, sender_message_center, logging.info("inference_port_external {} != inference_port {}".format( inference_port_external, inference_port)) result_payload = self.construct_deployment_results( - end_point_name, self.edge_id, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, + end_point_name, self.edge_id, device_ids, ClientConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED, model_id, model_name, inference_output_url, model_version, inference_port, inference_engine, model_metadata, model_config, replica_no=rank + 1, connectivity=connectivity @@ -441,12 +441,13 @@ def run_impl(self, run_extend_queue_list, sender_message_center, logging.error(f"Unsupported op {op} with op num {op_num}") return False - def construct_deployment_results(self, end_point_name, device_id, model_status, + def construct_deployment_results(self, end_point_name, device_id, device_ids, model_status, model_id, model_name, model_inference_url, model_version, inference_port, inference_engine, model_metadata, model_config, replica_no=1, connectivity=ClientConstants.WORKER_CONNECTIVITY_TYPE_DEFAULT): deployment_results_payload = {"end_point_id": self.run_id, "end_point_name": end_point_name, + "device_ids": device_ids, "model_id": model_id, "model_name": model_name, "model_url": model_inference_url, "model_version": model_version, "port": inference_port, @@ -460,7 +461,7 @@ def construct_deployment_results(self, end_point_name, device_id, model_status, } return deployment_results_payload - def send_deployment_results(self, end_point_name, device_id, model_status, + def send_deployment_results(self, end_point_name, device_id, device_ids, model_status, model_id, model_name, model_inference_url, model_version, inference_port, inference_engine, model_metadata, model_config, replica_no=1, @@ -469,7 +470,7 @@ def send_deployment_results(self, end_point_name, device_id, model_status, self.run_id, device_id) deployment_results_payload = self.construct_deployment_results( - end_point_name, device_id, model_status, + end_point_name, device_id, device_ids, model_status, model_id, model_name, model_inference_url, model_version, inference_port, inference_engine, model_metadata, model_config, replica_no=replica_no, connectivity=connectivity) diff --git a/python/fedml/computing/scheduler/model_scheduler/worker_protocol_manager.py b/python/fedml/computing/scheduler/model_scheduler/worker_protocol_manager.py index b1d0bebc47..7f7b041b77 100755 --- a/python/fedml/computing/scheduler/model_scheduler/worker_protocol_manager.py +++ b/python/fedml/computing/scheduler/model_scheduler/worker_protocol_manager.py @@ -170,7 +170,7 @@ def callback_start_deployment(self, topic, payload): ClientConstants.save_run_process(run_id, process.pid) def callback_delete_deployment(self, topic, payload): - logging.info("[Worker] callback_delete_deployment") + logging.info("[Worker] callback_delete_deployment, topic: {}, payload: {}".format(topic, payload)) # Parse payload as the model message object. model_msg_object = FedMLModelMsgObject(topic, payload) diff --git a/python/fedml/core/mlops/mlops_job_perfs.py b/python/fedml/core/mlops/mlops_job_perfs.py index fe3d921558..fe205323ec 100644 --- a/python/fedml/core/mlops/mlops_job_perfs.py +++ b/python/fedml/core/mlops/mlops_job_perfs.py @@ -175,7 +175,7 @@ def report_job_stats_entry(self, sys_event): logging.debug("exception when reporting job pref: {}.".format(traceback.format_exc())) pass - time.sleep(10) + time.sleep(15) logging.info("Job metrics process is about to exit.") mqtt_mgr.loop_stop() diff --git a/python/fedml/core/mlops/mlops_runtime_log.py b/python/fedml/core/mlops/mlops_runtime_log.py index 0fc5db3d23..4d2d68f50b 100644 --- a/python/fedml/core/mlops/mlops_runtime_log.py +++ b/python/fedml/core/mlops/mlops_runtime_log.py @@ -33,35 +33,39 @@ def __init__(self, run_id, edge_id, log_config_file, filepath): def update_config_and_rotate(self, source, dest): # source = current log file name # dest = log file name (dated) - MLOpsLoggingUtils.acquire_lock() - - # Check if the source and destination files exist. If it does, return - if os.path.exists(source): - # Copy the contents of the source file to the destination file - shutil.copy(source, dest) - # Clear everything in the source file - with open(source, 'w') as src_file: - src_file.truncate(0) - src_file.close() - - config_data = MLOpsLoggingUtils.load_log_config(self.run_id, self.edge_id, - self.log_config_file) - - # Update file name of current log file - config_data[self.rotate_count].file_path = dest - self.rotate_count += 1 - - # Store the rotate count, and corresponding log file name in the config file - rotated_log_file = LogFile(file_path=source) - config_data[self.rotate_count] = rotated_log_file - MLOpsLoggingUtils.save_log_config(run_id=self.run_id, device_id=self.edge_id, - log_config_file=self.log_config_file, - config_data=config_data) - MLOpsLoggingUtils.release_lock() + lock_acquired = MLOpsLoggingUtils.acquire_lock() + try: + # Check if the source and destination files exist. If it does, return + if os.path.exists(source): + # Copy the contents of the source file to the destination file + shutil.copy(source, dest) + # Clear everything in the source file + with open(source, 'w') as src_file: + src_file.truncate(0) + src_file.close() + + config_data = MLOpsLoggingUtils.load_log_config(self.run_id, self.edge_id, + self.log_config_file) + + # Update file name of current log file + config_data[self.rotate_count].file_path = dest + self.rotate_count += 1 + + # Store the rotate count, and corresponding log file name in the config file + rotated_log_file = LogFile(file_path=source) + config_data[self.rotate_count] = rotated_log_file + MLOpsLoggingUtils.save_log_config(run_id=self.run_id, device_id=self.edge_id, + log_config_file=self.log_config_file, + config_data=config_data) + except Exception as e: + raise ValueError("Error updating log config: {}".format(e)) + finally: + if lock_acquired: + MLOpsLoggingUtils.release_lock() def __initialize_config(self): + lock_acquired = MLOpsLoggingUtils.acquire_lock(block=True) try: - MLOpsLoggingUtils.acquire_lock() config_data = MLOpsLoggingUtils.load_log_config(run_id=self.run_id, device_id=self.edge_id, log_config_file=self.log_config_file) if not config_data: @@ -72,7 +76,8 @@ def __initialize_config(self): except Exception as e: raise ValueError("Error initializing log config: {}".format(e)) finally: - MLOpsLoggingUtils.release_lock() + if lock_acquired: + MLOpsLoggingUtils.release_lock() class MLOpsFormatter(logging.Formatter): diff --git a/python/fedml/core/mlops/mlops_runtime_log_daemon.py b/python/fedml/core/mlops/mlops_runtime_log_daemon.py index ff06dc91b3..9218519659 100644 --- a/python/fedml/core/mlops/mlops_runtime_log_daemon.py +++ b/python/fedml/core/mlops/mlops_runtime_log_daemon.py @@ -120,15 +120,19 @@ def log_upload(self, run_id, device_id): line_start_req = line_end_req # Update the uploaded file index - MLOpsLoggingUtils.acquire_lock() - config_data = MLOpsLoggingUtils.load_log_config(run_id, device_id, + lock_acquired = MLOpsLoggingUtils.acquire_lock() + try: + config_data = MLOpsLoggingUtils.load_log_config(run_id, device_id, self.log_config_file) - - config_data[self.file_rotate_count].uploaded_file_index = uploaded_file_index - MLOpsLoggingUtils.save_log_config(run_id=run_id, device_id=device_id, - log_config_file=self.log_config_file, - config_data=config_data) - MLOpsLoggingUtils.release_lock() + config_data[self.file_rotate_count].uploaded_file_index = uploaded_file_index + MLOpsLoggingUtils.save_log_config(run_id=run_id, device_id=device_id, + log_config_file=self.log_config_file, + config_data=config_data) + except Exception as e: + raise ValueError("Error updating log config: {}".format(e)) + finally: + if lock_acquired: + MLOpsLoggingUtils.release_lock() @staticmethod def __format_log_lines(log_lines: list, line_start_req: int, line_end_req: int): @@ -296,37 +300,65 @@ def log_process(self, process_event): print("Log Process exits normally.") def fetch_file_path_and_index(self) -> (str, int): + lock_acquired = False try: upload_file_index = None - MLOpsLoggingUtils.acquire_lock() - config_data = MLOpsLoggingUtils.load_log_config(run_id=self.run_id, device_id=self.device_id, - log_config_file=self.log_config_file) - MLOpsLoggingUtils.release_lock() + # Acquire lock for initial config read + lock_acquired = MLOpsLoggingUtils.acquire_lock() + try: + config_data = MLOpsLoggingUtils.load_log_config( + run_id=self.run_id, + device_id=self.device_id, + log_config_file=self.log_config_file + ) + finally: + if lock_acquired: + MLOpsLoggingUtils.release_lock() + lock_acquired = False + if config_data is not None: config_len = len(config_data) upload_file_config = config_data.get(self.file_rotate_count, None) if upload_file_config is not None: - file_path, uploaded_file_index = upload_file_config.file_path, upload_file_config.uploaded_file_index + file_path = upload_file_config.file_path + uploaded_file_index = upload_file_config.uploaded_file_index shutil.copyfile(file_path, self.log_file_path) - if MLOpsRuntimeLogProcessor.is_file_rotated(self.log_file_path, uploaded_file_index, config_len, - self.file_rotate_count): - MLOpsLoggingUtils.acquire_lock() - config_data = MLOpsLoggingUtils.load_log_config(run_id=self.run_id, device_id=self.device_id, - log_config_file=self.log_config_file) - config_data[self.file_rotate_count].upload_complete = True - MLOpsLoggingUtils.save_log_config(run_id=self.run_id, device_id=self.device_id, - log_config_file=self.log_config_file, config_data=config_data) - MLOpsLoggingUtils.release_lock() + + if MLOpsRuntimeLogProcessor.is_file_rotated( + self.log_file_path, uploaded_file_index, config_len, self.file_rotate_count + ): + # Acquire new lock for config update + lock_acquired = MLOpsLoggingUtils.acquire_lock() + try: + config_data = MLOpsLoggingUtils.load_log_config( + run_id=self.run_id, + device_id=self.device_id, + log_config_file=self.log_config_file + ) + config_data[self.file_rotate_count].upload_complete = True + MLOpsLoggingUtils.save_log_config( + run_id=self.run_id, + device_id=self.device_id, + log_config_file=self.log_config_file, + config_data=config_data + ) + finally: + if lock_acquired: + MLOpsLoggingUtils.release_lock() + lock_acquired = False + self.file_rotate_count += 1 - # Re-fetch file path and index if file is rotated + # Recursive call without holding any locks return self.fetch_file_path_and_index() return uploaded_file_index return upload_file_index + except Exception as e: raise ValueError(f"Failed to open log file. Exception: {e}") finally: - MLOpsLoggingUtils.release_lock() + if lock_acquired: + MLOpsLoggingUtils.release_lock() @staticmethod def is_file_rotated(file_path, uploaded_file_index, config_len, rotate_count): diff --git a/python/fedml/core/mlops/mlops_utils.py b/python/fedml/core/mlops/mlops_utils.py index 8bde9e4299..5fab10dd09 100644 --- a/python/fedml/core/mlops/mlops_utils.py +++ b/python/fedml/core/mlops/mlops_utils.py @@ -1,4 +1,5 @@ import json +import logging import multiprocessing import os import time @@ -76,6 +77,11 @@ class MLOpsLoggingUtils: @staticmethod def acquire_lock(block=True): return MLOpsLoggingUtils._lock.acquire(block) + + # logging.info("acquire_lock start, block: {}".format(block)) + # lock_acquired = MLOpsLoggingUtils._lock.acquire(block) + # logging.info("acquire_lock end, lock_acquired: {}".format(lock_acquired)) + # return lock_acquired @staticmethod def release_lock(): @@ -83,6 +89,27 @@ def release_lock(): MLOpsLoggingUtils._lock.acquire(block=False) MLOpsLoggingUtils._lock.release() + # modify by charlie + # release_lock method may have incorrect implementation: + # -> The acquire(block=False) in release_lock may incorrectly acquire and release the lock, especially in a multi-threaded environment. + # -> If the current thread already holds the lock, acquire(block=False) will fail (return False) in multiprocessing.Lock, + # because the lock is not re-entrant, and cross-thread use may lead to undefined behavior. + # -> Therefore, the acquire call in release_lock may fail, causing subsequent release() to throw an exception, + # or incorrectly release the lock held by other threads. + + # modify by charlie + # acquire the lock and release it in old lock implementation + # perhaps cause the lock is released in the wrong place + # so we need to release the lock directly + # try: + # logging.info("release_lock start") + # MLOpsLoggingUtils._lock.release() + # logging.info("release_lock end") + # except ValueError as e: + # # The lock is not acquired, ignore it + # logging.warning("release_lock error: {}".format(e)) + # pass + @staticmethod def build_log_file_path_with_run_params( run_id, edge_id, log_file_dir, is_server=False, log_file_prefix=None @@ -176,18 +203,28 @@ def save_log_config(run_id, device_id, log_config_file, config_data): log_config_key = "log_config_{}_{}".format(run_id, device_id) log_config = MLOpsLoggingUtils.load_yaml_config(log_config_file) log_config[log_config_key] = MLOpsLoggingUtils.__convert_to_dict(config_data) + # Use with statement to ensure file is properly closed with open(log_config_file, "w") as stream: - yaml.dump(log_config, stream) + # use safe_dump to avoid the problem of the lock + yaml.safe_dump(log_config, stream) except Exception as e: - MLOpsLoggingUtils.release_lock() + # modify by charlie + # Don't release lock here - let caller handle it + # MLOpsLoggingUtils.release_lock() raise ValueError("Error saving log config: {}".format(e)) @staticmethod def load_yaml_config(log_config_file): """Helper function to load a yaml config file""" - if MLOpsLoggingUtils._lock.acquire(block=False): - MLOpsLoggingUtils._lock.release() - raise ValueError("Able to acquire lock. This means lock was not acquired by the caller") + + # modify by charlie + # the lock is acquired in the caller, so the check is not necessary, + # it should be removed to avoid potential exceptions and logical conflicts. + + # if MLOpsLoggingUtils._lock.acquire(block=False): + # MLOpsLoggingUtils._lock.release() + # raise ValueError("Able to acquire lock. This means lock was not acquired by the caller") + if not os.path.exists(log_config_file): MLOpsLoggingUtils.generate_yaml_doc({}, log_config_file) with open(log_config_file, "r") as stream: diff --git a/python/setup.py b/python/setup.py index 032bdb4eed..59be71c5e0 100644 --- a/python/setup.py +++ b/python/setup.py @@ -126,7 +126,7 @@ def finalize_options(self): setup( name="fedml", - version="0.9.2", + version="0.9.6-dev202502202000", author="FedML Team", author_email="ch@fedml.ai", description="A research and production integrated edge-cloud library for "