From d7b052a07685bf1e1c69a76497ec675b40f16f4a Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Sat, 30 May 2026 13:15:17 +0200 Subject: [PATCH] Debugging, improved logging, etc. --- README.md | 59 ++++++++++++++- include/gm_ctflow.hrl | 3 + rebar.config | 6 ++ src/gm_ctflow.erl | 101 ++++++++++++++++++++++++++ src/gm_ctflow_app.erl | 2 + src/gm_ctflow_fun.erl | 17 +++++ src/gm_ctflow_log.erl | 31 ++++++++ src/gm_ctflow_reg.erl | 117 ++++++++++++++++++++++++++++++ src/gm_ctflow_server.erl | 59 ++++++++++++++- src/gm_ctflow_state.erl | 14 +++- src/gm_ctflow_sup.erl | 3 +- src/gm_ctflow_worker.erl | 137 ++++++++++++++++++++--------------- src/gm_ctflow_worker_sup.erl | 8 +- test/gm_ctflow_SUITE.erl | 29 ++++++++ 14 files changed, 521 insertions(+), 65 deletions(-) create mode 100644 include/gm_ctflow.hrl create mode 100644 src/gm_ctflow_log.erl create mode 100644 src/gm_ctflow_reg.erl diff --git a/README.md b/README.md index a7ff9b7..1edc3ba 100644 --- a/README.md +++ b/README.md @@ -16,13 +16,16 @@ for the duration of such a sequence, and this tends to lead to hacking a plain process with a custom protocol. This application attempts to make that a bit more structured. -In its initial version, there are two APIs: +In its initial version, there are a few different APIs: * `gm_ctflow_fun`, where one can instantiate a stateful handler fun, which can be called by test cases as needed. * `gm_ctflow_worker`, a gen_server where the server logic is given by a user-provided fun. The server is spawned under a `simple_one_for_one` supervisor. +* `gm_ctflow_reg`, used to register and find `gm_ctflow_worker` processes. + Complies with the `via` addressing scheme for OTP behaviors. +* `gm_ctflow`, providing a shared dictionary, logging and status support. The application `gm_ctflow` is intended to be started in `init_per_suite/1` and stopped in `end_per_suite/1`, or in @@ -70,6 +73,60 @@ end_per_group(_Grp, _Config) -> This resets the state and removes all helper processes for each group. +A simple way to keep track of the flow state: + +```erlang +init_per_testcase(_Case, Config) -> + gm_ctflow:status(), + Config. + +end_per_testcase(_Case, _Config) -> + gm_ctflow:status(), + ok. +``` + +This gives output like this, from the `gm_ctflow_SUITE:incr/1` testcase: +``` +*** User 2026-05-30 13:06:02.679 ***🔗 +== Summary for flow my_counter += Worker State: - none - += Fun State: 0 += State: - none - += Log History: + +2026-05-30 13:06:02.654: +New flow: my_counter + + + +*** User 2026-05-30 13:06:02.679 ***🔗 +my_counter[<0.332.0>]: F(incr, 0) -> {ok,1,1} + +*** User 2026-05-30 13:06:02.679 ***🔗 +== Summary for flow my_counter += Worker State: - none - += Fun State: 1 += State: - none - += Log History: + +2026-05-30 13:06:02.654: +New flow: my_counter + +2026-05-30 13:06:02.679: +F(incr, 0) -> {ok,1,1} +``` + +Normally, of course, there would be more going on in the test, making the +bookends a bit less dominant, but here we see the initial state of the flow +`my_counter`: The "Fun state" is `0`, while there's no worker, and no shared state. +The log history shows one previous message, from instantiating the fun. + +The next log output shows the effect of calling the fun. Log messages produced +with `gm_ctflow:ct_log/[2,3]` go both to `ct:log/2` and the log history. The +flow is derived automatically, if possible, if not provided. + +In the ending `status` output, we see the accumulated history, with timestamps +to make it easier to find the context in CT or SUT logs. Build ----- diff --git a/include/gm_ctflow.hrl b/include/gm_ctflow.hrl new file mode 100644 index 0000000..283a36a --- /dev/null +++ b/include/gm_ctflow.hrl @@ -0,0 +1,3 @@ + +-define(LOG(Fmt, Args), gm_ctflow:ct_log(Fmt, Args)). +-define(LOG(Flow, Fmt, Args), gm_ctflow:ct_log(Flow, Fmt, Args)). diff --git a/rebar.config b/rebar.config index ef763d2..692d046 100644 --- a/rebar.config +++ b/rebar.config @@ -5,3 +5,9 @@ %% {config, "config/sys.config"}, {apps, [gm_ctflow]} ]}. + +{dialyzer, [ + {warnings, [unknown]}, + {plt_apps, all_deps}, + {base_plt_apps, [erts, kernel, stdlib, common_test]} + ]}. diff --git a/src/gm_ctflow.erl b/src/gm_ctflow.erl index a360468..46204f2 100644 --- a/src/gm_ctflow.erl +++ b/src/gm_ctflow.erl @@ -6,6 +6,18 @@ , erase/1 ]). +-export([note_flow/1, + ct_log/2, %% ct_log(Fmt, Args) + ct_log/3, %% ct_log(Flow, Fmt, Args) + timestamp/0]). + +-export([ status/0 + , summarize_flow/1 + , reset_flow/1 + , reset/0 ]). + +-include("gm_ctflow.hrl"). + -type flow() :: any(). -type key() :: any(). -type value() :: any(). @@ -28,3 +40,92 @@ get(Key, Default) -> -spec erase(key()) -> 'ok'. erase(Key) -> gm_ctflow_state:erase(Key). + + +note_flow(Flow) -> + erlang:put({?MODULE, flow}, Flow). + +ct_log(Fmt, Args) -> + ct_log(erlang:get({?MODULE, flow}), Fmt, Args). + +ct_log(Flow, Fmt, Args) -> + TS = timestamp(), + case Flow of + undefined -> + save_log(undefined, TS, Fmt, Args), + ct:log(Fmt, Args); + Flow -> + save_log(Flow, TS, Fmt, Args), + ct:log("~p[~w]: " ++ Fmt, [Flow, self()|Args]) + end. + + +save_log(Flow, TS, Fmt, Args) -> + gm_ctflow_log:log(Flow, TS, Fmt, Args). + +reset_flow(Flow) -> + gm_ctflow_log:erase(Flow), + try_call(fun gm_ctflow_worker:stop/1, Flow), + try_call(fun gm_ctflow_fun:delete/1, Flow), + ok. + +reset() -> + gm_ctflow_log:erase(), + gm_ctflow_state:reset(), + gm_ctflow_worker_sup:reset(), + gm_ctflow_fun:reset(), + ok. + +status() -> + WorkerFlows = [F || {F,_} <- gm_ctflow_reg:registered_names()], + FunFlows = gm_ctflow_fun:flows(), + AllFlows = ordsets:from_list(WorkerFlows ++ FunFlows), + [summarize_flow(Flow) || Flow <- AllFlows]. + +summarize_flow(Flow) -> + LogHistory = gm_ctflow_log:history(Flow), + AllState = gm_ctflow_state:all(), + FunState = try_call(fun gm_ctflow_fun:maybe_get_state/1, Flow), + WorkerState = try_call(fun gm_ctflow_worker:maybe_get_state/1, Flow), + S = [ {worker_state, WorkerState} + , {fun_state, FunState} + , {all_state, AllState} + , {log_history, LogHistory} ], + ct:log("== Summary for flow ~p~n~s", [Flow , [summary_(S1) || S1 <- S]]). + +summary_({log_history, []}) -> fw("= Log History: - none -~n", []); +summary_({log_history, H}) -> + [ fw("= Log History:~n", []) + , [ fw("~n~s:~n~s~n", [TS, fw(Fmt, Args)]) + || {TS, Fmt, Args} <- H], "\n"]; +summary_({all_state, []}) -> fw("= State: - none -~n", []); +summary_({all_state, L}) -> + [ fw("= State:~n", []) , [fw("~p = ~p~n", [K, V]) || {K, V} <- L], "\n"]; +summary_({worker_state, S}) -> fw("= Worker State: ~s~n", [maybe_state_str(S)]); +summary_({fun_state, S}) -> fw("= Fun State: ~s~n", [maybe_state_str(S)]). + +maybe_state_str({ok, S}) -> fw("~p", [S]); +maybe_state_str({error, E}) -> fw("ERROR: ~p", [E]); +maybe_state_str(error) -> "- none -". + +fw(Fmt, Args) -> io_lib:fwrite(Fmt, Args). + +try_call(F, Flow) -> + try F(Flow) + catch + exit:E:ST -> + ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]); + error:E:ST -> + ct:log("Call to ~p(~p) failed: error:~p:~p", [F, Flow, E, ST]), + undefined + end. + +%% copy-pasted (roughly) from ct_logs.erl +timestamp() -> + {MS,S,US} = os:timestamp(), + {{Year,Month,Day}, {Hour,Min,Sec}} = + calendar:now_to_local_time({MS,S,US}), + MilliSec = trunc(US/1000), + lists:flatten(io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B " + "~2.10.0B:~2.10.0B:~2.10.0B.~3.10.0B", + [Year,Month,Day,Hour,Min,Sec,MilliSec])). diff --git a/src/gm_ctflow_app.erl b/src/gm_ctflow_app.erl index 8489b3e..f6ec1cb 100644 --- a/src/gm_ctflow_app.erl +++ b/src/gm_ctflow_app.erl @@ -10,7 +10,9 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> + gm_ctflow_reg:init_reg(), gm_ctflow_state:init(), + gm_ctflow_log:init(), gm_ctflow_sup:start_link(). stop(_State) -> diff --git a/src/gm_ctflow_fun.erl b/src/gm_ctflow_fun.erl index e7e260f..f379761 100644 --- a/src/gm_ctflow_fun.erl +++ b/src/gm_ctflow_fun.erl @@ -10,6 +10,10 @@ , call/3 %% (Flow, Req, Timeout) , get_state/1 %% (Flow) , get_state/2 %% (Flow, Timeout) + , maybe_get_state/1 + , delete/1 %% (Flow) + , reset/0 + , flows/0 ]). -spec new(Flow, F, St) -> ok @@ -36,3 +40,16 @@ get_state(Flow) -> -spec get_state(flow(), timeout()) -> state() | no_return(). get_state(Flow, Timeout) -> gm_ctflow_server:get_state(Flow, Timeout). + +-spec maybe_get_state(flow()) -> 'error' | {ok, state()}. +maybe_get_state(Flow) -> + gm_ctflow_server:maybe_get_state(Flow). + +delete(Flow) -> + gm_ctflow_server:delete(Flow). + +reset() -> + gm_ctflow_server:reset(). + +flows() -> + gm_ctflow_server:flows(). diff --git a/src/gm_ctflow_log.erl b/src/gm_ctflow_log.erl new file mode 100644 index 0000000..44f5465 --- /dev/null +++ b/src/gm_ctflow_log.erl @@ -0,0 +1,31 @@ +-module(gm_ctflow_log). + +-export([ init/0 + , log/4 + , history/1 + , erase/1 + , erase/0 ]). + +-type ts() :: string(). +-type flow() :: gm_ctflow:flow(). +-type format() :: string(). +-type args() :: list(). + +init() -> + ets:new(?MODULE, [ordered_set, public, named_table]). + +-spec log(flow(), ts(), format(), args()) -> ok. +log(Flow, TS, Format, Args) -> + ets:insert(?MODULE, {{Flow, TS}, {Format, Args}}), + ok. + +-spec history(flow()) -> [{ts(), format(), args()}]. +history(Flow) -> + ets:select(?MODULE, [{ {{Flow,'$1'}, {'$2','$3'}}, [], [{{'$1','$2','$3'}}] }]). + +erase(Flow) -> + ets:select_delete(?MODULE, [{ {{Flow,'_'}, '_'}, [], [true] }]), + ok. + +erase() -> + ets:match_delete(?MODULE, '_'). diff --git a/src/gm_ctflow_reg.erl b/src/gm_ctflow_reg.erl new file mode 100644 index 0000000..a55dc97 --- /dev/null +++ b/src/gm_ctflow_reg.erl @@ -0,0 +1,117 @@ +-module(gm_ctflow_reg). + +-behavior(gen_server). + +-export([ register_name/2 + , unregister_name/1 + , whereis_name/1 + , send/2 ]). +-export([ registered_names/0 ]). + +-export([ start_link/0 + , init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 ]). + +-export([ init_reg/0 ]). + +-type name() :: any(). +-type msg() :: any(). + +-record(st, {mrefs = #{}}). + +init_reg() -> + ets:new(?MODULE, [public, named_table]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec register_name(name(), pid()) -> yes | no. +register_name(Name, Pid) -> + call({register_name, Name, Pid}). + +-spec unregister_name(name()) -> yes | no. +unregister_name(Name) -> + call({unregister_name, Name}). + +-spec whereis_name(name()) -> pid() | 'undefined'. +whereis_name(Name) -> + case ets:lookup(?MODULE, Name) of + [{_, Pid}] -> + case is_process_alive(Pid) of + true -> + Pid; + false -> + ets:delete(?MODULE, Name), + undefined + end; + [] -> + undefined + end. + +-spec send(name(), msg()) -> pid(). +send(Name, Msg) -> + case whereis_name(Name) of + undefined -> + exit({badarg, {Name, Msg}}); + Pid -> + Pid ! Msg, + Pid + end. + +-spec registered_names() -> [{name(), pid()}]. +registered_names() -> + [X || {_,P} = X <- ets:tab2list(?MODULE), + erlang:is_process_alive(P)]. + +call(Req) -> + gen_server:call(?MODULE, Req). + +init([]) -> + {ok, #st{}}. + +handle_call({register_name, Name, Pid}, _From, #st{mrefs = MRefs} = St) -> + case register_name_(Name, Pid) of + yes -> + MRef = erlang:monitor(process, Pid), + {reply, yes, St#st{mrefs = MRefs#{MRef => {Name, Pid}}}}; + no -> + {reply, no, St} + end; +handle_call({unregister_name, Name}, _From, St) -> + Res = unregister_name(Name), + {reply, Res, St}. + +handle_cast(_Msg, St) -> + {noreply, St}. + +handle_info({'DOWN', MRef, process, Pid, _}, #st{mrefs = MRefs} = St) -> + case maps:find(MRef, MRefs) of + {ok, {Name, Pid}} -> + unregister_name_(Name), + {noreply, St#st{mrefs = maps:remove(MRef, MRefs)}}; + error -> + {noreply, St} + end; +handle_info(_, St) -> + {noreply, St}. + +terminate(_Reason, _St) -> + ok. + +code_change(_FromVsn, St, _Extra) -> + {ok, St}. + +register_name_(Name, Pid) -> + case ets:insert_new(?MODULE, {Name, Pid}) of + true -> + yes; + false -> + no + end. + +unregister_name_(Name) -> + ets:delete(?MODULE, Name). diff --git a/src/gm_ctflow_server.erl b/src/gm_ctflow_server.erl index 881bc9f..60e16c9 100644 --- a/src/gm_ctflow_server.erl +++ b/src/gm_ctflow_server.erl @@ -10,6 +10,8 @@ , terminate/2 , code_change/3 ]). +-include("gm_ctflow.hrl"). + -type flow() :: gm_ctflow:flow(). -type request() :: any(). -type result() :: any(). @@ -20,6 +22,10 @@ , call/3 %% (Flow, Req, Timeout) , get_state/1 %% (Flow) , get_state/2 %% (Flow, Timeout) + , maybe_get_state/1 %% + , delete/1 %% (Flow) + , reset/0 + , flows/0 ]). -spec new(Flow, F, St) -> ok @@ -47,6 +53,22 @@ get_state(Flow) -> get_state(Flow, Timeout) -> call_({get_state, Flow}, Timeout). +-spec maybe_get_state(flow()) -> {ok, state()} | {error, any()} | error. +maybe_get_state(Flow) -> + call_({maybe_get_state, Flow}, infinity). + +-spec delete(flow()) -> boolean(). +delete(Flow) -> + call_({delete, Flow}). + +-spec reset() -> ok. +reset() -> + call_(reset). + +-spec flows() -> [flow()]. +flows() -> + call_(flows). + call_(Req) -> call_(Req, infinity). @@ -65,9 +87,13 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init([]) -> + %% GL = test_server_io:get_gl(_Shared = true), + %% erlang:group_leader(GL, self()), + ct_util:mark_process(), {ok, #{}}. handle_call({new, Flow, F, InitSt}, _From, St) -> + ?LOG(Flow, "New flow: ~p", [Flow]), case maps:is_key(Flow, St) of true -> {reply, {error, exists}, St}; @@ -78,16 +104,22 @@ handle_call({new, Flow, F, InitSt}, _From, St) -> handle_call({call, Flow, Req}, _From, St) -> case maps:find(Flow, St) of {ok, {F, FSt}} when is_function(F, 2) -> + Prev = gm_ctflow:note_flow(Flow), try F(Req, FSt) of - {ok, Res, FSt1} -> - {reply, {ok, Res}, St#{Flow := {F, FSt1}}}; + {ok, Res, FSt1} = Result -> + ?LOG(Flow, "F(~p, ~p) -> ~p", [Req, FSt, Result]), + {reply, {ok, Res}, St#{Flow := {F, FSt1}}}; Other -> + ?LOG(Flow, "ERROR F(~p, ~p) -> ~p", [Req, FSt, Other]), Error = {error, {bad_return, Other}}, {reply, Error, St#{Flow := Error}} catch error:E:ST -> + ?LOG(Flow, "CAUGHT F(~p, ~p) ~> error:~p:~p", [Req, FSt, E, ST]), Error = {error, {E, ST}}, {reply, Error, St#{Flow := Error}} + after + gm_ctflow:note_flow(Prev) end; error -> {reply, {error, unknown_flow}, St} @@ -95,10 +127,31 @@ handle_call({call, Flow, Req}, _From, St) -> handle_call({get_state, Flow}, _From, St) -> case maps:find(Flow, St) of {ok, {F, FSt}} when is_function(F, 2) -> + ?LOG(Flow, "State = ~p", [FSt]), {reply, {ok, FSt}, St}; + {ok, Error} -> + {reply, {error, Error}, St}; error -> {reply, {error, unknown_flow}, St} - end. + end; +handle_call({maybe_get_state, Flow}, _From, St) -> + Reply = case maps:find(Flow, St) of + {ok, {F, FSt}} when is_function(F) -> + {ok, FSt}; + {ok, Error} -> + {error, Error}; + error -> + error + end, + {reply, {ok, Reply}, St}; +handle_call({delete, Flow}, _From, St) -> + Res = maps:is_key(Flow, St), + {reply, Res, maps:remove(Flow, St)}; +handle_call(reset, _From, _St) -> + {reply, {ok, ok}, #{}}; +handle_call(flows, _From, St) -> + {reply, {ok, maps:keys(St)}, St}. + handle_cast(_Mst, St) -> {noreply, St}. diff --git a/src/gm_ctflow_state.erl b/src/gm_ctflow_state.erl index d3910ad..7e4560d 100644 --- a/src/gm_ctflow_state.erl +++ b/src/gm_ctflow_state.erl @@ -4,7 +4,10 @@ , put/2 , get/1 , get/2 - , erase/1 ]). + , all/0 + , erase/1 + , reset/0 + ]). -type key() :: any(). -type value() :: any(). @@ -37,6 +40,15 @@ get(Key, Default) -> Default end. +-spec all() -> [{key(), value()}]. +all() -> + ets:tab2list(?MODULE). + erase(Key) -> ets:delete(?MODULE, Key), ok. + +-spec reset() -> ok. +reset() -> + ets:match_delete(?MODULE, '_'), + ok. diff --git a/src/gm_ctflow_sup.erl b/src/gm_ctflow_sup.erl index 2c671fa..4156de4 100644 --- a/src/gm_ctflow_sup.erl +++ b/src/gm_ctflow_sup.erl @@ -33,7 +33,8 @@ init([]) -> period => 1 }, ChildSpecs = [ - child(gm_ctflow_server, worker) + child(gm_ctflow_reg, worker) + , child(gm_ctflow_server, worker) , child(gm_ctflow_worker_sup, supervisor) ], {ok, {SupFlags, ChildSpecs}}. diff --git a/src/gm_ctflow_worker.erl b/src/gm_ctflow_worker.erl index 1298cc0..3a4f19e 100644 --- a/src/gm_ctflow_worker.erl +++ b/src/gm_ctflow_worker.erl @@ -8,6 +8,7 @@ , cast/2 , get_state/1 , get_state/2 + , maybe_get_state/1 , stop/1 ]). -export([ start_link/2 @@ -19,11 +20,7 @@ , code_change/3 ]). --export([ init_reg/0 - , register_name/2 - , unregister_name/1 - , whereis_name/1 - , send/2 ]). +-include("gm_ctflow.hrl"). -type flow() :: gm_ctflow:flow(). -type state() :: any(). @@ -37,13 +34,24 @@ -type handler_fun() :: fun( (op(), msg(), state()) -> handler_reply() ). -type init_fun() :: fun( () -> {ok, handler_fun(), state()} ). +-record(st, { flow :: flow() + , f :: handler_fun() + , f_st :: state() }). +-record(error, { time = gm_ctflow:timestamp() :: string() + , error :: any() }). + +via(Pid) when is_pid(Pid) -> + Pid; +via(Flow) -> + {via, gm_ctflow_reg, Flow}. + -spec start(flow(), init_fun()) -> {'ok', pid()}. start(Flow, InitF) when is_function(InitF, 0) -> supervisor:start_child(gm_ctflow_worker_sup, [Flow, InitF]). -spec stop(flow()) -> 'ok'. stop(Flow) -> - supervisor:terminate_child(gm_ctflow_worker_sup, whereis_name(Flow)). + supervisor:terminate_child(gm_ctflow_worker_sup, gm_ctflow_reg:whereis_name(Flow)). -spec call(Flow, Req) -> Reply | no_return() when Flow :: flow(), @@ -62,18 +70,26 @@ call(Flow, Req, Timeout) -> -spec cast(flow(), msg()) -> 'ok' | no_return(). cast(Flow, Msg) -> - gen_server:cast({via, ?MODULE, Flow}, Msg). + gen_server:cast(via(Flow), Msg). -spec get_state(flow()) -> state() | no_return(). get_state(Flow) -> call_(Flow, get_state, infinity). +maybe_get_state(Flow) -> + case gm_ctflow_reg:whereis_name(Flow) of + undefined -> + error; + Pid -> + call_(Pid, maybe_get_state, infinity) + end. + -spec get_state(flow(), timeout()) -> state() | no_return(). get_state(Flow, Timeout) -> call_(Flow, get_state, Timeout). call_(Flow, Req, Timeout) -> - try gen_server:call({via, ?MODULE, Flow}, Req, Timeout) of + try gen_server:call(via(Flow), Req, Timeout) of {error, Reason} -> error(Reason); Other -> @@ -84,42 +100,79 @@ call_(Flow, Req, Timeout) -> end. start_link(Flow, InitF) when is_function(InitF, 0) -> - gen_server:start_link({via, ?MODULE, Flow}, ?MODULE, InitF, []). + gen_server:start_link(via(Flow), ?MODULE, [Flow, InitF], []). -init(InitF) -> +init([Flow, InitF]) -> + ct_util:mark_process(), + gm_ctflow:note_flow(Flow), + ?LOG(Flow, "Initializing flow worker", []), case InitF() of {ok, HandlerFun, St} when is_function(HandlerFun, 3) -> - {ok, {HandlerFun, St}}; + {ok, #st{flow = Flow, f = HandlerFun, f_st = St}}; Other -> {error, Other} end. -handle_call(get_state, _From, {_, FSt} = St) -> +handle_call(Req, _From, #error{} = E) -> + case Req of + get_state -> {reply, E, E}; + maybe_get_state -> {reply, {ok, {error, E}}, E}; + _ -> + {reply, {error, {already_crashed, E}}, E} + end; +handle_call(get_state, _From, #st{f_st = FSt} = St) -> {reply, FSt, St}; -handle_call({call, Req}, From, {F, FSt} = St) -> - case F({call, From}, Req, FSt) of +handle_call(maybe_get_state, _From, #st{f_st = FSt} = St) -> + {reply, {ok, FSt}, St}; +handle_call({call, Req}, From, #st{flow = Flow, f = F, f_st = FSt} = St) -> + try F({call, From}, Req, FSt) of {reply, Res, FSt1} -> - {reply, Res, {F, FSt1}}; + ?LOG(Flow, "CALL: ~p -> ~p", [Req, Res]), + {reply, Res, St#st{f_st = FSt1}}; {noreply, FSt1} -> {noreply, {F, FSt1}}; Other -> - {stop, {error, Other}, St} + E = #error{error = Other}, + ?LOG(Flow, "ERROR CALL: ~p -> ~p", [Req, E]), + {reply, E, E} + catch + error:E:ST -> + Error = #error{error = {E, ST}}, + ?LOG(Flow, "CAUGHT CALL: ~p -> error:~p:~p", [Req, E, ST]), + {reply, Error, Error} end. -handle_cast(Msg, {F, FSt} = St) -> - case F(cast, Msg, FSt) of - {ok, FSt1} -> - {noreply, {F, FSt1}}; +handle_cast(_Msg, #error{} = E) -> + {noreply, E}; +handle_cast(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) -> + try F(cast, Msg, FSt) of + {Ok, FSt1} = Res when Ok == ok; Ok == noreply -> + ?LOG(Flow, "CAST ~p (~p) -> ~p", [Msg, FSt, Res]), + {noreply, St#st{f_st = FSt1}}; Other -> - {stop, {error, Other}, St} + ?LOG(Flow, "ERROR CAST: ~p -> ~p", [Msg, Other]), + {noreply, #error{error = Other}} + + catch + error:E:ST -> + ?LOG(Flow, "CAUGHT CAST ~p (~p) : error:~p:~p", [Msg, FSt, E, ST]), + {noreply, #error{error = {E, ST}}} end. -handle_info(Msg, {F, FSt} = St) -> - case F(info, Msg, FSt) of - {ok, FSt1} -> - {noreply, {F, FSt1}}; +handle_info(_, #error{} = E) -> + {noreply, E}; +handle_info(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) -> + try F(info, Msg, FSt) of + {Ok, FSt1} = Res when Ok == ok; Ok == noreply -> + ?LOG(Flow, "INFO ~p (~p) -> ~p", [Msg, FSt, Res]), + {noreply, St#st{f_st = FSt1}}; Other -> - {stop, {error, Other}, St} + ?LOG(Flow, "ERROR MSG: ~p -> ~p", [Msg, Other]), + {noreply, #error{error = Other}} + catch + error:E:ST -> + ?LOG(Flow, "CAUGHT MSG: ~p, error:~p:~p", [Msg, E, ST]), + {noreply, #error{error = {E, ST}}} end. terminate(_, _) -> @@ -127,35 +180,3 @@ terminate(_, _) -> code_change(_FromVsn, S, _Extra) -> {ok, S}. - - -init_reg() -> - ets:new(?MODULE, [public, named_table]). - -register_name(Name, Pid) -> - case ets:insert_new(?MODULE, {Name, Pid}) of - true -> - yes; - false -> - no - end. - -unregister_name(Name) -> - ets:delete(?MODULE, Name). - -whereis_name(Name) -> - case ets:lookup(?MODULE, Name) of - [{_, Pid}] -> - Pid; - [] -> - undefined - end. - -send(Name, Msg) -> - case whereis_name(Name) of - undefined -> - exit({badarg, {Name, Msg}}); - Pid -> - Pid ! Msg, - Pid - end. diff --git a/src/gm_ctflow_worker_sup.erl b/src/gm_ctflow_worker_sup.erl index 29ec739..d05e1f8 100644 --- a/src/gm_ctflow_worker_sup.erl +++ b/src/gm_ctflow_worker_sup.erl @@ -12,8 +12,15 @@ -export([init/1]). +-export([reset/0]). + -define(SERVER, ?MODULE). +reset() -> + [ok = supervisor:terminate_child(?MODULE, Id) + || {Id, _, _, _} <- supervisor:which_children(?MODULE)], + ok. + start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). @@ -27,7 +34,6 @@ start_link() -> %% type => worker(), % optional %% modules => modules()} % optional init([]) -> - gm_ctflow_worker:init_reg(), SupFlags = #{ strategy => simple_one_for_one, intensity => 0, diff --git a/test/gm_ctflow_SUITE.erl b/test/gm_ctflow_SUITE.erl index ed255e9..8d8a96b 100644 --- a/test/gm_ctflow_SUITE.erl +++ b/test/gm_ctflow_SUITE.erl @@ -24,6 +24,9 @@ , checkl1/1 , checkl2/1 , checkl3/1 + , reset/1 + , reset_flow/1 + , status/1 ]). -include_lib("stdlib/include/assert.hrl"). @@ -33,6 +36,7 @@ all() -> {group, state} , {group, statefuns} , {group, workers} + , {group, reset} ]. groups() -> @@ -47,6 +51,7 @@ groups() -> , check2 %% state = 2 , incr , check3 %% state = 3 + , status ]} , {workers, [sequence], [ init_counter , init_worker @@ -56,7 +61,19 @@ groups() -> , checkl2 %% [A0, A1] , append , checkl3 %% [A0, A1, A2] + , status ]} + , {reset, [sequence], [ init_counter + , init_worker + , append %% also increments + , check1 %% so, state = 1 + , checkl1 + , reset_flow + , checkl1 %% flow 'my_list' unaffected + , init_counter + , incr + , check1 + , status ]} ]. init_per_suite(Config) -> @@ -74,9 +91,11 @@ end_per_group(_Grp, _Config) -> ok. init_per_testcase(_Case, Config) -> + gm_ctflow:status(), Config. end_per_testcase(_Case, _Config) -> + gm_ctflow:status(), ok. %% ====================================================================== @@ -101,6 +120,7 @@ erase_a(_Cfg) -> %% easily threaded through the sequence of cases otherwise. %% init_counter(_Cfg) -> + gm_ctflow:status(), gm_ctflow_fun:new(my_counter, fun ctr/2, 0), ok. @@ -122,6 +142,12 @@ ctr(incr, N) -> NewN = N+1, {ok, NewN, NewN}. +reset_flow(_Cfg) -> + gm_ctflow:reset_flow(my_counter). + +status(_Cfg) -> + gm_ctflow:status(). + %% ====================================================================== %% Here, we make use of the `gm_ctflow_worker` it's a complete process %% (a gen_server, whose logic we instrument with the fun we provice). @@ -155,3 +181,6 @@ checkl_(L) -> lfun({call,_From}, {append, Item}, L) -> NewL = L ++ [Item], {reply, NewL, NewL}. + +reset(_Cfg) -> + gm_ctflow:reset().