One of the greatest Phoenix's features is PubSub which allows us to get updates about anything
happening in the application immediately, at any point in time, to any process that needs/wants it.
Sadly, listeners for messages in a given PubSub are subscribing on the process level.
That means LiveComponents can't really subscribe, because this will send a message to
parent LiveView to handle. This creates problems because you may have one LiveView
with multiple LiveComponents that manage some data, updates to that data, etc.
It's a reasonable use case for me, because sometimes there's a LiveView
that has a lot of LiveComponents. Having to route messages around them all,
managing subscriptions, etc., would be problematic, messy, and hard to deal with.
Another problem is that some LiveComponent could be used on multiple pages.
In such a case it would require writing the same handling logic for all of these modules
that use that LiveComponent. Of course you can make your life easier by creating helper modules
or helper functions in LiveComponent but in the end there will be repetition you have
to remember about and deal with.
So let's find a solution that will allow our LiveComponents to behave as much as possible like
separate processes as far as PubSub is concerned.
MessagePasser - passing messages to LiveComponent
To start, we need some way to pass messages to components and to let them handle them.
LiveComponent doesn't have handle_info, handle_cast and the like.
LiveView does since it is a separate process, but LiveComponent isn't.
Luckily, Phoenix gives us the send_update function,
so we can trigger the update callback with arbitrary, new data for any mounted LiveComponent.
There's also send_update_after in Phoenix, so you can implement delayed messages too.
But I will leave it as an exercise for you to do if you need it.
Let's create a MessagePasser module to implement this behavior.
defmodule MyAppWeb.LiveComponent.MessagePasser do
def send_message(pid \\ self(), module, id, msg) do
Phoenix.LiveView.send_update(pid, module, id: id, __message__: msg)
end
end
In this module, for now we have a function allowing us to send messages to module with given id.
Even if that module is mounted somewhere else
(including a different computer entirely, if you need something like that).
Now, to make it more useful for LiveComponent let's make it all a behaviour with the use macro.
defmodule MyAppWeb.LiveComponent.MessagePasser do
# previous code...
alias Phoenix.LiveView.Socket
defmacro __using__(_) do
quote do
@behaviour MyAppWeb.LiveComponent.MessagePasser
def send_message!(id, msg) do
MyAppWeb.LiveComponent.MessagePasser.send_message(__MODULE__, id, msg)
end
def update(%{__message__: msg} = new_assigns, socket) do
{:ok, socket} = update(Map.delete(new_assigns, :__message__), socket)
{:noreply, socket} = handle_message(msg, socket)
{:ok, socket}
end
def update(new_assigns, socket) do
if Map.get(socket.assigns, :__initial_load__, true) do
socket |> assign(new_assigns) |> assign(__initial_load__: false) |> initial_load()
else
do_update(new_assigns, socket)
end
end
def do_update(new_assigns, socket), do: {:ok, assign(socket, new_assigns)}
def handle_message(msg, _),
do: throw("#{__MODULE__} is missing implementation for #{inspect(msg)} message")
defoverridable do_update: 2, handle_message: 2
end
end
@doc """
Load data from DB.
"""
@callback initial_load(socket :: Socket.t()) :: {:ok, socket :: Socket.t()}
@doc """
Handle message(s).
"""
@callback handle_message(msg :: any(), socket :: Socket.t()) :: {:noreply, socket :: Socket.t()}
@doc """
Perform update differently than by doing merge of assigns into socket.
"""
@callback do_update(new_assigns :: %{}, Socket.t()) :: {:ok, Socket.t()}
end
The first thing we add to all modules that use MessagePasser is a simpler function to send messages,
so we can just write send_message!(assigns.id, msg) instead of having to pass __MODULE__.
After that we overwrite the update callback of LiveComponents. If the update contains the :__message__ key,
it means that it's message passed by this module, so we have to handle it as if it were cast or info,
not just update keys (which we do for other keys anyway).
There's also the initial_load callback, which is called after the module has been mounted,
attributes have been written into the socket.
With this, we can safely subscribe to PubSub, using data we expect from parent LiveView/LiveComponent.
Which isn't the case for normal init callback from LiveComponent
as it's called before data gets passed into LiveComponent's update.
And if we need a custom handler for update there's still the do_update callback.
Which is called when values change after the initial load.
It's just how it was most useful for me; you may as well call do_update on initial load
if that's something you want, but remember that in such a case on the first call
to do_update you won't have anything in assigns.
The first downside of this system is that we lose the ability to use the update_many callback,
but on the other hand, it's probably for the best if you need to subscribe to many similar PubSubs...
You may as well create a parent LiveComponent that will manage subscribed data and a child LiveComponent
to manage look and behavior related to that data.
Alternatively, you can improve this system to better fit your needs.
SubscribeServer - GenServer to manage subscriptions to PubSubs
Having implemented a way to send messages to LiveComponents,
we need to create a way that will allow us to manage subscriptions from LiveComponents.
I did it using a GenServer to avoid a situation where some LiveComponent subscribes to the same
topic as parent LiveView and then LiveView doesn't get the message.
This shouldn't really happen much, as you already have that information in LiveView
so you may as well pass it to LiveComponent.
But just to be sure, to avoid hard-to-track bugs and for peace of mind, let's use GenServer.
It makes our LiveViews be 2 processes instead of 1, but processes are cheap anyway,
and we will make SubscribeServer start only when the current LiveView mounts a LiveComponent that needs it.
Let's start writing our module.
defmodule MyAppWeb.LiveComponent.SubscribeServer do
use GenServer
defstruct [:parent_pid, subs: %{}, comps: %{}]
def start_link(parent_pid) do
GenServer.start_link(__MODULE__, parent_pid)
end
def init(parent_pid) do
Process.monitor(parent_pid)
{:ok, %__MODULE__{parent_pid: parent_pid}}
end
def handle_info({:DOWN, _, :process, _, _}, state) do
{:stop, :normal, state}
end
end
We start by making our GenServer start by monitoring the parent process
to stop with it to avoid zombie processes.
As for the state of our SubscribeServer we need to know pid of LiveView
this server is attached to. Then we need a map of subscriptions (subs)
in which we will store information about the subscription module,
arguments for (un)subscribing, and what LiveComponents use it.
We also need to store information about components - their modules, IDs, etc.
Subscription message
LiveComponents will want to subscribe to PubSubs, so we have to have a message and handler for that.
defmodule MyAppWeb.LiveComponent.SubscribeServer do
# previous code...
def handle_cast({:subscribe, sub_module, args, comp_module, comp_id, opts}, state) do
args = prepare_args(args)
{sub_function, unsub_function, filter_callback} = parse_subscribe_opts(opts)
sub_key = {sub_module, sub_function, args}
comp_key = {comp_module, comp_id, filter_callback}
state
|> insert_sub(sub_module, sub_function, unsub_function, args, comp_key)
|> save_component(comp_id, sub_key)
|> then(&{:noreply, &1})
end
defp prepare_args(args) do
if is_list(args) do
args
else
[args]
end
end
defp parse_subscribe_opts(opts) do
opts = Map.new(opts)
{
Map.get(opts, :sub_function, :subscribe),
Map.get(opts, :unsub_function, :unsubscribe),
Map.get(opts, :filter_callback)
}
end
end
To allow single and multiple arguments for (un)subscription functions, I wrote
the parse_args function, so in LiveComponents we can just send a single argument
instead of wrapping it in square brackets.
parse_subscribe_opts sets default values for input data like (un)subscription
function name and callback that filters messages to only allow those matching
what given LiveComponent wants/needs (more on that later).
Then we need to subscribe to PubSub and save info in state's subs and comps.
This is what we do in these pipes at the end of the handle_cast function.
Let's implement them.
defmodule MyAppWeb.LiveComponent.SubscribeServer do
# previous code...
defp insert_sub(state, sub_module, sub_function, unsub_function, args, comp_key) do
cond do
not Map.has_key?(state.subs, sub_module) ->
apply(sub_module, sub_function, args)
put_in(
state.subs,
Map.put(state.subs, sub_module, %{{sub_function, args} => {unsub_function, [comp_key]}})
)
not Map.has_key?(state.subs[sub_module], {sub_function, args}) ->
apply(sub_module, sub_function, args)
put_in(
state.subs[sub_module],
Map.put(state.subs[sub_module], {sub_function, args}, {unsub_function, [comp_key]})
)
true ->
{unsub_function, comps} = state.subs[sub_module][{sub_function, args}]
put_in(state.subs[sub_module][{sub_function, args}], {unsub_function, [comp_key | comps]})
end
end
defp save_component(state, comp_id, sub_key) do
cond do
not Map.has_key?(state.comps, comp_id) ->
put_in(state.comps, Map.put(state.comps, comp_id, [sub_key]))
true ->
put_in(state.comps[comp_id], [sub_key | state.comps[comp_id]])
end
end
end
Since we have all LiveComponents subscriptions in one process, then we may end up in a situation where
two (or more) of them subscribe to the same function with the same arguments.
Subscribing blindly would result in multiple messages being received and passed forward,
polluting message queues and slowing everything down.
To avoid that, we call subscribe only when the subs key in state
doesn't have a subscription module and/or none of the other LiveComponents
subscribed to the same module and function with the same arguments.
If some did, we only add new LiveComponent information into subs.
In save_component we save information about LiveComponent that subscribed
and to which PubSubs it's subscribed, so we can unsubscribe from them all when it unmounts
(or not if other LiveComponents still use that).
Unsubscription message
defmodule MyAppWeb.LiveComponent.SubscribeServer do
# previous code...
def handle_cast({:unsubscribe, sub_module, args, comp_id, opts}, state) do
args = prepare_args(args)
sub_function = parse_unsubscribe_opts(opts)
Map.get(state.comps, comp_id)
|> remove_one_sub(sub_module, {sub_function, args}, comp_id, state)
end
defp parse_unsubscribe_opts(opts) do
Keyword.get(opts, :sub_function, :subscribe)
end
defp remove_one_sub(nil, _, _, _, state), do: {:noreply, state}
defp remove_one_sub(
comp_subs,
sub_module,
{sub_f, args} = sub_key,
comp_id,
state
) do
key = {sub_module, sub_f, args}
comp_subs = Enum.reject(comp_subs, &(&1 == key))
if Enum.empty?(comp_subs) do
remove_all_subs(comp_subs, comp_id, state)
else
subs = cleanup_sub(state.subs, comp_id, sub_module, sub_key)
{:noreply,
%__MODULE__{
state
| comps: Map.put(state.comps, comp_id, comp_subs),
subs: subs
}}
end
end
defp cleanup_sub(subs, comp_id, sub_module, {_, args} = sub_key) do
{unsub_function, comps} = subs[sub_module][sub_key]
comps = Enum.reject(comps, &(elem(&1, 1) == comp_id))
if Enum.empty?(comps) do
if not is_nil(unsub_function) do
apply(sub_module, unsub_function, args)
end
put_in(subs[sub_module], Map.delete(subs[sub_module], sub_key))
else
put_in(subs[sub_module][sub_key], {unsub_function, comps})
end
end
end
When unsubscribing, we have to remove information about LiveComponent from the subs key in state
and maybe unsubscribe from the given PubSub module and topic if no other LiveComponent uses it.
We do this in the cleanup_sub function.
We also have to remove subscription information from the comps key in state.
If the subscription we're removing is the last one for a given LiveComponent then we have to remove this
LiveComponent's key entirely. We rely on the remove_all_subs function in this case.
It will be introduced in the next part.
LiveComponent termination message
defmodule MyAppWeb.LiveComponent.SubscribeServer do
# previous code...
def handle_cast({:terminate, comp_id}, state) do
Map.get(state.comps, comp_id) |> remove_all_subs(comp_id, state)
end
defp remove_all_subs(nil, _comp_id, state), do: {:noreply, state}
defp remove_all_subs(comp_subs, comp_id, state) do
subs =
for {sub_module, sub_function, args} <- comp_subs, reduce: state.subs do
subs ->
cleanup_sub(subs, comp_id, sub_module, {sub_function, args})
end
{:noreply, %__MODULE__{state | comps: Map.delete(state.comps, comp_id), subs: subs}}
end
end
In case a LiveComponent terminates (unmounts), we have to remove it from all its subs in state
and possibly unsubscribe from PubSub too. To deal with it, we will use function cleanup_sub
from the previous section and run it for all subscriptions in terminating LiveComponent.
Then we have to delete that LiveComponent information from the comps key in state.
In the previous section we relied on the remove_all_subs function to unsubscribe and
delete LiveComponent information from comps in the case when it's the last subscription
because if it's the last subscription, then there's no reason to keep information about that
LiveComponent in memory.
Receiving and sending messages to LiveComponents
We have handlers to manage subscriptions for our LiveComponents.
Now it's time to deal with received messages and forward them to the correct LiveComponents.
The way I send messages in PubSubs is a tuple like {MyApp.Some.Module, msg} where msg is whatever
data is broadcasted. So every message has information about which module sent it.
It could be a three-element tuple; it could have a msg that has different structs as a message,
but it's always a tuple with a module in the first position.
This way it's easier to distinguish messages as they arrive and debug who sent them.
In our case it also allows us to have some assumptions about messages and reduce the number of
LiveComponents we have to validate whether they want this message.
defmodule MyAppWeb.LiveComponent.SubscribeServer do
# previous code...
def handle_info(msg, state) when is_tuple(msg) and is_atom(elem(msg, 0)) do
module_name = elem(msg, 0)
Map.get(state.subs, module_name)
|> get_module_subs()
|> validate_subs(msg)
|> send_messages(msg, state.parent_pid)
{:noreply, state}
end
defp get_module_subs(nil), do: []
defp get_module_subs(module_subs) do
module_subs |> Map.values() |> Enum.flat_map(fn {_unsub_fn, comps} -> comps end)
end
defp validate_subs(comps, msg) do
Enum.filter(comps, fn
{_, _, filter_callback} when is_function(filter_callback) ->
try do
filter_callback.(msg)
rescue
_ -> false
end
_ ->
true
end)
end
defp send_messages(comps, msg, parent_pid) do
for {comp_module, comp_id, _filter_callback} <- comps do
MessagePasser.send_message(parent_pid, comp_module, comp_id, msg)
end
end
end
When a message arrives that isn't handled by anything else we wrote before,
we assume it's a message from PubSub.
We don't really have any other choice, as there's no way to distinguish PubSub
messages from other messages. Then we take the first element from the message tuple
to figure out which module sent the message.
With that, we can get all LiveComponents that want messages from this PubSub module.
We can't figure out which subscription we received the message from
(what values of subscribe function arguments caused our SubscribeServer to receive this message);
thus during subscription to SubscribeServer we take in filter_callback which receives the message
and should return true if it's interested in the given message.
Call to it is wrapped in try ... rescue to make it possible for this callback to only define the happy path
instead of dealing with all cases, e.g.
# We need to pass only this
fn {Some.Module, %Some.Module{ id: 5 }} -> true end
# Instead of this
fn
{Some.Module, %Some.Module{ id: 5 }} -> true
_ -> false
end
This may seem small, but that's one line instead of four for same amount of functionality.
And it only contains things important from the perspective of LiveComponent instead of dealing
with edge cases that would crash SubscribeServer.
After that filtering, we send a message to all LiveComponents that have shown interest in it by using
MessagePasser we defined at the start of the post.
__using__ macro
As it stands, we would have to deal with GenServer.cast() and large amounts of arguments to
use our SubscribeServer. Let's make our life easier by creating some public API and __using__
macro.
defmodule MyAppWeb.LiveComponent.SubscribeServer do
# previous code...
defmacro __using__(_) do
quote do
use MyAppWeb.LiveComponent.MessagePasser
def subscribe!(sub_module, args, comp_id, opts \\ []) do
send(
self(),
{MyAppWeb.LiveHook.LiveComponentSubscribe, :subscribe, sub_module, args, __MODULE__,
comp_id, opts}
)
end
def unsubscribe!(sub_module, args, comp_id, opts \\ []) do
send(
self(),
{MyAppWeb.LiveHook.LiveComponentSubscribe, :unsubscribe, sub_module, args, comp_id,
opts}
)
end
end
end
@doc """
Allows live component to subscribe to `PubSub`.
`sub_module` is a module to be subscribed to.
`args` are list of arguments to sub/unsub function.
`comp_module` is module of live component.
`comp_id` is live component's id.
`opts` can contain:
- `sub_function` - subscription function as an atom, defaults to `:subscribe`
- `unsub_function` - unsubscription function as an atom, defaults to `:unsubscribe`
- `filter_callback` - callback taking message and returning `true` if that message should be forwarded to module,
defaults to `true` as long as `sub_module` matches.
This is useful in case where you have eg. `{Some.Module, :some_key, ...}` and `{Some.Module, :some_other_key, ...}`
subscriptions and you only want `:some_key`.
"""
def subscribe(pid, sub_module, args, comp_module, comp_id, opts)
when is_pid(pid) and is_atom(sub_module) and is_atom(comp_module) and
is_list(opts) do
GenServer.cast(pid, {:subscribe, sub_module, args, comp_module, comp_id, opts})
end
def unsubscribe(pid, sub_module, args, comp_id, opts)
when is_pid(pid) and is_atom(sub_module) and is_list(opts) do
GenServer.cast(pid, {:unsubscribe, sub_module, args, comp_id, opts})
end
def clear_component(pid, comp_id) do
GenServer.cast(pid, {:terminate, comp_id})
end
end
Our __using__ macro spoils us that we will use a hook to deal with SubscribeServer,
but other than that, now LiveComponent can just do:
defmodule MyAppWeb.LiveComponent.SomeComponent do
use MyAppWeb, :live_component
use MyAppWeb.LiveComponent.SubscribeServer
def initial_load(socket) do
if connected?(socket) do
subscribe!(
Some.Module,
some_argument,
socket.assigns.id,
filter_callback: fn {X.Y, %X.Y{ id: 5 }} -> true end
)
subscribe!(Other.Module, [], socket.assigns.id)
end
{:ok, socket}
end
# Rest of module...
end
Instead of having to pass __MODULE__ or remembering a message tuple for hook.
Auto unsubscribing when LiveComponent unmounts
This is the biggest problem in this entire feature.
Normal LiveView has a terminate callback, but LiveComponent doesn't.
I couldn't find any way to make LiveComponent announce somehow that it's
unmounting to anything. Luckily there are hooks in JavaScript that do have
destroyed callback. It's called when an element vanishes from the page.
If we use this on the topmost element in LiveComponent then this element
vanishing means that LiveComponent vanished.
LiveComponents only vanish when they are unmounted, because otherwise they
are just updated in place.
With this information we can know when an element unmounts and thus unsubscribe
from all subscriptions still active from given LiveComponent in LiveView.
export const UnsubOnDelete = {
destroyed() {
this.pushEvent("__sub_comp_rm__", { id: this.el.id });
},
};
The hook itself is pretty simple. All we have to do is to send LiveComponent's id
(which is something required for LiveComponent to have in its assigns, we just have to put it in HTML attribute)
to backend to deal with result. ids have to be unique per page, because otherwise we will have errors.
With this hook, we have to enable it in assets/js/app.js and then use it like:
defmodule MyAppWeb.LiveComponent.SomeComponent do
use MyAppWeb, :live_component
# Using SubscribeServer, initial_load, etc.
def render(assigns) do
~H'''
<div phx-hook="UnsubOnDelete" id={@id}>
<!-- Rest of component -->
</div>
'''
end
end
It's another caveat in this solution that we have to remember to add this hook to all LiveComponents.
Out of all caveats it's probably biggest one for me, as most of them could be not a problem or considered features even.
This is definitely a caveat, but I couldn't find any other way to make it work.
And I don't think it's that big of a deal in the end.
LiveView's hook to deal with SubscribeServer
As I spoiled in one of the previous sections, we will create a LiveView hook to abstract dealing with
SubscribeServer away. Let's do that now.
defmodule MyAppWeb.LiveHook.LiveComponentSubscribe do
alias MyAppWeb.LiveComponent.SubscribeServer
use MyAppWeb, :live_view
def on_mount(:default, _params, _session, socket) do
{:cont,
socket
|> assign(__live_comp_sub_server__: nil)
|> attach_hook(:live_component_subscribe, :handle_info, &handle_info/2)
|> attach_hook(:live_component_sub_terminate, :handle_event, &handle_event/3)}
end
def handle_event("__sub_comp_rm__", %{"id" => comp_id}, socket) do
if not is_nil(socket.assigns.__live_comp_sub_server__) do
SubscribeServer.clear_component(
socket.assigns.__live_comp_sub_server__,
comp_id
)
end
{:halt, socket}
end
def handle_event(_, _, socket) do
{:cont, socket}
end
def handle_info({__MODULE__, :subscribe, sub_module, args, comp_module, comp_id, opts}, socket) do
socket = spawn_live_comp_server(socket)
SubscribeServer.subscribe(
socket.assigns.__live_comp_sub_server__,
sub_module,
args,
comp_module,
comp_id,
opts
)
{:halt, socket}
end
def handle_info({__MODULE__, :unsubscribe, sub_module, args, comp_id, opts}, socket) do
if not is_nil(socket.assigns.__live_comp_sub_server__) do
SubscribeServer.unsubscribe(
socket.assigns.__live_comp_sub_server__,
sub_module,
args,
comp_id,
opts
)
end
{:halt, socket}
end
def handle_info(_, socket) do
{:cont, socket}
end
defp spawn_live_comp_server(socket) when is_nil(socket.assigns.__live_comp_sub_server__) do
{:ok, pid} = SubscribeServer.start_link(self())
assign(socket, __live_comp_sub_server__: pid)
end
defp spawn_live_comp_server(socket), do: socket
end
Here we start an SubscribeServer instance only if some LiveComponent needs it in the current LiveView.
Other than that, it just calls the public API in SubscribeServer to deal with requests.
I've done it this way because we need to know pid of SubscribeServer and if it's not started,
start it only once. So there's one place to deal with these cases.
Another solution would be to use Registry or Process.put(...) and read/write that data
in each component, but in such a case our __using__ macro in SubscribeServer would
insert more code into every LiveComponent.
And I doubt that this solution would be a bottleneck at any point, realistically.
With our LiveHook ready, we can just attach it to the live_session or live page
in lib/my_app_web/router.ex like so:
defmodule MyAppWeb.Router do
# aliases, imports, pipelines, etc.
scope "/", MyAppWeb do
pipe_through [:browser]
live_session :a_session,
on_mount: [
LiveHook.Misc,
LiveHook.LiveComponentSubscribe,
] do
live "/", Live.PageWithLiveComponent
end
end
end
With that, we finally have the ability in our LiveComponents to subscribe to PubSubs.
There are some caveats and downsides, but there's also separation of concerns and modularization.
And probably other technobabble to justify this solution.
We also don't have to spawn nested LiveViews that spawn new processes every time and don't update
when a value passed from parent LiveView changes.