From 7f63d6d1fa4bfbaafd37b6b96ea81fc71a8d8af2 Mon Sep 17 00:00:00 2001 From: leondavi Date: Thu, 18 Jul 2024 02:21:24 +0300 Subject: [PATCH] [SOURCE] Critical Bug - RR and Random - RR and Random were never selected - Fixed - Optimization of sending RR and Random --- .../src/Source/sourceSendingPolicies.erl | 106 +++++++++++++++++ .../NerlnetApp/src/Source/sourceStatem.erl | 112 ++---------------- 2 files changed, 115 insertions(+), 103 deletions(-) create mode 100644 src_erl/NerlnetApp/src/Source/sourceSendingPolicies.erl diff --git a/src_erl/NerlnetApp/src/Source/sourceSendingPolicies.erl b/src_erl/NerlnetApp/src/Source/sourceSendingPolicies.erl new file mode 100644 index 000000000..2e2212da5 --- /dev/null +++ b/src_erl/NerlnetApp/src/Source/sourceSendingPolicies.erl @@ -0,0 +1,106 @@ +-module(sourceSendingPolicies). +-author("David Leon"). + +-define(MICRO_TO_MILLI_FACTOR, 0.001). + +-include("../nerl_tools.hrl"). +% exports +-export([send_method_casting/5, send_method_round_robin/5, send_method_random/5]). + +%% Sends batch of samples to a client +% A batch is always {NerlTensor, Type} +sendBatch(MyName,{NerlTensor, Type}, BatchID,ClientName,WorkerName,RouterHost,RouterPort)-> + ToSend = {MyName , ClientName, WorkerName, BatchID, {NerlTensor, Type}}, + nerl_tools:http_router_request(RouterHost, RouterPort, [ClientName], atom_to_list(batch), ToSend). + +prepare_and_send(_TransmitterEts, _TimeInterval_ms, _Batch, _BatchIdx, []) -> ok; +prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair | ClientWorkerPairsTail]) -> + Tic = erlang:timestamp(), % frequency relates to each send + {RouterHost,RouterPort} = ets:lookup_element(TransmitterEts, my_router, ?DATA_IDX), + case Batch of + {NerlTensor, Type} -> + {ClientName,WorkerName} = ClientWorkerPair,% TODO Gal is about to perform refactor here with casting support + MyName = ets:lookup_element(TransmitterEts, my_name, ?DATA_IDX), + % sending batch + sendBatch(MyName,{NerlTensor, Type}, BatchIdx, ClientName,WorkerName,RouterHost,RouterPort), + % timing handling + Toc_millisec = timer:now_diff(Tic, erlang:timestamp()) * ?MICRO_TO_MILLI_FACTOR, + SleepDuration = erlang:max(0, round(TimeInterval_ms - Toc_millisec)), + timer:sleep(SleepDuration); + <<>> -> + ets:update_counter(TransmitterEts, batches_skipped, 1); + _ISSUE -> + ets:update_counter(TransmitterEts, batches_issue, 1) + end, + prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, ClientWorkerPairsTail). + + +generate_batch_indexes(NumOfBatches, EpochIdx) -> + [ EpochIdx * NumOfBatches + BatchIdx || BatchIdx <- lists:seq(0, NumOfBatches-1)]. + + +send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) -> + send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0). +send_method_casting(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok; +send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) -> + % Sends the same batch to all + BatchFunc = fun({BatchIdx, Batch}) -> + prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, ClientWorkerPairs) + end, % end of BatchFunc + TotalNumOfBatches = length(BatchesListToSend), + BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx), + BatchesWithIndexes = lists:zip(BatchesIndexes, BatchesListToSend), + lists:foreach(BatchFunc, BatchesWithIndexes), + % update batches sent + SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX), + ets:update_counter(TransmitterEts, batches_sent, length(ClientWorkerPairs) * length(BatchesListToSend) - SkippedBatches), + send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend , EpochIdx + 1). + + +send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) -> + send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0). +send_method_round_robin(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok; +send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) -> + % Sends a batch per each + ClientWorkerPairsIndexes = lists:seq(0, length(ClientWorkerPairs)-1), + ClientWorkerPairsWithIndexes = lists:zip(ClientWorkerPairsIndexes, ClientWorkerPairs), % Tuple {Idx, Triplet} + ClientWorkerPairsMap = maps:from_list(ClientWorkerPairsWithIndexes), + + BatchFunc = fun({{BatchIdx, WorkerIdx}, Batch}) -> + ClientWorkerPair = maps:get(WorkerIdx, ClientWorkerPairsMap), + prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair]) + end, % end of BatchFunc + TotalNumOfBatches = length(BatchesListToSend), + BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx), + ClientWorkerPairsLength = length(ClientWorkerPairs), + WorkerIdxBatchIdxTuples = [ {X , X rem ClientWorkerPairsLength} || X <- BatchesIndexes], + BatchesWithIndexes = lists:zip(WorkerIdxBatchIdxTuples, BatchesListToSend), + lists:foreach(BatchFunc, BatchesWithIndexes), + % update batches sent + SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX), + ets:update_counter(TransmitterEts, batches_sent, length(BatchesListToSend) - SkippedBatches), + send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx + 1). + + +send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) -> + send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0). +send_method_random(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok; +send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) -> + % Sends a batch per each + ClientWorkerPairsIndexes = lists:seq(1, length(ClientWorkerPairs)), + ClientWorkerPairsWithIndexes = lists:zip(ClientWorkerPairsIndexes, ClientWorkerPairs), % Tuple {Idx, Triplet} + ClientWorkerPairsMap = maps:from_list(ClientWorkerPairsWithIndexes), + BatchFunc = fun({{BatchIdx, WorkerIdx}, Batch}) -> + ClientWorkerPair = maps:get(WorkerIdx, ClientWorkerPairsMap), + prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair]) + end, % end of BatchFunc + TotalNumOfBatches = length(BatchesListToSend), + BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx), + ClientWorkerPairsLength = length(ClientWorkerPairs), + WorkerIdxBatchIdxTuples = [ {X , rand:uniform(ClientWorkerPairsLength)} || X <- BatchesIndexes], + BatchesWithIndexes = lists:zip(WorkerIdxBatchIdxTuples, BatchesListToSend), % Tuple {{BatchIdx, WorkerIdx}, Batch} + lists:foreach(BatchFunc, BatchesWithIndexes), + % update batches sent + SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX), + ets:update_counter(TransmitterEts, batches_sent, length(BatchesListToSend) - SkippedBatches), + send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx + 1). \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/Source/sourceStatem.erl b/src_erl/NerlnetApp/src/Source/sourceStatem.erl index ea7085477..298eb13fe 100644 --- a/src_erl/NerlnetApp/src/Source/sourceStatem.erl +++ b/src_erl/NerlnetApp/src/Source/sourceStatem.erl @@ -21,9 +21,11 @@ %% utils +%% imports +-import(sourceSendingPolicies, [send_method_casting/5, send_method_round_robin/5, send_method_random/5]). + %% defintions -define(SENDING_FREQUENCY_OVERHEAD_FIX_FACTOR_PERC, 0.75). --define(MICRO_TO_MILLI_FACTOR, 0.001). -define(PHASE_TRAINING_ATOM, training). -define(PHASE_PREDICTION_ATOM, prediction). @@ -292,107 +294,11 @@ spawnTransmitter(SourceEtsRef, WorkersListOfNames, BatchesListToSend)-> ?PHASE_PREDICTION_ATOM -> 1; % In prediction phase, we send only a single epoch always! _ -> ?LOG_ERROR("Source ~p has an unknown phase: ~p",[MyName, Phase]) end, - ?LOG_NOTICE("Rounds of all data (Source Epochs): ~p", [Epochs]), + ?LOG_NOTICE("Source ~p # of epochs ~p", [MyName, Epochs]), SourcePid = self(), TimeIntervalWithOverheadFactor = TimeInterval_ms * ?SENDING_FREQUENCY_OVERHEAD_FIX_FACTOR_PERC, spawn_link(?MODULE,transmitter,[TimeIntervalWithOverheadFactor,SourceEtsRef, SourcePid ,Epochs, ClientWorkerPairs, BatchesListToSend, Method]). -%% Sends batch of samples to a client -% A batch is always {NerlTensor, Type} -sendBatch(MyName,{NerlTensor, Type}, BatchID,ClientName,WorkerName,RouterHost,RouterPort)-> - ToSend = {MyName , ClientName, WorkerName, BatchID, {NerlTensor, Type}}, - nerl_tools:http_router_request(RouterHost, RouterPort, [ClientName], atom_to_list(batch), ToSend). - -prepare_and_send(_TransmitterEts, _TimeInterval_ms, _Batch, _BatchIdx, []) -> ok; -prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair | ClientWorkerPairsTail]) -> - Tic = erlang:timestamp(), % frequency relates to each send - {RouterHost,RouterPort} = ets:lookup_element(TransmitterEts, my_router, ?DATA_IDX), - case Batch of - {NerlTensor, Type} -> - {ClientName,WorkerName} = ClientWorkerPair,% TODO Gal is about to perform refactor here with casting support - MyName = ets:lookup_element(TransmitterEts, my_name, ?DATA_IDX), - % sending batch - sendBatch(MyName,{NerlTensor, Type}, BatchIdx, ClientName,WorkerName,RouterHost,RouterPort), - % timing handling - Toc_millisec = timer:now_diff(Tic, erlang:timestamp()) * ?MICRO_TO_MILLI_FACTOR, - SleepDuration = erlang:max(0, round(TimeInterval_ms - Toc_millisec)), - timer:sleep(SleepDuration); - <<>> -> - ets:update_counter(TransmitterEts, batches_skipped, 1); - _ISSUE -> - ets:update_counter(TransmitterEts, batches_issue, 1) - end, - prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, ClientWorkerPairsTail). - - -generate_batch_indexes(NumOfBatches, EpochIdx) -> - [ EpochIdx * NumOfBatches + BatchIdx || BatchIdx <- lists:seq(0, NumOfBatches-1)]. - - -send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) -> - send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0). -send_method_casting(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok; -send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) -> - % Sends the same batch to all - BatchFunc = fun({BatchIdx, Batch}) -> - prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, ClientWorkerPairs) - end, % end of BatchFunc - TotalNumOfBatches = length(BatchesListToSend), - BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx), - BatchesWithIndexes = lists:zip(BatchesIndexes, BatchesListToSend), - lists:foreach(BatchFunc, BatchesWithIndexes), - % update batches sent - SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX), - ets:update_counter(TransmitterEts, batches_sent, length(ClientWorkerPairs) * length(BatchesListToSend) - SkippedBatches), - send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend , EpochIdx + 1). - - -send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) -> - send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0). -send_method_round_robin(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok; -send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) -> - % Sends a batch per each - ClientWorkerPairsIndexes = lists:seq(0, length(ClientWorkerPairs)-1), - ClientWorkerPairsWithIndexes = lists:zip(ClientWorkerPairsIndexes, ClientWorkerPairs), % Tuple {Idx, Triplet} - ClientWorkerPairsMap = maps:from_list(ClientWorkerPairsWithIndexes), - - BatchFunc = fun({BatchIdx, Batch}) -> - ClientWorkerPairIdx = BatchIdx rem length(ClientWorkerPairs), - ClientWorkerPair = maps:get(ClientWorkerPairIdx, ClientWorkerPairsMap), - prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair]) - end, % end of BatchFunc - TotalNumOfBatches = length(BatchesListToSend), - BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx), - BatchesWithIndexes = lists:zip(BatchesIndexes, BatchesListToSend), - lists:foreach(BatchFunc, BatchesWithIndexes), - % update batches sent - SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX), - ets:update_counter(TransmitterEts, batches_sent, length(BatchesListToSend) - SkippedBatches), - send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx + 1). - - -send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) -> - send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0). -send_method_random(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok; -send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) -> - % Sends a batch per each - ClientWorkerPairsIndexes = lists:seq(1, length(ClientWorkerPairs)), - ClientWorkerPairsWithIndexes = lists:zip(ClientWorkerPairsIndexes, ClientWorkerPairs), % Tuple {Idx, Triplet} - ClientWorkerPairsMap = maps:from_list(ClientWorkerPairsWithIndexes), - BatchFunc = fun({BatchIdx, Batch}) -> - ClientWorkerPairIdx = rand:uniform(length(ClientWorkerPairs)), - ClientWorkerPair = maps:get(ClientWorkerPairIdx, ClientWorkerPairsMap), - prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair]) - end, % end of BatchFunc - TotalNumOfBatches = length(BatchesListToSend), - BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx), - BatchesWithIndexes = lists:zip(BatchesIndexes, BatchesListToSend), - lists:foreach(BatchFunc, BatchesWithIndexes), - % update batches sent - SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX), - ets:update_counter(TransmitterEts, batches_sent, length(BatchesListToSend) - SkippedBatches), - send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx + 1). - transmitter(TimeInterval_ms, SourceEtsRef, SourcePid, Epochs ,ClientWorkerPairs, BatchesListToSend, Method) -> MyName = ets:lookup_element(SourceEtsRef, my_name, ?DATA_IDX), TransmitterEts = ets:new(transmitter_ets, [set]), % allow transmitter process to edit @@ -411,11 +317,11 @@ transmitter(TimeInterval_ms, SourceEtsRef, SourcePid, Epochs ,ClientWorkerPairs, end, lists:foreach(FuncStart, ClientWorkerPairs), TransmissionStart = erlang:timestamp(), - case Method of - ?SOURCE_POLICY_CASTING_ATOM -> send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend); - ?SOURCE_POLICY_ROUNDROBIN_ATOM -> send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend); - ?SOURCE_POLICY_RANDOM_ATOM -> send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend); - _Default -> send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) + case integer_to_list(Method) of % Method is given as an integer + ?SOURCE_POLICY_CASTING_IDX -> sourceSendingPolicies:send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend); + ?SOURCE_POLICY_ROUNDROBIN_IDX -> sourceSendingPolicies:send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend); + ?SOURCE_POLICY_RANDOM_IDX -> sourceSendingPolicies:send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend); + _Default -> sourceSendingPolicies:send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) end, TransmissionTimeTook_sec = timer:now_diff(erlang:timestamp(), TransmissionStart) / 1000000, % Message to workers : "end_stream"