Merge pull request 'Debugged status inspection, add spawn/1, spawn_opt/2' (#2) from uw-status-inspection into master

Reviewed-on: #2
This commit was merged in pull request #2.
This commit is contained in:
2026-06-04 15:14:07 +09:00
4 changed files with 81 additions and 25 deletions
+2
View File
@@ -26,6 +26,8 @@ In its initial version, there are a few different APIs:
* `gm_ctflow_reg`, used to register and find `gm_ctflow_worker` processes. * `gm_ctflow_reg`, used to register and find `gm_ctflow_worker` processes.
Complies with the `via` addressing scheme for OTP behaviors. Complies with the `via` addressing scheme for OTP behaviors.
* `gm_ctflow`, providing a shared dictionary, logging and status support. * `gm_ctflow`, providing a shared dictionary, logging and status support.
It also exports `spawn/1` and `spawn_opt/2`, which work as expected, but
also register with Common Test to enable use of `ct:log/2`.
The application `gm_ctflow` is intended to be started in The application `gm_ctflow` is intended to be started in
`init_per_suite/1` and stopped in `end_per_suite/1`, or in `init_per_suite/1` and stopped in `end_per_suite/1`, or in
+22 -5
View File
@@ -6,10 +6,13 @@
, erase/1 , erase/1
]). ]).
-export([note_flow/1, -export([ note_flow/1
ct_log/2, %% ct_log(Fmt, Args) , ct_log/2 %% ct_log(Fmt, Args)
ct_log/3, %% ct_log(Flow, Fmt, Args) , ct_log/3 %% ct_log(Flow, Fmt, Args)
timestamp/0]). , timestamp/0 ]).
-export([ spawn/1
, spawn_opt/2 ]).
-export([ status/0 -export([ status/0
, summarize_flow/1 , summarize_flow/1
@@ -106,6 +109,7 @@ summary_({fun_state, S}) -> fw("= Fun State: ~s~n", [maybe_state_str(S)]).
maybe_state_str({ok, S}) -> fw("~p", [S]); maybe_state_str({ok, S}) -> fw("~p", [S]);
maybe_state_str({error, E}) -> fw("ERROR: ~p", [E]); maybe_state_str({error, E}) -> fw("ERROR: ~p", [E]);
maybe_state_str(undefined) -> "- none -";
maybe_state_str(error) -> "- none -". maybe_state_str(error) -> "- none -".
fw(Fmt, Args) -> io_lib:fwrite(Fmt, Args). fw(Fmt, Args) -> io_lib:fwrite(Fmt, Args).
@@ -114,7 +118,8 @@ try_call(F, Flow) ->
try F(Flow) try F(Flow)
catch catch
exit:E:ST -> exit:E:ST ->
ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]); ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]),
undefined;
error:E:ST -> error:E:ST ->
ct:log("Call to ~p(~p) failed: error:~p:~p", [F, Flow, E, ST]), ct:log("Call to ~p(~p) failed: error:~p:~p", [F, Flow, E, ST]),
undefined undefined
@@ -129,3 +134,15 @@ timestamp() ->
lists:flatten(io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B " lists:flatten(io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B "
"~2.10.0B:~2.10.0B:~2.10.0B.~3.10.0B", "~2.10.0B:~2.10.0B:~2.10.0B.~3.10.0B",
[Year,Month,Day,Hour,Min,Sec,MilliSec])). [Year,Month,Day,Hour,Min,Sec,MilliSec])).
spawn(F) ->
proc_lib:spawn(fun() ->
ct_util:mark_process(),
F()
end).
spawn_opt(F, Opts) ->
proc_lib:spawn_opt(fun() ->
ct_util:mark_process(),
F()
end, Opts).
-1
View File
@@ -152,7 +152,6 @@ handle_call(reset, _From, _St) ->
handle_call(flows, _From, St) -> handle_call(flows, _From, St) ->
{reply, {ok, maps:keys(St)}, St}. {reply, {ok, maps:keys(St)}, St}.
handle_cast(_Mst, St) -> handle_cast(_Mst, St) ->
{noreply, St}. {noreply, St}.
+57 -19
View File
@@ -40,6 +40,8 @@
-record(error, { time = gm_ctflow:timestamp() :: string() -record(error, { time = gm_ctflow:timestamp() :: string()
, error :: any() }). , error :: any() }).
-type error_ret() :: {error, any()} | #error{}.
via(Pid) when is_pid(Pid) -> via(Pid) when is_pid(Pid) ->
Pid; Pid;
via(Flow) -> via(Flow) ->
@@ -76,29 +78,43 @@ cast(Flow, Msg) ->
get_state(Flow) -> get_state(Flow) ->
call_(Flow, get_state, infinity). call_(Flow, get_state, infinity).
-spec maybe_get_state(flow()) -> {'ok', state() | error_ret()} | error.
maybe_get_state(Flow) -> maybe_get_state(Flow) ->
case gm_ctflow_reg:whereis_name(Flow) of case gm_ctflow_reg:whereis_name(Flow) of
undefined -> undefined ->
error; error;
Pid -> Pid ->
call_(Pid, maybe_get_state, infinity) try do_call(Flow, maybe_get_state, 100)
catch
exit:{timeout,_} ->
get_st(Pid)
end
end. end.
-spec get_state(flow(), timeout()) -> state() | no_return(). -spec get_state(flow(), timeout()) -> state() | no_return().
get_state(Flow, Timeout) -> get_state(Flow, Timeout) ->
call_(Flow, get_state, Timeout). case do_call(Flow, get_state, Timeout) of
{ok, St} -> St;
Other ->
error(Other)
end.
call_(Flow, Req, Timeout) -> call_(Flow, Req, Timeout) ->
try gen_server:call(via(Flow), Req, Timeout) of try do_call(Flow, Req, Timeout) of
#error{} = E ->
error(E);
{error, Reason} -> {error, Reason} ->
error(Reason); error(Reason);
Other -> {ok, Other} ->
Other Other
catch catch
exit:timeout -> exit:{timeout, _} ->
error(timeout) error(timeout)
end. end.
do_call(Flow, Req, Timeout) ->
gen_server:call(via(Flow), Req, Timeout).
start_link(Flow, InitF) when is_function(InitF, 0) -> start_link(Flow, InitF) when is_function(InitF, 0) ->
gen_server:start_link(via(Flow), ?MODULE, [Flow, InitF], []). gen_server:start_link(via(Flow), ?MODULE, [Flow, InitF], []).
@@ -108,7 +124,7 @@ init([Flow, InitF]) ->
?LOG(Flow, "Initializing flow worker", []), ?LOG(Flow, "Initializing flow worker", []),
case InitF() of case InitF() of
{ok, HandlerFun, St} when is_function(HandlerFun, 3) -> {ok, HandlerFun, St} when is_function(HandlerFun, 3) ->
{ok, #st{flow = Flow, f = HandlerFun, f_st = St}}; {ok, new_st(#st{flow = Flow, f = HandlerFun, f_st = St})};
Other -> Other ->
{error, Other} {error, Other}
end. end.
@@ -116,30 +132,30 @@ init([Flow, InitF]) ->
handle_call(Req, _From, #error{} = E) -> handle_call(Req, _From, #error{} = E) ->
case Req of case Req of
get_state -> {reply, E, E}; get_state -> {reply, E, E};
maybe_get_state -> {reply, {ok, {error, E}}, E}; maybe_get_state -> {reply, {ok, E}, E};
_ -> _ ->
{reply, {error, {already_crashed, E}}, E} {reply, {error, {already_crashed, E}}, E}
end; end;
handle_call(get_state, _From, #st{f_st = FSt} = St) -> handle_call(get_state, _From, #st{f_st = FSt} = St) ->
{reply, FSt, St}; {reply, {ok, FSt}, St};
handle_call(maybe_get_state, _From, #st{f_st = FSt} = St) -> handle_call(maybe_get_state, _From, #st{f_st = FSt} = St) ->
{reply, {ok, FSt}, St}; {reply, {ok, FSt}, St};
handle_call({call, Req}, From, #st{flow = Flow, f = F, f_st = FSt} = St) -> handle_call({call, Req}, From, #st{flow = Flow, f = F, f_st = FSt} = St) ->
try F({call, From}, Req, FSt) of try F({call, From}, Req, FSt) of
{reply, Res, FSt1} -> {reply, Res, FSt1} ->
?LOG(Flow, "CALL: ~p -> ~p", [Req, Res]), ?LOG(Flow, "CALL: ~p -> ~p", [Req, Res]),
{reply, Res, St#st{f_st = FSt1}}; {reply, {ok, Res}, new_st(St#st{f_st = FSt1}, St)};
{noreply, FSt1} -> {noreply, FSt1} ->
{noreply, {F, FSt1}}; {noreply, new_st({F, FSt1}, St)};
Other -> Other ->
E = #error{error = Other}, E = #error{error = Other},
?LOG(Flow, "ERROR CALL: ~p -> ~p", [Req, E]), ?LOG(Flow, "ERROR CALL: ~p -> ~p", [Req, E]),
{reply, E, E} {reply, E, new_st(E, St)}
catch catch
error:E:ST -> error:E:ST ->
Error = #error{error = {E, ST}}, Error = #error{error = {E, ST}},
?LOG(Flow, "CAUGHT CALL: ~p -> error:~p:~p", [Req, E, ST]), ?LOG(Flow, "CAUGHT CALL: ~p -> error:~p:~p", [Req, E, ST]),
{reply, Error, Error} {reply, Error, new_st(Error, St)}
end. end.
handle_cast(_Msg, #error{} = E) -> handle_cast(_Msg, #error{} = E) ->
@@ -148,15 +164,15 @@ handle_cast(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) ->
try F(cast, Msg, FSt) of try F(cast, Msg, FSt) of
{Ok, FSt1} = Res when Ok == ok; Ok == noreply -> {Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
?LOG(Flow, "CAST ~p (~p) -> ~p", [Msg, FSt, Res]), ?LOG(Flow, "CAST ~p (~p) -> ~p", [Msg, FSt, Res]),
{noreply, St#st{f_st = FSt1}}; {noreply, new_st(St#st{f_st = FSt1}, St)};
Other -> Other ->
?LOG(Flow, "ERROR CAST: ~p -> ~p", [Msg, Other]), ?LOG(Flow, "ERROR CAST: ~p -> ~p", [Msg, Other]),
{noreply, #error{error = Other}} {noreply, new_st(#error{error = Other}, St)}
catch catch
error:E:ST -> error:E:ST ->
?LOG(Flow, "CAUGHT CAST ~p (~p) : error:~p:~p", [Msg, FSt, E, ST]), ?LOG(Flow, "CAUGHT CAST ~p (~p) : error:~p:~p", [Msg, FSt, E, ST]),
{noreply, #error{error = {E, ST}}} {noreply, new_st(#error{error = {E, ST}}, St)}
end. end.
handle_info(_, #error{} = E) -> handle_info(_, #error{} = E) ->
@@ -165,14 +181,14 @@ handle_info(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) ->
try F(info, Msg, FSt) of try F(info, Msg, FSt) of
{Ok, FSt1} = Res when Ok == ok; Ok == noreply -> {Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
?LOG(Flow, "INFO ~p (~p) -> ~p", [Msg, FSt, Res]), ?LOG(Flow, "INFO ~p (~p) -> ~p", [Msg, FSt, Res]),
{noreply, St#st{f_st = FSt1}}; {noreply, new_st(St#st{f_st = FSt1}, St)};
Other -> Other ->
?LOG(Flow, "ERROR MSG: ~p -> ~p", [Msg, Other]), ?LOG(Flow, "ERROR MSG: ~p -> ~p", [Msg, Other]),
{noreply, #error{error = Other}} {noreply, new_st(#error{error = Other}, St)}
catch catch
error:E:ST -> error:E:ST ->
?LOG(Flow, "CAUGHT MSG: ~p, error:~p:~p", [Msg, E, ST]), ?LOG(Flow, "CAUGHT MSG: ~p, error:~p:~p", [Msg, E, ST]),
{noreply, #error{error = {E, ST}}} {noreply, new_st(#error{error = {E, ST}}, St)}
end. end.
terminate(_, _) -> terminate(_, _) ->
@@ -180,3 +196,25 @@ terminate(_, _) ->
code_change(_FromVsn, S, _Extra) -> code_change(_FromVsn, S, _Extra) ->
{ok, S}. {ok, S}.
get_st(Pid) when is_pid(Pid) ->
case process_info(Pid, dictionary) of
{dictionary, Dict} ->
case lists:keyfind({?MODULE, state}, 1, Dict) of
{_, St} ->
{ok, St};
false ->
error
end;
_ ->
error
end.
new_st(S) ->
put({?MODULE, state}, S),
S.
new_st(S, S) ->
S;
new_st(NewS, _) ->
new_st(NewS).