Merge pull request 'Add gm_ctflow_state' (#1) from uw-add-state-api into master

Reviewed-on: #1
This commit was merged in pull request #1.
This commit is contained in:
2026-05-30 20:17:04 +09:00
14 changed files with 615 additions and 68 deletions
+58 -1
View File
@@ -16,13 +16,16 @@ for the duration of such a sequence, and this tends to lead to hacking
a plain process with a custom protocol. a plain process with a custom protocol.
This application attempts to make that a bit more structured. This application attempts to make that a bit more structured.
In its initial version, there are two APIs: In its initial version, there are a few different APIs:
* `gm_ctflow_fun`, where one can instantiate a stateful handler fun, * `gm_ctflow_fun`, where one can instantiate a stateful handler fun,
which can be called by test cases as needed. which can be called by test cases as needed.
* `gm_ctflow_worker`, a gen_server where the server logic is given * `gm_ctflow_worker`, a gen_server where the server logic is given
by a user-provided fun. The server is spawned under a by a user-provided fun. The server is spawned under a
`simple_one_for_one` supervisor. `simple_one_for_one` supervisor.
* `gm_ctflow_reg`, used to register and find `gm_ctflow_worker` processes.
Complies with the `via` addressing scheme for OTP behaviors.
* `gm_ctflow`, providing a shared dictionary, logging and status support.
The application `gm_ctflow` is intended to be started in The application `gm_ctflow` is intended to be started in
`init_per_suite/1` and stopped in `end_per_suite/1`, or in `init_per_suite/1` and stopped in `end_per_suite/1`, or in
@@ -70,6 +73,60 @@ end_per_group(_Grp, _Config) ->
This resets the state and removes all helper processes for each group. This resets the state and removes all helper processes for each group.
A simple way to keep track of the flow state:
```erlang
init_per_testcase(_Case, Config) ->
gm_ctflow:status(),
Config.
end_per_testcase(_Case, _Config) ->
gm_ctflow:status(),
ok.
```
This gives output like this, from the `gm_ctflow_SUITE:incr/1` testcase:
```
*** User 2026-05-30 13:06:02.679 ***🔗
== Summary for flow my_counter
= Worker State: - none -
= Fun State: 0
= State: - none -
= Log History:
2026-05-30 13:06:02.654:
New flow: my_counter
*** User 2026-05-30 13:06:02.679 ***🔗
my_counter[<0.332.0>]: F(incr, 0) -> {ok,1,1}
*** User 2026-05-30 13:06:02.679 ***🔗
== Summary for flow my_counter
= Worker State: - none -
= Fun State: 1
= State: - none -
= Log History:
2026-05-30 13:06:02.654:
New flow: my_counter
2026-05-30 13:06:02.679:
F(incr, 0) -> {ok,1,1}
```
Normally, of course, there would be more going on in the test, making the
bookends a bit less dominant, but here we see the initial state of the flow
`my_counter`: The "Fun state" is `0`, while there's no worker, and no shared state.
The log history shows one previous message, from instantiating the fun.
The next log output shows the effect of calling the fun. Log messages produced
with `gm_ctflow:ct_log/[2,3]` go both to `ct:log/2` and the log history. The
flow is derived automatically, if possible, if not provided.
In the ending `status` output, we see the accumulated history, with timestamps
to make it easier to find the context in CT or SUT logs.
Build Build
----- -----
+3
View File
@@ -0,0 +1,3 @@
-define(LOG(Fmt, Args), gm_ctflow:ct_log(Fmt, Args)).
-define(LOG(Flow, Fmt, Args), gm_ctflow:ct_log(Flow, Fmt, Args)).
+6
View File
@@ -5,3 +5,9 @@
%% {config, "config/sys.config"}, %% {config, "config/sys.config"},
{apps, [gm_ctflow]} {apps, [gm_ctflow]}
]}. ]}.
{dialyzer, [
{warnings, [unknown]},
{plt_apps, all_deps},
{base_plt_apps, [erts, kernel, stdlib, common_test]}
]}.
+126
View File
@@ -1,5 +1,131 @@
-module(gm_ctflow). -module(gm_ctflow).
-export([ put/2
, get/1
, get/2
, erase/1
]).
-export([note_flow/1,
ct_log/2, %% ct_log(Fmt, Args)
ct_log/3, %% ct_log(Flow, Fmt, Args)
timestamp/0]).
-export([ status/0
, summarize_flow/1
, reset_flow/1
, reset/0 ]).
-include("gm_ctflow.hrl").
-type flow() :: any(). -type flow() :: any().
-type key() :: any().
-type value() :: any().
-export_type([ flow/0 ]). -export_type([ flow/0 ]).
-spec put(key(), value()) -> 'ok'.
put(Key, Value) ->
gm_ctflow_state:put(Key, Value).
-spec get(key()) -> value() | no_return().
get(Key) ->
gm_ctflow_state:get(Key).
-spec get(key(), Default) -> value()
when Default :: value().
get(Key, Default) ->
gm_ctflow_state:get(Key, Default).
-spec erase(key()) -> 'ok'.
erase(Key) ->
gm_ctflow_state:erase(Key).
note_flow(Flow) ->
erlang:put({?MODULE, flow}, Flow).
ct_log(Fmt, Args) ->
ct_log(erlang:get({?MODULE, flow}), Fmt, Args).
ct_log(Flow, Fmt, Args) ->
TS = timestamp(),
case Flow of
undefined ->
save_log(undefined, TS, Fmt, Args),
ct:log(Fmt, Args);
Flow ->
save_log(Flow, TS, Fmt, Args),
ct:log("~p[~w]: " ++ Fmt, [Flow, self()|Args])
end.
save_log(Flow, TS, Fmt, Args) ->
gm_ctflow_log:log(Flow, TS, Fmt, Args).
reset_flow(Flow) ->
gm_ctflow_log:erase(Flow),
try_call(fun gm_ctflow_worker:stop/1, Flow),
try_call(fun gm_ctflow_fun:delete/1, Flow),
ok.
reset() ->
gm_ctflow_log:erase(),
gm_ctflow_state:reset(),
gm_ctflow_worker_sup:reset(),
gm_ctflow_fun:reset(),
ok.
status() ->
WorkerFlows = [F || {F,_} <- gm_ctflow_reg:registered_names()],
FunFlows = gm_ctflow_fun:flows(),
AllFlows = ordsets:from_list(WorkerFlows ++ FunFlows),
[summarize_flow(Flow) || Flow <- AllFlows].
summarize_flow(Flow) ->
LogHistory = gm_ctflow_log:history(Flow),
AllState = gm_ctflow_state:all(),
FunState = try_call(fun gm_ctflow_fun:maybe_get_state/1, Flow),
WorkerState = try_call(fun gm_ctflow_worker:maybe_get_state/1, Flow),
S = [ {worker_state, WorkerState}
, {fun_state, FunState}
, {all_state, AllState}
, {log_history, LogHistory} ],
ct:log("== Summary for flow ~p~n~s", [Flow , [summary_(S1) || S1 <- S]]).
summary_({log_history, []}) -> fw("= Log History: - none -~n", []);
summary_({log_history, H}) ->
[ fw("= Log History:~n", [])
, [ fw("~n~s:~n~s~n", [TS, fw(Fmt, Args)])
|| {TS, Fmt, Args} <- H], "\n"];
summary_({all_state, []}) -> fw("= State: - none -~n", []);
summary_({all_state, L}) ->
[ fw("= State:~n", []) , [fw("~p = ~p~n", [K, V]) || {K, V} <- L], "\n"];
summary_({worker_state, S}) -> fw("= Worker State: ~s~n", [maybe_state_str(S)]);
summary_({fun_state, S}) -> fw("= Fun State: ~s~n", [maybe_state_str(S)]).
maybe_state_str({ok, S}) -> fw("~p", [S]);
maybe_state_str({error, E}) -> fw("ERROR: ~p", [E]);
maybe_state_str(error) -> "- none -".
fw(Fmt, Args) -> io_lib:fwrite(Fmt, Args).
try_call(F, Flow) ->
try F(Flow)
catch
exit:E:ST ->
ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]);
error:E:ST ->
ct:log("Call to ~p(~p) failed: error:~p:~p", [F, Flow, E, ST]),
undefined
end.
%% copy-pasted (roughly) from ct_logs.erl
timestamp() ->
{MS,S,US} = os:timestamp(),
{{Year,Month,Day}, {Hour,Min,Sec}} =
calendar:now_to_local_time({MS,S,US}),
MilliSec = trunc(US/1000),
lists:flatten(io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B "
"~2.10.0B:~2.10.0B:~2.10.0B.~3.10.0B",
[Year,Month,Day,Hour,Min,Sec,MilliSec])).
+3
View File
@@ -10,6 +10,9 @@
-export([start/2, stop/1]). -export([start/2, stop/1]).
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
gm_ctflow_reg:init_reg(),
gm_ctflow_state:init(),
gm_ctflow_log:init(),
gm_ctflow_sup:start_link(). gm_ctflow_sup:start_link().
stop(_State) -> stop(_State) ->
+17
View File
@@ -10,6 +10,10 @@
, call/3 %% (Flow, Req, Timeout) , call/3 %% (Flow, Req, Timeout)
, get_state/1 %% (Flow) , get_state/1 %% (Flow)
, get_state/2 %% (Flow, Timeout) , get_state/2 %% (Flow, Timeout)
, maybe_get_state/1
, delete/1 %% (Flow)
, reset/0
, flows/0
]). ]).
-spec new(Flow, F, St) -> ok -spec new(Flow, F, St) -> ok
@@ -36,3 +40,16 @@ get_state(Flow) ->
-spec get_state(flow(), timeout()) -> state() | no_return(). -spec get_state(flow(), timeout()) -> state() | no_return().
get_state(Flow, Timeout) -> get_state(Flow, Timeout) ->
gm_ctflow_server:get_state(Flow, Timeout). gm_ctflow_server:get_state(Flow, Timeout).
-spec maybe_get_state(flow()) -> 'error' | {ok, state()}.
maybe_get_state(Flow) ->
gm_ctflow_server:maybe_get_state(Flow).
delete(Flow) ->
gm_ctflow_server:delete(Flow).
reset() ->
gm_ctflow_server:reset().
flows() ->
gm_ctflow_server:flows().
+31
View File
@@ -0,0 +1,31 @@
-module(gm_ctflow_log).
-export([ init/0
, log/4
, history/1
, erase/1
, erase/0 ]).
-type ts() :: string().
-type flow() :: gm_ctflow:flow().
-type format() :: string().
-type args() :: list().
init() ->
ets:new(?MODULE, [ordered_set, public, named_table]).
-spec log(flow(), ts(), format(), args()) -> ok.
log(Flow, TS, Format, Args) ->
ets:insert(?MODULE, {{Flow, TS}, {Format, Args}}),
ok.
-spec history(flow()) -> [{ts(), format(), args()}].
history(Flow) ->
ets:select(?MODULE, [{ {{Flow,'$1'}, {'$2','$3'}}, [], [{{'$1','$2','$3'}}] }]).
erase(Flow) ->
ets:select_delete(?MODULE, [{ {{Flow,'_'}, '_'}, [], [true] }]),
ok.
erase() ->
ets:match_delete(?MODULE, '_').
+117
View File
@@ -0,0 +1,117 @@
-module(gm_ctflow_reg).
-behavior(gen_server).
-export([ register_name/2
, unregister_name/1
, whereis_name/1
, send/2 ]).
-export([ registered_names/0 ]).
-export([ start_link/0
, init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3 ]).
-export([ init_reg/0 ]).
-type name() :: any().
-type msg() :: any().
-record(st, {mrefs = #{}}).
init_reg() ->
ets:new(?MODULE, [public, named_table]).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec register_name(name(), pid()) -> yes | no.
register_name(Name, Pid) ->
call({register_name, Name, Pid}).
-spec unregister_name(name()) -> yes | no.
unregister_name(Name) ->
call({unregister_name, Name}).
-spec whereis_name(name()) -> pid() | 'undefined'.
whereis_name(Name) ->
case ets:lookup(?MODULE, Name) of
[{_, Pid}] ->
case is_process_alive(Pid) of
true ->
Pid;
false ->
ets:delete(?MODULE, Name),
undefined
end;
[] ->
undefined
end.
-spec send(name(), msg()) -> pid().
send(Name, Msg) ->
case whereis_name(Name) of
undefined ->
exit({badarg, {Name, Msg}});
Pid ->
Pid ! Msg,
Pid
end.
-spec registered_names() -> [{name(), pid()}].
registered_names() ->
[X || {_,P} = X <- ets:tab2list(?MODULE),
erlang:is_process_alive(P)].
call(Req) ->
gen_server:call(?MODULE, Req).
init([]) ->
{ok, #st{}}.
handle_call({register_name, Name, Pid}, _From, #st{mrefs = MRefs} = St) ->
case register_name_(Name, Pid) of
yes ->
MRef = erlang:monitor(process, Pid),
{reply, yes, St#st{mrefs = MRefs#{MRef => {Name, Pid}}}};
no ->
{reply, no, St}
end;
handle_call({unregister_name, Name}, _From, St) ->
Res = unregister_name(Name),
{reply, Res, St}.
handle_cast(_Msg, St) ->
{noreply, St}.
handle_info({'DOWN', MRef, process, Pid, _}, #st{mrefs = MRefs} = St) ->
case maps:find(MRef, MRefs) of
{ok, {Name, Pid}} ->
unregister_name_(Name),
{noreply, St#st{mrefs = maps:remove(MRef, MRefs)}};
error ->
{noreply, St}
end;
handle_info(_, St) ->
{noreply, St}.
terminate(_Reason, _St) ->
ok.
code_change(_FromVsn, St, _Extra) ->
{ok, St}.
register_name_(Name, Pid) ->
case ets:insert_new(?MODULE, {Name, Pid}) of
true ->
yes;
false ->
no
end.
unregister_name_(Name) ->
ets:delete(?MODULE, Name).
+55 -2
View File
@@ -10,6 +10,8 @@
, terminate/2 , terminate/2
, code_change/3 ]). , code_change/3 ]).
-include("gm_ctflow.hrl").
-type flow() :: gm_ctflow:flow(). -type flow() :: gm_ctflow:flow().
-type request() :: any(). -type request() :: any().
-type result() :: any(). -type result() :: any().
@@ -20,6 +22,10 @@
, call/3 %% (Flow, Req, Timeout) , call/3 %% (Flow, Req, Timeout)
, get_state/1 %% (Flow) , get_state/1 %% (Flow)
, get_state/2 %% (Flow, Timeout) , get_state/2 %% (Flow, Timeout)
, maybe_get_state/1 %%
, delete/1 %% (Flow)
, reset/0
, flows/0
]). ]).
-spec new(Flow, F, St) -> ok -spec new(Flow, F, St) -> ok
@@ -47,6 +53,22 @@ get_state(Flow) ->
get_state(Flow, Timeout) -> get_state(Flow, Timeout) ->
call_({get_state, Flow}, Timeout). call_({get_state, Flow}, Timeout).
-spec maybe_get_state(flow()) -> {ok, state()} | {error, any()} | error.
maybe_get_state(Flow) ->
call_({maybe_get_state, Flow}, infinity).
-spec delete(flow()) -> boolean().
delete(Flow) ->
call_({delete, Flow}).
-spec reset() -> ok.
reset() ->
call_(reset).
-spec flows() -> [flow()].
flows() ->
call_(flows).
call_(Req) -> call_(Req) ->
call_(Req, infinity). call_(Req, infinity).
@@ -65,9 +87,13 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) -> init([]) ->
%% GL = test_server_io:get_gl(_Shared = true),
%% erlang:group_leader(GL, self()),
ct_util:mark_process(),
{ok, #{}}. {ok, #{}}.
handle_call({new, Flow, F, InitSt}, _From, St) -> handle_call({new, Flow, F, InitSt}, _From, St) ->
?LOG(Flow, "New flow: ~p", [Flow]),
case maps:is_key(Flow, St) of case maps:is_key(Flow, St) of
true -> true ->
{reply, {error, exists}, St}; {reply, {error, exists}, St};
@@ -78,16 +104,22 @@ handle_call({new, Flow, F, InitSt}, _From, St) ->
handle_call({call, Flow, Req}, _From, St) -> handle_call({call, Flow, Req}, _From, St) ->
case maps:find(Flow, St) of case maps:find(Flow, St) of
{ok, {F, FSt}} when is_function(F, 2) -> {ok, {F, FSt}} when is_function(F, 2) ->
Prev = gm_ctflow:note_flow(Flow),
try F(Req, FSt) of try F(Req, FSt) of
{ok, Res, FSt1} -> {ok, Res, FSt1} = Result ->
?LOG(Flow, "F(~p, ~p) -> ~p", [Req, FSt, Result]),
{reply, {ok, Res}, St#{Flow := {F, FSt1}}}; {reply, {ok, Res}, St#{Flow := {F, FSt1}}};
Other -> Other ->
?LOG(Flow, "ERROR F(~p, ~p) -> ~p", [Req, FSt, Other]),
Error = {error, {bad_return, Other}}, Error = {error, {bad_return, Other}},
{reply, Error, St#{Flow := Error}} {reply, Error, St#{Flow := Error}}
catch catch
error:E:ST -> error:E:ST ->
?LOG(Flow, "CAUGHT F(~p, ~p) ~> error:~p:~p", [Req, FSt, E, ST]),
Error = {error, {E, ST}}, Error = {error, {E, ST}},
{reply, Error, St#{Flow := Error}} {reply, Error, St#{Flow := Error}}
after
gm_ctflow:note_flow(Prev)
end; end;
error -> error ->
{reply, {error, unknown_flow}, St} {reply, {error, unknown_flow}, St}
@@ -95,10 +127,31 @@ handle_call({call, Flow, Req}, _From, St) ->
handle_call({get_state, Flow}, _From, St) -> handle_call({get_state, Flow}, _From, St) ->
case maps:find(Flow, St) of case maps:find(Flow, St) of
{ok, {F, FSt}} when is_function(F, 2) -> {ok, {F, FSt}} when is_function(F, 2) ->
?LOG(Flow, "State = ~p", [FSt]),
{reply, {ok, FSt}, St}; {reply, {ok, FSt}, St};
{ok, Error} ->
{reply, {error, Error}, St};
error -> error ->
{reply, {error, unknown_flow}, St} {reply, {error, unknown_flow}, St}
end. end;
handle_call({maybe_get_state, Flow}, _From, St) ->
Reply = case maps:find(Flow, St) of
{ok, {F, FSt}} when is_function(F) ->
{ok, FSt};
{ok, Error} ->
{error, Error};
error ->
error
end,
{reply, {ok, Reply}, St};
handle_call({delete, Flow}, _From, St) ->
Res = maps:is_key(Flow, St),
{reply, Res, maps:remove(Flow, St)};
handle_call(reset, _From, _St) ->
{reply, {ok, ok}, #{}};
handle_call(flows, _From, St) ->
{reply, {ok, maps:keys(St)}, St}.
handle_cast(_Mst, St) -> handle_cast(_Mst, St) ->
{noreply, St}. {noreply, St}.
+54
View File
@@ -0,0 +1,54 @@
-module(gm_ctflow_state).
-export([ init/0
, put/2
, get/1
, get/2
, all/0
, erase/1
, reset/0
]).
-type key() :: any().
-type value() :: any().
init() ->
ets:new(?MODULE, [ordered_set, public, named_table]).
-spec put(key(), value()) -> 'ok'.
put(Key, Value) ->
ets:insert(?MODULE, {Key, Value}),
ok.
-spec get(key()) -> value() | no_return().
get(Key) ->
case ets:lookup(?MODULE, Key) of
[] ->
error({no_such_key, Key});
[{_, Value}] ->
Value
end.
-spec get(key(), Default) -> value()
when Default :: value().
get(Key, Default) ->
case ets:lookup(?MODULE, Key) of
[{_, Value}] ->
Value;
[] ->
Default
end.
-spec all() -> [{key(), value()}].
all() ->
ets:tab2list(?MODULE).
erase(Key) ->
ets:delete(?MODULE, Key),
ok.
-spec reset() -> ok.
reset() ->
ets:match_delete(?MODULE, '_'),
ok.
+2 -1
View File
@@ -33,7 +33,8 @@ init([]) ->
period => 1 period => 1
}, },
ChildSpecs = [ ChildSpecs = [
child(gm_ctflow_server, worker) child(gm_ctflow_reg, worker)
, child(gm_ctflow_server, worker)
, child(gm_ctflow_worker_sup, supervisor) , child(gm_ctflow_worker_sup, supervisor)
], ],
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.
+79 -58
View File
@@ -8,6 +8,7 @@
, cast/2 , cast/2
, get_state/1 , get_state/1
, get_state/2 , get_state/2
, maybe_get_state/1
, stop/1 ]). , stop/1 ]).
-export([ start_link/2 -export([ start_link/2
@@ -19,11 +20,7 @@
, code_change/3 , code_change/3
]). ]).
-export([ init_reg/0 -include("gm_ctflow.hrl").
, register_name/2
, unregister_name/1
, whereis_name/1
, send/2 ]).
-type flow() :: gm_ctflow:flow(). -type flow() :: gm_ctflow:flow().
-type state() :: any(). -type state() :: any().
@@ -37,13 +34,24 @@
-type handler_fun() :: fun( (op(), msg(), state()) -> handler_reply() ). -type handler_fun() :: fun( (op(), msg(), state()) -> handler_reply() ).
-type init_fun() :: fun( () -> {ok, handler_fun(), state()} ). -type init_fun() :: fun( () -> {ok, handler_fun(), state()} ).
-record(st, { flow :: flow()
, f :: handler_fun()
, f_st :: state() }).
-record(error, { time = gm_ctflow:timestamp() :: string()
, error :: any() }).
via(Pid) when is_pid(Pid) ->
Pid;
via(Flow) ->
{via, gm_ctflow_reg, Flow}.
-spec start(flow(), init_fun()) -> {'ok', pid()}. -spec start(flow(), init_fun()) -> {'ok', pid()}.
start(Flow, InitF) when is_function(InitF, 0) -> start(Flow, InitF) when is_function(InitF, 0) ->
supervisor:start_child(gm_ctflow_worker_sup, [Flow, InitF]). supervisor:start_child(gm_ctflow_worker_sup, [Flow, InitF]).
-spec stop(flow()) -> 'ok'. -spec stop(flow()) -> 'ok'.
stop(Flow) -> stop(Flow) ->
supervisor:terminate_child(gm_ctflow_worker_sup, whereis_name(Flow)). supervisor:terminate_child(gm_ctflow_worker_sup, gm_ctflow_reg:whereis_name(Flow)).
-spec call(Flow, Req) -> Reply | no_return() -spec call(Flow, Req) -> Reply | no_return()
when Flow :: flow(), when Flow :: flow(),
@@ -62,18 +70,26 @@ call(Flow, Req, Timeout) ->
-spec cast(flow(), msg()) -> 'ok' | no_return(). -spec cast(flow(), msg()) -> 'ok' | no_return().
cast(Flow, Msg) -> cast(Flow, Msg) ->
gen_server:cast({via, ?MODULE, Flow}, Msg). gen_server:cast(via(Flow), Msg).
-spec get_state(flow()) -> state() | no_return(). -spec get_state(flow()) -> state() | no_return().
get_state(Flow) -> get_state(Flow) ->
call_(Flow, get_state, infinity). call_(Flow, get_state, infinity).
maybe_get_state(Flow) ->
case gm_ctflow_reg:whereis_name(Flow) of
undefined ->
error;
Pid ->
call_(Pid, maybe_get_state, infinity)
end.
-spec get_state(flow(), timeout()) -> state() | no_return(). -spec get_state(flow(), timeout()) -> state() | no_return().
get_state(Flow, Timeout) -> get_state(Flow, Timeout) ->
call_(Flow, get_state, Timeout). call_(Flow, get_state, Timeout).
call_(Flow, Req, Timeout) -> call_(Flow, Req, Timeout) ->
try gen_server:call({via, ?MODULE, Flow}, Req, Timeout) of try gen_server:call(via(Flow), Req, Timeout) of
{error, Reason} -> {error, Reason} ->
error(Reason); error(Reason);
Other -> Other ->
@@ -84,42 +100,79 @@ call_(Flow, Req, Timeout) ->
end. end.
start_link(Flow, InitF) when is_function(InitF, 0) -> start_link(Flow, InitF) when is_function(InitF, 0) ->
gen_server:start_link({via, ?MODULE, Flow}, ?MODULE, InitF, []). gen_server:start_link(via(Flow), ?MODULE, [Flow, InitF], []).
init(InitF) -> init([Flow, InitF]) ->
ct_util:mark_process(),
gm_ctflow:note_flow(Flow),
?LOG(Flow, "Initializing flow worker", []),
case InitF() of case InitF() of
{ok, HandlerFun, St} when is_function(HandlerFun, 3) -> {ok, HandlerFun, St} when is_function(HandlerFun, 3) ->
{ok, {HandlerFun, St}}; {ok, #st{flow = Flow, f = HandlerFun, f_st = St}};
Other -> Other ->
{error, Other} {error, Other}
end. end.
handle_call(get_state, _From, {_, FSt} = St) -> handle_call(Req, _From, #error{} = E) ->
case Req of
get_state -> {reply, E, E};
maybe_get_state -> {reply, {ok, {error, E}}, E};
_ ->
{reply, {error, {already_crashed, E}}, E}
end;
handle_call(get_state, _From, #st{f_st = FSt} = St) ->
{reply, FSt, St}; {reply, FSt, St};
handle_call({call, Req}, From, {F, FSt} = St) -> handle_call(maybe_get_state, _From, #st{f_st = FSt} = St) ->
case F({call, From}, Req, FSt) of {reply, {ok, FSt}, St};
handle_call({call, Req}, From, #st{flow = Flow, f = F, f_st = FSt} = St) ->
try F({call, From}, Req, FSt) of
{reply, Res, FSt1} -> {reply, Res, FSt1} ->
{reply, Res, {F, FSt1}}; ?LOG(Flow, "CALL: ~p -> ~p", [Req, Res]),
{reply, Res, St#st{f_st = FSt1}};
{noreply, FSt1} -> {noreply, FSt1} ->
{noreply, {F, FSt1}}; {noreply, {F, FSt1}};
Other -> Other ->
{stop, {error, Other}, St} E = #error{error = Other},
?LOG(Flow, "ERROR CALL: ~p -> ~p", [Req, E]),
{reply, E, E}
catch
error:E:ST ->
Error = #error{error = {E, ST}},
?LOG(Flow, "CAUGHT CALL: ~p -> error:~p:~p", [Req, E, ST]),
{reply, Error, Error}
end. end.
handle_cast(Msg, {F, FSt} = St) -> handle_cast(_Msg, #error{} = E) ->
case F(cast, Msg, FSt) of {noreply, E};
{ok, FSt1} -> handle_cast(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) ->
{noreply, {F, FSt1}}; try F(cast, Msg, FSt) of
{Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
?LOG(Flow, "CAST ~p (~p) -> ~p", [Msg, FSt, Res]),
{noreply, St#st{f_st = FSt1}};
Other -> Other ->
{stop, {error, Other}, St} ?LOG(Flow, "ERROR CAST: ~p -> ~p", [Msg, Other]),
{noreply, #error{error = Other}}
catch
error:E:ST ->
?LOG(Flow, "CAUGHT CAST ~p (~p) : error:~p:~p", [Msg, FSt, E, ST]),
{noreply, #error{error = {E, ST}}}
end. end.
handle_info(Msg, {F, FSt} = St) -> handle_info(_, #error{} = E) ->
case F(info, Msg, FSt) of {noreply, E};
{ok, FSt1} -> handle_info(Msg, #st{flow = Flow, f = F, f_st = FSt} = St) ->
{noreply, {F, FSt1}}; try F(info, Msg, FSt) of
{Ok, FSt1} = Res when Ok == ok; Ok == noreply ->
?LOG(Flow, "INFO ~p (~p) -> ~p", [Msg, FSt, Res]),
{noreply, St#st{f_st = FSt1}};
Other -> Other ->
{stop, {error, Other}, St} ?LOG(Flow, "ERROR MSG: ~p -> ~p", [Msg, Other]),
{noreply, #error{error = Other}}
catch
error:E:ST ->
?LOG(Flow, "CAUGHT MSG: ~p, error:~p:~p", [Msg, E, ST]),
{noreply, #error{error = {E, ST}}}
end. end.
terminate(_, _) -> terminate(_, _) ->
@@ -127,35 +180,3 @@ terminate(_, _) ->
code_change(_FromVsn, S, _Extra) -> code_change(_FromVsn, S, _Extra) ->
{ok, S}. {ok, S}.
init_reg() ->
ets:new(?MODULE, [public, named_table]).
register_name(Name, Pid) ->
case ets:insert_new(?MODULE, {Name, Pid}) of
true ->
yes;
false ->
no
end.
unregister_name(Name) ->
ets:delete(?MODULE, Name).
whereis_name(Name) ->
case ets:lookup(?MODULE, Name) of
[{_, Pid}] ->
Pid;
[] ->
undefined
end.
send(Name, Msg) ->
case whereis_name(Name) of
undefined ->
exit({badarg, {Name, Msg}});
Pid ->
Pid ! Msg,
Pid
end.
+7 -1
View File
@@ -12,8 +12,15 @@
-export([init/1]). -export([init/1]).
-export([reset/0]).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
reset() ->
[ok = supervisor:terminate_child(?MODULE, Id)
|| {Id, _, _, _} <- supervisor:which_children(?MODULE)],
ok.
start_link() -> start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []). supervisor:start_link({local, ?SERVER}, ?MODULE, []).
@@ -27,7 +34,6 @@ start_link() ->
%% type => worker(), % optional %% type => worker(), % optional
%% modules => modules()} % optional %% modules => modules()} % optional
init([]) -> init([]) ->
gm_ctflow_worker:init_reg(),
SupFlags = #{ SupFlags = #{
strategy => simple_one_for_one, strategy => simple_one_for_one,
intensity => 0, intensity => 0,
+55 -3
View File
@@ -11,7 +11,10 @@
]). ]).
%% test cases %% test cases
-export([ init_counter/1 -export([ put_a/1
, get_a/1
, erase_a/1
, init_counter/1
, incr/1 , incr/1
, check1/1 , check1/1
, check2/1 , check2/1
@@ -21,24 +24,34 @@
, checkl1/1 , checkl1/1
, checkl2/1 , checkl2/1
, checkl3/1 , checkl3/1
, reset/1
, reset_flow/1
, status/1
]). ]).
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
all() -> all() ->
[ [
{group, statefuns} {group, state}
, {group, statefuns}
, {group, workers} , {group, workers}
, {group, reset}
]. ].
groups() -> groups() ->
[ {statefuns, [sequence], [ init_counter [ {state, [sequence], [ put_a
, get_a
, erase_a
]}
, {statefuns, [sequence], [ init_counter
, incr , incr
, check1 %% state = 1 , check1 %% state = 1
, incr , incr
, check2 %% state = 2 , check2 %% state = 2
, incr , incr
, check3 %% state = 3 , check3 %% state = 3
, status
]} ]}
, {workers, [sequence], [ init_counter , {workers, [sequence], [ init_counter
, init_worker , init_worker
@@ -48,7 +61,19 @@ groups() ->
, checkl2 %% [A0, A1] , checkl2 %% [A0, A1]
, append , append
, checkl3 %% [A0, A1, A2] , checkl3 %% [A0, A1, A2]
, status
]} ]}
, {reset, [sequence], [ init_counter
, init_worker
, append %% also increments
, check1 %% so, state = 1
, checkl1
, reset_flow
, checkl1 %% flow 'my_list' unaffected
, init_counter
, incr
, check1
, status ]}
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
@@ -66,11 +91,28 @@ end_per_group(_Grp, _Config) ->
ok. ok.
init_per_testcase(_Case, Config) -> init_per_testcase(_Case, Config) ->
gm_ctflow:status(),
Config. Config.
end_per_testcase(_Case, _Config) -> end_per_testcase(_Case, _Config) ->
gm_ctflow:status(),
ok. ok.
%% ======================================================================
%% State update commands
put_a(_Cfg) ->
ok = gm_ctflow:put(a, 1),
?assertMatch(1, gm_ctflow:get(a)).
get_a(_Cfg) ->
?assertMatch(1, gm_ctflow:get(a)).
erase_a(_Cfg) ->
ok = gm_ctflow:erase(a),
?assertException(error, {no_such_key,a}, gm_ctflow:get(a)),
?assertMatch(undefined, gm_ctflow:get(a, undefined)).
%% ====================================================================== %% ======================================================================
%% This is our 'ctflow_fun' instance, which floats above some simple %% This is our 'ctflow_fun' instance, which floats above some simple
%% test cases. In this test suite, we only exercise the flow funs, but %% test cases. In this test suite, we only exercise the flow funs, but
@@ -78,6 +120,7 @@ end_per_testcase(_Case, _Config) ->
%% easily threaded through the sequence of cases otherwise. %% easily threaded through the sequence of cases otherwise.
%% %%
init_counter(_Cfg) -> init_counter(_Cfg) ->
gm_ctflow:status(),
gm_ctflow_fun:new(my_counter, fun ctr/2, 0), gm_ctflow_fun:new(my_counter, fun ctr/2, 0),
ok. ok.
@@ -99,6 +142,12 @@ ctr(incr, N) ->
NewN = N+1, NewN = N+1,
{ok, NewN, NewN}. {ok, NewN, NewN}.
reset_flow(_Cfg) ->
gm_ctflow:reset_flow(my_counter).
status(_Cfg) ->
gm_ctflow:status().
%% ====================================================================== %% ======================================================================
%% Here, we make use of the `gm_ctflow_worker` it's a complete process %% Here, we make use of the `gm_ctflow_worker` it's a complete process
%% (a gen_server, whose logic we instrument with the fun we provice). %% (a gen_server, whose logic we instrument with the fun we provice).
@@ -132,3 +181,6 @@ checkl_(L) ->
lfun({call,_From}, {append, Item}, L) -> lfun({call,_From}, {append, Item}, L) ->
NewL = L ++ [Item], NewL = L ++ [Item],
{reply, NewL, NewL}. {reply, NewL, NewL}.
reset(_Cfg) ->
gm_ctflow:reset().