TZWZ's personal page
How to allow LiveComponents to subscribe to PubSubs
13.07.2025
Elixir
JavaScript
Phoenix
Phoenix LiveView

One of the greatest Phoenix's features is PubSub which allows us to get updates about anything happening in the application immediately, at any point in time, to any process that needs/wants it. Sadly, listeners for messages in a given PubSub are subscribing on the process level. That means LiveComponents can't really subscribe, because this will send a message to parent LiveView to handle. This creates problems because you may have one LiveView with multiple LiveComponents that manage some data, updates to that data, etc. It's a reasonable use case for me, because sometimes there's a LiveView that has a lot of LiveComponents. Having to route messages around them all, managing subscriptions, etc., would be problematic, messy, and hard to deal with. Another problem is that some LiveComponent could be used on multiple pages. In such a case it would require writing the same handling logic for all of these modules that use that LiveComponent. Of course you can make your life easier by creating helper modules or helper functions in LiveComponent but in the end there will be repetition you have to remember about and deal with. So let's find a solution that will allow our LiveComponents to behave as much as possible like separate processes as far as PubSub is concerned.

MessagePasser - passing messages to LiveComponent

To start, we need some way to pass messages to components and to let them handle them. LiveComponent doesn't have handle_info, handle_cast and the like. LiveView does since it is a separate process, but LiveComponent isn't. Luckily, Phoenix gives us the send_update function, so we can trigger the update callback with arbitrary, new data for any mounted LiveComponent.

There's also send_update_after in Phoenix, so you can implement delayed messages too. But I will leave it as an exercise for you to do if you need it.

Let's create a MessagePasser module to implement this behavior.

defmodule MyAppWeb.LiveComponent.MessagePasser do
  def send_message(pid \\ self(), module, id, msg) do
    Phoenix.LiveView.send_update(pid, module, id: id, __message__: msg)
  end
end

In this module, for now we have a function allowing us to send messages to module with given id. Even if that module is mounted somewhere else (including a different computer entirely, if you need something like that). Now, to make it more useful for LiveComponent let's make it all a behaviour with the use macro.

defmodule MyAppWeb.LiveComponent.MessagePasser do
  # previous code...
  alias Phoenix.LiveView.Socket

  defmacro __using__(_) do
    quote do
      @behaviour MyAppWeb.LiveComponent.MessagePasser

      def send_message!(id, msg) do
        MyAppWeb.LiveComponent.MessagePasser.send_message(__MODULE__, id, msg)
      end

      def update(%{__message__: msg} = new_assigns, socket) do
        {:ok, socket} = update(Map.delete(new_assigns, :__message__), socket)
        {:noreply, socket} = handle_message(msg, socket)
        {:ok, socket}
      end

      def update(new_assigns, socket) do
        if Map.get(socket.assigns, :__initial_load__, true) do
          socket |> assign(new_assigns) |> assign(__initial_load__: false) |> initial_load()
        else
          do_update(new_assigns, socket)
        end
      end

      def do_update(new_assigns, socket), do: {:ok, assign(socket, new_assigns)}

      def handle_message(msg, _),
        do: throw("#{__MODULE__} is missing implementation for #{inspect(msg)} message")

      defoverridable do_update: 2, handle_message: 2
    end
  end

  @doc """
  Load data from DB.
  """
  @callback initial_load(socket :: Socket.t()) :: {:ok, socket :: Socket.t()}

  @doc """
  Handle message(s).
  """
  @callback handle_message(msg :: any(), socket :: Socket.t()) :: {:noreply, socket :: Socket.t()}

  @doc """
  Perform update differently than by doing merge of assigns into socket.
  """
  @callback do_update(new_assigns :: %{}, Socket.t()) :: {:ok, Socket.t()}
end

The first thing we add to all modules that use MessagePasser is a simpler function to send messages, so we can just write send_message!(assigns.id, msg) instead of having to pass __MODULE__.

After that we overwrite the update callback of LiveComponents. If the update contains the :__message__ key, it means that it's message passed by this module, so we have to handle it as if it were cast or info, not just update keys (which we do for other keys anyway).

There's also the initial_load callback, which is called after the module has been mounted, attributes have been written into the socket. With this, we can safely subscribe to PubSub, using data we expect from parent LiveView/LiveComponent. Which isn't the case for normal init callback from LiveComponent as it's called before data gets passed into LiveComponent's update.

And if we need a custom handler for update there's still the do_update callback. Which is called when values change after the initial load. It's just how it was most useful for me; you may as well call do_update on initial load if that's something you want, but remember that in such a case on the first call to do_update you won't have anything in assigns.

The first downside of this system is that we lose the ability to use the update_many callback, but on the other hand, it's probably for the best if you need to subscribe to many similar PubSubs... You may as well create a parent LiveComponent that will manage subscribed data and a child LiveComponent to manage look and behavior related to that data. Alternatively, you can improve this system to better fit your needs.

SubscribeServer - GenServer to manage subscriptions to PubSubs

Having implemented a way to send messages to LiveComponents, we need to create a way that will allow us to manage subscriptions from LiveComponents. I did it using a GenServer to avoid a situation where some LiveComponent subscribes to the same topic as parent LiveView and then LiveView doesn't get the message. This shouldn't really happen much, as you already have that information in LiveView so you may as well pass it to LiveComponent. But just to be sure, to avoid hard-to-track bugs and for peace of mind, let's use GenServer. It makes our LiveViews be 2 processes instead of 1, but processes are cheap anyway, and we will make SubscribeServer start only when the current LiveView mounts a LiveComponent that needs it.

Let's start writing our module.

defmodule MyAppWeb.LiveComponent.SubscribeServer do
  use GenServer

  defstruct [:parent_pid, subs: %{}, comps: %{}]

  def start_link(parent_pid) do
    GenServer.start_link(__MODULE__, parent_pid)
  end

  def init(parent_pid) do
    Process.monitor(parent_pid)
    {:ok, %__MODULE__{parent_pid: parent_pid}}
  end

  def handle_info({:DOWN, _, :process, _, _}, state) do
    {:stop, :normal, state}
  end
end

We start by making our GenServer start by monitoring the parent process to stop with it to avoid zombie processes.

As for the state of our SubscribeServer we need to know pid of LiveView this server is attached to. Then we need a map of subscriptions (subs) in which we will store information about the subscription module, arguments for (un)subscribing, and what LiveComponents use it. We also need to store information about components - their modules, IDs, etc.

Subscription message

LiveComponents will want to subscribe to PubSubs, so we have to have a message and handler for that.

defmodule MyAppWeb.LiveComponent.SubscribeServer do
  # previous code...

  def handle_cast({:subscribe, sub_module, args, comp_module, comp_id, opts}, state) do
    args = prepare_args(args)
    {sub_function, unsub_function, filter_callback} = parse_subscribe_opts(opts)
    sub_key = {sub_module, sub_function, args}
    comp_key = {comp_module, comp_id, filter_callback}

    state
    |> insert_sub(sub_module, sub_function, unsub_function, args, comp_key)
    |> save_component(comp_id, sub_key)
    |> then(&{:noreply, &1})
  end

  defp prepare_args(args) do
    if is_list(args) do
      args
    else
      [args]
    end
  end

  defp parse_subscribe_opts(opts) do
    opts = Map.new(opts)

    {
      Map.get(opts, :sub_function, :subscribe),
      Map.get(opts, :unsub_function, :unsubscribe),
      Map.get(opts, :filter_callback)
    }
  end
end

To allow single and multiple arguments for (un)subscription functions, I wrote the parse_args function, so in LiveComponents we can just send a single argument instead of wrapping it in square brackets.

parse_subscribe_opts sets default values for input data like (un)subscription function name and callback that filters messages to only allow those matching what given LiveComponent wants/needs (more on that later).

Then we need to subscribe to PubSub and save info in state's subs and comps. This is what we do in these pipes at the end of the handle_cast function. Let's implement them.

defmodule MyAppWeb.LiveComponent.SubscribeServer do
  # previous code...

  defp insert_sub(state, sub_module, sub_function, unsub_function, args, comp_key) do
    cond do
      not Map.has_key?(state.subs, sub_module) ->
        apply(sub_module, sub_function, args)

        put_in(
          state.subs,
          Map.put(state.subs, sub_module, %{{sub_function, args} => {unsub_function, [comp_key]}})
        )

      not Map.has_key?(state.subs[sub_module], {sub_function, args}) ->
        apply(sub_module, sub_function, args)

        put_in(
          state.subs[sub_module],
          Map.put(state.subs[sub_module], {sub_function, args}, {unsub_function, [comp_key]})
        )

      true ->
        {unsub_function, comps} = state.subs[sub_module][{sub_function, args}]
        put_in(state.subs[sub_module][{sub_function, args}], {unsub_function, [comp_key | comps]})
    end
  end

  defp save_component(state, comp_id, sub_key) do
    cond do
      not Map.has_key?(state.comps, comp_id) ->
        put_in(state.comps, Map.put(state.comps, comp_id, [sub_key]))

      true ->
        put_in(state.comps[comp_id], [sub_key | state.comps[comp_id]])
    end
  end
end

Since we have all LiveComponents subscriptions in one process, then we may end up in a situation where two (or more) of them subscribe to the same function with the same arguments. Subscribing blindly would result in multiple messages being received and passed forward, polluting message queues and slowing everything down. To avoid that, we call subscribe only when the subs key in state doesn't have a subscription module and/or none of the other LiveComponents subscribed to the same module and function with the same arguments. If some did, we only add new LiveComponent information into subs.

In save_component we save information about LiveComponent that subscribed and to which PubSubs it's subscribed, so we can unsubscribe from them all when it unmounts (or not if other LiveComponents still use that).

Unsubscription message

defmodule MyAppWeb.LiveComponent.SubscribeServer do
  # previous code...

  def handle_cast({:unsubscribe, sub_module, args, comp_id, opts}, state) do
    args = prepare_args(args)
    sub_function = parse_unsubscribe_opts(opts)

    Map.get(state.comps, comp_id)
    |> remove_one_sub(sub_module, {sub_function, args}, comp_id, state)
  end

  defp parse_unsubscribe_opts(opts) do
    Keyword.get(opts, :sub_function, :subscribe)
  end

  defp remove_one_sub(nil, _, _, _, state), do: {:noreply, state}

  defp remove_one_sub(
         comp_subs,
         sub_module,
         {sub_f, args} = sub_key,
         comp_id,
         state
       ) do
    key = {sub_module, sub_f, args}
    comp_subs = Enum.reject(comp_subs, &(&1 == key))

    if Enum.empty?(comp_subs) do
      remove_all_subs(comp_subs, comp_id, state)
    else
      subs = cleanup_sub(state.subs, comp_id, sub_module, sub_key)

      {:noreply,
       %__MODULE__{
         state
         | comps: Map.put(state.comps, comp_id, comp_subs),
           subs: subs
       }}
    end
  end

  defp cleanup_sub(subs, comp_id, sub_module, {_, args} = sub_key) do
    {unsub_function, comps} = subs[sub_module][sub_key]
    comps = Enum.reject(comps, &(elem(&1, 1) == comp_id))

    if Enum.empty?(comps) do
      if not is_nil(unsub_function) do
        apply(sub_module, unsub_function, args)
      end

      put_in(subs[sub_module], Map.delete(subs[sub_module], sub_key))
    else
      put_in(subs[sub_module][sub_key], {unsub_function, comps})
    end
  end
end

When unsubscribing, we have to remove information about LiveComponent from the subs key in state and maybe unsubscribe from the given PubSub module and topic if no other LiveComponent uses it. We do this in the cleanup_sub function.

We also have to remove subscription information from the comps key in state. If the subscription we're removing is the last one for a given LiveComponent then we have to remove this LiveComponent's key entirely. We rely on the remove_all_subs function in this case. It will be introduced in the next part.

LiveComponent termination message

defmodule MyAppWeb.LiveComponent.SubscribeServer do
  # previous code...

  def handle_cast({:terminate, comp_id}, state) do
    Map.get(state.comps, comp_id) |> remove_all_subs(comp_id, state)
  end

  defp remove_all_subs(nil, _comp_id, state), do: {:noreply, state}

  defp remove_all_subs(comp_subs, comp_id, state) do
    subs =
      for {sub_module, sub_function, args} <- comp_subs, reduce: state.subs do
        subs ->
          cleanup_sub(subs, comp_id, sub_module, {sub_function, args})
      end

    {:noreply, %__MODULE__{state | comps: Map.delete(state.comps, comp_id), subs: subs}}
  end
end

In case a LiveComponent terminates (unmounts), we have to remove it from all its subs in state and possibly unsubscribe from PubSub too. To deal with it, we will use function cleanup_sub from the previous section and run it for all subscriptions in terminating LiveComponent. Then we have to delete that LiveComponent information from the comps key in state.

In the previous section we relied on the remove_all_subs function to unsubscribe and delete LiveComponent information from comps in the case when it's the last subscription because if it's the last subscription, then there's no reason to keep information about that LiveComponent in memory.

Receiving and sending messages to LiveComponents

We have handlers to manage subscriptions for our LiveComponents. Now it's time to deal with received messages and forward them to the correct LiveComponents. The way I send messages in PubSubs is a tuple like {MyApp.Some.Module, msg} where msg is whatever data is broadcasted. So every message has information about which module sent it. It could be a three-element tuple; it could have a msg that has different structs as a message, but it's always a tuple with a module in the first position. This way it's easier to distinguish messages as they arrive and debug who sent them. In our case it also allows us to have some assumptions about messages and reduce the number of LiveComponents we have to validate whether they want this message.

defmodule MyAppWeb.LiveComponent.SubscribeServer do
  # previous code...

  def handle_info(msg, state) when is_tuple(msg) and is_atom(elem(msg, 0)) do
    module_name = elem(msg, 0)

    Map.get(state.subs, module_name)
    |> get_module_subs()
    |> validate_subs(msg)
    |> send_messages(msg, state.parent_pid)

    {:noreply, state}
  end

  defp get_module_subs(nil), do: []

  defp get_module_subs(module_subs) do
    module_subs |> Map.values() |> Enum.flat_map(fn {_unsub_fn, comps} -> comps end)
  end

  defp validate_subs(comps, msg) do
    Enum.filter(comps, fn
      {_, _, filter_callback} when is_function(filter_callback) ->
        try do
          filter_callback.(msg)
        rescue
          _ -> false
        end

      _ ->
        true
    end)
  end

  defp send_messages(comps, msg, parent_pid) do
    for {comp_module, comp_id, _filter_callback} <- comps do
      MessagePasser.send_message(parent_pid, comp_module, comp_id, msg)
    end
  end
end

When a message arrives that isn't handled by anything else we wrote before, we assume it's a message from PubSub. We don't really have any other choice, as there's no way to distinguish PubSub messages from other messages. Then we take the first element from the message tuple to figure out which module sent the message. With that, we can get all LiveComponents that want messages from this PubSub module.

We can't figure out which subscription we received the message from (what values of subscribe function arguments caused our SubscribeServer to receive this message); thus during subscription to SubscribeServer we take in filter_callback which receives the message and should return true if it's interested in the given message. Call to it is wrapped in try ... rescue to make it possible for this callback to only define the happy path instead of dealing with all cases, e.g.

# We need to pass only this
fn {Some.Module, %Some.Module{ id: 5 }} -> true end

# Instead of this
fn
  {Some.Module, %Some.Module{ id: 5 }} -> true
  _ -> false
end

This may seem small, but that's one line instead of four for same amount of functionality. And it only contains things important from the perspective of LiveComponent instead of dealing with edge cases that would crash SubscribeServer.

After that filtering, we send a message to all LiveComponents that have shown interest in it by using MessagePasser we defined at the start of the post.

__using__ macro

As it stands, we would have to deal with GenServer.cast() and large amounts of arguments to use our SubscribeServer. Let's make our life easier by creating some public API and __using__ macro.

defmodule MyAppWeb.LiveComponent.SubscribeServer do
  # previous code...

  defmacro __using__(_) do
    quote do
      use MyAppWeb.LiveComponent.MessagePasser

      def subscribe!(sub_module, args, comp_id, opts \\ []) do
        send(
          self(),
          {MyAppWeb.LiveHook.LiveComponentSubscribe, :subscribe, sub_module, args, __MODULE__,
           comp_id, opts}
        )
      end

      def unsubscribe!(sub_module, args, comp_id, opts \\ []) do
        send(
          self(),
          {MyAppWeb.LiveHook.LiveComponentSubscribe, :unsubscribe, sub_module, args, comp_id,
           opts}
        )
      end
    end
  end

  @doc """
  Allows live component to subscribe to `PubSub`.
  `sub_module` is a module to be subscribed to.
  `args` are list of arguments to sub/unsub function.
  `comp_module` is module of live component.
  `comp_id` is live component's id.
  `opts` can contain:
  - `sub_function` - subscription function as an atom, defaults to `:subscribe`
  - `unsub_function` - unsubscription function as an atom, defaults to `:unsubscribe`
  - `filter_callback` - callback taking message and returning `true` if that message should be forwarded to module,
    defaults to `true` as long as `sub_module` matches.
    This is useful in case where you have eg. `{Some.Module, :some_key, ...}` and `{Some.Module, :some_other_key, ...}`
    subscriptions and you only want `:some_key`.
  """
  def subscribe(pid, sub_module, args, comp_module, comp_id, opts)
      when is_pid(pid) and is_atom(sub_module) and is_atom(comp_module) and
             is_list(opts) do
    GenServer.cast(pid, {:subscribe, sub_module, args, comp_module, comp_id, opts})
  end

  def unsubscribe(pid, sub_module, args, comp_id, opts)
      when is_pid(pid) and is_atom(sub_module) and is_list(opts) do
    GenServer.cast(pid, {:unsubscribe, sub_module, args, comp_id, opts})
  end

  def clear_component(pid, comp_id) do
    GenServer.cast(pid, {:terminate, comp_id})
  end
end

Our __using__ macro spoils us that we will use a hook to deal with SubscribeServer, but other than that, now LiveComponent can just do:

defmodule MyAppWeb.LiveComponent.SomeComponent do
  use MyAppWeb, :live_component
  use MyAppWeb.LiveComponent.SubscribeServer

  def initial_load(socket) do
    if connected?(socket) do
      subscribe!(
        Some.Module,
        some_argument,
        socket.assigns.id,
        filter_callback: fn {X.Y, %X.Y{ id: 5 }} -> true end
      )

      subscribe!(Other.Module, [], socket.assigns.id)
    end

    {:ok, socket}
  end

  # Rest of module...
end

Instead of having to pass __MODULE__ or remembering a message tuple for hook.

Auto unsubscribing when LiveComponent unmounts

This is the biggest problem in this entire feature. Normal LiveView has a terminate callback, but LiveComponent doesn't. I couldn't find any way to make LiveComponent announce somehow that it's unmounting to anything. Luckily there are hooks in JavaScript that do have destroyed callback. It's called when an element vanishes from the page. If we use this on the topmost element in LiveComponent then this element vanishing means that LiveComponent vanished. LiveComponents only vanish when they are unmounted, because otherwise they are just updated in place. With this information we can know when an element unmounts and thus unsubscribe from all subscriptions still active from given LiveComponent in LiveView.

export const UnsubOnDelete = {
  destroyed() {
    this.pushEvent("__sub_comp_rm__", { id: this.el.id });
  },
};

The hook itself is pretty simple. All we have to do is to send LiveComponent's id (which is something required for LiveComponent to have in its assigns, we just have to put it in HTML attribute) to backend to deal with result. ids have to be unique per page, because otherwise we will have errors.

With this hook, we have to enable it in assets/js/app.js and then use it like:

defmodule MyAppWeb.LiveComponent.SomeComponent do
  use MyAppWeb, :live_component

  # Using SubscribeServer, initial_load, etc.

  def render(assigns) do
    ~H'''
    <div phx-hook="UnsubOnDelete" id={@id}>
      <!-- Rest of component -->
    </div>
    '''
  end
end

It's another caveat in this solution that we have to remember to add this hook to all LiveComponents. Out of all caveats it's probably biggest one for me, as most of them could be not a problem or considered features even. This is definitely a caveat, but I couldn't find any other way to make it work. And I don't think it's that big of a deal in the end.

LiveView's hook to deal with SubscribeServer

As I spoiled in one of the previous sections, we will create a LiveView hook to abstract dealing with SubscribeServer away. Let's do that now.

defmodule MyAppWeb.LiveHook.LiveComponentSubscribe do
  alias MyAppWeb.LiveComponent.SubscribeServer
  use MyAppWeb, :live_view

  def on_mount(:default, _params, _session, socket) do
    {:cont,
     socket
     |> assign(__live_comp_sub_server__: nil)
     |> attach_hook(:live_component_subscribe, :handle_info, &handle_info/2)
     |> attach_hook(:live_component_sub_terminate, :handle_event, &handle_event/3)}
  end

  def handle_event("__sub_comp_rm__", %{"id" => comp_id}, socket) do
    if not is_nil(socket.assigns.__live_comp_sub_server__) do
      SubscribeServer.clear_component(
        socket.assigns.__live_comp_sub_server__,
        comp_id
      )
    end

    {:halt, socket}
  end

  def handle_event(_, _, socket) do
    {:cont, socket}
  end

  def handle_info({__MODULE__, :subscribe, sub_module, args, comp_module, comp_id, opts}, socket) do
    socket = spawn_live_comp_server(socket)

    SubscribeServer.subscribe(
      socket.assigns.__live_comp_sub_server__,
      sub_module,
      args,
      comp_module,
      comp_id,
      opts
    )

    {:halt, socket}
  end

  def handle_info({__MODULE__, :unsubscribe, sub_module, args, comp_id, opts}, socket) do
    if not is_nil(socket.assigns.__live_comp_sub_server__) do
      SubscribeServer.unsubscribe(
        socket.assigns.__live_comp_sub_server__,
        sub_module,
        args,
        comp_id,
        opts
      )
    end

    {:halt, socket}
  end

  def handle_info(_, socket) do
    {:cont, socket}
  end

  defp spawn_live_comp_server(socket) when is_nil(socket.assigns.__live_comp_sub_server__) do
    {:ok, pid} = SubscribeServer.start_link(self())
    assign(socket, __live_comp_sub_server__: pid)
  end

  defp spawn_live_comp_server(socket), do: socket
end

Here we start an SubscribeServer instance only if some LiveComponent needs it in the current LiveView. Other than that, it just calls the public API in SubscribeServer to deal with requests. I've done it this way because we need to know pid of SubscribeServer and if it's not started, start it only once. So there's one place to deal with these cases. Another solution would be to use Registry or Process.put(...) and read/write that data in each component, but in such a case our __using__ macro in SubscribeServer would insert more code into every LiveComponent. And I doubt that this solution would be a bottleneck at any point, realistically.

With our LiveHook ready, we can just attach it to the live_session or live page in lib/my_app_web/router.ex like so:

defmodule MyAppWeb.Router do
  # aliases, imports, pipelines, etc.

  scope "/", MyAppWeb do
    pipe_through [:browser]

    live_session :a_session,
      on_mount: [
        LiveHook.Misc,
        LiveHook.LiveComponentSubscribe,
      ] do
      live "/", Live.PageWithLiveComponent
    end
  end
end

With that, we finally have the ability in our LiveComponents to subscribe to PubSubs. There are some caveats and downsides, but there's also separation of concerns and modularization. And probably other technobabble to justify this solution. We also don't have to spawn nested LiveViews that spawn new processes every time and don't update when a value passed from parent LiveView changes.

Related projects