Debugging, improved logging, etc.
This commit is contained in:
@@ -16,13 +16,16 @@ for the duration of such a sequence, and this tends to lead to hacking
|
||||
a plain process with a custom protocol.
|
||||
|
||||
This application attempts to make that a bit more structured.
|
||||
In its initial version, there are two APIs:
|
||||
In its initial version, there are a few different APIs:
|
||||
|
||||
* `gm_ctflow_fun`, where one can instantiate a stateful handler fun,
|
||||
which can be called by test cases as needed.
|
||||
* `gm_ctflow_worker`, a gen_server where the server logic is given
|
||||
by a user-provided fun. The server is spawned under a
|
||||
`simple_one_for_one` supervisor.
|
||||
* `gm_ctflow_reg`, used to register and find `gm_ctflow_worker` processes.
|
||||
Complies with the `via` addressing scheme for OTP behaviors.
|
||||
* `gm_ctflow`, providing a shared dictionary, logging and status support.
|
||||
|
||||
The application `gm_ctflow` is intended to be started in
|
||||
`init_per_suite/1` and stopped in `end_per_suite/1`, or in
|
||||
@@ -70,6 +73,60 @@ end_per_group(_Grp, _Config) ->
|
||||
|
||||
This resets the state and removes all helper processes for each group.
|
||||
|
||||
A simple way to keep track of the flow state:
|
||||
|
||||
```erlang
|
||||
init_per_testcase(_Case, Config) ->
|
||||
gm_ctflow:status(),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_Case, _Config) ->
|
||||
gm_ctflow:status(),
|
||||
ok.
|
||||
```
|
||||
|
||||
This gives output like this, from the `gm_ctflow_SUITE:incr/1` testcase:
|
||||
```
|
||||
*** User 2026-05-30 13:06:02.679 ***🔗
|
||||
== Summary for flow my_counter
|
||||
= Worker State: - none -
|
||||
= Fun State: 0
|
||||
= State: - none -
|
||||
= Log History:
|
||||
|
||||
2026-05-30 13:06:02.654:
|
||||
New flow: my_counter
|
||||
|
||||
|
||||
|
||||
*** User 2026-05-30 13:06:02.679 ***🔗
|
||||
my_counter[<0.332.0>]: F(incr, 0) -> {ok,1,1}
|
||||
|
||||
*** User 2026-05-30 13:06:02.679 ***🔗
|
||||
== Summary for flow my_counter
|
||||
= Worker State: - none -
|
||||
= Fun State: 1
|
||||
= State: - none -
|
||||
= Log History:
|
||||
|
||||
2026-05-30 13:06:02.654:
|
||||
New flow: my_counter
|
||||
|
||||
2026-05-30 13:06:02.679:
|
||||
F(incr, 0) -> {ok,1,1}
|
||||
```
|
||||
|
||||
Normally, of course, there would be more going on in the test, making the
|
||||
bookends a bit less dominant, but here we see the initial state of the flow
|
||||
`my_counter`: The "Fun state" is `0`, while there's no worker, and no shared state.
|
||||
The log history shows one previous message, from instantiating the fun.
|
||||
|
||||
The next log output shows the effect of calling the fun. Log messages produced
|
||||
with `gm_ctflow:ct_log/[2,3]` go both to `ct:log/2` and the log history. The
|
||||
flow is derived automatically, if possible, if not provided.
|
||||
|
||||
In the ending `status` output, we see the accumulated history, with timestamps
|
||||
to make it easier to find the context in CT or SUT logs.
|
||||
|
||||
Build
|
||||
-----
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
|
||||
-define(LOG(Fmt, Args), gm_ctflow:ct_log(Fmt, Args)).
|
||||
-define(LOG(Flow, Fmt, Args), gm_ctflow:ct_log(Flow, Fmt, Args)).
|
||||
@@ -5,3 +5,9 @@
|
||||
%% {config, "config/sys.config"},
|
||||
{apps, [gm_ctflow]}
|
||||
]}.
|
||||
|
||||
{dialyzer, [
|
||||
{warnings, [unknown]},
|
||||
{plt_apps, all_deps},
|
||||
{base_plt_apps, [erts, kernel, stdlib, common_test]}
|
||||
]}.
|
||||
|
||||
@@ -6,6 +6,18 @@
|
||||
, erase/1
|
||||
]).
|
||||
|
||||
-export([note_flow/1,
|
||||
ct_log/2, %% ct_log(Fmt, Args)
|
||||
ct_log/3, %% ct_log(Flow, Fmt, Args)
|
||||
timestamp/0]).
|
||||
|
||||
-export([ status/0
|
||||
, summarize_flow/1
|
||||
, reset_flow/1
|
||||
, reset/0 ]).
|
||||
|
||||
-include("gm_ctflow.hrl").
|
||||
|
||||
-type flow() :: any().
|
||||
-type key() :: any().
|
||||
-type value() :: any().
|
||||
@@ -28,3 +40,92 @@ get(Key, Default) ->
|
||||
-spec erase(key()) -> 'ok'.
|
||||
erase(Key) ->
|
||||
gm_ctflow_state:erase(Key).
|
||||
|
||||
|
||||
note_flow(Flow) ->
|
||||
erlang:put({?MODULE, flow}, Flow).
|
||||
|
||||
ct_log(Fmt, Args) ->
|
||||
ct_log(erlang:get({?MODULE, flow}), Fmt, Args).
|
||||
|
||||
ct_log(Flow, Fmt, Args) ->
|
||||
TS = timestamp(),
|
||||
case Flow of
|
||||
undefined ->
|
||||
save_log(undefined, TS, Fmt, Args),
|
||||
ct:log(Fmt, Args);
|
||||
Flow ->
|
||||
save_log(Flow, TS, Fmt, Args),
|
||||
ct:log("~p[~w]: " ++ Fmt, [Flow, self()|Args])
|
||||
end.
|
||||
|
||||
|
||||
save_log(Flow, TS, Fmt, Args) ->
|
||||
gm_ctflow_log:log(Flow, TS, Fmt, Args).
|
||||
|
||||
reset_flow(Flow) ->
|
||||
gm_ctflow_log:erase(Flow),
|
||||
try_call(fun gm_ctflow_worker:stop/1, Flow),
|
||||
try_call(fun gm_ctflow_fun:delete/1, Flow),
|
||||
ok.
|
||||
|
||||
reset() ->
|
||||
gm_ctflow_log:erase(),
|
||||
gm_ctflow_state:reset(),
|
||||
gm_ctflow_worker_sup:reset(),
|
||||
gm_ctflow_fun:reset(),
|
||||
ok.
|
||||
|
||||
status() ->
|
||||
WorkerFlows = [F || {F,_} <- gm_ctflow_reg:registered_names()],
|
||||
FunFlows = gm_ctflow_fun:flows(),
|
||||
AllFlows = ordsets:from_list(WorkerFlows ++ FunFlows),
|
||||
[summarize_flow(Flow) || Flow <- AllFlows].
|
||||
|
||||
summarize_flow(Flow) ->
|
||||
LogHistory = gm_ctflow_log:history(Flow),
|
||||
AllState = gm_ctflow_state:all(),
|
||||
FunState = try_call(fun gm_ctflow_fun:maybe_get_state/1, Flow),
|
||||
WorkerState = try_call(fun gm_ctflow_worker:maybe_get_state/1, Flow),
|
||||
S = [ {worker_state, WorkerState}
|
||||
, {fun_state, FunState}
|
||||
, {all_state, AllState}
|
||||
, {log_history, LogHistory} ],
|
||||
ct:log("== Summary for flow ~p~n~s", [Flow , [summary_(S1) || S1 <- S]]).
|
||||
|
||||
summary_({log_history, []}) -> fw("= Log History: - none -~n", []);
|
||||
summary_({log_history, H}) ->
|
||||
[ fw("= Log History:~n", [])
|
||||
, [ fw("~n~s:~n~s~n", [TS, fw(Fmt, Args)])
|
||||
|| {TS, Fmt, Args} <- H], "\n"];
|
||||
summary_({all_state, []}) -> fw("= State: - none -~n", []);
|
||||
summary_({all_state, L}) ->
|
||||
[ fw("= State:~n", []) , [fw("~p = ~p~n", [K, V]) || {K, V} <- L], "\n"];
|
||||
summary_({worker_state, S}) -> fw("= Worker State: ~s~n", [maybe_state_str(S)]);
|
||||
summary_({fun_state, S}) -> fw("= Fun State: ~s~n", [maybe_state_str(S)]).
|
||||
|
||||
maybe_state_str({ok, S}) -> fw("~p", [S]);
|
||||
maybe_state_str({error, E}) -> fw("ERROR: ~p", [E]);
|
||||
maybe_state_str(error) -> "- none -".
|
||||
|
||||
fw(Fmt, Args) -> io_lib:fwrite(Fmt, Args).
|
||||
|
||||
try_call(F, Flow) ->
|
||||
try F(Flow)
|
||||
catch
|
||||
exit:E:ST ->
|
||||
ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]);
|
||||
error:E:ST ->
|
||||
ct:log("Call to ~p(~p) failed: error:~p:~p", [F, Flow, E, ST]),
|
||||
undefined
|
||||
end.
|
||||
|
||||
%% copy-pasted (roughly) from ct_logs.erl
|
||||
timestamp() ->
|
||||
{MS,S,US} = os:timestamp(),
|
||||
{{Year,Month,Day}, {Hour,Min,Sec}} =
|
||||
calendar:now_to_local_time({MS,S,US}),
|
||||
MilliSec = trunc(US/1000),
|
||||
lists:flatten(io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B "
|
||||
"~2.10.0B:~2.10.0B:~2.10.0B.~3.10.0B",
|
||||
[Year,Month,Day,Hour,Min,Sec,MilliSec])).
|
||||
|
||||
@@ -10,7 +10,9 @@
|
||||
-export([start/2, stop/1]).
|
||||
|
||||
start(_StartType, _StartArgs) ->
|
||||
gm_ctflow_reg:init_reg(),
|
||||
gm_ctflow_state:init(),
|
||||
gm_ctflow_log:init(),
|
||||
gm_ctflow_sup:start_link().
|
||||
|
||||
stop(_State) ->
|
||||
|
||||
@@ -10,6 +10,10 @@
|
||||
, call/3 %% (Flow, Req, Timeout)
|
||||
, get_state/1 %% (Flow)
|
||||
, get_state/2 %% (Flow, Timeout)
|
||||
, maybe_get_state/1
|
||||
, delete/1 %% (Flow)
|
||||
, reset/0
|
||||
, flows/0
|
||||
]).
|
||||
|
||||
-spec new(Flow, F, St) -> ok
|
||||
@@ -36,3 +40,16 @@ get_state(Flow) ->
|
||||
-spec get_state(flow(), timeout()) -> state() | no_return().
|
||||
get_state(Flow, Timeout) ->
|
||||
gm_ctflow_server:get_state(Flow, Timeout).
|
||||
|
||||
-spec maybe_get_state(flow()) -> 'error' | {ok, state()}.
|
||||
maybe_get_state(Flow) ->
|
||||
gm_ctflow_server:maybe_get_state(Flow).
|
||||
|
||||
delete(Flow) ->
|
||||
gm_ctflow_server:delete(Flow).
|
||||
|
||||
reset() ->
|
||||
gm_ctflow_server:reset().
|
||||
|
||||
flows() ->
|
||||
gm_ctflow_server:flows().
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
-module(gm_ctflow_log).
|
||||
|
||||
-export([ init/0
|
||||
, log/4
|
||||
, history/1
|
||||
, erase/1
|
||||
, erase/0 ]).
|
||||
|
||||
-type ts() :: string().
|
||||
-type flow() :: gm_ctflow:flow().
|
||||
-type format() :: string().
|
||||
-type args() :: list().
|
||||
|
||||
init() ->
|
||||
ets:new(?MODULE, [ordered_set, public, named_table]).
|
||||
|
||||
-spec log(flow(), ts(), format(), args()) -> ok.
|
||||
log(Flow, TS, Format, Args) ->
|
||||
ets:insert(?MODULE, {{Flow, TS}, {Format, Args}}),
|
||||
ok.
|
||||
|
||||
-spec history(flow()) -> [{ts(), format(), args()}].
|
||||
history(Flow) ->
|
||||
ets:select(?MODULE, [{ {{Flow,'$1'}, {'$2','$3'}}, [], [{{'$1','$2','$3'}}] }]).
|
||||
|
||||
erase(Flow) ->
|
||||
ets:select_delete(?MODULE, [{ {{Flow,'_'}, '_'}, [], [true] }]),
|
||||
ok.
|
||||
|
||||
erase() ->
|
||||
ets:match_delete(?MODULE, '_').
|
||||
@@ -0,0 +1,117 @@
|
||||
-module(gm_ctflow_reg).
|
||||
|
||||
-behavior(gen_server).
|
||||
|
||||
-export([ register_name/2
|
||||
, unregister_name/1
|
||||
, whereis_name/1
|
||||
, send/2 ]).
|
||||
-export([ registered_names/0 ]).
|
||||
|
||||
-export([ start_link/0
|
||||
, init/1
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
, code_change/3 ]).
|
||||
|
||||
-export([ init_reg/0 ]).
|
||||
|
||||
-type name() :: any().
|
||||
-type msg() :: any().
|
||||
|
||||
-record(st, {mrefs = #{}}).
|
||||
|
||||
init_reg() ->
|
||||
ets:new(?MODULE, [public, named_table]).
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
-spec register_name(name(), pid()) -> yes | no.
|
||||
register_name(Name, Pid) ->
|
||||
call({register_name, Name, Pid}).
|
||||
|
||||
-spec unregister_name(name()) -> yes | no.
|
||||
unregister_name(Name) ->
|
||||
call({unregister_name, Name}).
|
||||
|
||||
-spec whereis_name(name()) -> pid() | 'undefined'.
|
||||
whereis_name(Name) ->
|
||||
case ets:lookup(?MODULE, Name) of
|
||||
[{_, Pid}] ->
|
||||
case is_process_alive(Pid) of
|
||||
true ->
|
||||
Pid;
|
||||
false ->
|
||||
ets:delete(?MODULE, Name),
|
||||
undefined
|
||||
end;
|
||||
[] ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
-spec send(name(), msg()) -> pid().
|
||||
send(Name, Msg) ->
|
||||
case whereis_name(Name) of
|
||||
undefined ->
|
||||
exit({badarg, {Name, Msg}});
|
||||
Pid ->
|
||||
Pid ! Msg,
|
||||
Pid
|
||||
end.
|
||||
|
||||
-spec registered_names() -> [{name(), pid()}].
|
||||
registered_names() ->
|
||||
[X || {_,P} = X <- ets:tab2list(?MODULE),
|
||||
erlang:is_process_alive(P)].
|
||||
|
||||
call(Req) ->
|
||||
gen_server:call(?MODULE, Req).
|
||||
|
||||
init([]) ->
|
||||
{ok, #st{}}.
|
||||
|
||||
handle_call({register_name, Name, Pid}, _From, #st{mrefs = MRefs} = St) ->
|
||||
case register_name_(Name, Pid) of
|
||||
yes ->
|
||||
MRef = erlang:monitor(process, Pid),
|
||||
{reply, yes, St#st{mrefs = MRefs#{MRef => {Name, Pid}}}};
|
||||
no ->
|
||||
{reply, no, St}
|
||||
end;
|
||||
handle_call({unregister_name, Name}, _From, St) ->
|
||||
Res = unregister_name(Name),
|
||||
{reply, Res, St}.
|
||||
|
||||
handle_cast(_Msg, St) ->
|
||||
{noreply, St}.
|
||||
|
||||
handle_info({'DOWN', MRef, process, Pid, _}, #st{mrefs = MRefs} = St) ->
|
||||
case maps:find(MRef, MRefs) of
|
||||
{ok, {Name, Pid}} ->
|
||||
unregister_name_(Name),
|
||||
{noreply, St#st{mrefs = maps:remove(MRef, MRefs)}};
|
||||
error ->
|
||||
{noreply, St}
|
||||
end;
|
||||
handle_info(_, St) ->
|
||||
{noreply, St}.
|
||||
|
||||
terminate(_Reason, _St) ->
|
||||
ok.
|
||||
|
||||
code_change(_FromVsn, St, _Extra) ->
|
||||
{ok, St}.
|
||||
|
||||
register_name_(Name, Pid) ->
|
||||
case ets:insert_new(?MODULE, {Name, Pid}) of
|
||||
true ->
|
||||
yes;
|
||||
false ->
|
||||
no
|
||||
end.
|
||||
|
||||
unregister_name_(Name) ->
|
||||
ets:delete(?MODULE, Name).
|
||||
@@ -10,6 +10,8 @@
|
||||
, terminate/2
|
||||
, code_change/3 ]).
|
||||
|
||||
-include("gm_ctflow.hrl").
|
||||
|
||||
-type flow() :: gm_ctflow:flow().
|
||||
-type request() :: any().
|
||||
-type result() :: any().
|
||||
@@ -20,6 +22,10 @@
|
||||
, call/3 %% (Flow, Req, Timeout)
|
||||
, get_state/1 %% (Flow)
|
||||
, get_state/2 %% (Flow, Timeout)
|
||||
, maybe_get_state/1 %%
|
||||
, delete/1 %% (Flow)
|
||||
, reset/0
|
||||
, flows/0
|
||||
]).
|
||||
|
||||
-spec new(Flow, F, St) -> ok
|
||||
@@ -47,6 +53,22 @@ get_state(Flow) ->
|
||||
get_state(Flow, Timeout) ->
|
||||
call_({get_state, Flow}, Timeout).
|
||||
|
||||
-spec maybe_get_state(flow()) -> {ok, state()} | {error, any()} | error.
|
||||
maybe_get_state(Flow) ->
|
||||
call_({maybe_get_state, Flow}, infinity).
|
||||
|
||||
-spec delete(flow()) -> boolean().
|
||||
delete(Flow) ->
|
||||
call_({delete, Flow}).
|
||||
|
||||
-spec reset() -> ok.
|
||||
reset() ->
|
||||
call_(reset).
|
||||
|
||||
-spec flows() -> [flow()].
|
||||
flows() ->
|
||||
call_(flows).
|
||||
|
||||
call_(Req) ->
|
||||
call_(Req, infinity).
|
||||
|
||||
@@ -65,9 +87,13 @@ start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
init([]) ->
|
||||
%% GL = test_server_io:get_gl(_Shared = true),
|
||||
%% erlang:group_leader(GL, self()),
|
||||
ct_util:mark_process(),
|
||||
{ok, #{}}.
|
||||
|
||||
handle_call({new, Flow, F, InitSt}, _From, St) ->
|
||||
?LOG(Flow, "New flow: ~p", [Flow]),
|
||||
case maps:is_key(Flow, St) of
|
||||
true ->
|
||||
{reply, {error, exists}, St};
|
||||
@@ -78,16 +104,22 @@ handle_call({new, Flow, F, InitSt}, _From, St) ->
|
||||
handle_call({call, Flow, Req}, _From, St) ->
|
||||
case maps:find(Flow, St) of
|
||||
{ok, {F, FSt}} when is_function(F, 2) ->
|
||||
Prev = gm_ctflow:note_flow(Flow),
|
||||
try F(Req, FSt) of
|
||||
{ok, Res, FSt1} ->
|
||||
{reply, {ok, Res}, St#{Flow := {F, FSt1}}};
|
||||
{ok, Res, FSt1} = Result ->
|
||||
?LOG(Flow, "F(~p, ~p) -> ~p", [Req, FSt, Result]),
|
||||
{reply, {ok, Res}, St#{Flow := {F, FSt1}}};
|
||||
Other ->
|
||||
?LOG(Flow, "ERROR F(~p, ~p) -> ~p", [Req, FSt, Other]),
|
||||
Error = {error, {bad_return, Other}},
|
||||
{reply, Error, St#{Flow := Error}}
|
||||
catch
|
||||
error:E:ST ->
|
||||
?LOG(Flow, "CAUGHT F(~p, ~p) ~> error:~p:~p", [Req, FSt, E, ST]),
|
||||
Error = {error, {E, ST}},
|
||||
{reply, Error, St#{Flow := Error}}
|
||||
after
|
||||
gm_ctflow:note_flow(Prev)
|
||||
end;
|
||||
error ->
|
||||
{reply, {error, unknown_flow}, St}
|
||||
@@ -95,10 +127,31 @@ handle_call({call, Flow, Req}, _From, St) ->
|
||||
handle_call({get_state, Flow}, _From, St) ->
|
||||
case maps:find(Flow, St) of
|
||||
{ok, {F, FSt}} when is_function(F, 2) ->
|
||||
?LOG(Flow, "State = ~p", [FSt]),
|
||||
{reply, {ok, FSt}, St};
|
||||
{ok, Error} ->
|
||||
{reply, {error, Error}, St};
|
||||
error ->
|
||||
{reply, {error, unknown_flow}, St}
|
||||
end.
|
||||
end;
|
||||
handle_call({maybe_get_state, Flow}, _From, St) ->
|
||||
Reply = case maps:find(Flow, St) of
|
||||
{ok, {F, FSt}} when is_function(F) ->
|
||||
{ok, FSt};
|
||||
{ok, Error} ->
|
||||
{error, Error};
|
||||
error ->
|
||||
error
|
||||
end,
|
||||
{reply, {ok, Reply}, St};
|
||||
handle_call({delete, Flow}, _From, St) ->
|
||||
Res = maps:is_key(Flow, St),
|
||||
{reply, Res, maps:remove(Flow, St)};
|
||||
handle_call(reset, _From, _St) ->
|
||||
{reply, {ok, ok}, #{}};
|
||||
handle_call(flows, _From, St) ->
|
||||
{reply, {ok, maps:keys(St)}, St}.
|
||||
|
||||
|
||||
handle_cast(_Mst, St) ->
|
||||
{noreply, St}.
|
||||
|
||||
+13
-1
@@ -4,7 +4,10 @@
|
||||
, put/2
|
||||
, get/1
|
||||
, get/2
|
||||
, erase/1 ]).
|
||||
, all/0
|
||||
, erase/1
|
||||
, reset/0
|
||||
]).
|
||||
|
||||
-type key() :: any().
|
||||
-type value() :: any().
|
||||
@@ -37,6 +40,15 @@ get(Key, Default) ->
|
||||
Default
|
||||
end.
|
||||
|
||||
-spec all() -> [{key(), value()}].
|
||||
all() ->
|
||||
ets:tab2list(?MODULE).
|
||||
|
||||
erase(Key) ->
|
||||
ets:delete(?MODULE, Key),
|
||||
ok.
|
||||
|
||||
-spec reset() -> ok.
|
||||
reset() ->
|
||||
ets:match_delete(?MODULE, '_'),
|
||||
ok.
|
||||
|
||||
@@ -33,7 +33,8 @@ init([]) ->
|
||||
period => 1
|
||||
},
|
||||
ChildSpecs = [
|
||||
child(gm_ctflow_server, worker)
|
||||
child(gm_ctflow_reg, worker)
|
||||
, child(gm_ctflow_server, worker)
|
||||
, child(gm_ctflow_worker_sup, supervisor)
|
||||
],
|
||||
{ok, {SupFlags, ChildSpecs}}.
|
||||
|
||||
+79
-58
@@ -8,6 +8,7 @@
|
||||
, cast/2
|
||||
, get_state/1
|
||||
, get_state/2
|
||||
, maybe_get_state/1
|
||||
, stop/1 ]).
|
||||
|
||||
-export([ start_link/2
|
||||
@@ -19,11 +20,7 @@
|
||||
, code_change/3
|
||||
]).
|
||||
|
||||
-export([ init_reg/0
|
||||
, register_name/2
|
||||
, unregister_name/1
|
||||
, whereis_name/1
|
||||
, send/2 ]).
|
||||
-include("gm_ctflow.hrl").
|
||||
|
||||
-type flow() :: gm_ctflow:flow().
|
||||
-type state() :: any().
|
||||
@@ -37,13 +34,24 @@
|
||||
-type handler_fun() :: fun( (op(), msg(), state()) -> handler_reply() ).
|
||||
-type init_fun() :: fun( () -> {ok, handler_fun(), state()} ).
|
||||
|
||||
-record(st, { flow :: flow()
|
||||
, f :: handler_fun()
|
||||
, f_st :: state() }).
|
||||
-record(error, { time = gm_ctflow:timestamp() :: string()
|
||||
, error :: any() }).
|
||||
|
||||
via(Pid) when is_pid(Pid) ->
|
||||
Pid;
|
||||
via(Flow) ->
|
||||
{via, gm_ctflow_reg, Flow}.
|
||||
|
||||
-spec start(flow(), init_fun()) -> {'ok', pid()}.
|
||||
start(Flow, InitF) when is_function(InitF, 0) ->
|
||||
supervisor:start_child(gm_ctflow_worker_sup, [Flow, InitF]).
|
||||
|
||||
-spec stop(flow()) -> 'ok'.
|
||||
stop(Flow) ->
|
||||
supervisor:terminate_child(gm_ctflow_worker_sup, whereis_name(Flow)).
|
||||
supervisor:terminate_child(gm_ctflow_worker_sup, gm_ctflow_reg:whereis_name(Flow)).
|
||||
|
||||
-spec call(Flow, Req) -> Reply | no_return()
|
||||
when Flow :: flow(),
|
||||
@@ -62,18 +70,26 @@ call(Flow, Req, Timeout) ->
|
||||
|
||||
-spec cast(flow(), msg()) -> 'ok' | no_return().
|
||||
cast(Flow, Msg) ->
|
||||
gen_server:cast({via, ?MODULE, Flow}, Msg).
|
||||
gen_server:cast(via(Flow), Msg).
|
||||
|
||||
-spec get_state(flow()) -> state() | no_return().
|
||||
get_state(Flow) ->
|
||||
call_(Flow, get_state, infinity).
|
||||
|
||||
maybe_get_state(Flow) ->
|
||||
case gm_ctflow_reg:whereis_name(Flow) of
|
||||
undefined ->
|
||||
error;
|
||||
Pid ->
|
||||
call_(Pid, maybe_get_state, infinity)
|
||||
end.
|
||||
|
||||
-spec get_state(flow(), timeout()) -> state() | no_return().
|
||||
get_state(Flow, Timeout) ->
|
||||
call_(Flow, get_state, Timeout).
|
||||
|
||||
call_(Flow, Req, Timeout) ->
|
||||
try gen_server:call({via, ?MODULE, Flow}, Req, Timeout) of
|
||||
try gen_server:call(via(Flow), Req, Timeout) of
|
||||
{error, Reason} ->
|
||||
error(Reason);
|
||||
Other ->
|
||||
@@ -84,42 +100,79 @@ call_(Flow, Req, Timeout) ->
|
||||
end.
|
||||
|
||||
start_link(Flow, InitF) when is_function(InitF, 0) ->
|
||||
gen_server:start_link({via, ?MODULE, Flow}, ?MODULE, InitF, []).
|
||||
gen_server:start_link(via(Flow), ?MODULE, [Flow, InitF], []).
|
||||
|
||||
init(InitF) ->
|
||||
init([Flow, InitF]) ->
|
||||
ct_util:mark_process(),
|
||||
gm_ctflow:note_flow(Flow),
|
||||
?LOG(Flow, "Initializing flow worker", []),
|
||||
case InitF() of
|
||||
{ok, HandlerFun, St} when is_function(HandlerFun, 3) ->
|
||||
{ok, {HandlerFun, St}};
|
||||
{ok, #st{flow = Flow, f = HandlerFun, f_st = St}};
|
||||
Other ->
|
||||
{error, Other}
|
||||
end.
|
||||
|
||||
handle_call(get_state, _From, {_, FSt} = St) ->
|
||||
handle_call(Req, _From, #error{} = E) ->
|
||||
case Req of
|
||||
get_state -> {reply, E, E};
|
||||
maybe_get_state -> {reply, {ok, {error, E}}, E};
|
||||
_ ->
|
||||
{reply, {error, {already_crashed, E}}, E}
|
||||
end;
|
||||
handle_call(get_state, _From, #st{f_st = FSt} = St) ->
|
||||
{reply, FSt, St};
|
||||
handle_call({call, Req}, From, {F, FSt} = St) ->
|
||||
case F({call, From}, Req, FSt) of
|
||||
handle_call(maybe_get_state, _From, #st{f_st = FSt} = St) ->
|
||||
{reply, {ok, FSt}, St};
|
||||
handle_call({call, Req}, From, #st{flow = Flow, f = F, f_st = FSt} = St) ->
|
||||
try F({call, From}, Req, FSt) of
|
||||
{reply, Res, FSt1} ->
|
||||
{reply, Res, {F, FSt1}};
|
||||
?LOG(Flow, "CALL: ~p -> ~p", [Req, Res]),
|
||||
{reply, Res, St#st{f_st = FSt1}};
|
||||
{noreply, FSt1} ->
|
||||
{noreply, {F, FSt1}};
|
||||
Other ->
|
||||
{stop, {error, Other}, St}
|
||||
E = #error{error = Other},
|
||||
?LOG(Flow, "ERROR CALL: ~p -> ~p", [Req, E]),
|
||||
{reply, E, E}
|
||||
catch
|
||||
error:E:ST ->
|
||||
Error = #error{error = {E, ST}},
|
||||
?LOG(Flow, "CAUGHT CALL: ~p -> error:~p:~p", [Req, E, ST]),
|
||||
{reply, Error, Error}
|
||||
end.
|
||||
|
||||
handle_cast(Msg, {F, FSt} = St) ->
|
||||
case F(cast, Msg, FSt) of
|
||||
{ok, FSt1} ->
|
||||
{noreply, {F, FSt1}};
|
||||
handle_cast(_Msg, #error{} = E) ->
|
||||
{noreply, E};
|
||||
handle_cast(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) ->
|
||||
try F(cast, Msg, FSt) of
|
||||
{Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
|
||||
?LOG(Flow, "CAST ~p (~p) -> ~p", [Msg, FSt, Res]),
|
||||
{noreply, St#st{f_st = FSt1}};
|
||||
Other ->
|
||||
{stop, {error, Other}, St}
|
||||
?LOG(Flow, "ERROR CAST: ~p -> ~p", [Msg, Other]),
|
||||
{noreply, #error{error = Other}}
|
||||
|
||||
catch
|
||||
error:E:ST ->
|
||||
?LOG(Flow, "CAUGHT CAST ~p (~p) : error:~p:~p", [Msg, FSt, E, ST]),
|
||||
{noreply, #error{error = {E, ST}}}
|
||||
end.
|
||||
|
||||
handle_info(Msg, {F, FSt} = St) ->
|
||||
case F(info, Msg, FSt) of
|
||||
{ok, FSt1} ->
|
||||
{noreply, {F, FSt1}};
|
||||
handle_info(_, #error{} = E) ->
|
||||
{noreply, E};
|
||||
handle_info(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) ->
|
||||
try F(info, Msg, FSt) of
|
||||
{Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
|
||||
?LOG(Flow, "INFO ~p (~p) -> ~p", [Msg, FSt, Res]),
|
||||
{noreply, St#st{f_st = FSt1}};
|
||||
Other ->
|
||||
{stop, {error, Other}, St}
|
||||
?LOG(Flow, "ERROR MSG: ~p -> ~p", [Msg, Other]),
|
||||
{noreply, #error{error = Other}}
|
||||
catch
|
||||
error:E:ST ->
|
||||
?LOG(Flow, "CAUGHT MSG: ~p, error:~p:~p", [Msg, E, ST]),
|
||||
{noreply, #error{error = {E, ST}}}
|
||||
end.
|
||||
|
||||
terminate(_, _) ->
|
||||
@@ -127,35 +180,3 @@ terminate(_, _) ->
|
||||
|
||||
code_change(_FromVsn, S, _Extra) ->
|
||||
{ok, S}.
|
||||
|
||||
|
||||
init_reg() ->
|
||||
ets:new(?MODULE, [public, named_table]).
|
||||
|
||||
register_name(Name, Pid) ->
|
||||
case ets:insert_new(?MODULE, {Name, Pid}) of
|
||||
true ->
|
||||
yes;
|
||||
false ->
|
||||
no
|
||||
end.
|
||||
|
||||
unregister_name(Name) ->
|
||||
ets:delete(?MODULE, Name).
|
||||
|
||||
whereis_name(Name) ->
|
||||
case ets:lookup(?MODULE, Name) of
|
||||
[{_, Pid}] ->
|
||||
Pid;
|
||||
[] ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
send(Name, Msg) ->
|
||||
case whereis_name(Name) of
|
||||
undefined ->
|
||||
exit({badarg, {Name, Msg}});
|
||||
Pid ->
|
||||
Pid ! Msg,
|
||||
Pid
|
||||
end.
|
||||
|
||||
@@ -12,8 +12,15 @@
|
||||
|
||||
-export([init/1]).
|
||||
|
||||
-export([reset/0]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
reset() ->
|
||||
[ok = supervisor:terminate_child(?MODULE, Id)
|
||||
|| {Id, _, _, _} <- supervisor:which_children(?MODULE)],
|
||||
ok.
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
|
||||
|
||||
@@ -27,7 +34,6 @@ start_link() ->
|
||||
%% type => worker(), % optional
|
||||
%% modules => modules()} % optional
|
||||
init([]) ->
|
||||
gm_ctflow_worker:init_reg(),
|
||||
SupFlags = #{
|
||||
strategy => simple_one_for_one,
|
||||
intensity => 0,
|
||||
|
||||
@@ -24,6 +24,9 @@
|
||||
, checkl1/1
|
||||
, checkl2/1
|
||||
, checkl3/1
|
||||
, reset/1
|
||||
, reset_flow/1
|
||||
, status/1
|
||||
]).
|
||||
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
@@ -33,6 +36,7 @@ all() ->
|
||||
{group, state}
|
||||
, {group, statefuns}
|
||||
, {group, workers}
|
||||
, {group, reset}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
@@ -47,6 +51,7 @@ groups() ->
|
||||
, check2 %% state = 2
|
||||
, incr
|
||||
, check3 %% state = 3
|
||||
, status
|
||||
]}
|
||||
, {workers, [sequence], [ init_counter
|
||||
, init_worker
|
||||
@@ -56,7 +61,19 @@ groups() ->
|
||||
, checkl2 %% [A0, A1]
|
||||
, append
|
||||
, checkl3 %% [A0, A1, A2]
|
||||
, status
|
||||
]}
|
||||
, {reset, [sequence], [ init_counter
|
||||
, init_worker
|
||||
, append %% also increments
|
||||
, check1 %% so, state = 1
|
||||
, checkl1
|
||||
, reset_flow
|
||||
, checkl1 %% flow 'my_list' unaffected
|
||||
, init_counter
|
||||
, incr
|
||||
, check1
|
||||
, status ]}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
@@ -74,9 +91,11 @@ end_per_group(_Grp, _Config) ->
|
||||
ok.
|
||||
|
||||
init_per_testcase(_Case, Config) ->
|
||||
gm_ctflow:status(),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_Case, _Config) ->
|
||||
gm_ctflow:status(),
|
||||
ok.
|
||||
|
||||
%% ======================================================================
|
||||
@@ -101,6 +120,7 @@ erase_a(_Cfg) ->
|
||||
%% easily threaded through the sequence of cases otherwise.
|
||||
%%
|
||||
init_counter(_Cfg) ->
|
||||
gm_ctflow:status(),
|
||||
gm_ctflow_fun:new(my_counter, fun ctr/2, 0),
|
||||
ok.
|
||||
|
||||
@@ -122,6 +142,12 @@ ctr(incr, N) ->
|
||||
NewN = N+1,
|
||||
{ok, NewN, NewN}.
|
||||
|
||||
reset_flow(_Cfg) ->
|
||||
gm_ctflow:reset_flow(my_counter).
|
||||
|
||||
status(_Cfg) ->
|
||||
gm_ctflow:status().
|
||||
|
||||
%% ======================================================================
|
||||
%% Here, we make use of the `gm_ctflow_worker` it's a complete process
|
||||
%% (a gen_server, whose logic we instrument with the fun we provice).
|
||||
@@ -155,3 +181,6 @@ checkl_(L) ->
|
||||
lfun({call,_From}, {append, Item}, L) ->
|
||||
NewL = L ++ [Item],
|
||||
{reply, NewL, NewL}.
|
||||
|
||||
reset(_Cfg) ->
|
||||
gm_ctflow:reset().
|
||||
|
||||
Reference in New Issue
Block a user