From 03ec6669985b7d43264f7ebcd73d09e24a34b954 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Thu, 4 Jun 2026 08:12:09 +0200 Subject: [PATCH] Debugged status inspection, add spawn/1, spawn_opt/2 --- README.md | 2 ++ src/gm_ctflow.erl | 27 +++++++++++--- src/gm_ctflow_server.erl | 1 - src/gm_ctflow_worker.erl | 76 ++++++++++++++++++++++++++++++---------- 4 files changed, 81 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 1edc3ba..3e1c4fc 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,8 @@ In its initial version, there are a few different APIs: * `gm_ctflow_reg`, used to register and find `gm_ctflow_worker` processes. Complies with the `via` addressing scheme for OTP behaviors. * `gm_ctflow`, providing a shared dictionary, logging and status support. + It also exports `spawn/1` and `spawn_opt/2`, which work as expected, but + also register with Common Test to enable use of `ct:log/2`. The application `gm_ctflow` is intended to be started in `init_per_suite/1` and stopped in `end_per_suite/1`, or in diff --git a/src/gm_ctflow.erl b/src/gm_ctflow.erl index 46204f2..a010875 100644 --- a/src/gm_ctflow.erl +++ b/src/gm_ctflow.erl @@ -6,10 +6,13 @@ , erase/1 ]). --export([note_flow/1, - ct_log/2, %% ct_log(Fmt, Args) - ct_log/3, %% ct_log(Flow, Fmt, Args) - timestamp/0]). +-export([ note_flow/1 + , ct_log/2 %% ct_log(Fmt, Args) + , ct_log/3 %% ct_log(Flow, Fmt, Args) + , timestamp/0 ]). + +-export([ spawn/1 + , spawn_opt/2 ]). -export([ status/0 , summarize_flow/1 @@ -106,6 +109,7 @@ summary_({fun_state, S}) -> fw("= Fun State: ~s~n", [maybe_state_str(S)]). maybe_state_str({ok, S}) -> fw("~p", [S]); maybe_state_str({error, E}) -> fw("ERROR: ~p", [E]); +maybe_state_str(undefined) -> "- none -"; maybe_state_str(error) -> "- none -". fw(Fmt, Args) -> io_lib:fwrite(Fmt, Args). @@ -114,7 +118,8 @@ try_call(F, Flow) -> try F(Flow) catch exit:E:ST -> - ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]); + ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]), + undefined; error:E:ST -> ct:log("Call to ~p(~p) failed: error:~p:~p", [F, Flow, E, ST]), undefined @@ -129,3 +134,15 @@ timestamp() -> lists:flatten(io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B " "~2.10.0B:~2.10.0B:~2.10.0B.~3.10.0B", [Year,Month,Day,Hour,Min,Sec,MilliSec])). + +spawn(F) -> + proc_lib:spawn(fun() -> + ct_util:mark_process(), + F() + end). + +spawn_opt(F, Opts) -> + proc_lib:spawn_opt(fun() -> + ct_util:mark_process(), + F() + end, Opts). diff --git a/src/gm_ctflow_server.erl b/src/gm_ctflow_server.erl index 60e16c9..5361faa 100644 --- a/src/gm_ctflow_server.erl +++ b/src/gm_ctflow_server.erl @@ -152,7 +152,6 @@ handle_call(reset, _From, _St) -> handle_call(flows, _From, St) -> {reply, {ok, maps:keys(St)}, St}. - handle_cast(_Mst, St) -> {noreply, St}. diff --git a/src/gm_ctflow_worker.erl b/src/gm_ctflow_worker.erl index 3a4f19e..7775c47 100644 --- a/src/gm_ctflow_worker.erl +++ b/src/gm_ctflow_worker.erl @@ -40,6 +40,8 @@ -record(error, { time = gm_ctflow:timestamp() :: string() , error :: any() }). +-type error_ret() :: {error, any()} | #error{}. + via(Pid) when is_pid(Pid) -> Pid; via(Flow) -> @@ -76,29 +78,43 @@ cast(Flow, Msg) -> get_state(Flow) -> call_(Flow, get_state, infinity). +-spec maybe_get_state(flow()) -> {'ok', state() | error_ret()} | error. maybe_get_state(Flow) -> case gm_ctflow_reg:whereis_name(Flow) of undefined -> error; Pid -> - call_(Pid, maybe_get_state, infinity) + try do_call(Flow, maybe_get_state, 100) + catch + exit:{timeout,_} -> + get_st(Pid) + end end. -spec get_state(flow(), timeout()) -> state() | no_return(). get_state(Flow, Timeout) -> - call_(Flow, get_state, Timeout). + case do_call(Flow, get_state, Timeout) of + {ok, St} -> St; + Other -> + error(Other) + end. call_(Flow, Req, Timeout) -> - try gen_server:call(via(Flow), Req, Timeout) of + try do_call(Flow, Req, Timeout) of + #error{} = E -> + error(E); {error, Reason} -> error(Reason); - Other -> + {ok, Other} -> Other catch - exit:timeout -> + exit:{timeout, _} -> error(timeout) end. +do_call(Flow, Req, Timeout) -> + gen_server:call(via(Flow), Req, Timeout). + start_link(Flow, InitF) when is_function(InitF, 0) -> gen_server:start_link(via(Flow), ?MODULE, [Flow, InitF], []). @@ -108,7 +124,7 @@ init([Flow, InitF]) -> ?LOG(Flow, "Initializing flow worker", []), case InitF() of {ok, HandlerFun, St} when is_function(HandlerFun, 3) -> - {ok, #st{flow = Flow, f = HandlerFun, f_st = St}}; + {ok, new_st(#st{flow = Flow, f = HandlerFun, f_st = St})}; Other -> {error, Other} end. @@ -116,30 +132,30 @@ init([Flow, InitF]) -> handle_call(Req, _From, #error{} = E) -> case Req of get_state -> {reply, E, E}; - maybe_get_state -> {reply, {ok, {error, E}}, E}; + maybe_get_state -> {reply, {ok, E}, E}; _ -> {reply, {error, {already_crashed, E}}, E} end; handle_call(get_state, _From, #st{f_st = FSt} = St) -> - {reply, FSt, St}; + {reply, {ok, FSt}, St}; handle_call(maybe_get_state, _From, #st{f_st = FSt} = St) -> {reply, {ok, FSt}, St}; handle_call({call, Req}, From, #st{flow = Flow, f = F, f_st = FSt} = St) -> try F({call, From}, Req, FSt) of {reply, Res, FSt1} -> ?LOG(Flow, "CALL: ~p -> ~p", [Req, Res]), - {reply, Res, St#st{f_st = FSt1}}; + {reply, {ok, Res}, new_st(St#st{f_st = FSt1}, St)}; {noreply, FSt1} -> - {noreply, {F, FSt1}}; + {noreply, new_st({F, FSt1}, St)}; Other -> E = #error{error = Other}, ?LOG(Flow, "ERROR CALL: ~p -> ~p", [Req, E]), - {reply, E, E} - catch + {reply, E, new_st(E, St)} + catch error:E:ST -> Error = #error{error = {E, ST}}, ?LOG(Flow, "CAUGHT CALL: ~p -> error:~p:~p", [Req, E, ST]), - {reply, Error, Error} + {reply, Error, new_st(Error, St)} end. handle_cast(_Msg, #error{} = E) -> @@ -148,15 +164,15 @@ handle_cast(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) -> try F(cast, Msg, FSt) of {Ok, FSt1} = Res when Ok == ok; Ok == noreply -> ?LOG(Flow, "CAST ~p (~p) -> ~p", [Msg, FSt, Res]), - {noreply, St#st{f_st = FSt1}}; + {noreply, new_st(St#st{f_st = FSt1}, St)}; Other -> ?LOG(Flow, "ERROR CAST: ~p -> ~p", [Msg, Other]), - {noreply, #error{error = Other}} + {noreply, new_st(#error{error = Other}, St)} catch error:E:ST -> ?LOG(Flow, "CAUGHT CAST ~p (~p) : error:~p:~p", [Msg, FSt, E, ST]), - {noreply, #error{error = {E, ST}}} + {noreply, new_st(#error{error = {E, ST}}, St)} end. handle_info(_, #error{} = E) -> @@ -165,14 +181,14 @@ handle_info(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) -> try F(info, Msg, FSt) of {Ok, FSt1} = Res when Ok == ok; Ok == noreply -> ?LOG(Flow, "INFO ~p (~p) -> ~p", [Msg, FSt, Res]), - {noreply, St#st{f_st = FSt1}}; + {noreply, new_st(St#st{f_st = FSt1}, St)}; Other -> ?LOG(Flow, "ERROR MSG: ~p -> ~p", [Msg, Other]), - {noreply, #error{error = Other}} + {noreply, new_st(#error{error = Other}, St)} catch error:E:ST -> ?LOG(Flow, "CAUGHT MSG: ~p, error:~p:~p", [Msg, E, ST]), - {noreply, #error{error = {E, ST}}} + {noreply, new_st(#error{error = {E, ST}}, St)} end. terminate(_, _) -> @@ -180,3 +196,25 @@ terminate(_, _) -> code_change(_FromVsn, S, _Extra) -> {ok, S}. + +get_st(Pid) when is_pid(Pid) -> + case process_info(Pid, dictionary) of + {dictionary, Dict} -> + case lists:keyfind({?MODULE, state}, 1, Dict) of + {_, St} -> + {ok, St}; + false -> + error + end; + _ -> + error + end. + +new_st(S) -> + put({?MODULE, state}, S), + S. + +new_st(S, S) -> + S; +new_st(NewS, _) -> + new_st(NewS).