Skip to content

Commit

Permalink
Merge pull request #391 from leondavi/stat_update
Browse files Browse the repository at this point in the history
[stat_update]
  • Loading branch information
leondavi authored Aug 11, 2024
2 parents b71bff9 + 26aa313 commit 9bada32
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 4 deletions.
9 changes: 9 additions & 0 deletions inputJsonsFiles/ConnectionMap/conn_EEG_1d_2c_1s_4r_4w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"connectionsMap":
{
"r1":["mainServer", "r2"],
"r2":["r3", "s1"],
"r3":["r4", "c1"],
"r4":["r1", "c2"]
}
}
117 changes: 117 additions & 0 deletions inputJsonsFiles/DistributedConfig/dc_EEG_1d_2c_1s_4r_4w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{
"nerlnetSettings": {
"frequency": "5",
"batchSize": "10"
},
"mainServer": {
"port": "8081",
"args": ""
},
"apiServer": {
"port": "8082",
"args": ""
},
"devices": [
{
"name": "pc1",
"ipv4": "10.0.0.30",
"entities": "c1,c2,r2,r1,r3,r4,s1,apiServer,mainServer"
}
],
"routers": [
{
"name": "r1",
"port": "8086",
"policy": "0"
},
{
"name": "r2",
"port": "8087",
"policy": "0"
},
{
"name": "r3",
"port": "8088",
"policy": "0"
},
{
"name": "r4",
"port": "8089",
"policy": "0"
}
],
"sources": [
{
"name": "s1",
"port": "8085",
"frequency": "200",
"policy": "0",
"epochs": "1",
"type": "0"
}
],
"clients": [
{
"name": "c1",
"port": "8083",
"workers": "w1,w2"
},
{
"name": "c2",
"port": "8084",
"workers": "w3,w4"
}
],
"workers": [
{
"name": "w1",
"model_sha": "d8df752e0a2e8f01de8f66e9cec941cdbc65d144ecf90ab7713e69d65e7e82aa"
},
{
"name": "w2",
"model_sha": "d8df752e0a2e8f01de8f66e9cec941cdbc65d144ecf90ab7713e69d65e7e82aa"
},
{
"name": "w3",
"model_sha": "d8df752e0a2e8f01de8f66e9cec941cdbc65d144ecf90ab7713e69d65e7e82aa"
},
{
"name": "w4",
"model_sha": "d8df752e0a2e8f01de8f66e9cec941cdbc65d144ecf90ab7713e69d65e7e82aa"
}
],
"model_sha": {
"d8df752e0a2e8f01de8f66e9cec941cdbc65d144ecf90ab7713e69d65e7e82aa": {
"modelType": "0",
"_doc_modelType": " nn:0 | approximation:1 | classification:2 | forecasting:3 | image-classification:4 | text-classification:5 | text-generation:6 | auto-association:7 | autoencoder:8 | ae-classifier:9 |",
"modelArgs": "",
"layersSizes": "70x1x1k5x1x1x64p0s1t0,66x1x64k2x1p0s1,65x1x64k5x1x64x64p0s1t0,61x1x64k2x1p0s1,60x1x64k5x1x64x32p0s1t0,1,32,16,9",
"_doc_layersSizes": "List of postive integers [L0, L1, ..., LN]",
"layerTypesList": "2,4,2,4,2,9,3,3,3",
"_doc_LayerTypes": " Default:0 | Scaling:1 | CNN:2 | Perceptron:3 | Pooling:4 | Probabilistic:5 | LSTM:6 | Reccurrent:7 | Unscaling:8 | Flatten:9 | Bounding:10 |",
"layers_functions": "11,2,11,2,11,1,6,6,11",
"_doc_layers_functions_activation": " Threshold:1 | Sign:2 | Logistic:3 | Tanh:4 | Linear:5 | ReLU:6 | eLU:7 | SeLU:8 | Soft-plus:9 | Soft-sign:10 | Hard-sigmoid:11 |",
"_doc_layer_functions_pooling": " none:1 | Max:2 | Avg:3 |",
"_doc_layer_functions_probabilistic": " Binary:1 | Logistic:2 | Competitive:3 | Softmax:4 |",
"_doc_layer_functions_scaler": " none:1 | MinMax:2 | MeanStd:3 | STD:4 | Log:5 |",
"lossMethod": "2",
"_doc_lossMethod": " SSE:1 | MSE:2 | NSE:3 | MinkowskiE:4 | WSE:5 | CEE:6 |",
"lr": "0.00001",
"_doc_lr": "Positve float",
"epochs": "1",
"_doc_epochs": "Positve Integer",
"optimizer": "5",
"_doc_optimizer": " GD:0 | CGD:1 | SGD:2 | QuasiNeuton:3 | LVM:4 | ADAM:5 |",
"optimizerArgs": "",
"_doc_optimizerArgs": "String",
"infraType": "0",
"_doc_infraType": " opennn:0 | wolfengine:1 |",
"distributedSystemType": "0",
"_doc_distributedSystemType": " none:0 | fedClientAvg:1 | fedServerAvg:2 |",
"distributedSystemArgs": "",
"_doc_distributedSystemArgs": "String",
"distributedSystemToken": "none",
"_doc_distributedSystemToken": "Token that associates distributed group of workers and parameter-server"
}
}
}
41 changes: 41 additions & 0 deletions inputJsonsFiles/experimentsFlow/exp_EEG_1d_2c_1s_4r_4w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"experimentName": "EEG_Valence_Recognition_DEAP",
"experimentType": "classification",
"batchSize": 10,
"csvFilePath": "/home/nerlnet/workspace/1_3_persons_normalize_bins_valence.csv",
"numOfFeatures": "70",
"numOfLabels": "9",
"headersNames": "1,2,3,4,5,6,7,8,9",
"Phases":
[
{
"phaseName": "training_phase",
"phaseType": "training",
"sourcePieces":
[
{
"sourceName": "s1",
"startingSample": "10",
"numOfBatches": "10",
"workers": "w1,w2,w3,w4",
"nerltensorType": "float"
}
]
},
{
"phaseName": "prediction_phase",
"phaseType": "prediction",
"sourcePieces":
[
{
"sourceName": "s1",
"startingSample": "16510",
"numOfBatches": "10",
"workers": "w1,w2,w3,w4",
"nerltensorType": "float"
}
]
}
]
}

18 changes: 14 additions & 4 deletions src_py/apiServer/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_loss_by_source(self , plot : bool = False , saveToFile : bool = False):
"""
pass

def get_loss_ts(self , plot : bool = False , saveToFile : bool = False):
def get_loss_ts(self , plot : bool = False , saveToFile : bool = False, smoothing : bool = False, log_plot : bool = False):
"""
Returns a dictionary of {worker : loss list} for each worker in the experiment.
use plot=True to plot the loss function.
Expand Down Expand Up @@ -83,6 +83,11 @@ def get_loss_ts(self , plot : bool = False , saveToFile : bool = False):

df = pd.DataFrame(loss_dict)
self.loss_ts_pd = df

if smoothing:
for column in df.columns:
for i in range(1, len(df)):
df.at[i, column] = (df.at[i, column] + df.at[i-1, column]) / 2

if plot:
sns.set(style="whitegrid")
Expand All @@ -103,6 +108,9 @@ def get_loss_ts(self , plot : bool = False , saveToFile : bool = False):
if saveToFile:
plt.savefig('training_loss_function.png', bbox_inches='tight')

if log_plot:
plt.yscale('log')

plt.show()
return df

Expand Down Expand Up @@ -282,12 +290,13 @@ def recieved_batches_key(phase_name, source_name, worker_name):
workers_model_db_list = self.nerl_model_db.get_workers_model_db_list()
for source_piece_inst in sources_pieces_list:
source_name = source_piece_inst.get_source_name()
source_epoch = int(globe.components.sourceEpochs[source_name])
target_workers_string = source_piece_inst.get_target_workers()
target_workers_names = target_workers_string.split(',')
for worker_db in workers_model_db_list:
worker_name = worker_db.get_worker_name()
if worker_name in target_workers_names: # Check if the worker is in the target workers list of this source
for batch_id in range(source_piece_inst.get_num_of_batches()):
for batch_id in range(source_epoch * source_piece_inst.get_num_of_batches()):
batch_db = worker_db.get_batch(source_name, str(batch_id))
if batch_db: # if batch is recieved
recieved_batch_key_str = recieved_batches_key(phase_name, source_name, worker_name)
Expand All @@ -311,13 +320,14 @@ def missed_batches_key(phase_name, source_name, worker_name):
for source_piece_inst in sources_pieces_list:
source_name = source_piece_inst.get_source_name()
source_policy = globe.components.sources_policy_dict[source_name] # 0 -> casting , 1 -> round robin, 2 -> random
source_epoch = int(globe.components.sourceEpochs[source_name])
target_workers_string = source_piece_inst.get_target_workers()
target_workers_names = target_workers_string.split(',')
if source_policy == '0': # casting policy
for worker_db in workers_model_db_list:
worker_name = worker_db.get_worker_name()
if worker_name in target_workers_names: # Check if the worker is in the target workers list of this source
for batch_id in range(source_piece_inst.get_num_of_batches()):
for batch_id in range(source_epoch * source_piece_inst.get_num_of_batches()):
batch_db = worker_db.get_batch(source_name, str(batch_id))
if not batch_db: # if batch is missing
missed_batch_key_str = missed_batches_key(phase_name, source_name, worker_name)
Expand All @@ -326,7 +336,7 @@ def missed_batches_key(phase_name, source_name, worker_name):
missed_batches_dict[missed_batch_key_str].append(batch_id)
elif source_policy == '1': # round robin policy
number_of_workers = len(target_workers_names)
batches_indexes = [i for i in range(source_piece_inst.get_num_of_batches())]
batches_indexes = [i for i in range(source_epoch * source_piece_inst.get_num_of_batches())]
batch_worker_tuple = [(batch_index, target_workers_names[batch_index % number_of_workers]) for batch_index in batches_indexes] # (batch_index, worker_name_that_should_recive_the_batch)
worker_batches_dict = {worker_name: [] for worker_name in target_workers_names} # Create a dictionary to hold batches id for each worker
for batch_index, worker_name in batch_worker_tuple:
Expand Down

0 comments on commit 9bada32

Please sign in to comment.