erlbusコードリーディング
pub-subでメッセージ受け渡しをどういう設計でやっているかの参考にする。
erlbusはElixirのPhoenix frameworkで提供するPhoenix.pubsubモジュールの移植とのこと。 ドキュメントを読むと、プロセス間のメッセージパッシングや、Redisをつかったpub-subのクライアント用途として使う模様。
%% supervisorを登録
supervisor(Phoenix.PubSub.Redis, [:my_redis_pubsub, host: "192.168.100.1"])
%% subscribe/publish
ex> PubSub.subscribe MyApp.PubSub, self, "user:123"
:ok
iex> Process.info(self)[:messages]
[]
iex> PubSub.broadcast MyApp.PubSub, "user:123", {:user_update, %{id: 123, name:
"Shane"}}
:ok
iex> Process.info(self)[:messages]
{:user_update, %{id: 123, name: "Shane"}}
Process design
┌───────────┐
│ ebus │
└───────────┘
│
│
┌──────────────────────────────┐
│ ebus_ps_local_sup │
└──────────────────────────────┘
│
│
┌──────────────────────────────┐
│ ebus_supervisor │
└──────────────────────────────┘
│
│
┌────────────────┴─────────────────────────────┐
│ │
│ │
┌──────────────────────────────┐ ┌───────────────────────────────────────────────────────┐
│ ebus_ps_local │ │ │
└──────────────────────────────┘ │ ebus_ps_gc │
│ <gen_server> │
│ │
│ (1) down(GCServer, Pid) │
│ Remove an object from management table │
│ of subscribers and topics │
│ when subscriber is down. │
│ │
│ (2) unsubscribe(Pid, Topic, TopicsTable, PidsTable) │
│ Remove subscriber info from topics table │
│ and remove subscriber info │
│ from subscriber table also │
│ │
│ │
└───────────────────────────────────────────────────────┘
ebus.erl
application behaviour実装。
start/2
でebus_supを実行している。
start(_StartType, _StartArgs) -> ebus_sup:start_link().
ebus_sup.erl
トップレベルのsupervisor。ebus_ps_pg2
にebus_ps
を渡して、ebus_ps
のsupervisor-childrenと
ebus_ps
のweokersを起動する。
-spec start_link() -> supervisor:startlink_ret().
start_link() ->
%% ebusのpubsubパラメータを読み込む。デフォルト値は[]
PubSub = application:get_env(ebus, pubsub, []),
%% PubSubリストから、nameキーの値を取得する
%% デフォルト値(PubSubが[]だった場合も)はebus:default_ps_server()で`ebus_ps`を返す
Name = ebus_common:keyfind(name, PubSub, ebus:default_ps_server()),
%% 上記と同様。デフォルトはebus_ps_pg2
Adapter = ebus_common:keyfind(adapter, PubSub, ebus_ps_pg2),
%% デフォルトの場合はebus_ps_pg2にebus_psを渡して起動
Adapter:start_link(Name, PubSub).
ebus_ps_pg2.erl
受け取ったServer
(デフォルトではebus_ps
)を
- supervisor-childrenツリー
- サーバプロセス
の2種類で立ち上げる。
%%%===================================================================
%%% API functions
%%%===================================================================
-spec start_link(atom(), [term()]) -> supervisor:startlink_ret().
start_link(Name, Opts) ->
%% `ebus_common:build_name(List, Separator)`はリストをSeparatorで区切って
%% atomとして返す
%% Nameがebus_psなのでsup_ebus_psというatomが返る
SupName = ebus_common:build_name([Name, <<"sup">>], <<"_">>),
%% start_link(SupName, Mdule, Args)
%% ArgsはModuleの`init/1`に渡すパラメータ
%% この場合はsup_ebus_psがSupName, Nameがebus_ps
supervisor:start_link({local, SupName}, ?MODULE, [Name, Opts]).
%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
%% @hidden
init([Server, Opts]) ->
PoolSize = ebus_common:keyfind(pool_size, Opts, 1),
DispatchRules = [{broadcast, ebus_ps_pg2_server, [Server, PoolSize]}],
Children = [
%% supervisor用のspecを返すutil
%% ebus_ps_local_supはebus_psを子プロセスとして起動
ebus_supervisor_spec:supervisor(
ebus_ps_local_sup, [Server, PoolSize, DispatchRules]
),
%% worker用のspecを返すutil
%% ebus_ps_pg2_serverを起動
ebus_supervisor_spec:worker(ebus_ps_pg2_server, [Server])
],
%% strategyを作成
ebus_supervisor_spec:supervise(Children, #{strategy => rest_for_one}).
ebus_ps_local_sup.erl
Used by PubSub adapters to handle “local” subscriptions. Defines an ets dispatch table for routing subscription requests. Extendable by PubSub adapters by providing a list of `dispatch_rules' to extend the dispatch table.
PubSub adaptors(デフォルトではebus_ps_pg2
)local(おそらくネットワークを介さない1ノード上限定でのメッセージ交換と思われる)での購読を管理させるために利用される。
ETSのdispatchテーブルを定義する。これは、subscription requestsのルーティングのため。
このモジュールはPubSub adaptorsに拡張される。拡張は、dispatch_rules
のリストを与えられることによって発生し、これはdispatch tablesを拡張する。
%%%===================================================================
%%% API functions
%%%===================================================================
-spec start_link(
atom(), pos_integer(), [dispatch_rule()]
) -> supervisor:startlink_ret().
start_link(Server, PoolSize, DispatchRules) ->
supervisor:start_link(?MODULE, [Server, PoolSize, DispatchRules]).
%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
%% @hidden
init([Server, PoolSize, DispatchRules]) ->
% Define a dispatch table so we don't have to go through
% a bottleneck to get the instruction to perform.
% dispatch tablesを定義することによって
% 実行時のボトルネックに対処する必要がなくなる
%% etsにsubscribe情報を格納する
%% read が多いのでread_concurrencyをtrueにしているものと思われる
Server = ets:new(Server, [set, named_table, {read_concurrency, true}]),
true = ets:insert(Server, {subscribe, ebus_ps_local, [Server, PoolSize]}),
true = ets:insert(Server, {unsubscribe, ebus_ps_local, [Server, PoolSize]}),
true = ets:insert(Server, {subscribers, ebus_ps_local, [Server, PoolSize]}),
true = ets:insert(Server, {list, ebus_ps_local, [Server, PoolSize]}),
true = ets:insert(Server, DispatchRules),
%% Shardは数字。
ChildrenFun = fun(Shard) ->
%% Shard=1なら、ebus_ps_local_1のようになる
LocalShardName = ebus_ps_local:local_name(Server, Shard),
%% Shard=1なら、ebus_ps_gc_1のようになる
GCShardName = ebus_ps_local:gc_name(Server, Shard),
%% Shardテーブルに登録する
true = ets:insert(Server, {Shard, {LocalShardName, GCShardName}}),
%% Shardのspecを作成する
ShardChildren = [
ebus_supervisor_spec:worker(ebus_ps_gc, [GCShardName, LocalShardName]),
ebus_supervisor_spec:worker(ebus_ps_local, [LocalShardName, GCShardName])
],
%% Shardのspecは、ebus_supervisourの子プロセスとして使うようにして
%% supervisor specを生成
ebus_supervisor_spec:supervisor(
ebus_supervisor,
[ShardChildren, #{strategy => one_for_all}],
#{id => Shard}
)
end,
%% プールサイズ分Shard数をChildrenFunに渡す
Children = [ChildrenFun(C) || C <- lists:seq(0, PoolSize - 1)],
ebus_supervisor_spec:supervise(Children, #{strategy => one_for_one}).
ebus_supervisor.erl
省略
ebus_ps_gc.erl
- gen_server behavior
- etsにpids tables(おそらくsubscriber table)とtopicsテーブルがあり、unsubscribe/4を呼ばれたときか、subscriberのdownを明示的に教えるdown/2が呼ばれた時にそれぞれのテーブル中のひも付けレコードを削除するためのサーバプロセス
以下いろいろ省略してるメモ
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @hidden
init([ServerName, LocalName]) ->
%% ServerNameは`ebus_ps_gc_1`のような形式, LocalNameは`ebus_ps_local_1`のような形式
{ok, #{topics => LocalName, pids => ServerName}}.
%% @hidden
%% ebus_ps_localからgen_server:callされる
handle_call({subscription, Pid}, _From, #{pids := Pids} = State) ->
try
{reply, ets:lookup_element(Pids, Pid, 2), State}
catch
error:badarg -> {reply, [], State}
end;
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%% @hidden
%% pidsはServerName, topicsはLocalName
%% down()が呼ばれたらetsのebus_ps_local_X(Topis)、ebus_pa_gc_X(Pids)テーブルから
%% 該当Pidのvalueを削除
handle_cast({down, Pid}, #{pids := Pids, topics := Topics} = State) ->
try
%% e.g. ebus_ps_gc_1テーブルからPidを探す
%% keyにはtopicsのリストがvalueとして格納されていると思われる
Topics0 = ets:lookup_element(Pids, Pid, 2),
%% e.g. ebus_ps_local_1テーブル(トピック管理テーブル?)から、
%% 該当Pidのtopicと該当pidのタプルを削除
lists:foreach(fun(Topic) ->
true = ets:match_delete(Topics, {Topic, {Pid, '_'}})
end, Topics0),
%% e.g. ebus_ps_gc_1テーブル(Pidとトピックのひも付けテーブル?)から、
%% 該当Pidを削除
true = ets:match_delete(Pids, {Pid, '_'})
catch
error:badarg -> badarg
end,
{noreply, State};
handle_cast(_Request, State) ->
{noreply, State}.
ebus_ps_local.erl
疑問点
- 今のところetsを素で使っているようなので、テーブル情報はnodeの中でしか共有できない。ebusは単一ノードのみのユースケースを考えている?
- もしそうなら、複数ノードでも動くような実装をつくりたい
- https://github.com/cabol/erlbus には複数nodeで動くようなサンプルが載っているでどこかでできるのかなあ
- 内部ではmnesiaを使っていない。素のetsのみ
- ただErlangのnodeのconnectを使ったらpidsが共有できるようになるのでどっかでhandleされているのかも
- そうすれば、それぞれのnodeで別のetsのスペースに対して同じ操作(insertとかdelete)すればよいと思われる
erlbus examples ソースコードリーディング
ebus_proc:spawn_handler
にhandle_msg
をFun, 自分のプロセスをArgとして渡すhanlde_msg
は、publishされたメッセージをMassageとして受け取り、Contextをself()(websocket handlerプロセスとして実行するhandle_msg
は、Context(この場合websocket handlerプロセス)に対して {message_published, Msg} メッセージを送信する
websocket_init(_Type, Req, _Opts) ->
% Create the handler from our custom callback
Handler = ebus_proc:spawn_handler(fun chat_erlbus_handler:handle_msg/2, [self()]),
ebus:sub(Handler, ?CHATROOM_NAME),
{ok, Req, #state{name = get_name(Req), handler = Handler}, ?TIMEOUT}.
%% ...中略
websocket_info({message_published, {Sender, Msg}}, Req, State) ->
{reply, {text, jiffy:encode({[{sender, Sender}, {msg, Msg}]})}, Req, State};
-module(chat_erlbus_handler).
%% API
-export([handle_msg/2]).
handle_msg(Msg, Context) ->
Context ! {message_published, Msg}.
spawn_handler
には関数を渡すことができる。
spawn_handler
は内部でhandle関数を呼び出すが、これは受け取ったModule, Funにreceive
で受け取ったMessageを第一引数として渡している。
%% @equiv spawn_handler(Fun, [])
spawn_handler(Fun) ->
spawn_handler(Fun, []).
%% @equiv spawn_handler(Fun, Args, [])
spawn_handler(Fun, Args) ->
spawn_handler(Fun, Args, []).
%% @equiv spawn_callback_handler(erlang, apply, [Fun, Args], Opts)
%% @doc
%% Same as `spawn_callback_handler/4', but receives a `fun' as
%% callback. This `fun' is invoked as:
%%
%% '''
%% apply(erlang, apply, [Fun, [Message | Args]])
%% '''
%%
%% Where `Message' is inserted as 1st argument in the `fun' args.
%% @end
spawn_handler(Fun, Args, Opts) ->
spawn_callback_handler(erlang, apply, [Fun, Args], Opts).
%% @equiv spawn_callback_handler(Module, Fun, Args, [])
spawn_callback_handler(Module, Fun, Args) ->
spawn_callback_handler(Module, Fun, Args, []).
%% @doc
%% Spawns a process that stays receiving messages, and when a message
%% is received, it applies the given callback `{Mod, Fun, Args}'.
%%
%% Options:
%% <ul>
%% <li>`Opts': Spawn process options. See `erlang:spawn_opt/2'.</li>
%% </ul>
%% @see erlang:spawn_opt/2.
%% @end
-spec spawn_callback_handler(
module(), atom(), [term()], [term()]
) -> pid() | {pid(), reference()}.
spawn_callback_handler(Module, Fun, Args, Opts)
when is_atom(Module), is_atom(Fun) ->
%% handle(erlang, apply, [chat_erlbus_handler:handle_msg/2, [self()]])
spawn_opt(fun() -> handle(Module, Fun, Args) end, Opts).
%% @private
handle(erlang, apply, [Fun, FunArgs] = Args) ->
receive
Message ->
%% Messageを第一引数にしてFunを実行
apply(erlang, apply, [Fun, [Message | FunArgs]]),
handle(erlang, apply, Args)
end;
handle(Module, Fun, Args) ->
receive
Message ->
apply(Module, Fun, [Message | Args]),
handle(Module, Fun, Args)
end.