Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e8233561c6 | |||
| 03ec666998 | |||
| b05895f14f |
@@ -26,6 +26,8 @@ In its initial version, there are a few different APIs:
|
|||||||
* `gm_ctflow_reg`, used to register and find `gm_ctflow_worker` processes.
|
* `gm_ctflow_reg`, used to register and find `gm_ctflow_worker` processes.
|
||||||
Complies with the `via` addressing scheme for OTP behaviors.
|
Complies with the `via` addressing scheme for OTP behaviors.
|
||||||
* `gm_ctflow`, providing a shared dictionary, logging and status support.
|
* `gm_ctflow`, providing a shared dictionary, logging and status support.
|
||||||
|
It also exports `spawn/1` and `spawn_opt/2`, which work as expected, but
|
||||||
|
also register with Common Test to enable use of `ct:log/2`.
|
||||||
|
|
||||||
The application `gm_ctflow` is intended to be started in
|
The application `gm_ctflow` is intended to be started in
|
||||||
`init_per_suite/1` and stopped in `end_per_suite/1`, or in
|
`init_per_suite/1` and stopped in `end_per_suite/1`, or in
|
||||||
|
|||||||
+22
-5
@@ -6,10 +6,13 @@
|
|||||||
, erase/1
|
, erase/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([note_flow/1,
|
-export([ note_flow/1
|
||||||
ct_log/2, %% ct_log(Fmt, Args)
|
, ct_log/2 %% ct_log(Fmt, Args)
|
||||||
ct_log/3, %% ct_log(Flow, Fmt, Args)
|
, ct_log/3 %% ct_log(Flow, Fmt, Args)
|
||||||
timestamp/0]).
|
, timestamp/0 ]).
|
||||||
|
|
||||||
|
-export([ spawn/1
|
||||||
|
, spawn_opt/2 ]).
|
||||||
|
|
||||||
-export([ status/0
|
-export([ status/0
|
||||||
, summarize_flow/1
|
, summarize_flow/1
|
||||||
@@ -106,6 +109,7 @@ summary_({fun_state, S}) -> fw("= Fun State: ~s~n", [maybe_state_str(S)]).
|
|||||||
|
|
||||||
maybe_state_str({ok, S}) -> fw("~p", [S]);
|
maybe_state_str({ok, S}) -> fw("~p", [S]);
|
||||||
maybe_state_str({error, E}) -> fw("ERROR: ~p", [E]);
|
maybe_state_str({error, E}) -> fw("ERROR: ~p", [E]);
|
||||||
|
maybe_state_str(undefined) -> "- none -";
|
||||||
maybe_state_str(error) -> "- none -".
|
maybe_state_str(error) -> "- none -".
|
||||||
|
|
||||||
fw(Fmt, Args) -> io_lib:fwrite(Fmt, Args).
|
fw(Fmt, Args) -> io_lib:fwrite(Fmt, Args).
|
||||||
@@ -114,7 +118,8 @@ try_call(F, Flow) ->
|
|||||||
try F(Flow)
|
try F(Flow)
|
||||||
catch
|
catch
|
||||||
exit:E:ST ->
|
exit:E:ST ->
|
||||||
ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]);
|
ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]),
|
||||||
|
undefined;
|
||||||
error:E:ST ->
|
error:E:ST ->
|
||||||
ct:log("Call to ~p(~p) failed: error:~p:~p", [F, Flow, E, ST]),
|
ct:log("Call to ~p(~p) failed: error:~p:~p", [F, Flow, E, ST]),
|
||||||
undefined
|
undefined
|
||||||
@@ -129,3 +134,15 @@ timestamp() ->
|
|||||||
lists:flatten(io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B "
|
lists:flatten(io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B "
|
||||||
"~2.10.0B:~2.10.0B:~2.10.0B.~3.10.0B",
|
"~2.10.0B:~2.10.0B:~2.10.0B.~3.10.0B",
|
||||||
[Year,Month,Day,Hour,Min,Sec,MilliSec])).
|
[Year,Month,Day,Hour,Min,Sec,MilliSec])).
|
||||||
|
|
||||||
|
spawn(F) ->
|
||||||
|
proc_lib:spawn(fun() ->
|
||||||
|
ct_util:mark_process(),
|
||||||
|
F()
|
||||||
|
end).
|
||||||
|
|
||||||
|
spawn_opt(F, Opts) ->
|
||||||
|
proc_lib:spawn_opt(fun() ->
|
||||||
|
ct_util:mark_process(),
|
||||||
|
F()
|
||||||
|
end, Opts).
|
||||||
|
|||||||
@@ -152,7 +152,6 @@ handle_call(reset, _From, _St) ->
|
|||||||
handle_call(flows, _From, St) ->
|
handle_call(flows, _From, St) ->
|
||||||
{reply, {ok, maps:keys(St)}, St}.
|
{reply, {ok, maps:keys(St)}, St}.
|
||||||
|
|
||||||
|
|
||||||
handle_cast(_Mst, St) ->
|
handle_cast(_Mst, St) ->
|
||||||
{noreply, St}.
|
{noreply, St}.
|
||||||
|
|
||||||
|
|||||||
+56
-18
@@ -40,6 +40,8 @@
|
|||||||
-record(error, { time = gm_ctflow:timestamp() :: string()
|
-record(error, { time = gm_ctflow:timestamp() :: string()
|
||||||
, error :: any() }).
|
, error :: any() }).
|
||||||
|
|
||||||
|
-type error_ret() :: {error, any()} | #error{}.
|
||||||
|
|
||||||
via(Pid) when is_pid(Pid) ->
|
via(Pid) when is_pid(Pid) ->
|
||||||
Pid;
|
Pid;
|
||||||
via(Flow) ->
|
via(Flow) ->
|
||||||
@@ -76,29 +78,43 @@ cast(Flow, Msg) ->
|
|||||||
get_state(Flow) ->
|
get_state(Flow) ->
|
||||||
call_(Flow, get_state, infinity).
|
call_(Flow, get_state, infinity).
|
||||||
|
|
||||||
|
-spec maybe_get_state(flow()) -> {'ok', state() | error_ret()} | error.
|
||||||
maybe_get_state(Flow) ->
|
maybe_get_state(Flow) ->
|
||||||
case gm_ctflow_reg:whereis_name(Flow) of
|
case gm_ctflow_reg:whereis_name(Flow) of
|
||||||
undefined ->
|
undefined ->
|
||||||
error;
|
error;
|
||||||
Pid ->
|
Pid ->
|
||||||
call_(Pid, maybe_get_state, infinity)
|
try do_call(Flow, maybe_get_state, 100)
|
||||||
|
catch
|
||||||
|
exit:{timeout,_} ->
|
||||||
|
get_st(Pid)
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec get_state(flow(), timeout()) -> state() | no_return().
|
-spec get_state(flow(), timeout()) -> state() | no_return().
|
||||||
get_state(Flow, Timeout) ->
|
get_state(Flow, Timeout) ->
|
||||||
call_(Flow, get_state, Timeout).
|
case do_call(Flow, get_state, Timeout) of
|
||||||
|
{ok, St} -> St;
|
||||||
|
Other ->
|
||||||
|
error(Other)
|
||||||
|
end.
|
||||||
|
|
||||||
call_(Flow, Req, Timeout) ->
|
call_(Flow, Req, Timeout) ->
|
||||||
try gen_server:call(via(Flow), Req, Timeout) of
|
try do_call(Flow, Req, Timeout) of
|
||||||
|
#error{} = E ->
|
||||||
|
error(E);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
error(Reason);
|
error(Reason);
|
||||||
Other ->
|
{ok, Other} ->
|
||||||
Other
|
Other
|
||||||
catch
|
catch
|
||||||
exit:timeout ->
|
exit:{timeout, _} ->
|
||||||
error(timeout)
|
error(timeout)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_call(Flow, Req, Timeout) ->
|
||||||
|
gen_server:call(via(Flow), Req, Timeout).
|
||||||
|
|
||||||
start_link(Flow, InitF) when is_function(InitF, 0) ->
|
start_link(Flow, InitF) when is_function(InitF, 0) ->
|
||||||
gen_server:start_link(via(Flow), ?MODULE, [Flow, InitF], []).
|
gen_server:start_link(via(Flow), ?MODULE, [Flow, InitF], []).
|
||||||
|
|
||||||
@@ -108,7 +124,7 @@ init([Flow, InitF]) ->
|
|||||||
?LOG(Flow, "Initializing flow worker", []),
|
?LOG(Flow, "Initializing flow worker", []),
|
||||||
case InitF() of
|
case InitF() of
|
||||||
{ok, HandlerFun, St} when is_function(HandlerFun, 3) ->
|
{ok, HandlerFun, St} when is_function(HandlerFun, 3) ->
|
||||||
{ok, #st{flow = Flow, f = HandlerFun, f_st = St}};
|
{ok, new_st(#st{flow = Flow, f = HandlerFun, f_st = St})};
|
||||||
Other ->
|
Other ->
|
||||||
{error, Other}
|
{error, Other}
|
||||||
end.
|
end.
|
||||||
@@ -116,30 +132,30 @@ init([Flow, InitF]) ->
|
|||||||
handle_call(Req, _From, #error{} = E) ->
|
handle_call(Req, _From, #error{} = E) ->
|
||||||
case Req of
|
case Req of
|
||||||
get_state -> {reply, E, E};
|
get_state -> {reply, E, E};
|
||||||
maybe_get_state -> {reply, {ok, {error, E}}, E};
|
maybe_get_state -> {reply, {ok, E}, E};
|
||||||
_ ->
|
_ ->
|
||||||
{reply, {error, {already_crashed, E}}, E}
|
{reply, {error, {already_crashed, E}}, E}
|
||||||
end;
|
end;
|
||||||
handle_call(get_state, _From, #st{f_st = FSt} = St) ->
|
handle_call(get_state, _From, #st{f_st = FSt} = St) ->
|
||||||
{reply, FSt, St};
|
{reply, {ok, FSt}, St};
|
||||||
handle_call(maybe_get_state, _From, #st{f_st = FSt} = St) ->
|
handle_call(maybe_get_state, _From, #st{f_st = FSt} = St) ->
|
||||||
{reply, {ok, FSt}, St};
|
{reply, {ok, FSt}, St};
|
||||||
handle_call({call, Req}, From, #st{flow = Flow, f = F, f_st = FSt} = St) ->
|
handle_call({call, Req}, From, #st{flow = Flow, f = F, f_st = FSt} = St) ->
|
||||||
try F({call, From}, Req, FSt) of
|
try F({call, From}, Req, FSt) of
|
||||||
{reply, Res, FSt1} ->
|
{reply, Res, FSt1} ->
|
||||||
?LOG(Flow, "CALL: ~p -> ~p", [Req, Res]),
|
?LOG(Flow, "CALL: ~p -> ~p", [Req, Res]),
|
||||||
{reply, Res, St#st{f_st = FSt1}};
|
{reply, {ok, Res}, new_st(St#st{f_st = FSt1}, St)};
|
||||||
{noreply, FSt1} ->
|
{noreply, FSt1} ->
|
||||||
{noreply, {F, FSt1}};
|
{noreply, new_st({F, FSt1}, St)};
|
||||||
Other ->
|
Other ->
|
||||||
E = #error{error = Other},
|
E = #error{error = Other},
|
||||||
?LOG(Flow, "ERROR CALL: ~p -> ~p", [Req, E]),
|
?LOG(Flow, "ERROR CALL: ~p -> ~p", [Req, E]),
|
||||||
{reply, E, E}
|
{reply, E, new_st(E, St)}
|
||||||
catch
|
catch
|
||||||
error:E:ST ->
|
error:E:ST ->
|
||||||
Error = #error{error = {E, ST}},
|
Error = #error{error = {E, ST}},
|
||||||
?LOG(Flow, "CAUGHT CALL: ~p -> error:~p:~p", [Req, E, ST]),
|
?LOG(Flow, "CAUGHT CALL: ~p -> error:~p:~p", [Req, E, ST]),
|
||||||
{reply, Error, Error}
|
{reply, Error, new_st(Error, St)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_cast(_Msg, #error{} = E) ->
|
handle_cast(_Msg, #error{} = E) ->
|
||||||
@@ -148,15 +164,15 @@ handle_cast(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) ->
|
|||||||
try F(cast, Msg, FSt) of
|
try F(cast, Msg, FSt) of
|
||||||
{Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
|
{Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
|
||||||
?LOG(Flow, "CAST ~p (~p) -> ~p", [Msg, FSt, Res]),
|
?LOG(Flow, "CAST ~p (~p) -> ~p", [Msg, FSt, Res]),
|
||||||
{noreply, St#st{f_st = FSt1}};
|
{noreply, new_st(St#st{f_st = FSt1}, St)};
|
||||||
Other ->
|
Other ->
|
||||||
?LOG(Flow, "ERROR CAST: ~p -> ~p", [Msg, Other]),
|
?LOG(Flow, "ERROR CAST: ~p -> ~p", [Msg, Other]),
|
||||||
{noreply, #error{error = Other}}
|
{noreply, new_st(#error{error = Other}, St)}
|
||||||
|
|
||||||
catch
|
catch
|
||||||
error:E:ST ->
|
error:E:ST ->
|
||||||
?LOG(Flow, "CAUGHT CAST ~p (~p) : error:~p:~p", [Msg, FSt, E, ST]),
|
?LOG(Flow, "CAUGHT CAST ~p (~p) : error:~p:~p", [Msg, FSt, E, ST]),
|
||||||
{noreply, #error{error = {E, ST}}}
|
{noreply, new_st(#error{error = {E, ST}}, St)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_info(_, #error{} = E) ->
|
handle_info(_, #error{} = E) ->
|
||||||
@@ -165,14 +181,14 @@ handle_info(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) ->
|
|||||||
try F(info, Msg, FSt) of
|
try F(info, Msg, FSt) of
|
||||||
{Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
|
{Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
|
||||||
?LOG(Flow, "INFO ~p (~p) -> ~p", [Msg, FSt, Res]),
|
?LOG(Flow, "INFO ~p (~p) -> ~p", [Msg, FSt, Res]),
|
||||||
{noreply, St#st{f_st = FSt1}};
|
{noreply, new_st(St#st{f_st = FSt1}, St)};
|
||||||
Other ->
|
Other ->
|
||||||
?LOG(Flow, "ERROR MSG: ~p -> ~p", [Msg, Other]),
|
?LOG(Flow, "ERROR MSG: ~p -> ~p", [Msg, Other]),
|
||||||
{noreply, #error{error = Other}}
|
{noreply, new_st(#error{error = Other}, St)}
|
||||||
catch
|
catch
|
||||||
error:E:ST ->
|
error:E:ST ->
|
||||||
?LOG(Flow, "CAUGHT MSG: ~p, error:~p:~p", [Msg, E, ST]),
|
?LOG(Flow, "CAUGHT MSG: ~p, error:~p:~p", [Msg, E, ST]),
|
||||||
{noreply, #error{error = {E, ST}}}
|
{noreply, new_st(#error{error = {E, ST}}, St)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
terminate(_, _) ->
|
terminate(_, _) ->
|
||||||
@@ -180,3 +196,25 @@ terminate(_, _) ->
|
|||||||
|
|
||||||
code_change(_FromVsn, S, _Extra) ->
|
code_change(_FromVsn, S, _Extra) ->
|
||||||
{ok, S}.
|
{ok, S}.
|
||||||
|
|
||||||
|
get_st(Pid) when is_pid(Pid) ->
|
||||||
|
case process_info(Pid, dictionary) of
|
||||||
|
{dictionary, Dict} ->
|
||||||
|
case lists:keyfind({?MODULE, state}, 1, Dict) of
|
||||||
|
{_, St} ->
|
||||||
|
{ok, St};
|
||||||
|
false ->
|
||||||
|
error
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
error
|
||||||
|
end.
|
||||||
|
|
||||||
|
new_st(S) ->
|
||||||
|
put({?MODULE, state}, S),
|
||||||
|
S.
|
||||||
|
|
||||||
|
new_st(S, S) ->
|
||||||
|
S;
|
||||||
|
new_st(NewS, _) ->
|
||||||
|
new_st(NewS).
|
||||||
|
|||||||
Reference in New Issue
Block a user