From 47279d3669772ee87c2932d1fee469929edc9c75 Mon Sep 17 00:00:00 2001 From: GuyPerets106 Date: Thu, 6 Feb 2025 22:09:18 +0000 Subject: [PATCH 1/8] hot fix --- src_py/apiServer/nerl_csv_dataset_db.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src_py/apiServer/nerl_csv_dataset_db.py b/src_py/apiServer/nerl_csv_dataset_db.py index 795a623d9..f7d06c1a5 100644 --- a/src_py/apiServer/nerl_csv_dataset_db.py +++ b/src_py/apiServer/nerl_csv_dataset_db.py @@ -71,6 +71,8 @@ def set_pointer_to_sourcePiece_CsvDataSet(self, pointer_to_sourcePiece_CsvDataSe def set_pointer_to_sourcePiece_CsvDataSet_labels(self, pointer_to_sourcePiece_CsvDataSet_labels): self.pointer_to_sourcePiece_CsvDataSet_labels = pointer_to_sourcePiece_CsvDataSet_labels + + class CsvDataSet(): def __init__(self, csv_path, output_dir: str, batch_size, num_of_features, num_of_labels, headers_row: list): self.csv_path = csv_path @@ -81,6 +83,7 @@ def __init__(self, csv_path, output_dir: str, batch_size, num_of_features, num_o self.num_of_features = num_of_features self.num_of_labels = num_of_labels self.headers_row = headers_row + self.df = pd.read_csv(self.csv_path, header = None) def get_csv_path(self): return self.csv_path @@ -99,11 +102,11 @@ def set_num_of_labels(self, num_of_labels): def get_total_num_of_batches(self): # ! Not always using the whole csv! (should be calculated by the source pieces offsets) - return ceil(pd.read_csv(self.csv_path).shape[0] / self.batch_size) + return ceil(self.df.shape[0] / self.batch_size) def get_total_num_of_samples(self): # ! Not always using the whole csv! (should be calculated by the source pieces offsets) - return pd.read_csv(self.csv_path).shape[0] + 1 # +1 for sample 0 which is the header row + return self.df.shape[0] + 1 # +1 for sample 0 which is the header row def get_headers_row(self): return self.headers_row @@ -119,25 +122,21 @@ def generate_source_piece_ds(self, source_name : str, batch_size: int, phase : s def generate_source_piece_ds_csv_file(self, source_piece_ds_inst: SourcePieceDS, phase : str): skip_rows = source_piece_ds_inst.get_starting_offset() number_of_samples = source_piece_ds_inst.get_num_of_batches() * source_piece_ds_inst.get_batch_size() - df = pd.read_csv(self.csv_path, skiprows = skip_rows, nrows = number_of_samples, header = None) - df = df.rename(columns=df.iloc[0]).drop(df.index[0]) # set the first row as the header, to prevent sending of the header row + df = self.df[skip_rows:(skip_rows + number_of_samples)] # slicing creates a copy of the data df_features = df.iloc[:, :int(self.get_num_of_features())] # from 0 column to num_of_features column (bun not including num_of_features column) - #print(f'df_features: {df_features}') - #df_labels = df.iloc[:, int(self.get_num_of_features()):] # from num_of_features column to the end of the dataframe - #print(f'df_labels: {df_labels}') source_piece_file_path = f'{self.output_dir}/{source_piece_ds_inst.get_source_name()}_data.csv' if phase == PHASE_TRAINING_STR: df_train = df.iloc[:, :int(self.get_num_of_features()) + int(self.get_num_of_labels())] # from 0 column to num_of_features + num_of_labels column (bun not including num_of_features + num_of_labels column) - df_train.to_csv(source_piece_file_path, index = False) + df_train.to_csv(source_piece_file_path, index = False, header = False) elif phase == PHASE_PREDICTION_STR: - df_features.to_csv(source_piece_file_path, index = False) + df_features.to_csv(source_piece_file_path, index = False, header = False) return source_piece_file_path def genrate_source_piece_ds_csv_file_labels(self, source_piece_ds_inst: SourcePieceDS, phase: str): if phase == PHASE_PREDICTION_STR: # only for prediction phase skip_rows = source_piece_ds_inst.get_starting_offset() number_of_samples = source_piece_ds_inst.get_num_of_batches() * source_piece_ds_inst.get_batch_size() - df = pd.read_csv(self.csv_path, skiprows = skip_rows, nrows = number_of_samples, header = None) + df = self.df[skip_rows:(skip_rows + number_of_samples)] df_labels = df.iloc[:, int(self.get_num_of_features()):int(self.get_num_of_features()) + int(self.get_num_of_labels())] # from num_of_features column to the end of the dataframe df_labels.columns = self.headers_row source_piece_file_path_labels = f'{self.output_dir}/{source_piece_ds_inst.get_source_name()}_data_labels.csv' From f1bae2457aedffd9974c95c64d4ed4caed0558cf Mon Sep 17 00:00:00 2001 From: GuyPerets106 Date: Thu, 6 Feb 2025 22:20:31 +0000 Subject: [PATCH 2/8] add print --- src_py/apiServer/experiment_flow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src_py/apiServer/experiment_flow.py b/src_py/apiServer/experiment_flow.py index 8bdbf6bb8..1e23352f5 100644 --- a/src_py/apiServer/experiment_flow.py +++ b/src_py/apiServer/experiment_flow.py @@ -120,6 +120,7 @@ def parse_experiment_flow_json(self, json_path : str, override_csv_path = ""): source_piece_csv_file = self.csv_dataset.generate_source_piece_ds_csv_file(source_piece_inst, phase_type) source_piece_inst.set_pointer_to_sourcePiece_CsvDataSet(source_piece_csv_file) source_pieces_inst_list.append(source_piece_inst) + LOG_INFO(f"phase {phase} source pieces parsed and generated.") self.add_phase(phase_name, phase_type, source_pieces_inst_list, num_of_features) From e6d6a6f320219e6439cfd91a05e03a4b13f0cb6d Mon Sep 17 00:00:00 2001 From: GuyPerets106 Date: Thu, 6 Feb 2025 22:33:06 +0000 Subject: [PATCH 3/8] Fixed print, JSONs --- .../ConnectionMap/conn_paper_25bs_5hz.json | 7 + .../DistributedConfig/dc_paper_25bs_5hz.json | 146 ++++++++++++++++++ .../DistributedConfig/dc_paper_cifar.json | 146 ++++++++++++++++++ .../experimentsFlow/exp_paper_cifar.json | 140 +++++++++++++++++ .../exp_paper_mnist_25bs_5hz.json | 56 +++++++ src_py/apiServer/experiment_flow.py | 2 +- 6 files changed, 496 insertions(+), 1 deletion(-) create mode 100644 inputJsonsFiles/ConnectionMap/conn_paper_25bs_5hz.json create mode 100644 inputJsonsFiles/DistributedConfig/dc_paper_25bs_5hz.json create mode 100644 inputJsonsFiles/DistributedConfig/dc_paper_cifar.json create mode 100644 inputJsonsFiles/experimentsFlow/exp_paper_cifar.json create mode 100644 inputJsonsFiles/experimentsFlow/exp_paper_mnist_25bs_5hz.json diff --git a/inputJsonsFiles/ConnectionMap/conn_paper_25bs_5hz.json b/inputJsonsFiles/ConnectionMap/conn_paper_25bs_5hz.json new file mode 100644 index 000000000..85874e5de --- /dev/null +++ b/inputJsonsFiles/ConnectionMap/conn_paper_25bs_5hz.json @@ -0,0 +1,7 @@ +{ + "connectionsMap": + { + "r1":["mainServer", "c1", "s1", "r2", "s3", "s4"], + "r2":["s2", "r1"] + } +} diff --git a/inputJsonsFiles/DistributedConfig/dc_paper_25bs_5hz.json b/inputJsonsFiles/DistributedConfig/dc_paper_25bs_5hz.json new file mode 100644 index 000000000..740e2c35f --- /dev/null +++ b/inputJsonsFiles/DistributedConfig/dc_paper_25bs_5hz.json @@ -0,0 +1,146 @@ +{ + "nerlnetSettings": { + "frequency": "5", + "batchSize": "25" + }, + "mainServer": { + "port": "8080", + "args": "" + }, + "apiServer": { + "port": "8081", + "args": "" + }, + "devices": [ + { + "name": "NerlNist-Powerful", + "ipv4": "10.0.0.57", + "entities": "mainServer,apiServer,c1" + }, + { + "name": "C0VM0", + "ipv4": "10.0.0.5", + "entities": "s1" + }, + { + "name": "C0VM1", + "ipv4": "10.0.0.4", + "entities": "s2" + }, + { + "name": "C0VM2", + "ipv4": "10.0.0.9", + "entities": "s3" + }, + { + "name": "C0VM3", + "ipv4": "10.0.0.7", + "entities": "s4" + }, + { + "name": "NerlNist-6", + "ipv4": "10.0.0.44", + "entities": "r1" + }, + { + "name": "NerlNist-8", + "ipv4": "10.0.0.43", + "entities": "r2" + } + ], + "routers": [ + { + "name": "r1", + "port": "8090", + "policy": "0" + }, + { + "name": "r2", + "port": "8090", + "policy": "0" + } + ], + "sources": [ + { + "name": "s1", + "port": "8086", + "frequency": "5", + "policy": "0", + "epochs": "1", + "type": "0" + }, + { + "name": "s2", + "port": "8086", + "frequency": "5", + "policy": "0", + "epochs": "1", + "type": "0" + }, + { + "name": "s3", + "port": "8086", + "frequency": "80", + "policy": "0", + "epochs": "1", + "type": "0" + }, + { + "name": "s4", + "port": "8086", + "frequency": "80", + "policy": "0", + "epochs": "1", + "type": "0" + } + ], + "clients": [ + { + "name": "c1", + "port": "8082", + "workers": "w1" + } + ], + "workers": [ + { + "name": "w1", + "model_sha": "9c5f1261068be7be96487a2cae282aa22e8c1cb482a5bf8d557bc8e1e2b6fef0" + } + ], + "model_sha": { + "9c5f1261068be7be96487a2cae282aa22e8c1cb482a5bf8d557bc8e1e2b6fef0": { + "modelType": "0", + "_doc_modelType": " nn:0 | approximation:1 | classification:2 | forecasting:3 | image_classification:4 | text_classification:5 | text_generation:6 | auto_association:7 | autoencoder:8 | ae_classifier:9 |", + "modelArgs": "", + "_doc_modelArgs": "Extra arguments to model", + "layersSizes": "28x28x1k5x5x1x6p0s1t1,28x28x6k2x2p0s2,14x14x6k4x4x6x12p0s1t0,1,32,10", + "_doc_layersSizes": "List of postive integers [L0, L1, ..., LN]", + "layerTypesList": "2,4,2,9,3,5", + "_doc_LayerTypes": " Default:0 | Scaling:1 | Conv:2 | Perceptron:3 | Pooling:4 | Probabilistic:5 | LSTM:6 | Reccurrent:7 | Unscaling:8 | Flatten:9 | Bounding:10 |", + "layers_functions": "6,2,6,1,6,4", + "_doc_layers_functions_activation": " Threshold:1 | Sign:2 | Logistic:3 | Tanh:4 | Linear:5 | ReLU:6 | eLU:7 | SeLU:8 | Soft-plus:9 | Soft-sign:10 | Hard-sigmoid:11 |", + "_doc_layer_functions_pooling": " none:1 | Max:2 | Avg:3 |", + "_doc_layer_functions_probabilistic": " Binary:1 | Logistic:2 | Competitive:3 | Softmax:4 |", + "_doc_layer_functions_scaler": " none:1 | MinMax:2 | MeanStd:3 | STD:4 | Log:5 |", + "lossMethod": "6", + "lossArgs": "", + "_doc_lossMethod": " SSE:1 | MSE:2 | NSE:3 | MinkowskiE:4 | WSE:5 | CEE:6 |", + "lr": "0.001", + "_doc_lr": "Positve float", + "epochs": "1", + "_doc_epochs": "Positve Integer", + "optimizer": "5", + "_doc_optimizer": " GD:0 | CGD:1 | SGD:2 | QuasiNeuton:3 | LVM:4 | ADAM:5 |", + "optimizerArgs": "none", + "_doc_optimizerArgs": "String", + "infraType": "0", + "_doc_infraType": " opennn:0 | wolfengine:1 |", + "distributedSystemType": "0", + "_doc_distributedSystemType": " none:0 | FedClientAvg:1 | FedServerAvg:2 | FedClientWeightedAvgClassification:3 | FedServerWeightedAvgClassification:4 | FedClientAE:5 | FedServerAE:6 | tiles:7 |", + "distributedSystemArgs": "none", + "_doc_distributedSystemArgs": "String", + "distributedSystemToken": "none", + "_doc_distributedSystemToken": "Token that associates distributed group of workers and parameter-server" + } + } +} \ No newline at end of file diff --git a/inputJsonsFiles/DistributedConfig/dc_paper_cifar.json b/inputJsonsFiles/DistributedConfig/dc_paper_cifar.json new file mode 100644 index 000000000..e3d6ba142 --- /dev/null +++ b/inputJsonsFiles/DistributedConfig/dc_paper_cifar.json @@ -0,0 +1,146 @@ +{ + "nerlnetSettings": { + "frequency": "5", + "batchSize": "25" + }, + "mainServer": { + "port": "8080", + "args": "" + }, + "apiServer": { + "port": "8081", + "args": "" + }, + "devices": [ + { + "name": "NerlNist-Powerful", + "ipv4": "10.0.0.57", + "entities": "mainServer,apiServer,c1" + }, + { + "name": "C0VM0", + "ipv4": "10.0.0.5", + "entities": "s1" + }, + { + "name": "C0VM3", + "ipv4": "10.0.0.7", + "entities": "s2" + }, + { + "name": "C0VM2", + "ipv4": "10.0.0.9", + "entities": "s3" + }, + { + "name": "C0VM1", + "ipv4": "10.0.0.4", + "entities": "s4" + }, + { + "name": "NerlNist-6", + "ipv4": "10.0.0.44", + "entities": "r1" + }, + { + "name": "NerlNist-8", + "ipv4": "10.0.0.43", + "entities": "r2" + } + ], + "routers": [ + { + "name": "r1", + "port": "8090", + "policy": "0" + }, + { + "name": "r2", + "port": "8090", + "policy": "0" + } + ], + "sources": [ + { + "name": "s1", + "port": "8086", + "frequency": "5", + "policy": "0", + "epochs": "1", + "type": "0" + }, + { + "name": "s2", + "port": "8086", + "frequency": "5", + "policy": "0", + "epochs": "1", + "type": "0" + }, + { + "name": "s3", + "port": "8086", + "frequency": "80", + "policy": "0", + "epochs": "1", + "type": "0" + }, + { + "name": "s4", + "port": "8086", + "frequency": "80", + "policy": "0", + "epochs": "1", + "type": "0" + } + ], + "clients": [ + { + "name": "c1", + "port": "8082", + "workers": "w1" + } + ], + "workers": [ + { + "name": "w1", + "model_sha": "9c5f1261068be7be96487a2cae282aa22e8c1cb482a5bf8d557bc8e1e2b6fef0" + } + ], + "model_sha": { + "9c5f1261068be7be96487a2cae282aa22e8c1cb482a5bf8d557bc8e1e2b6fef0": { + "modelType": "0", + "_doc_modelType": " nn:0 | approximation:1 | classification:2 | forecasting:3 | image_classification:4 | text_classification:5 | text_generation:6 | auto_association:7 | autoencoder:8 | ae_classifier:9 |", + "modelArgs": "", + "_doc_modelArgs": "Extra arguments to model", + "layersSizes": "32x32x3k3x3x3x32p0s1t1,32x32x32k2x2p0s2,16x16x32k3x3x32x64p0s1t1,16x16x64k2x2p0s2,8x8x64k3x3x64x128p0s1t0,1,512,128,10", + "_doc_layersSizes": "List of postive integers [L0, L1, ..., LN]", + "layerTypesList": "2,4,2,4,2,9,3,3,5", + "_doc_LayerTypes": " Default:0 | Scaling:1 | Conv:2 | Perceptron:3 | Pooling:4 | Probabilistic:5 | LSTM:6 | Reccurrent:7 | Unscaling:8 | Flatten:9 | Bounding:10 |", + "layers_functions": "6,2,6,2,6,1,6,11,4", + "_doc_layers_functions_activation": " Threshold:1 | Sign:2 | Logistic:3 | Tanh:4 | Linear:5 | ReLU:6 | eLU:7 | SeLU:8 | Soft-plus:9 | Soft-sign:10 | Hard-sigmoid:11 |", + "_doc_layer_functions_pooling": " none:1 | Max:2 | Avg:3 |", + "_doc_layer_functions_probabilistic": " Binary:1 | Logistic:2 | Competitive:3 | Softmax:4 |", + "_doc_layer_functions_scaler": " none:1 | MinMax:2 | MeanStd:3 | STD:4 | Log:5 |", + "lossMethod": "6", + "lossArgs": "reg=L2", + "_doc_lossMethod": " SSE:1 | MSE:2 | NSE:3 | MinkowskiE:4 | WSE:5 | CEE:6 |", + "lr": "0.00001", + "_doc_lr": "Positve float", + "epochs": "1", + "_doc_epochs": "Positve Integer", + "optimizer": "5", + "_doc_optimizer": " GD:0 | CGD:1 | SGD:2 | QuasiNeuton:3 | LVM:4 | ADAM:5 |", + "optimizerArgs": "none", + "_doc_optimizerArgs": "String", + "infraType": "0", + "_doc_infraType": " opennn:0 | wolfengine:1 |", + "distributedSystemType": "0", + "_doc_distributedSystemType": " none:0 | FedClientAvg:1 | FedServerAvg:2 | FedClientWeightedAvgClassification:3 | FedServerWeightedAvgClassification:4 | FedClientAE:5 | FedServerAE:6 | tiles:7 |", + "distributedSystemArgs": "none", + "_doc_distributedSystemArgs": "String", + "distributedSystemToken": "none", + "_doc_distributedSystemToken": "Token that associates distributed group of workers and parameter-server" + } + } +} \ No newline at end of file diff --git a/inputJsonsFiles/experimentsFlow/exp_paper_cifar.json b/inputJsonsFiles/experimentsFlow/exp_paper_cifar.json new file mode 100644 index 000000000..6e4dd5cf0 --- /dev/null +++ b/inputJsonsFiles/experimentsFlow/exp_paper_cifar.json @@ -0,0 +1,140 @@ +{ + "experimentName": "mnist_rr", + "experimentType": "classification", + "batchSize": 25, + "csvFilePath": "/tmp/nerlnet/data/NerlnetData-master/nerlnet/cifar10/cifar_independent1.csv", + "numOfFeatures": "3072", + "numOfLabels": "10", + "headersNames": "airplane,automobile,bird,cat,deer,dog,frog,horse,ship,truck", + "Phases": + [ + { + "phaseName": "training_phase1", + "phaseType": "training", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "0", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + }, + { + "sourceName": "s2", + "startingSample": "5000", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + } + ] + }, + { + "phaseName": "training_phase2", + "phaseType": "training", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "10000", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + }, + { + "sourceName": "s2", + "startingSample": "15000", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + } + ] + }, + { + "phaseName": "training_phase3", + "phaseType": "training", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "20000", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + }, + { + "sourceName": "s2", + "startingSample": "25000", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + } + ] + }, + { + "phaseName": "training_phase4", + "phaseType": "training", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "30000", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + }, + { + "sourceName": "s2", + "startingSample": "35000", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + } + ] + }, + { + "phaseName": "training_phase5", + "phaseType": "training", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "40000", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + }, + { + "sourceName": "s2", + "startingSample": "45000", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + } + ] + }, + { + "phaseName": "prediction_phase", + "phaseType": "prediction", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "50000", + "numOfBatches": "200", + "workers": "w1", + "nerltensorType": "float" + }, + { + "sourceName": "s2", + "startingSample": "55000", + "numOfBatches": "199", + "workers": "w1", + "nerltensorType": "float" + } + ] + } + ] + } + + \ No newline at end of file diff --git a/inputJsonsFiles/experimentsFlow/exp_paper_mnist_25bs_5hz.json b/inputJsonsFiles/experimentsFlow/exp_paper_mnist_25bs_5hz.json new file mode 100644 index 000000000..2181058aa --- /dev/null +++ b/inputJsonsFiles/experimentsFlow/exp_paper_mnist_25bs_5hz.json @@ -0,0 +1,56 @@ +{ + "experimentName": "mnist_rr", + "experimentType": "classification", + "batchSize": 25, + "csvFilePath": "/tmp/nerlnet/data/NerlnetData-master/nerlnet/mnist_norm/mnist_train_255_norm.csv", + "numOfFeatures": "784", + "numOfLabels": "10", + "headersNames": "0,1,2,3,4,5,6,7,8,9", + "Phases": + [ + { + "phaseName": "training_phase", + "phaseType": "training", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "0", + "numOfBatches": "500", + "workers": "w1", + "nerltensorType": "float" + }, + { + "sourceName": "s2", + "startingSample": "12500", + "numOfBatches": "500", + "workers": "w1", + "nerltensorType": "float" + } + ] + }, + { + "phaseName": "prediction_phase", + "phaseType": "prediction", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "25000", + "numOfBatches": "250", + "workers": "w1", + "nerltensorType": "float" + }, + { + "sourceName": "s2", + "startingSample": "31250", + "numOfBatches": "250", + "workers": "w1", + "nerltensorType": "float" + } + ] + } + ] + } + + \ No newline at end of file diff --git a/src_py/apiServer/experiment_flow.py b/src_py/apiServer/experiment_flow.py index 1e23352f5..8dc52a935 100644 --- a/src_py/apiServer/experiment_flow.py +++ b/src_py/apiServer/experiment_flow.py @@ -120,7 +120,7 @@ def parse_experiment_flow_json(self, json_path : str, override_csv_path = ""): source_piece_csv_file = self.csv_dataset.generate_source_piece_ds_csv_file(source_piece_inst, phase_type) source_piece_inst.set_pointer_to_sourcePiece_CsvDataSet(source_piece_csv_file) source_pieces_inst_list.append(source_piece_inst) - LOG_INFO(f"phase {phase} source pieces parsed and generated.") + LOG_INFO(f"phase {phase_name} source pieces parsed and generated.") self.add_phase(phase_name, phase_type, source_pieces_inst_list, num_of_features) From 7f91b894a4f4693a9cf857cdd496bddcd9cc48d5 Mon Sep 17 00:00:00 2001 From: GuyPerets106 Date: Thu, 6 Feb 2025 22:45:56 +0000 Subject: [PATCH 4/8] Removed duplicate source piece gen --- src_py/apiServer/apiServer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src_py/apiServer/apiServer.py b/src_py/apiServer/apiServer.py index 03cf6d1fb..c785d8b8f 100644 --- a/src_py/apiServer/apiServer.py +++ b/src_py/apiServer/apiServer.py @@ -145,7 +145,7 @@ def send_data_to_sources(self, csv_dataset: CsvDataSet, experiment_phase: Experi sources_pieces_list = experiment_phase.get_sources_pieces() source_files_to_send = [] # list of csv's paths to send to sources for source_piece_inst in sources_pieces_list: - source_generated_csv_path = csv_dataset.generate_source_piece_ds_csv_file(source_piece_inst, experiment_phase.get_phase_type()) + source_generated_csv_path = source_piece_inst.get_pointer_to_sourcePiece_CsvDataSet() source_files_to_send.append(source_generated_csv_path) LOG_INFO("Done generating source pieces") From 62058f49fed1bdc6060173f7c69826a8de385033 Mon Sep 17 00:00:00 2001 From: GuyPerets106 Date: Fri, 7 Feb 2025 00:29:04 +0000 Subject: [PATCH 5/8] Fixed CSV issues --- src_py/apiServer/experiment_flow.py | 3 ++- src_py/apiServer/experiment_phase.py | 2 +- src_py/apiServer/nerl_csv_dataset_db.py | 13 +++++++------ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src_py/apiServer/experiment_flow.py b/src_py/apiServer/experiment_flow.py index 8dc52a935..5d13c675f 100644 --- a/src_py/apiServer/experiment_flow.py +++ b/src_py/apiServer/experiment_flow.py @@ -117,7 +117,7 @@ def parse_experiment_flow_json(self, json_path : str, override_csv_path = ""): nerltensor_type = source_piece[EXPFLOW_PHASE_SOURCE_PIECES_NERLTENSOR_TYPE_FIELD] source_piece_inst = self.csv_dataset.generate_source_piece_ds(source_name, self.batch_size, phase_type, starting_sample, num_of_batches, nerltensor_type) source_piece_inst.update_target_workers(workers) - source_piece_csv_file = self.csv_dataset.generate_source_piece_ds_csv_file(source_piece_inst, phase_type) + source_piece_csv_file = self.csv_dataset.generate_source_piece_ds_csv_file(source_piece_inst, phase_type, phase_name) source_piece_inst.set_pointer_to_sourcePiece_CsvDataSet(source_piece_csv_file) source_pieces_inst_list.append(source_piece_inst) LOG_INFO(f"phase {phase_name} source pieces parsed and generated.") @@ -142,6 +142,7 @@ def add_phase(self, name : str, phase_type : str, source_pieces_inst_list : list for source_piece_inst in source_pieces_inst_list: exp_phase_inst.add_source_piece(source_piece_inst) self.exp_phase_list.append(exp_phase_inst) + def print(self): diff --git a/src_py/apiServer/experiment_phase.py b/src_py/apiServer/experiment_phase.py index 83b4d6742..0ed5bc200 100644 --- a/src_py/apiServer/experiment_phase.py +++ b/src_py/apiServer/experiment_phase.py @@ -13,7 +13,7 @@ def __init__(self, experiment_flow_name : str, experiment_flow_type: str, name : assert self.phase_type in [PHASE_TRAINING_STR, PHASE_PREDICTION_STR] self.nerl_comm_db = NerlComDB(network_componenets) self.nerl_model_db = NerlModelDB(self.phase_type) - self.source_pieces_dict = {} # Dict of SourcePieceDS + self.source_pieces_dict = {} # Dict of SourcePieceDS self.num_of_features = num_of_features self.raw_data_buffer = [] self.network_componenets = network_componenets diff --git a/src_py/apiServer/nerl_csv_dataset_db.py b/src_py/apiServer/nerl_csv_dataset_db.py index f7d06c1a5..1e8926d4d 100644 --- a/src_py/apiServer/nerl_csv_dataset_db.py +++ b/src_py/apiServer/nerl_csv_dataset_db.py @@ -80,10 +80,11 @@ def __init__(self, csv_path, output_dir: str, batch_size, num_of_features, num_o assert os.path.exists(self.csv_path), "csv_path does not exist" self.output_dir = output_dir self.batch_size = batch_size - self.num_of_features = num_of_features - self.num_of_labels = num_of_labels + self.num_of_features = int(num_of_features) + self.num_of_labels = int(num_of_labels) self.headers_row = headers_row self.df = pd.read_csv(self.csv_path, header = None) + assert self.df.shape[1] == (self.num_of_features + self.num_of_labels), "Experiment Flow JSON #Features + #Labels mismatched with CSV #Cols" def get_csv_path(self): return self.csv_path @@ -119,16 +120,16 @@ def generate_source_piece_ds(self, source_name : str, batch_size: int, phase : s assert nerltensor_type in NERLTENSOR_TYPE_LIST, "nerltensor_type is not in NERLTENSOR_TYPE_LIST" return SourcePieceDS(self, source_name, batch_size, phase, starting_offset, num_of_batches, nerltensor_type, self.num_of_features, self.num_of_labels) - def generate_source_piece_ds_csv_file(self, source_piece_ds_inst: SourcePieceDS, phase : str): + def generate_source_piece_ds_csv_file(self, source_piece_ds_inst: SourcePieceDS, phase_type : str, phase_name : str): skip_rows = source_piece_ds_inst.get_starting_offset() number_of_samples = source_piece_ds_inst.get_num_of_batches() * source_piece_ds_inst.get_batch_size() df = self.df[skip_rows:(skip_rows + number_of_samples)] # slicing creates a copy of the data df_features = df.iloc[:, :int(self.get_num_of_features())] # from 0 column to num_of_features column (bun not including num_of_features column) - source_piece_file_path = f'{self.output_dir}/{source_piece_ds_inst.get_source_name()}_data.csv' - if phase == PHASE_TRAINING_STR: + source_piece_file_path = f'{self.output_dir}/{source_piece_ds_inst.get_source_name()}_data_{phase_name}.csv' + if phase_type == PHASE_TRAINING_STR: df_train = df.iloc[:, :int(self.get_num_of_features()) + int(self.get_num_of_labels())] # from 0 column to num_of_features + num_of_labels column (bun not including num_of_features + num_of_labels column) df_train.to_csv(source_piece_file_path, index = False, header = False) - elif phase == PHASE_PREDICTION_STR: + elif phase_type == PHASE_PREDICTION_STR: df_features.to_csv(source_piece_file_path, index = False, header = False) return source_piece_file_path From 98364e6e3b0457ed85a11174bad64c9b0cbb921f Mon Sep 17 00:00:00 2001 From: GuyPerets106 Date: Sat, 8 Feb 2025 02:48:38 +0000 Subject: [PATCH 6/8] optimization of compression and parsing --- .../NerlnetApp/src/MainServer/initHandler.erl | 3 ++- .../src/MainServer/mainGenserver.erl | 4 +-- src_erl/NerlnetApp/src/Source/parser.erl | 16 ++++++------ src_py/apiServer/apiServer.py | 2 +- src_py/apiServer/transmitter.py | 26 ++++++++++--------- 5 files changed, 27 insertions(+), 24 deletions(-) diff --git a/src_erl/NerlnetApp/src/MainServer/initHandler.erl b/src_erl/NerlnetApp/src/MainServer/initHandler.erl index 23aa3082e..462da725c 100644 --- a/src_erl/NerlnetApp/src/MainServer/initHandler.erl +++ b/src_erl/NerlnetApp/src/MainServer/initHandler.erl @@ -23,7 +23,8 @@ init(Req0, [Main_genServer_Pid]) -> {_,Body,_} = cowboy_req:read_body(Req0, #{length => ?DATA_LEN}), %read up to X MB (default was 8MB) DecodedBody = binary_to_list(zlib:uncompress(Body)), [Index, TotalSources, SourceName, WorkersStr, Phase, NumOfBatches, NerlTensorType, Data] = string:split(DecodedBody, "#", all), - gen_server:cast(Main_genServer_Pid,{initCSV, Index, TotalSources, SourceName, WorkersStr, Phase, NumOfBatches, NerlTensorType, Data}), + DataCompressed = zlib:compress(list_to_binary(Data)), + gen_server:cast(Main_genServer_Pid,{initCSV, Index, TotalSources, SourceName, WorkersStr, Phase, NumOfBatches, NerlTensorType, DataCompressed}), %[Source|WorkersAndInput] = re:split(binary_to_list(Body), "#", [{return, list}]), %{Workers,SourceData} = getWorkerInput(WorkersAndInput,[]), diff --git a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl index 3cf5a5c01..d169449dd 100644 --- a/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl +++ b/src_erl/NerlnetApp/src/MainServer/mainGenserver.erl @@ -82,13 +82,13 @@ init({MyName,ClientsNames,BatchSize,WorkersMap,NerlnetGraph , DeviceName}) -> {ok, #main_genserver_state{myName = MyNameStr , state=idle, total_sources=0, sources_data_ready_ctr = 0}}. -handle_cast({initCSV, _Index, TotalSources, SourceName, WorkersList, Phase, NumOfBatches, NerlTensorType, Data}, State = #main_genserver_state{state = idle, sourcesWaitingList = SourcesWaitingList, total_sources = TotalSourcesOld, sources_data_ready_ctr = SourcesDataReadyCtrOld}) -> +handle_cast({initCSV, _Index, TotalSources, SourceName, WorkersList, Phase, NumOfBatches, NerlTensorType, DataCompressed}, State = #main_genserver_state{state = idle, sourcesWaitingList = SourcesWaitingList, total_sources = TotalSourcesOld, sources_data_ready_ctr = SourcesDataReadyCtrOld}) -> {RouterHost,RouterPort} = ets:lookup_element(get(main_server_ets), my_router, ?DATA_IDX), ActionStr = atom_to_list(updateCSV), {TotalSourcesInt, _Rest} = string:to_integer(TotalSources), % MessageBody = WorkersList ++ "#" ++ NumOfBatches ++ "#" ++ NerlTensorType ++ "#" ++ Data, WorkersListSeperated = string:split(WorkersList, ",", all), - MessageBody = {WorkersListSeperated, Phase, NumOfBatches, NerlTensorType, zlib:compress(list_to_binary(Data))}, + MessageBody = {WorkersListSeperated, Phase, NumOfBatches, NerlTensorType, DataCompressed}, nerl_tools:http_router_request(RouterHost,RouterPort, [SourceName], ActionStr, MessageBody), % update the source with its data UpdatedSourceWaitingList = SourcesWaitingList++[list_to_atom(SourceName)], {SourcesDataReadyCtr, NewTotalSources} = diff --git a/src_erl/NerlnetApp/src/Source/parser.erl b/src_erl/NerlnetApp/src/Source/parser.erl index f65d7c74d..d395db1b3 100644 --- a/src_erl/NerlnetApp/src/Source/parser.erl +++ b/src_erl/NerlnetApp/src/Source/parser.erl @@ -14,7 +14,7 @@ -import(nerlNIF,[erl_type_conversion/1]). -define(CORE_NUM, erlang:system_info(logical_processors_available)). --define(PARALLELIZATION_FACTOR, 4). +-define(PARALLELIZATION_FACTOR, 2). %% API -export([parseCSV/4, batchesProcFunc/3]). @@ -22,17 +22,17 @@ parseCSV(SourceName, BatchSize, NerlTensorType, CSVData)-> - put(nerltensor_type , NerlTensorType), ErlType = nerlNIF:erl_type_conversion(list_to_atom(NerlTensorType)), put(erl_tensor_type, ErlType), - SourceNameStr = atom_to_list(SourceName), - nerl_tools:setup_logger(?MODULE), - FileName = ?TMP_DIR_RUN ++ SourceNameStr ++ ?TMP_DATA_ADDR, try - file:write_file(FileName, CSVData), parse_file(SourceName, BatchSize, NerlTensorType, ErlType, CSVData) %% change so read data only when sending (currently loading all data) catch - {error,Er} -> logger:error("couldn't write file ~p, beacuse ~p",[FileName, Er]) + {error,Er} -> + nerl_tools:setup_logger(?MODULE), + SourceNameStr = atom_to_list(SourceName), + FileName = ?TMP_DIR_RUN ++ SourceNameStr ++ ?TMP_DATA_ADDR, + file:write_file(FileName, CSVData), + logger:error("couldn't write file ~p, beacuse ~p",[FileName, Er]) end. @@ -60,7 +60,7 @@ dataStrToNumeric_NumHandler({NumStr, ErlType}) -> IsFloat = lists:member($. , NumStr), % Check if there is a decimal point (usually labels don't have) if IsFloat -> Num = list_to_float(NumStr); - true -> Num = float(list_to_integer(NumStr)) + true -> Num = list_to_float(NumStr+".0") end; erl_int -> Num = list_to_integer(NumStr); _ -> io:format("Error: unknown erl_tensor_type: ~p~n", [ErlType]) , Num = none diff --git a/src_py/apiServer/apiServer.py b/src_py/apiServer/apiServer.py index c785d8b8f..6ac6d7387 100644 --- a/src_py/apiServer/apiServer.py +++ b/src_py/apiServer/apiServer.py @@ -147,7 +147,7 @@ def send_data_to_sources(self, csv_dataset: CsvDataSet, experiment_phase: Experi for source_piece_inst in sources_pieces_list: source_generated_csv_path = source_piece_inst.get_pointer_to_sourcePiece_CsvDataSet() source_files_to_send.append(source_generated_csv_path) - LOG_INFO("Done generating source pieces") + LOG_INFO("Done sending data to source") events_sync_inst.set_event_wait(EventSync.UPDATE_CSV) self.transmitter.update_csv(source_files_to_send, sources_pieces_list) diff --git a/src_py/apiServer/transmitter.py b/src_py/apiServer/transmitter.py index 6be98e84a..ffd21293d 100644 --- a/src_py/apiServer/transmitter.py +++ b/src_py/apiServer/transmitter.py @@ -77,20 +77,22 @@ def update_csv(self, csv_files: list, source_pieces: list): num_of_batches = source_piece.get_num_of_batches() nerltensor_type = source_piece.get_nerltensor_type() phase_type = source_piece.get_phase() + data_str_encoded = None with open(csv_file, 'r') as file: csvfile = file.read() - data_str = f'{index + 1}#{total_sources}#{source_name}#{target_workers}#{phase_type}#{num_of_batches}#{nerltensor_type}#{csvfile}' - data_zip = zlib.compress(data_str.encode()) - try: - response = requests.post(self.updateCSVAddress, data = data_zip) - if not response.ok: # If Code =/= 200 - LOG_ERROR(f"Failed to update {csv_file} to Main Server") - except ConnectionRefusedError: - LOG_ERROR(f"Connection Refused Error: failed to connect to {self.updateCSVAddress}") - raise ConnectionRefusedError - except ConnectionError: - LOG_ERROR(f"Connection Error: failed to connect to {self.updateCSVAddress}") - raise ConnectionError + data_str_encoded = (f'{index + 1}#{total_sources}#{source_name}#{target_workers}#{phase_type}#{num_of_batches}#{nerltensor_type}#{csvfile}').encode() + data_zip = zlib.compress(data_str_encoded) + data_str_encoded = None + try: + response = requests.post(self.updateCSVAddress, data = data_zip) + if not response.ok: # If Code =/= 200 + LOG_ERROR(f"Failed to update {csv_file} to Main Server") + except ConnectionRefusedError: + LOG_ERROR(f"Connection Refused Error: failed to connect to {self.updateCSVAddress}") + raise ConnectionRefusedError + except ConnectionError: + LOG_ERROR(f"Connection Error: failed to connect to {self.updateCSVAddress}") + raise ConnectionError LOG_INFO(f'{((index+1)/total_sources)*100:.2f}% Sent') LOG_INFO(f'Data Transmission To Sources Is Completed!') From dcfdd6b7615c9615949d0ba93484f0a783e4c9fa Mon Sep 17 00:00:00 2001 From: GuyPerets106 Date: Sat, 8 Feb 2025 03:15:38 +0000 Subject: [PATCH 7/8] change dataStrToNumericParallelLoop from > to >= --- src_erl/NerlnetApp/src/Source/parser.erl | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src_erl/NerlnetApp/src/Source/parser.erl b/src_erl/NerlnetApp/src/Source/parser.erl index d395db1b3..8d626817a 100644 --- a/src_erl/NerlnetApp/src/Source/parser.erl +++ b/src_erl/NerlnetApp/src/Source/parser.erl @@ -70,9 +70,9 @@ dataStrToNumeric_NumHandler({NumStr, ErlType}) -> dataStrToNumeric_lineHandler(PIPD, LineOfData, EtsTable, EtsKey, ErlType) -> Splitted = string:split(binary_to_list(LineOfData), ",", all), - Samples = [{Sample, ErlType} || Sample <- Splitted], - FloatDataList = lists:map(fun dataStrToNumeric_NumHandler/1, Samples), - ets:insert(EtsTable, {EtsKey, FloatDataList}), + SampleStrList = [{Sample, ErlType} || Sample <- Splitted], + SampleNumericList = lists:map(fun dataStrToNumeric_NumHandler/1, SampleStrList), + ets:insert(EtsTable, {EtsKey, SampleNumericList}), PIPD ! done. dataStrToNumeric_sync(0) -> ok; @@ -84,7 +84,7 @@ dataStrToNumeric_sync(PF) -> dataStrToNumericParallelLoop(_PF, _EtsTable, [], _ErlType, _LastKey) -> done; -dataStrToNumericParallelLoop(PF, EtsTable, ListOfLinesOfData, ErlType, LastKey) when length(ListOfLinesOfData) > PF -> % PF - Parallelization Factor +dataStrToNumericParallelLoop(PF, EtsTable, ListOfLinesOfData, ErlType, LastKey) when length(ListOfLinesOfData) >= PF -> % PF - Parallelization Factor {ListOfLinesOfDataToBeProcessed, ListOfLinesOfDataRest} = lists:split(PF, ListOfLinesOfData), IdxList = lists:seq(LastKey,LastKey+PF-1), PIPD = self(), @@ -92,7 +92,6 @@ dataStrToNumericParallelLoop(PF, EtsTable, ListOfLinesOfData, ErlType, LastKey) dataStrToNumeric_sync(PF), dataStrToNumericParallelLoop(PF, EtsTable, ListOfLinesOfDataRest, ErlType, LastKey+PF); - dataStrToNumericParallelLoop(PF, EtsTable, ListOfLinesOfData, ErlType, LastKey) -> PIPD = self(), CurrentKey = LastKey + 1, From c7225d779722913d5dadc7596ee03fc3e167e90d Mon Sep 17 00:00:00 2001 From: GuyPerets106 Date: Sat, 8 Feb 2025 03:43:20 +0000 Subject: [PATCH 8/8] remove prints of epochs --- src_erl/NerlnetApp/src/Source/parser.erl | 2 +- src_erl/NerlnetApp/src/Source/sourceSendingPolicies.erl | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src_erl/NerlnetApp/src/Source/parser.erl b/src_erl/NerlnetApp/src/Source/parser.erl index 8d626817a..a15885795 100644 --- a/src_erl/NerlnetApp/src/Source/parser.erl +++ b/src_erl/NerlnetApp/src/Source/parser.erl @@ -60,7 +60,7 @@ dataStrToNumeric_NumHandler({NumStr, ErlType}) -> IsFloat = lists:member($. , NumStr), % Check if there is a decimal point (usually labels don't have) if IsFloat -> Num = list_to_float(NumStr); - true -> Num = list_to_float(NumStr+".0") + true -> Num = list_to_float(NumStr++".0") end; erl_int -> Num = list_to_integer(NumStr); _ -> io:format("Error: unknown erl_tensor_type: ~p~n", [ErlType]) , Num = none diff --git a/src_erl/NerlnetApp/src/Source/sourceSendingPolicies.erl b/src_erl/NerlnetApp/src/Source/sourceSendingPolicies.erl index 395e67bbb..2e2212da5 100644 --- a/src_erl/NerlnetApp/src/Source/sourceSendingPolicies.erl +++ b/src_erl/NerlnetApp/src/Source/sourceSendingPolicies.erl @@ -43,7 +43,6 @@ send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0). send_method_casting(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok; send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) -> - io:format("Epoch ~p~n", [EpochIdx]), % Sends the same batch to all BatchFunc = fun({BatchIdx, Batch}) -> prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, ClientWorkerPairs) @@ -62,7 +61,6 @@ send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPai send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0). send_method_round_robin(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok; send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) -> - io:format("Epoch ~p~n", [EpochIdx]), % Sends a batch per each ClientWorkerPairsIndexes = lists:seq(0, length(ClientWorkerPairs)-1), ClientWorkerPairsWithIndexes = lists:zip(ClientWorkerPairsIndexes, ClientWorkerPairs), % Tuple {Idx, Triplet} @@ -88,7 +86,6 @@ send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, B send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0). send_method_random(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok; send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) -> - io:format("Epoch ~p~n", [EpochIdx + 1]), % Sends a batch per each ClientWorkerPairsIndexes = lists:seq(1, length(ClientWorkerPairs)), ClientWorkerPairsWithIndexes = lists:zip(ClientWorkerPairsIndexes, ClientWorkerPairs), % Tuple {Idx, Triplet}