RECIPE

GenStage (Elixir) primer

GenStage is the back-pressure aware producer-consumer abstraction from the Elixir core team. It lets you wire concurrent pipelines where each stage pulls only as much work as it can handle, which makes it ideal for streaming Meridian model inferences without flooding downstream sinks.

1.The three stage roles

Every pipeline is built from three role primitives: a:producer that emits events on demand, a:consumer that pulls and processes them, and an optional:producer_consumer that sits in the middle and transforms the stream. Demand always flows upstream while events flow downstream.

2.A minimal producer plus consumer

The snippet below boots a counter producer and a printer consumer. The printer subscribes to the counter, and GenStage handles the demand signaling for you. Notice that the consumer never asks for events explicitly: it declares its subscription and the runtime does the rest.

defmodule Counter do
  use GenStage

  def start_link(initial) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, counter) when demand > 0 do
    events = Enum.to_list(counter..counter + demand - 1)
    {:noreply, events, counter + demand}
  end
end

defmodule Printer do
  use GenStage

  def start_link(_) do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:consumer, :the_state, subscribe_to: [Counter]}
  end

  def handle_events(events, _from, state) do
    IO.inspect(events)
    {:noreply, [], state}
  end
end

3.Tuning back-pressure

Pass min_demand andmax_demand on subscription to control batching. A consumer withmax_demand: 50 andmin_demand: 25 keeps between 25 and 50 events in flight, refilling whenever the buffer drops. For Meridian-style fan-out, pair GenStage with Flow to get partitioning and windowed aggregation for free.