Skip to content

Commit

Permalink
Merge pull request #373 from leondavi/source_rr
Browse files Browse the repository at this point in the history
[SOURCE] Critical Bug - RR and Random
  • Loading branch information
leondavi authored Jul 18, 2024
2 parents d663453 + 7f63d6d commit 26a6df6
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 103 deletions.
106 changes: 106 additions & 0 deletions src_erl/NerlnetApp/src/Source/sourceSendingPolicies.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
-module(sourceSendingPolicies).
-author("David Leon").

-define(MICRO_TO_MILLI_FACTOR, 0.001).

-include("../nerl_tools.hrl").
% exports
-export([send_method_casting/5, send_method_round_robin/5, send_method_random/5]).

%% Sends batch of samples to a client
% A batch is always {NerlTensor, Type}
sendBatch(MyName,{NerlTensor, Type}, BatchID,ClientName,WorkerName,RouterHost,RouterPort)->
ToSend = {MyName , ClientName, WorkerName, BatchID, {NerlTensor, Type}},
nerl_tools:http_router_request(RouterHost, RouterPort, [ClientName], atom_to_list(batch), ToSend).

prepare_and_send(_TransmitterEts, _TimeInterval_ms, _Batch, _BatchIdx, []) -> ok;
prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair | ClientWorkerPairsTail]) ->
Tic = erlang:timestamp(), % frequency relates to each send
{RouterHost,RouterPort} = ets:lookup_element(TransmitterEts, my_router, ?DATA_IDX),
case Batch of
{NerlTensor, Type} ->
{ClientName,WorkerName} = ClientWorkerPair,% TODO Gal is about to perform refactor here with casting support
MyName = ets:lookup_element(TransmitterEts, my_name, ?DATA_IDX),
% sending batch
sendBatch(MyName,{NerlTensor, Type}, BatchIdx, ClientName,WorkerName,RouterHost,RouterPort),
% timing handling
Toc_millisec = timer:now_diff(Tic, erlang:timestamp()) * ?MICRO_TO_MILLI_FACTOR,
SleepDuration = erlang:max(0, round(TimeInterval_ms - Toc_millisec)),
timer:sleep(SleepDuration);
<<>> ->
ets:update_counter(TransmitterEts, batches_skipped, 1);
_ISSUE ->
ets:update_counter(TransmitterEts, batches_issue, 1)
end,
prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, ClientWorkerPairsTail).


generate_batch_indexes(NumOfBatches, EpochIdx) ->
[ EpochIdx * NumOfBatches + BatchIdx || BatchIdx <- lists:seq(0, NumOfBatches-1)].


send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) ->
send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0).
send_method_casting(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok;
send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) ->
% Sends the same batch to all
BatchFunc = fun({BatchIdx, Batch}) ->
prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, ClientWorkerPairs)
end, % end of BatchFunc
TotalNumOfBatches = length(BatchesListToSend),
BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx),
BatchesWithIndexes = lists:zip(BatchesIndexes, BatchesListToSend),
lists:foreach(BatchFunc, BatchesWithIndexes),
% update batches sent
SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX),
ets:update_counter(TransmitterEts, batches_sent, length(ClientWorkerPairs) * length(BatchesListToSend) - SkippedBatches),
send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend , EpochIdx + 1).


send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) ->
send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0).
send_method_round_robin(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok;
send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) ->
% Sends a batch per each
ClientWorkerPairsIndexes = lists:seq(0, length(ClientWorkerPairs)-1),
ClientWorkerPairsWithIndexes = lists:zip(ClientWorkerPairsIndexes, ClientWorkerPairs), % Tuple {Idx, Triplet}
ClientWorkerPairsMap = maps:from_list(ClientWorkerPairsWithIndexes),

BatchFunc = fun({{BatchIdx, WorkerIdx}, Batch}) ->
ClientWorkerPair = maps:get(WorkerIdx, ClientWorkerPairsMap),
prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair])
end, % end of BatchFunc
TotalNumOfBatches = length(BatchesListToSend),
BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx),
ClientWorkerPairsLength = length(ClientWorkerPairs),
WorkerIdxBatchIdxTuples = [ {X , X rem ClientWorkerPairsLength} || X <- BatchesIndexes],
BatchesWithIndexes = lists:zip(WorkerIdxBatchIdxTuples, BatchesListToSend),
lists:foreach(BatchFunc, BatchesWithIndexes),
% update batches sent
SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX),
ets:update_counter(TransmitterEts, batches_sent, length(BatchesListToSend) - SkippedBatches),
send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx + 1).


send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) ->
send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0).
send_method_random(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok;
send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) ->
% Sends a batch per each
ClientWorkerPairsIndexes = lists:seq(1, length(ClientWorkerPairs)),
ClientWorkerPairsWithIndexes = lists:zip(ClientWorkerPairsIndexes, ClientWorkerPairs), % Tuple {Idx, Triplet}
ClientWorkerPairsMap = maps:from_list(ClientWorkerPairsWithIndexes),
BatchFunc = fun({{BatchIdx, WorkerIdx}, Batch}) ->
ClientWorkerPair = maps:get(WorkerIdx, ClientWorkerPairsMap),
prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair])
end, % end of BatchFunc
TotalNumOfBatches = length(BatchesListToSend),
BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx),
ClientWorkerPairsLength = length(ClientWorkerPairs),
WorkerIdxBatchIdxTuples = [ {X , rand:uniform(ClientWorkerPairsLength)} || X <- BatchesIndexes],
BatchesWithIndexes = lists:zip(WorkerIdxBatchIdxTuples, BatchesListToSend), % Tuple {{BatchIdx, WorkerIdx}, Batch}
lists:foreach(BatchFunc, BatchesWithIndexes),
% update batches sent
SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX),
ets:update_counter(TransmitterEts, batches_sent, length(BatchesListToSend) - SkippedBatches),
send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx + 1).
112 changes: 9 additions & 103 deletions src_erl/NerlnetApp/src/Source/sourceStatem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
%% utils


%% imports
-import(sourceSendingPolicies, [send_method_casting/5, send_method_round_robin/5, send_method_random/5]).

%% defintions
-define(SENDING_FREQUENCY_OVERHEAD_FIX_FACTOR_PERC, 0.75).
-define(MICRO_TO_MILLI_FACTOR, 0.001).

-define(PHASE_TRAINING_ATOM, training).
-define(PHASE_PREDICTION_ATOM, prediction).
Expand Down Expand Up @@ -292,107 +294,11 @@ spawnTransmitter(SourceEtsRef, WorkersListOfNames, BatchesListToSend)->
?PHASE_PREDICTION_ATOM -> 1; % In prediction phase, we send only a single epoch always!
_ -> ?LOG_ERROR("Source ~p has an unknown phase: ~p",[MyName, Phase])
end,
?LOG_NOTICE("Rounds of all data (Source Epochs): ~p", [Epochs]),
?LOG_NOTICE("Source ~p # of epochs ~p", [MyName, Epochs]),
SourcePid = self(),
TimeIntervalWithOverheadFactor = TimeInterval_ms * ?SENDING_FREQUENCY_OVERHEAD_FIX_FACTOR_PERC,
spawn_link(?MODULE,transmitter,[TimeIntervalWithOverheadFactor,SourceEtsRef, SourcePid ,Epochs, ClientWorkerPairs, BatchesListToSend, Method]).

%% Sends batch of samples to a client
% A batch is always {NerlTensor, Type}
sendBatch(MyName,{NerlTensor, Type}, BatchID,ClientName,WorkerName,RouterHost,RouterPort)->
ToSend = {MyName , ClientName, WorkerName, BatchID, {NerlTensor, Type}},
nerl_tools:http_router_request(RouterHost, RouterPort, [ClientName], atom_to_list(batch), ToSend).

prepare_and_send(_TransmitterEts, _TimeInterval_ms, _Batch, _BatchIdx, []) -> ok;
prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair | ClientWorkerPairsTail]) ->
Tic = erlang:timestamp(), % frequency relates to each send
{RouterHost,RouterPort} = ets:lookup_element(TransmitterEts, my_router, ?DATA_IDX),
case Batch of
{NerlTensor, Type} ->
{ClientName,WorkerName} = ClientWorkerPair,% TODO Gal is about to perform refactor here with casting support
MyName = ets:lookup_element(TransmitterEts, my_name, ?DATA_IDX),
% sending batch
sendBatch(MyName,{NerlTensor, Type}, BatchIdx, ClientName,WorkerName,RouterHost,RouterPort),
% timing handling
Toc_millisec = timer:now_diff(Tic, erlang:timestamp()) * ?MICRO_TO_MILLI_FACTOR,
SleepDuration = erlang:max(0, round(TimeInterval_ms - Toc_millisec)),
timer:sleep(SleepDuration);
<<>> ->
ets:update_counter(TransmitterEts, batches_skipped, 1);
_ISSUE ->
ets:update_counter(TransmitterEts, batches_issue, 1)
end,
prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, ClientWorkerPairsTail).


generate_batch_indexes(NumOfBatches, EpochIdx) ->
[ EpochIdx * NumOfBatches + BatchIdx || BatchIdx <- lists:seq(0, NumOfBatches-1)].


send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) ->
send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0).
send_method_casting(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok;
send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) ->
% Sends the same batch to all
BatchFunc = fun({BatchIdx, Batch}) ->
prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, ClientWorkerPairs)
end, % end of BatchFunc
TotalNumOfBatches = length(BatchesListToSend),
BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx),
BatchesWithIndexes = lists:zip(BatchesIndexes, BatchesListToSend),
lists:foreach(BatchFunc, BatchesWithIndexes),
% update batches sent
SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX),
ets:update_counter(TransmitterEts, batches_sent, length(ClientWorkerPairs) * length(BatchesListToSend) - SkippedBatches),
send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend , EpochIdx + 1).


send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) ->
send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0).
send_method_round_robin(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok;
send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) ->
% Sends a batch per each
ClientWorkerPairsIndexes = lists:seq(0, length(ClientWorkerPairs)-1),
ClientWorkerPairsWithIndexes = lists:zip(ClientWorkerPairsIndexes, ClientWorkerPairs), % Tuple {Idx, Triplet}
ClientWorkerPairsMap = maps:from_list(ClientWorkerPairsWithIndexes),

BatchFunc = fun({BatchIdx, Batch}) ->
ClientWorkerPairIdx = BatchIdx rem length(ClientWorkerPairs),
ClientWorkerPair = maps:get(ClientWorkerPairIdx, ClientWorkerPairsMap),
prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair])
end, % end of BatchFunc
TotalNumOfBatches = length(BatchesListToSend),
BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx),
BatchesWithIndexes = lists:zip(BatchesIndexes, BatchesListToSend),
lists:foreach(BatchFunc, BatchesWithIndexes),
% update batches sent
SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX),
ets:update_counter(TransmitterEts, batches_sent, length(BatchesListToSend) - SkippedBatches),
send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx + 1).


send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend) ->
send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, 0).
send_method_random(_TransmitterEts, Epochs, _TimeInterval_ms, _ClientWorkerPairs, _BatchesListToSend, EpochIdx) when EpochIdx == Epochs -> ok;
send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx) ->
% Sends a batch per each
ClientWorkerPairsIndexes = lists:seq(1, length(ClientWorkerPairs)),
ClientWorkerPairsWithIndexes = lists:zip(ClientWorkerPairsIndexes, ClientWorkerPairs), % Tuple {Idx, Triplet}
ClientWorkerPairsMap = maps:from_list(ClientWorkerPairsWithIndexes),
BatchFunc = fun({BatchIdx, Batch}) ->
ClientWorkerPairIdx = rand:uniform(length(ClientWorkerPairs)),
ClientWorkerPair = maps:get(ClientWorkerPairIdx, ClientWorkerPairsMap),
prepare_and_send(TransmitterEts, TimeInterval_ms, Batch, BatchIdx, [ClientWorkerPair])
end, % end of BatchFunc
TotalNumOfBatches = length(BatchesListToSend),
BatchesIndexes = generate_batch_indexes(TotalNumOfBatches, EpochIdx),
BatchesWithIndexes = lists:zip(BatchesIndexes, BatchesListToSend),
lists:foreach(BatchFunc, BatchesWithIndexes),
% update batches sent
SkippedBatches = ets:lookup_element(TransmitterEts, batches_skipped, ?DATA_IDX),
ets:update_counter(TransmitterEts, batches_sent, length(BatchesListToSend) - SkippedBatches),
send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend, EpochIdx + 1).

transmitter(TimeInterval_ms, SourceEtsRef, SourcePid, Epochs ,ClientWorkerPairs, BatchesListToSend, Method) ->
MyName = ets:lookup_element(SourceEtsRef, my_name, ?DATA_IDX),
TransmitterEts = ets:new(transmitter_ets, [set]), % allow transmitter process to edit
Expand All @@ -411,11 +317,11 @@ transmitter(TimeInterval_ms, SourceEtsRef, SourcePid, Epochs ,ClientWorkerPairs,
end,
lists:foreach(FuncStart, ClientWorkerPairs),
TransmissionStart = erlang:timestamp(),
case Method of
?SOURCE_POLICY_CASTING_ATOM -> send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend);
?SOURCE_POLICY_ROUNDROBIN_ATOM -> send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend);
?SOURCE_POLICY_RANDOM_ATOM -> send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend);
_Default -> send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend)
case integer_to_list(Method) of % Method is given as an integer
?SOURCE_POLICY_CASTING_IDX -> sourceSendingPolicies:send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend);
?SOURCE_POLICY_ROUNDROBIN_IDX -> sourceSendingPolicies:send_method_round_robin(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend);
?SOURCE_POLICY_RANDOM_IDX -> sourceSendingPolicies:send_method_random(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend);
_Default -> sourceSendingPolicies:send_method_casting(TransmitterEts, Epochs, TimeInterval_ms, ClientWorkerPairs, BatchesListToSend)
end,
TransmissionTimeTook_sec = timer:now_diff(erlang:timestamp(), TransmissionStart) / 1000000,
% Message to workers : "end_stream"
Expand Down

0 comments on commit 26a6df6

Please sign in to comment.