erlbus reading

Aug 9, 2016 ( Feb 11, 2022 更新 )

erlbusコードリーディング

pub-subでメッセージ受け渡しをどういう設計でやっているかの参考にする。

erlbusはElixirのPhoenix frameworkで提供するPhoenix.pubsubモジュールの移植とのこと。 ドキュメントを読むと、プロセス間のメッセージパッシングや、Redisをつかったpub-subのクライアント用途として使う模様。

%% supervisorを登録
supervisor(Phoenix.PubSub.Redis, [:my_redis_pubsub, host: "192.168.100.1"])

%% subscribe/publish
ex> PubSub.subscribe MyApp.PubSub, self, "user:123"
:ok
iex> Process.info(self)[:messages]
[]
iex> PubSub.broadcast MyApp.PubSub, "user:123", {:user_update, %{id: 123, name:
"Shane"}}
:ok
iex> Process.info(self)[:messages]
{:user_update, %{id: 123, name: "Shane"}}

Process design


                                            ┌───────────┐
                                            │   ebus    │
                                            └───────────┘
                                                  │
                                                  │
                                  ┌──────────────────────────────┐
                                  │      ebus_ps_local_sup       │
                                  └──────────────────────────────┘
                                                  │
                                                  │
                                  ┌──────────────────────────────┐
                                  │       ebus_supervisor        │
                                  └──────────────────────────────┘
                                                  │
                                                  │
                                 ┌────────────────┴─────────────────────────────┐
                                 │                                              │
                                 │                                              │
                 ┌──────────────────────────────┐   ┌───────────────────────────────────────────────────────┐
                 │        ebus_ps_local         │   │                                                       │
                 └──────────────────────────────┘   │                      ebus_ps_gc                       │
                                                    │                     <gen_server>                      │
                                                    │                                                       │
                                                    │                (1) down(GCServer, Pid)                │
                                                    │        Remove an object from management table         │
                                                    │               of subscribers and topics               │
                                                    │               when subscriber is down.                │
                                                    │                                                       │
                                                    │  (2) unsubscribe(Pid, Topic, TopicsTable, PidsTable)  │
                                                    │       Remove subscriber info from topics table        │
                                                    │              and remove subscriber info               │
                                                    │              from subscriber table also               │
                                                    │                                                       │
                                                    │                                                       │
                                                    └───────────────────────────────────────────────────────┘

ebus.erl

application behaviour実装。

start/2でebus_supを実行している。

start(_StartType, _StartArgs) -> ebus_sup:start_link().

ebus_sup.erl

トップレベルのsupervisor。ebus_ps_pg2ebus_psを渡して、ebus_psのsupervisor-childrenと ebus_psのweokersを起動する。

-spec start_link() -> supervisor:startlink_ret().
start_link() ->
  %% ebusのpubsubパラメータを読み込む。デフォルト値は[]
  PubSub = application:get_env(ebus, pubsub, []),
  %% PubSubリストから、nameキーの値を取得する
  %% デフォルト値(PubSubが[]だった場合も)はebus:default_ps_server()で`ebus_ps`を返す
  Name = ebus_common:keyfind(name, PubSub, ebus:default_ps_server()),
  %% 上記と同様。デフォルトはebus_ps_pg2
  Adapter = ebus_common:keyfind(adapter, PubSub, ebus_ps_pg2),
  %% デフォルトの場合はebus_ps_pg2にebus_psを渡して起動
  Adapter:start_link(Name, PubSub).

ebus_ps_pg2.erl

受け取ったServer(デフォルトではebus_ps)を

  • supervisor-childrenツリー
  • サーバプロセス

の2種類で立ち上げる。

%%%===================================================================
%%% API functions
%%%===================================================================

-spec start_link(atom(), [term()]) -> supervisor:startlink_ret().
start_link(Name, Opts) ->
  %% `ebus_common:build_name(List, Separator)`はリストをSeparatorで区切って
  %% atomとして返す
  %% Nameがebus_psなのでsup_ebus_psというatomが返る
  SupName = ebus_common:build_name([Name, <<"sup">>], <<"_">>),
  %% start_link(SupName, Mdule, Args)
  %% ArgsはModuleの`init/1`に渡すパラメータ
  %% この場合はsup_ebus_psがSupName, Nameがebus_ps
  supervisor:start_link({local, SupName}, ?MODULE, [Name, Opts]).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================

%% @hidden
init([Server, Opts]) ->
  PoolSize = ebus_common:keyfind(pool_size, Opts, 1),
  DispatchRules = [{broadcast, ebus_ps_pg2_server, [Server, PoolSize]}],

  Children = [
    %% supervisor用のspecを返すutil
    %% ebus_ps_local_supはebus_psを子プロセスとして起動
    ebus_supervisor_spec:supervisor(
      ebus_ps_local_sup, [Server, PoolSize, DispatchRules]
    ),
    %% worker用のspecを返すutil
    %% ebus_ps_pg2_serverを起動
    ebus_supervisor_spec:worker(ebus_ps_pg2_server, [Server])
  ],

  %% strategyを作成
  ebus_supervisor_spec:supervise(Children, #{strategy => rest_for_one}).

ebus_ps_local_sup.erl

Used by PubSub adapters to handle “local” subscriptions. Defines an ets dispatch table for routing subscription requests. Extendable by PubSub adapters by providing a list of `dispatch_rules' to extend the dispatch table.

PubSub adaptors(デフォルトではebus_ps_pg2)local(おそらくネットワークを介さない1ノード上限定でのメッセージ交換と思われる)での購読を管理させるために利用される。

ETSのdispatchテーブルを定義する。これは、subscription requestsのルーティングのため。 このモジュールはPubSub adaptorsに拡張される。拡張は、dispatch_rulesのリストを与えられることによって発生し、これはdispatch tablesを拡張する。

%%%===================================================================
%%% API functions
%%%===================================================================

-spec start_link(
  atom(), pos_integer(), [dispatch_rule()]
) -> supervisor:startlink_ret().
start_link(Server, PoolSize, DispatchRules) ->
  supervisor:start_link(?MODULE, [Server, PoolSize, DispatchRules]).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================

%% @hidden
init([Server, PoolSize, DispatchRules]) ->
  % Define a dispatch table so we don't have to go through
  % a bottleneck to get the instruction to perform.
  % dispatch tablesを定義することによって
  % 実行時のボトルネックに対処する必要がなくなる

  %% etsにsubscribe情報を格納する
  %% read が多いのでread_concurrencyをtrueにしているものと思われる
  Server = ets:new(Server, [set, named_table, {read_concurrency, true}]),
  true = ets:insert(Server, {subscribe, ebus_ps_local, [Server, PoolSize]}),
  true = ets:insert(Server, {unsubscribe, ebus_ps_local, [Server, PoolSize]}),
  true = ets:insert(Server, {subscribers, ebus_ps_local, [Server, PoolSize]}),
  true = ets:insert(Server, {list, ebus_ps_local, [Server, PoolSize]}),
  true = ets:insert(Server, DispatchRules),

  %% Shardは数字。
  ChildrenFun = fun(Shard) ->
    %% Shard=1なら、ebus_ps_local_1のようになる
    LocalShardName = ebus_ps_local:local_name(Server, Shard),
    %% Shard=1なら、ebus_ps_gc_1のようになる
    GCShardName    = ebus_ps_local:gc_name(Server, Shard),
    %% Shardテーブルに登録する
    true = ets:insert(Server, {Shard, {LocalShardName, GCShardName}}),

    %% Shardのspecを作成する
    ShardChildren = [
      ebus_supervisor_spec:worker(ebus_ps_gc, [GCShardName, LocalShardName]),
      ebus_supervisor_spec:worker(ebus_ps_local, [LocalShardName, GCShardName])
    ],

    %% Shardのspecは、ebus_supervisourの子プロセスとして使うようにして
    %% supervisor specを生成
    ebus_supervisor_spec:supervisor(
      ebus_supervisor,
      [ShardChildren, #{strategy => one_for_all}],
      #{id => Shard}
    )
  end,
  %% プールサイズ分Shard数をChildrenFunに渡す
  Children = [ChildrenFun(C) || C <- lists:seq(0, PoolSize - 1)],

  ebus_supervisor_spec:supervise(Children, #{strategy => one_for_one}).

ebus_supervisor.erl

省略

ebus_ps_gc.erl

  • gen_server behavior
  • etsにpids tables(おそらくsubscriber table)とtopicsテーブルがあり、unsubscribe/4を呼ばれたときか、subscriberのdownを明示的に教えるdown/2が呼ばれた時にそれぞれのテーブル中のひも付けレコードを削除するためのサーバプロセス

以下いろいろ省略してるメモ

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

%% @hidden
init([ServerName, LocalName]) ->
  %% ServerNameは`ebus_ps_gc_1`のような形式, LocalNameは`ebus_ps_local_1`のような形式
  {ok, #{topics => LocalName, pids => ServerName}}.

%% @hidden
%% ebus_ps_localからgen_server:callされる
handle_call({subscription, Pid}, _From, #{pids := Pids} = State) ->
  try
    {reply, ets:lookup_element(Pids, Pid, 2), State}
  catch
    error:badarg -> {reply, [], State}
  end;
handle_call(_Request, _From, State) ->
  {reply, ok, State}.

%% @hidden
%% pidsはServerName, topicsはLocalName
%% down()が呼ばれたらetsのebus_ps_local_X(Topis)、ebus_pa_gc_X(Pids)テーブルから
%% 該当Pidのvalueを削除
handle_cast({down, Pid}, #{pids := Pids, topics := Topics} = State) ->
  try
    %% e.g. ebus_ps_gc_1テーブルからPidを探す
    %% keyにはtopicsのリストがvalueとして格納されていると思われる
    Topics0 = ets:lookup_element(Pids, Pid, 2),
    %% e.g. ebus_ps_local_1テーブル(トピック管理テーブル?)から、
    %% 該当Pidのtopicと該当pidのタプルを削除
    lists:foreach(fun(Topic) ->
      true = ets:match_delete(Topics, {Topic, {Pid, '_'}})
    end, Topics0),
    %% e.g. ebus_ps_gc_1テーブル(Pidとトピックのひも付けテーブル?)から、
    %% 該当Pidを削除
    true = ets:match_delete(Pids, {Pid, '_'})
  catch
    error:badarg -> badarg
  end,
  {noreply, State};
handle_cast(_Request, State) ->
  {noreply, State}.

ebus_ps_local.erl

疑問点

  • 今のところetsを素で使っているようなので、テーブル情報はnodeの中でしか共有できない。ebusは単一ノードのみのユースケースを考えている?
    • もしそうなら、複数ノードでも動くような実装をつくりたい
    • https://github.com/cabol/erlbus には複数nodeで動くようなサンプルが載っているでどこかでできるのかなあ
    • 内部ではmnesiaを使っていない。素のetsのみ
    • ただErlangのnodeのconnectを使ったらpidsが共有できるようになるのでどっかでhandleされているのかも
    • そうすれば、それぞれのnodeで別のetsのスペースに対して同じ操作(insertとかdelete)すればよいと思われる

erlbus examples ソースコードリーディング

  • ebus_proc:spawn_handlerhandle_msgをFun, 自分のプロセスをArgとして渡す
  • hanlde_msgは、publishされたメッセージをMassageとして受け取り、Contextをself()(websocket handlerプロセスとして実行する
  • handle_msgは、Context(この場合websocket handlerプロセス)に対して {message_published, Msg} メッセージを送信する
websocket_init(_Type, Req, _Opts) ->
  % Create the handler from our custom callback
  Handler = ebus_proc:spawn_handler(fun chat_erlbus_handler:handle_msg/2, [self()]),
  ebus:sub(Handler, ?CHATROOM_NAME),
  {ok, Req, #state{name = get_name(Req), handler = Handler}, ?TIMEOUT}.

%% ...中略
websocket_info({message_published, {Sender, Msg}}, Req, State) ->
  {reply, {text, jiffy:encode({[{sender, Sender}, {msg, Msg}]})}, Req, State};
-module(chat_erlbus_handler).

%% API
-export([handle_msg/2]).

handle_msg(Msg, Context) ->
  Context ! {message_published, Msg}.

spawn_handlerには関数を渡すことができる。 spawn_handlerは内部でhandle関数を呼び出すが、これは受け取ったModule, Funにreceiveで受け取ったMessageを第一引数として渡している。

%% @equiv spawn_handler(Fun, [])
spawn_handler(Fun) ->
  spawn_handler(Fun, []).

%% @equiv spawn_handler(Fun, Args, [])
spawn_handler(Fun, Args) ->
  spawn_handler(Fun, Args, []).

%% @equiv spawn_callback_handler(erlang, apply, [Fun, Args], Opts)
%% @doc
%% Same as `spawn_callback_handler/4', but receives a `fun' as
%% callback. This `fun' is invoked as:
%%
%% '''
%% apply(erlang, apply, [Fun, [Message | Args]])
%% '''
%%
%% Where `Message' is inserted as 1st argument in the `fun' args.
%% @end
spawn_handler(Fun, Args, Opts) ->
  spawn_callback_handler(erlang, apply, [Fun, Args], Opts).

%% @equiv spawn_callback_handler(Module, Fun, Args, [])
spawn_callback_handler(Module, Fun, Args) ->
  spawn_callback_handler(Module, Fun, Args, []).

%% @doc
%% Spawns a process that stays receiving messages, and when a message
%% is received, it applies the given callback `{Mod, Fun, Args}'.
%%
%% Options:
%% <ul>
%% <li>`Opts': Spawn process options. See `erlang:spawn_opt/2'.</li>
%% </ul>
%% @see erlang:spawn_opt/2.
%% @end
-spec spawn_callback_handler(
  module(), atom(), [term()], [term()]
) -> pid() | {pid(), reference()}.
spawn_callback_handler(Module, Fun, Args, Opts)
    when is_atom(Module), is_atom(Fun) ->
  %% handle(erlang, apply, [chat_erlbus_handler:handle_msg/2, [self()]])
  spawn_opt(fun() -> handle(Module, Fun, Args) end, Opts).

%% @private
handle(erlang, apply, [Fun, FunArgs] = Args) ->
  receive
    Message ->
      %% Messageを第一引数にしてFunを実行
      apply(erlang, apply, [Fun, [Message | FunArgs]]),
      handle(erlang, apply, Args)
  end;
handle(Module, Fun, Args) ->
  receive
    Message ->
      apply(Module, Fun, [Message | Args]),
      handle(Module, Fun, Args)
  end.
Return to top