Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
GuyPerets106 authored and leondavi committed Jun 24, 2024
1 parent 0be393d commit 1cee08b
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 10 deletions.
12 changes: 3 additions & 9 deletions src_erl/NerlnetApp/src/Bridge/onnWorkers/workerGeneric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ wait(cast, {predictRes, PredNerlTensor, PredNerlTensorType, TimeNif, BatchID , S

wait(cast, {end_stream , StreamName}, State = #workerGeneric_state{myName = MyName, distributedBehaviorFunc = DistributedBehaviorFunc}) ->
%logger:notice("Waiting, next state - idle"),
io:format("@~p got end_stream from ~p while being at state wait~n",[MyName, StreamName]),
Func = fun() -> io:format("HEY~n"), stream_handler(end_stream, wait, StreamName, DistributedBehaviorFunc) end,
Func = fun() -> stream_handler(end_stream, wait, StreamName, DistributedBehaviorFunc) end,
{next_state, wait, State#workerGeneric_state{postBatchFunc = Func}};

wait(cast, {post_train_update, Data}, State = #workerGeneric_state{myName = MyName, distributedBehaviorFunc = DistributedBehaviorFunc, postBatchFunc = PostBatchFunc}) ->
Expand Down Expand Up @@ -275,7 +274,6 @@ train(cast, {start_stream , StreamName}, State = #workerGeneric_state{myName = _
{next_state, train, State};

train(cast, {end_stream , StreamName}, State = #workerGeneric_state{myName = MyName , distributedBehaviorFunc = DistributedBehaviorFunc}) ->
io:format("@~p got end_stream from ~p while being at state train~n",[MyName, StreamName]),
stream_handler(end_stream, train, StreamName, DistributedBehaviorFunc),
{next_state, train, State};

Expand Down Expand Up @@ -331,15 +329,11 @@ update_client_avilable_worker(MyName) ->
stream_handler(StreamPhase , ModelPhase , StreamName , DistributedBehaviorFunc) ->
GenWorkerEts = get(generic_worker_ets),
MyName = ets:lookup_element(GenWorkerEts, worker_name, ?ETS_KEYVAL_VAL_IDX),
case ModelPhase of
wait -> io:format("@~p got ~p from ~p~n",[MyName, StreamPhase, StreamName]);
_ -> ok
end,
ActiveStreams = ets:lookup_element(GenWorkerEts, active_streams, ?ETS_KEYVAL_VAL_IDX),
NewActiveStreams =
case StreamPhase of
start_stream -> io:format("@~p Adding ~p ,NewActiveStreams: ~p~n",[MyName, StreamName, ActiveStreams ++ [StreamName]]), ActiveStreams ++ [StreamName];
end_stream -> io:format("@~p Removing ~p ,NewActiveStreams: ~p~n",[MyName, StreamName, ActiveStreams -- [StreamName]]), ActiveStreams -- [StreamName]
start_stream -> ActiveStreams ++ [StreamName];
end_stream -> ActiveStreams -- [StreamName]
end,
ets:update_element(GenWorkerEts, active_streams, {?ETS_KEYVAL_VAL_IDX, NewActiveStreams}),
DistributedBehaviorFunc(StreamPhase, {GenWorkerEts, [StreamName , ModelPhase]}),
Expand Down
1 change: 0 additions & 1 deletion src_erl/NerlnetApp/src/Client/clientStatem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ training(cast, In = {start_stream , Data}, State = #client_statem_state{etsRef =

training(cast, In = {end_stream , Data}, State = #client_statem_state{etsRef = EtsRef}) ->
{SourceName, ClientName, WorkerName} = binary_to_term(Data),
io:format("Client ~p got end_stream from source ~p destined for ~p~n",[ClientName, SourceName, WorkerName]),
ClientStatsEts = get(client_stats_ets),
stats:increment_messages_received(ClientStatsEts),
stats:increment_bytes_received(ClientStatsEts , nerl_tools:calculate_size(In)),
Expand Down

0 comments on commit 1cee08b

Please sign in to comment.