Debugged status inspection, add spawn/1, spawn_opt/2
This commit is contained in:
@@ -26,6 +26,8 @@ In its initial version, there are a few different APIs:
|
||||
* `gm_ctflow_reg`, used to register and find `gm_ctflow_worker` processes.
|
||||
Complies with the `via` addressing scheme for OTP behaviors.
|
||||
* `gm_ctflow`, providing a shared dictionary, logging and status support.
|
||||
It also exports `spawn/1` and `spawn_opt/2`, which work as expected, but
|
||||
also register with Common Test to enable use of `ct:log/2`.
|
||||
|
||||
The application `gm_ctflow` is intended to be started in
|
||||
`init_per_suite/1` and stopped in `end_per_suite/1`, or in
|
||||
|
||||
+22
-5
@@ -6,10 +6,13 @@
|
||||
, erase/1
|
||||
]).
|
||||
|
||||
-export([note_flow/1,
|
||||
ct_log/2, %% ct_log(Fmt, Args)
|
||||
ct_log/3, %% ct_log(Flow, Fmt, Args)
|
||||
timestamp/0]).
|
||||
-export([ note_flow/1
|
||||
, ct_log/2 %% ct_log(Fmt, Args)
|
||||
, ct_log/3 %% ct_log(Flow, Fmt, Args)
|
||||
, timestamp/0 ]).
|
||||
|
||||
-export([ spawn/1
|
||||
, spawn_opt/2 ]).
|
||||
|
||||
-export([ status/0
|
||||
, summarize_flow/1
|
||||
@@ -106,6 +109,7 @@ summary_({fun_state, S}) -> fw("= Fun State: ~s~n", [maybe_state_str(S)]).
|
||||
|
||||
maybe_state_str({ok, S}) -> fw("~p", [S]);
|
||||
maybe_state_str({error, E}) -> fw("ERROR: ~p", [E]);
|
||||
maybe_state_str(undefined) -> "- none -";
|
||||
maybe_state_str(error) -> "- none -".
|
||||
|
||||
fw(Fmt, Args) -> io_lib:fwrite(Fmt, Args).
|
||||
@@ -114,7 +118,8 @@ try_call(F, Flow) ->
|
||||
try F(Flow)
|
||||
catch
|
||||
exit:E:ST ->
|
||||
ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]);
|
||||
ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]),
|
||||
undefined;
|
||||
error:E:ST ->
|
||||
ct:log("Call to ~p(~p) failed: error:~p:~p", [F, Flow, E, ST]),
|
||||
undefined
|
||||
@@ -129,3 +134,15 @@ timestamp() ->
|
||||
lists:flatten(io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B "
|
||||
"~2.10.0B:~2.10.0B:~2.10.0B.~3.10.0B",
|
||||
[Year,Month,Day,Hour,Min,Sec,MilliSec])).
|
||||
|
||||
spawn(F) ->
|
||||
proc_lib:spawn(fun() ->
|
||||
ct_util:mark_process(),
|
||||
F()
|
||||
end).
|
||||
|
||||
spawn_opt(F, Opts) ->
|
||||
proc_lib:spawn_opt(fun() ->
|
||||
ct_util:mark_process(),
|
||||
F()
|
||||
end, Opts).
|
||||
|
||||
@@ -152,7 +152,6 @@ handle_call(reset, _From, _St) ->
|
||||
handle_call(flows, _From, St) ->
|
||||
{reply, {ok, maps:keys(St)}, St}.
|
||||
|
||||
|
||||
handle_cast(_Mst, St) ->
|
||||
{noreply, St}.
|
||||
|
||||
|
||||
+57
-19
@@ -40,6 +40,8 @@
|
||||
-record(error, { time = gm_ctflow:timestamp() :: string()
|
||||
, error :: any() }).
|
||||
|
||||
-type error_ret() :: {error, any()} | #error{}.
|
||||
|
||||
via(Pid) when is_pid(Pid) ->
|
||||
Pid;
|
||||
via(Flow) ->
|
||||
@@ -76,29 +78,43 @@ cast(Flow, Msg) ->
|
||||
get_state(Flow) ->
|
||||
call_(Flow, get_state, infinity).
|
||||
|
||||
-spec maybe_get_state(flow()) -> {'ok', state() | error_ret()} | error.
|
||||
maybe_get_state(Flow) ->
|
||||
case gm_ctflow_reg:whereis_name(Flow) of
|
||||
undefined ->
|
||||
error;
|
||||
Pid ->
|
||||
call_(Pid, maybe_get_state, infinity)
|
||||
try do_call(Flow, maybe_get_state, 100)
|
||||
catch
|
||||
exit:{timeout,_} ->
|
||||
get_st(Pid)
|
||||
end
|
||||
end.
|
||||
|
||||
-spec get_state(flow(), timeout()) -> state() | no_return().
|
||||
get_state(Flow, Timeout) ->
|
||||
call_(Flow, get_state, Timeout).
|
||||
case do_call(Flow, get_state, Timeout) of
|
||||
{ok, St} -> St;
|
||||
Other ->
|
||||
error(Other)
|
||||
end.
|
||||
|
||||
call_(Flow, Req, Timeout) ->
|
||||
try gen_server:call(via(Flow), Req, Timeout) of
|
||||
try do_call(Flow, Req, Timeout) of
|
||||
#error{} = E ->
|
||||
error(E);
|
||||
{error, Reason} ->
|
||||
error(Reason);
|
||||
Other ->
|
||||
{ok, Other} ->
|
||||
Other
|
||||
catch
|
||||
exit:timeout ->
|
||||
exit:{timeout, _} ->
|
||||
error(timeout)
|
||||
end.
|
||||
|
||||
do_call(Flow, Req, Timeout) ->
|
||||
gen_server:call(via(Flow), Req, Timeout).
|
||||
|
||||
start_link(Flow, InitF) when is_function(InitF, 0) ->
|
||||
gen_server:start_link(via(Flow), ?MODULE, [Flow, InitF], []).
|
||||
|
||||
@@ -108,7 +124,7 @@ init([Flow, InitF]) ->
|
||||
?LOG(Flow, "Initializing flow worker", []),
|
||||
case InitF() of
|
||||
{ok, HandlerFun, St} when is_function(HandlerFun, 3) ->
|
||||
{ok, #st{flow = Flow, f = HandlerFun, f_st = St}};
|
||||
{ok, new_st(#st{flow = Flow, f = HandlerFun, f_st = St})};
|
||||
Other ->
|
||||
{error, Other}
|
||||
end.
|
||||
@@ -116,30 +132,30 @@ init([Flow, InitF]) ->
|
||||
handle_call(Req, _From, #error{} = E) ->
|
||||
case Req of
|
||||
get_state -> {reply, E, E};
|
||||
maybe_get_state -> {reply, {ok, {error, E}}, E};
|
||||
maybe_get_state -> {reply, {ok, E}, E};
|
||||
_ ->
|
||||
{reply, {error, {already_crashed, E}}, E}
|
||||
end;
|
||||
handle_call(get_state, _From, #st{f_st = FSt} = St) ->
|
||||
{reply, FSt, St};
|
||||
{reply, {ok, FSt}, St};
|
||||
handle_call(maybe_get_state, _From, #st{f_st = FSt} = St) ->
|
||||
{reply, {ok, FSt}, St};
|
||||
handle_call({call, Req}, From, #st{flow = Flow, f = F, f_st = FSt} = St) ->
|
||||
try F({call, From}, Req, FSt) of
|
||||
{reply, Res, FSt1} ->
|
||||
?LOG(Flow, "CALL: ~p -> ~p", [Req, Res]),
|
||||
{reply, Res, St#st{f_st = FSt1}};
|
||||
{reply, {ok, Res}, new_st(St#st{f_st = FSt1}, St)};
|
||||
{noreply, FSt1} ->
|
||||
{noreply, {F, FSt1}};
|
||||
{noreply, new_st({F, FSt1}, St)};
|
||||
Other ->
|
||||
E = #error{error = Other},
|
||||
?LOG(Flow, "ERROR CALL: ~p -> ~p", [Req, E]),
|
||||
{reply, E, E}
|
||||
catch
|
||||
{reply, E, new_st(E, St)}
|
||||
catch
|
||||
error:E:ST ->
|
||||
Error = #error{error = {E, ST}},
|
||||
?LOG(Flow, "CAUGHT CALL: ~p -> error:~p:~p", [Req, E, ST]),
|
||||
{reply, Error, Error}
|
||||
{reply, Error, new_st(Error, St)}
|
||||
end.
|
||||
|
||||
handle_cast(_Msg, #error{} = E) ->
|
||||
@@ -148,15 +164,15 @@ handle_cast(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) ->
|
||||
try F(cast, Msg, FSt) of
|
||||
{Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
|
||||
?LOG(Flow, "CAST ~p (~p) -> ~p", [Msg, FSt, Res]),
|
||||
{noreply, St#st{f_st = FSt1}};
|
||||
{noreply, new_st(St#st{f_st = FSt1}, St)};
|
||||
Other ->
|
||||
?LOG(Flow, "ERROR CAST: ~p -> ~p", [Msg, Other]),
|
||||
{noreply, #error{error = Other}}
|
||||
{noreply, new_st(#error{error = Other}, St)}
|
||||
|
||||
catch
|
||||
error:E:ST ->
|
||||
?LOG(Flow, "CAUGHT CAST ~p (~p) : error:~p:~p", [Msg, FSt, E, ST]),
|
||||
{noreply, #error{error = {E, ST}}}
|
||||
{noreply, new_st(#error{error = {E, ST}}, St)}
|
||||
end.
|
||||
|
||||
handle_info(_, #error{} = E) ->
|
||||
@@ -165,14 +181,14 @@ handle_info(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) ->
|
||||
try F(info, Msg, FSt) of
|
||||
{Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
|
||||
?LOG(Flow, "INFO ~p (~p) -> ~p", [Msg, FSt, Res]),
|
||||
{noreply, St#st{f_st = FSt1}};
|
||||
{noreply, new_st(St#st{f_st = FSt1}, St)};
|
||||
Other ->
|
||||
?LOG(Flow, "ERROR MSG: ~p -> ~p", [Msg, Other]),
|
||||
{noreply, #error{error = Other}}
|
||||
{noreply, new_st(#error{error = Other}, St)}
|
||||
catch
|
||||
error:E:ST ->
|
||||
?LOG(Flow, "CAUGHT MSG: ~p, error:~p:~p", [Msg, E, ST]),
|
||||
{noreply, #error{error = {E, ST}}}
|
||||
{noreply, new_st(#error{error = {E, ST}}, St)}
|
||||
end.
|
||||
|
||||
terminate(_, _) ->
|
||||
@@ -180,3 +196,25 @@ terminate(_, _) ->
|
||||
|
||||
code_change(_FromVsn, S, _Extra) ->
|
||||
{ok, S}.
|
||||
|
||||
get_st(Pid) when is_pid(Pid) ->
|
||||
case process_info(Pid, dictionary) of
|
||||
{dictionary, Dict} ->
|
||||
case lists:keyfind({?MODULE, state}, 1, Dict) of
|
||||
{_, St} ->
|
||||
{ok, St};
|
||||
false ->
|
||||
error
|
||||
end;
|
||||
_ ->
|
||||
error
|
||||
end.
|
||||
|
||||
new_st(S) ->
|
||||
put({?MODULE, state}, S),
|
||||
S.
|
||||
|
||||
new_st(S, S) ->
|
||||
S;
|
||||
new_st(NewS, _) ->
|
||||
new_st(NewS).
|
||||
|
||||
Reference in New Issue
Block a user