-module(gm_ctflow). -export([ put/2 , get/1 , get/2 , erase/1 ]). -export([ note_flow/1 , get_flow/0 , ct_log/2 %% ct_log(Fmt, Args) , ct_log/3 %% ct_log(Flow, Fmt, Args) , timestamp/0 ]). -export([ spawn/1 , spawn_opt/2 ]). -export([ status/0 , summarize_flow/1 , reset_flow/1 , reset/0 ]). -include("gm_ctflow.hrl"). -type flow() :: any(). -type key() :: any(). -type value() :: any(). -export_type([ flow/0 ]). -spec put(key(), value()) -> 'ok'. put(Key, Value) -> gm_ctflow_state:put(Key, Value). -spec get(key()) -> value() | no_return(). get(Key) -> gm_ctflow_state:get(Key). -spec get(key(), Default) -> value() when Default :: value(). get(Key, Default) -> gm_ctflow_state:get(Key, Default). -spec erase(key()) -> 'ok'. erase(Key) -> gm_ctflow_state:erase(Key). -spec note_flow(flow()) -> 'ok'. note_flow(Flow) -> erlang:put({?MODULE, flow}, Flow), ok. -spec get_flow() -> flow() | 'undefined'. get_flow() -> erlang:get({?MODULE, flow}). ct_log(Fmt, Args) -> ct_log(erlang:get({?MODULE, flow}), Fmt, Args). ct_log(Flow, Fmt, Args) -> TS = timestamp(), case Flow of undefined -> maybe_save_log_no_flow(TS, Fmt, Args), ct:log(Fmt, Args); Flow -> save_log(Flow, TS, Fmt, Args), ct:log("~p[~w]: " ++ Fmt, [Flow, self()|Args]) end. maybe_save_log_no_flow(TS, Fmt, Args) -> try save_log(undefined, TS, Fmt, Args) catch error:_ -> ok end. save_log(Flow, TS, Fmt, Args) -> gm_ctflow_log:log(Flow, TS, Fmt, Args). reset_flow(Flow) -> gm_ctflow_log:erase(Flow), try_call(fun gm_ctflow_worker:stop/1, Flow), try_call(fun gm_ctflow_fun:delete/1, Flow), ok. reset() -> gm_ctflow_log:erase(), gm_ctflow_state:reset(), gm_ctflow_worker_sup:reset(), gm_ctflow_fun:reset(), ok. status() -> WorkerFlows = [F || {F,_} <- gm_ctflow_reg:registered_names()], FunFlows = gm_ctflow_fun:flows(), AllFlows = ordsets:from_list(WorkerFlows ++ FunFlows), [summarize_flow(Flow) || Flow <- AllFlows]. summarize_flow(Flow) -> LogHistory = gm_ctflow_log:history(Flow), AllState = gm_ctflow_state:all(), FunState = try_call(fun gm_ctflow_fun:maybe_get_state/1, Flow), WorkerState = try_call(fun gm_ctflow_worker:maybe_get_state/1, Flow), S = [ {worker_state, WorkerState} , {fun_state, FunState} , {all_state, AllState} , {log_history, LogHistory} ], ct:log("== Summary for flow ~p~n~s", [Flow , [summary_(S1) || S1 <- S]]). summary_({log_history, []}) -> fw("= Log History: - none -~n", []); summary_({log_history, H}) -> [ fw("= Log History:~n", []) , [ fw("~n~s:~n~s~n", [TS, fw(Fmt, Args)]) || {TS, Fmt, Args} <- H], "\n"]; summary_({all_state, []}) -> fw("= State: - none -~n", []); summary_({all_state, L}) -> [ fw("= State:~n", []) , [fw("~p = ~p~n", [K, V]) || {K, V} <- L], "\n"]; summary_({worker_state, S}) -> fw("= Worker State: ~s~n", [maybe_state_str(S)]); summary_({fun_state, S}) -> fw("= Fun State: ~s~n", [maybe_state_str(S)]). maybe_state_str({ok, S}) -> fw("~p", [S]); maybe_state_str({error, E}) -> fw("ERROR: ~p", [E]); maybe_state_str(undefined) -> "- none -"; maybe_state_str(error) -> "- none -". fw(Fmt, Args) -> io_lib:fwrite(Fmt, Args). try_call(F, Flow) -> try F(Flow) catch exit:E:ST -> ct:log("Call to ~p(~p) failed: exit:~p:~p", [F, Flow, E, ST]), undefined; error:E:ST -> ct:log("Call to ~p(~p) failed: error:~p:~p", [F, Flow, E, ST]), undefined end. %% copy-pasted (roughly) from ct_logs.erl timestamp() -> {MS,S,US} = os:timestamp(), {{Year,Month,Day}, {Hour,Min,Sec}} = calendar:now_to_local_time({MS,S,US}), MilliSec = trunc(US/1000), lists:flatten(io_lib:format("~4.10.0B-~2.10.0B-~2.10.0B " "~2.10.0B:~2.10.0B:~2.10.0B.~3.10.0B", [Year,Month,Day,Hour,Min,Sec,MilliSec])). spawn(F) -> Flow = get_flow(), proc_lib:spawn(fun() -> maybe_inherit_flow(Flow), ct_util:mark_process(), F() end). spawn_opt(F, Opts) -> Flow = get_flow(), proc_lib:spawn_opt(fun() -> maybe_inherit_flow(Flow), ct_util:mark_process(), F() end, Opts). maybe_inherit_flow(undefined) -> ok; maybe_inherit_flow(Flow) -> note_flow(Flow).