From 7503463ff2a321831b9781e6919d78b2dd5e1833 Mon Sep 17 00:00:00 2001 From: Jarvis Carroll Date: Sun, 26 Oct 2025 00:25:01 +0000 Subject: [PATCH] message dispatch stub This is probably about as far as I can go without actual manager processes... I need something that knows what channels are in use!! --- src/msp_channel.erl | 219 +++++++++++++++++++++++++++++++++++++++++ src/msp_connection.erl | 17 +++- src/msp_tests.erl | 4 +- 3 files changed, 235 insertions(+), 5 deletions(-) create mode 100644 src/msp_channel.erl diff --git a/src/msp_channel.erl b/src/msp_channel.erl new file mode 100644 index 0000000..54c38fc --- /dev/null +++ b/src/msp_channel.erl @@ -0,0 +1,219 @@ +%%% @doc +%%% Minimal Stream Protocol: Channel Worker +%%% @end + +-module(msp_channel). +-vsn("0.1.0"). +-behavior(gen_server). +-author("Jarvis Carroll "). +-copyright("Jarvis Carroll "). +-license("MIT"). + +-export([new_connection/2]). + +%% gen_server +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2]). +-include("$zx_include/zx_logger.hrl"). + + +%%% Opcodes + +% Fragment/message opcodes: +% _____________________________________________________________________________ +% OP_FRAGMENT | Part of a message/stream +% OP_OPEN | Open a new channel, and send the first fragment of the +% | first message in it. +% OP_END_MESSAGE | End of a message/stream, expects a response +% OP_OPEN_END | Open a new channel, and send an entire message +% OP_CLOSE | End of a message/stream, do not respond, close the channel +% OP_OPEN_CLOSE | Open a channel, send an entire message, and close the +% | channel, all in one datagram. Sort of like +% | gen_server:cast/2, or if UDP were reliable. +% +% Transport Control Stuff +% ----------------------------------------------------------------------------- +% OP_ACK | Acknowledge that instructions up to N have been received. +% OP_STATUS | Request an ACK for this exact command. Has many uses - +% | diagnostics, benchmarking, keepalive, dropped ACKs, or just +% | to stop the receiver from coalescing ACKs too much. +% OP_NACK | Instruction N has not been received. +% OP_CANCEL | Sender is hoping that messages M through N weren't +% | delivered, and can be cancelled. +% OP_ACK_CANCEL | Respond to OP_CANCEL. +% +% Stream Negotiation +% _____________________________________________________________________________ +% OP_REQ_MOVE | Indicate that a newly opened channel lost the race, and +% | should be moved to a different ID. +% +% Lossy +% _____________________________________________________________________________ +% OP_OPEN_DIRTY | Like OP_OPEN, but marks the channel as 'dirty', meaning +% | it is allowed to send lossy datagrams too, for voip, etc. +% OP_LOSSY | A lossy datagram, for the application to deal with. + +% Numbers aren't final. +-define(OP_FRAGMENT, 0). +-define(OP_OPEN, 1). +-define(OP_END_MESSAGE, 2). +-define(OP_OPEN_END, 3). +-define(OP_CLOSE, 4). +-define(OP_OPEN_CLOSE, 5). +-define(OP_ACK, 6). +-define(OP_STATUS, 7). +-define(OP_NACK, 8). +-define(OP_CANCEL, 9). +-define(OP_ACK_CANCEL, 10). +-define(OP_REQ_MOVE, 11). +-define(OP_OPEN_DIRTY, 12). +-define(OP_LOSSY, 13). + +%%% Type and Record Definitions + +-record(s, + {}). + + +-type state() :: none | #s{}. + + + +%%% Interface + +new_connection(Peer, Packet) -> + {ok, Pid} = start_link(), + configure_channel(Pid, Peer), + handle_datagram(Pid, Packet). + +configure_channel(Id, Peer) -> + gen_server:cast(Id, {configure_channel, Peer}). + +handle_datagram(Id, Packet) -> + gen_server:cast(Id, {handle_datagram, Packet}). + +%%% gen_server + +-spec start_link() -> Result + when Result :: {ok, pid()} + | {error, Reason :: term()}. + +start_link() -> + gen_server:start_link(?MODULE, none, []). + + +init(none) -> + {ok, none}. + + +handle_call(Unexpected, From, State) -> + ok = log(warning, "Unexpected call from ~tp: ~tp", [From, Unexpected]), + {noreply, State}. + + +handle_cast({configure_channel, Peer}, none) -> + State = #s{}, + {noreply, State}; +handle_cast({handle_datagram, Packet}, State) -> + NewState = do_handle_datagram(State, Packet), + {noreply, NewState}; +handle_cast(Unexpected, State) -> + ok = log(warning, "Unexpected cast: ~tp", [Unexpected]), + {noreply, State}. + + +handle_info(Unexpected, State) -> + ok = log(warning, "Unexpected info: ~tp", [Unexpected]), + {noreply, State}. + + +-spec code_change(OldVersion, State, Extra) -> {ok, NewState} | {error, Reason} + when OldVersion :: Version | {old, Version}, + Version :: term(), + State :: state(), + Extra :: term(), + NewState :: term(), + Reason :: term(). + +code_change(_, State, _) -> + {ok, State}. + + +terminate(_, _) -> + ok. + + + +%%% Doer Functions + +do_handle_datagram(State, <>) -> + do_handle_lossy(State, Data); +do_handle_datagram(State, <>) -> + do_handle_reliable(State, Op, Seq, Rest); +do_handle_datagram(State, Unexpected) -> + io:format("Got datagram ~p which is too short.~n", [Unexpected]), + State. + +do_handle_lossy(State, Data) -> + io:format("Got lossy datagram: ~p~n", [Data]), + State. + +do_handle_reliable(State, ?OP_FRAGMENT, Seq, Payload) -> + do_handle_fragment(State, false, continue, Seq, Payload); +do_handle_reliable(State, ?OP_OPEN, Seq, Payload) -> + do_handle_fragment(State, true, continue, Seq, Payload); +do_handle_reliable(State, ?OP_END_MESSAGE, Seq, Payload) -> + do_handle_fragment(State, false, end_message, Seq, Payload); +do_handle_reliable(State, ?OP_OPEN_END, Seq, Payload) -> + do_handle_fragment(State, true, end_message, Seq, Payload); +do_handle_reliable(State, ?OP_CLOSE, Seq, Payload) -> + do_handle_fragment(State, false, close_channel, Seq, Payload); +do_handle_reliable(State, ?OP_OPEN_CLOSE, Seq, Payload) -> + do_handle_fragment(State, true, close_channel, Seq, Payload); +do_handle_reliable(State, ?OP_ACK, Seq, Payload) -> + do_handle_ack(State, Seq, Payload); +do_handle_reliable(State, ?OP_STATUS, Seq, Payload) -> + do_handle_status(State, Seq, Payload); +do_handle_reliable(State, ?OP_NACK, Seq, Payload) -> + do_handle_nack(State, Seq, Payload); +do_handle_reliable(State, ?OP_CANCEL, Seq, Payload) -> + do_handle_cancel(State, Seq, Payload); +do_handle_reliable(State, ?OP_ACK_CANCEL, Seq, Payload) -> + do_handle_ack_cancel(State, Seq, Payload); +do_handle_reliable(State, ?OP_REQ_MOVE, Seq, Payload) -> + do_handle_req_move(State, Seq, Payload); +do_handle_reliable(State, ?OP_OPEN_DIRTY, Seq, Payload) -> + do_handle_fragment(State, true, dirty, Seq, Payload); +do_handle_reliable(State, Unknown, _, _) -> + io:format("Got unexpected opcode ~p~n", [Unknown]), + State. + +do_handle_fragment(State, NewChannel, Mod, Seq, Payload) -> + io:format("Got fragment ~p with index ~p, and flags ~p and ~p~n", [Payload, Seq, NewChannel, Mod]), + State. + +do_handle_ack(State, Seq, Payload) -> + io:format("Unimplemented command. Index ~p, payload ~p~n"), + State. + +do_handle_status(State, Seq, Payload) -> + io:format("Unimplemented command. Index ~p, payload ~p~n"), + State. + +do_handle_nack(State, Seq, Payload) -> + io:format("Unimplemented command. Index ~p, payload ~p~n"), + State. + +do_handle_cancel(State, Seq, Payload) -> + io:format("Unimplemented command. Index ~p, payload ~p~n"), + State. + +do_handle_ack_cancel(State, Seq, Payload) -> + io:format("Unimplemented command. Index ~p, payload ~p~n"), + State. + +do_handle_req_move(State, Seq, Payload) -> + io:format("Unimplemented command. Index ~p, payload ~p~n"), + State. + diff --git a/src/msp_connection.erl b/src/msp_connection.erl index 9bda4f8..b727e1f 100644 --- a/src/msp_connection.erl +++ b/src/msp_connection.erl @@ -105,14 +105,25 @@ terminate(_, _) -> %%% Doer Functions do_begin_msp(Sock, Peer, Side) -> - ok = inet:setopts(Sock, [{active, once}]), + ok = inet:setopts(Sock, [{active, once}, {mode, binary}]), State = #s{socket = Sock, peer = Peer, side = Side}, State. -do_dispatch(State = #s{socket = Sock}, Packet) -> - io:format("Got data: ~p~n", [Packet]), +do_dispatch(State = #s{socket = Sock, peer = Peer, connections = Conns}, <>) -> ok = inet:setopts(Sock, [{active, once}]), + case maps:find(ID, Conns) of + {ok, Conn} -> + erlang:send(Conn, {msp_fragment, Packet}), + State; + error -> + io:format("Opening connection ~p~n", [ID]), + Conn = spawn_link(msp_channel, new_connection, [Peer, Packet]), + NewConns = maps:put(Peer, Conn, Conns), + State#s{connections = NewConns} + end; +do_dispatch(State, <<>>) -> + % Empty datagram? State. diff --git a/src/msp_tests.erl b/src/msp_tests.erl index 549912e..bcb090e 100644 --- a/src/msp_tests.erl +++ b/src/msp_tests.erl @@ -34,8 +34,8 @@ send_test() -> PortB = 6666, {A, SockA} = make_connection(PortA, IP, PortB, 0), {B, SockB} = make_connection(PortB, IP, PortA, 1), - gen_udp:send(SockA, {IP, PortB}, <<"message sent from A to B">>), - gen_udp:send(SockB, {IP, PortA}, <<"message sent from B to A">>), + gen_udp:send(SockA, {IP, PortB}, << 0:8, 5:8, 0:16, "message sent from A to B">>), + gen_udp:send(SockB, {IP, PortA}, <<128:8, 5:8, 0:16, "message sent from B to A">>), timer:sleep(10), ok.