Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Improve backend service URL handling and logging #2246

Open
wants to merge 24 commits into
base: dev/v0.7.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
46d766a
[logs]add deploy param
charlieyl Dec 28, 2024
a912c57
[bugfix] Handle deployment failure by deleting deployed replicas and …
charlieyl Feb 11, 2025
c6bfe20
[refactor] Disable request timeout middleware in device model inference
charlieyl Feb 12, 2025
7134c15
add logs
charlieyl Feb 12, 2025
f7552c4
[logs] Always enable log file
charlieyl Feb 12, 2025
d3b447a
[refactor] Optimize HTTP inference client and log file handling
charlieyl Feb 12, 2025
5910111
[perf] Optimize Uvicorn server configuration for improved inference g…
charlieyl Feb 12, 2025
a728037
[perf] Disable uvloop and httptools in model inference gateway
charlieyl Feb 12, 2025
3e1ae10
[perf] Reduce model inference gateway workers from 10 to 2
charlieyl Feb 12, 2025
d6d67e8
[perf] Optimize HTTP inference client and Uvicorn server configuration
charlieyl Feb 12, 2025
3bc0666
[perf] Remove verbose logging in model inference request handling
charlieyl Feb 13, 2025
ad22de4
[bugfix-combination] Add model configuration details for deployment f…
charlieyl Feb 13, 2025
2528f4f
[bugfix] Update default model configuration parameters for safer depl…
charlieyl Feb 13, 2025
60be0f7
[feature] Add endpoint_name parameter to model deployment method
charlieyl Feb 14, 2025
7853c90
[bugfix] Restore full GPU card selection parameters in NvidiaGPUtil
charlieyl Feb 17, 2025
068ed51
[perf] Reduce job metrics reporting sleep interval to 15 seconds and …
charlieyl Feb 17, 2025
8ac783d
[chore] Bump version to 0.9.6-dev202502181030
charlieyl Feb 18, 2025
b11558b
[feature] Enhance container log retrieval for exited containers
charlieyl Feb 18, 2025
a6702a4
Merge pull request #2250 from FedML-AI/dev/v0.7.0
charlieyl Feb 18, 2025
f376824
[bugfix] Improve lock handling in MLOps logging utilities
charlieyl Feb 19, 2025
cd976a5
[refactor] Simplify lock handling in MLOps logging utilities
charlieyl Feb 19, 2025
add749a
[feature] Add robust database operation error handling decorator
charlieyl Feb 19, 2025
46b8ce7
[upd] Bump version to 0.9.6-dev202502191600
charlieyl Feb 19, 2025
38a930a
[debug] Add logging for endpoint replica information in job monitor
charlieyl Feb 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/fedml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
_global_training_type = None
_global_comm_backend = None

__version__ = "0.9.2"
__version__ = "0.9.6-dev202502191600"


# This is the deployment environment used for different roles (RD/PM/BD/Public Developers). Potential VALUE: local, dev, test, release
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ def get_gpu_cards() -> List[GPUCard]:

@staticmethod
def get_available_gpu_card_ids(order: str, limit: int, max_load: float, max_memory: float) -> List[int]:
# return GPUtil.getAvailable(order=order, limit=limit, maxLoad=max_load, maxMemory=max_memory)
return GPUtil.getAvailable(order='random', limit=limit)
return GPUtil.getAvailable(order=order, limit=limit, maxLoad=max_load, maxMemory=max_memory)

@staticmethod
def get_docker_gpu_device_mapping(gpu_ids: List[int], num_gpus: int = 0) -> Optional[Dict]:
Expand Down
9 changes: 7 additions & 2 deletions python/fedml/computing/scheduler/comm_utils/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def monitor_replicas_number():
res_to_mlops = {} # endpoint_id -> num_replica

for endpoint_detail in res_frm_db:
logging.info(f"endpoint_detail: {endpoint_detail}")
endpoint_replicas_details = {}
if isinstance(endpoint_detail, str):
endpoint_replicas_details = json.loads(endpoint_detail)
Expand All @@ -218,11 +219,13 @@ def monitor_replicas_number():
endpoint_replica_details = {}
if isinstance(endpoint_replicas_details["result"], str):
endpoint_replica_details = json.loads(endpoint_replicas_details["result"])


logging.info(f"endpoint_replica_details: {endpoint_replica_details}")
res_to_mlops[endpoint_replica_details["end_point_id"]] = res_to_mlops.get(
endpoint_replica_details["end_point_id"], 0) + 1

for endpoint_id, num_replica in res_to_mlops.items():
logging.info(f"endpoint_id: {endpoint_id}, num_replica: {num_replica}")
num_replica_url_path = "fedmlModelServer/api/v1/endpoint/replica-info"
mlops_prefix = fedml._get_backend_service()
url = f"{mlops_prefix}/{num_replica_url_path}"
Expand All @@ -240,13 +243,15 @@ def monitor_replicas_number():
"replicaNumber": int(num_replica),
"timestamp": int(time.time() * 1000)
}

logging.info(f"req_header: {req_header}")
logging.info(f"req_body: {req_body}")
try:
response = requests.post(
url,
headers=req_header,
json=req_body
)
logging.info(f"endpoint_id: {endpoint_id}, response: {response}")
if response.status_code != 200:
logging.error(f"Failed to send the replica number request to MLOps platform.")
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import time
import uuid

import httpx
import traceback
Expand All @@ -12,6 +14,19 @@


class FedMLHttpInference:
_http_client = None # Class variable for shared HTTP client

@classmethod
async def get_http_client(cls):
if cls._http_client is None:
limits = httpx.Limits(
max_keepalive_connections=100,
max_connections=1000,
keepalive_expiry=60
)
cls._http_client = httpx.AsyncClient(limits=limits)
return cls._http_client

def __init__(self):
pass

Expand All @@ -28,8 +43,9 @@ async def is_inference_ready(inference_url, path="ready", timeout=None):

# TODO (Raphael): Support more methods and return codes rules.
try:
async with httpx.AsyncClient() as client:
ready_response = await client.get(url=ready_url, timeout=timeout)
# async with httpx.AsyncClient() as client:
client = await FedMLHttpInference.get_http_client()
ready_response = await client.get(url=ready_url, timeout=timeout)

if isinstance(ready_response, (Response, StreamingResponse)):
error_code = ready_response.status_code
Expand Down Expand Up @@ -88,23 +104,35 @@ async def run_http_inference_with_curl_request(


async def stream_generator(inference_url, input_json, method="POST"):
async with httpx.AsyncClient() as client:
async with client.stream(method, inference_url, json=input_json,
timeout=ClientConstants.WORKER_STREAM_API_TIMEOUT) as response:
async for chunk in response.aiter_lines():
# we consumed a newline, need to put it back
yield f"{chunk}\n"
# async with httpx.AsyncClient() as client:
client = await FedMLHttpInference.get_http_client()
async with client.stream(method, inference_url, json=input_json,
timeout=ClientConstants.WORKER_STREAM_API_TIMEOUT) as response:
async for chunk in response.aiter_lines():
# we consumed a newline, need to put it back
yield f"{chunk}\n"


async def redirect_non_stream_req_to_worker(inference_type, inference_url, model_api_headers, model_inference_json,
timeout=None, method="POST"):
response_ok = True
# request_id = str(uuid.uuid4())[:8]
# start_time = time.time()
# logging.info(f"[Request-{request_id}] Starting HTTP request to {inference_url}")

try:
async with httpx.AsyncClient() as client:
response = await client.request(
method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout
)
# async with httpx.AsyncClient() as client:
client = await FedMLHttpInference.get_http_client()
response = await client.request(
method=method, url=inference_url, headers=model_api_headers, json=model_inference_json, timeout=timeout
)
# end_time = time.time()
# elapsed_time = end_time - start_time
# logging.info(f"[Request-{request_id}] Completed HTTP request. Time taken: {elapsed_time:.3f} seconds")
except Exception as e:
# end_time = time.time()
# elapsed_time = end_time - start_time
# logging.error(f"[Request-{request_id}] Failed HTTP request after {elapsed_time:.3f} seconds. Error: {str(e)}")
response_ok = False
model_inference_result = {"error": e}
return response_ok, model_inference_result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def serve_model_on_premise(self, model_name, endpoint_name, master_device_ids,

if use_remote:
if not self.deploy_model(model_name, device_type, target_devices, "", user_api_key,
additional_params_dict, use_local_deployment, endpoint_id=endpoint_id):
additional_params_dict, use_local_deployment, endpoint_name=endpoint_name,
endpoint_id=endpoint_id):
print("Failed to deploy model")
return False
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from fedml.core.common.singleton import Singleton
from sqlalchemy.sql import text
from typing import List, Dict
import functools
from sqlalchemy import exc

Base = declarative_base()

Expand All @@ -25,7 +27,57 @@ def __init__(self):
self.db_engine = None
if not hasattr(self, "db_base_dir"):
self.db_base_dir = None


@staticmethod
def db_operation(func):
"""decorator: handle the database operation exceptions"""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
try:
# open the database connection
self.open_job_db()
# execute the function
return func(self, *args, **kwargs)
except (
# session state error
exc.InvalidRequestError, # including "prepared state" error
exc.StatementError, # SQL statement execution error
# connection error
exc.DBAPIError, # base class of database API error
exc.OperationalError, # database operation error (e.g. connection failure)
exc.DisconnectionError, # connection disconnected
# transaction error
exc.InvalidatePoolError, # connection pool invalid
exc.TimeoutError, # connection timeout
exc.ResourceClosedError, # resource (e.g. cursor) closed
# concurrent error
exc.PendingRollbackError, # pending rollback transaction
exc.IntegrityError # integrity constraint violation
) as e:
logging.error(f"Database error in {func.__name__}, rebuilding session: {e}")
# rollback any unfinished transactions
if self.db_connection:
try:
self.db_connection.rollback()
except:
pass
try:
self.db_connection.close()
except:
pass
# set the db connection to None, then open again in open_job_db method
self.db_connection = None
# retry open the database connection
self.open_job_db()
# retry execute the function
return func(self, *args, **kwargs)
except Exception as e:
# other unexpected errors, record logs and raise
logging.error(f"Unexpected error in {func.__name__}: {e}")
self.db_connection = None
raise
return wrapper

@staticmethod
def get_instance():
return FedMLModelDatabase()
Expand Down Expand Up @@ -134,6 +186,7 @@ def get_deployment_status_with_device_id(self, end_point_id, end_point_name, mod

return None

@db_operation
def delete_deployment_status(self, end_point_id, end_point_name, model_name, model_version=None):
self.open_job_db()
if model_version is None:
Expand All @@ -149,6 +202,7 @@ def delete_deployment_status(self, end_point_id, end_point_name, model_name, mod
FedMLDeploymentResultInfoModel.model_version == f'{model_version}')).delete()
self.db_connection.commit()

@db_operation
def delete_deployment_result(self, end_point_id, end_point_name, model_name, model_version=None):
self.open_job_db()
if model_version is None:
Expand All @@ -164,6 +218,7 @@ def delete_deployment_result(self, end_point_id, end_point_name, model_name, mod
FedMLDeploymentResultInfoModel.model_version == f'{model_version}')).delete()
self.db_connection.commit()

@db_operation
def delete_deployment_result_with_device_id(self, end_point_id, end_point_name, model_name, device_id):
self.open_job_db()
self.db_connection.query(FedMLDeploymentResultInfoModel).filter(
Expand All @@ -173,6 +228,7 @@ def delete_deployment_result_with_device_id(self, end_point_id, end_point_name,
FedMLDeploymentResultInfoModel.device_id == f'{device_id}')).delete()
self.db_connection.commit()

@db_operation
def delete_deployment_result_with_device_id_and_rank(self, end_point_id, end_point_name, model_name,
device_id, replica_rank):
replica_no = replica_rank + 1
Expand All @@ -185,6 +241,7 @@ def delete_deployment_result_with_device_id_and_rank(self, end_point_id, end_poi
FedMLDeploymentResultInfoModel.replica_no == f'{replica_no}')).delete()
self.db_connection.commit()

@db_operation
def delete_deployment_run_info(self, end_point_id):
# db / table -> model-deployment.db / "deployment_run_info"
self.open_job_db()
Expand Down Expand Up @@ -343,6 +400,7 @@ def drop_table(self):
except Exception as e:
pass

@db_operation
def get_deployment_results_info(self, end_point_id, end_point_name, model_name, model_version):
self.open_job_db()
if model_version is None:
Expand All @@ -358,11 +416,13 @@ def get_deployment_results_info(self, end_point_id, end_point_name, model_name,
FedMLDeploymentResultInfoModel.model_version == f'{model_version}')).all()
return result_info

@db_operation
def _get_all_deployment_results_info(self):
self.open_job_db()
result_info = self.db_connection.query(FedMLDeploymentResultInfoModel).all()
return result_info

@db_operation
def set_deployment_results_info(self, end_point_id, end_point_name,
model_name, model_version, device_id,
deployment_result=None, deployment_status=None, replica_no=None):
Expand Down Expand Up @@ -402,12 +462,14 @@ def set_deployment_results_info(self, end_point_id, end_point_name,

self.db_connection.commit()

@db_operation
def get_deployment_run_info(self, end_point_id):
self.open_job_db()
run_info = self.db_connection.query(FedMLDeploymentRunInfoModel). \
filter_by(end_point_id=f'{end_point_id}').first()
return run_info

@db_operation
def set_deployment_run_info(self, end_point_id, end_point_name,
end_point_status=None, device_info=None,
activated=None, token=None):
Expand Down Expand Up @@ -435,6 +497,7 @@ def set_deployment_run_info(self, end_point_id, end_point_name,

self.db_connection.commit()

@db_operation
def get_deployment_auth_info(self, end_point_id, end_point_name, model_name):
self.open_job_db()
run_info = self.db_connection.query(FedMLDeploymentAuthInfoModel). \
Expand All @@ -443,6 +506,7 @@ def get_deployment_auth_info(self, end_point_id, end_point_name, model_name):
FedMLDeploymentAuthInfoModel.model_name == f'{model_name}')).first()
return run_info

@db_operation
def set_deployment_auth_info(self, end_point_id, end_point_name, model_name, token):
self.open_job_db()
auth_info = self.db_connection.query(FedMLDeploymentAuthInfoModel). \
Expand All @@ -462,6 +526,7 @@ def set_deployment_auth_info(self, end_point_id, end_point_name, model_name, tok

self.db_connection.commit()

@db_operation
def get_latest_end_point_metrics(self, end_point_id, end_point_name, model_name, model_version):
self.open_job_db()
endpoint_metric = self.db_connection.query(FedMLEndPointMetricsModel). \
Expand All @@ -473,6 +538,7 @@ def get_latest_end_point_metrics(self, end_point_id, end_point_name, model_name,
return endpoint_metric[-1]
return None

@db_operation
def get_end_point_metrics_by_index(self, end_point_id, end_point_name, model_name, model_version, index):
self.open_job_db()
endpoint_metric = self.db_connection.query(FedMLEndPointMetricsModel). \
Expand All @@ -483,6 +549,7 @@ def get_end_point_metrics_by_index(self, end_point_id, end_point_name, model_nam
offset(index).limit(1).first()
return endpoint_metric

@db_operation
def set_end_point_metrics(self, end_point_id, end_point_name,
model_name, model_version,
total_latency=None, avg_latency=None, current_latency=None,
Expand Down
Loading