Building a Membrane Pipeline to talk to Google Gemini

Building a Membrane Pipeline to talk to Google Gemini

This post was also published in the Software Mansion blog.

Large Language Models like Google’s Gemini 2.0 Flash and OpenAI’s GPT-4o Realtime are multimodal, meaning users can chat with them via text, talk to them directly like in a conversation, or even send a live video feed.

Compared to text-only LLMs, this introduces new challenges for developers building on top of Google’s and OpenAI’s APIs. Suddenly, they need to create multimedia pipelines that can handle audio and video streams from users and send them to the LLM provider.

Elixir, together with the Membrane framework, is well-suited for building pipelines that enable seamless communication between users and large language models. In this post, we’ll explore how Membrane helps orchestrate network transport, audio processing, and interactions with Google Gemini.

Here’s a video demo of what we’ll build:

If you’d like to try the example yourself, grab a Gemini API key and clone Livebook from: https://github.com/samrat/gemini_live_with_membrane. After running all the cells, you’ll be able to chat with the Gemini Flash model. It also “sees” you, so feel free to ask questions and test its ability to understand video too.

The architecture

Here’s the high-level overview of the Membrane pipeline we’ll be building:

We establish two dedicated WebRTC connections — one for upstream media (user to server) and one for downstream audio (server to user).

The media from the source is sent (after some processing) to Gemini via a WebSocket connection. Gemini then sends its audio response back, base64-encoded, over the same connection. We capture this audio and send it to a PlaybackQueue.

The purpose of the PlaybackQueue is to buffer Gemini’s audio response. If the user interrupts while we’re playing the response, we can clear the PlaybackQueue buffer to prevent it from being played. Since the Gemini API generates and sends audio faster than real-time, even though it stops generating a response when an interruption is detected, we still need to clear out any audio already buffered on our end.

The Membrane pipeline

Let’s take a closer look at how this pipeline is implemented in code. Don’t worry if some parts seem complex — we’ll break down the key components for clarity:

defmodule GeminiPipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(_ctx, opts) do
    spec = [
      # WebRTC Source -> Gemini -> PlaybackQueue -> WebRTC Sink
      child(:webrtc_source, %Membrane.WebRTC.Source{
        signaling: {:websocket, port: opts[:webrtc_source_ws_port]}
      })
      |> via_out(:output, options: [kind: :audio])
      |> child(:input_opus_parser, Membrane.Opus.Parser)
      |> child(:opus_decoder, %Membrane.Opus.Decoder{sample_rate: 16_000})
      |> via_in(:audio)
      |> child(:gemini, %GeminiEndpoint{
        websocket_opts: opts[:gemini_ws_opts],
      })
      |> child(:playback_queue, PlaybackQueue)
      |> child(:raw_audio_parser, %Membrane.RawAudioParser{overwrite_pts?: true})
      |> via_in(:input, target_queue_size: 1_000_000_000, toilet_capacity: 1_000_000_000)
      |> child(:realtimer, Membrane.Realtimer)
      |> child(:opus_encoder, Membrane.Opus.Encoder)
      |> via_in(:input, options: [kind: :audio])
      |> child(:webrtc_sink, %Membrane.WebRTC.Sink{
        tracks: [:audio],
        signaling: {:websocket, port: opts[:webrtc_sink_ws_port]}
      }),

      # Send video to Gemini
      get_child(:webrtc_source)
      |> via_out(:output, options: [kind: :video])
      |> child(:h264_parser, %Membrane.H264.Parser{})
      |> child(:h264_decoder, Membrane.H264.FFmpeg.Decoder)
      |> child(:jpeg_encoder, Turbojpeg.Filter)
      |> via_in(:video)
      |> get_child(:gemini)
    ]

    {[spec: spec], %{}}
  end
end

Creating a Membrane pipeline involves defining a graph of elements (each child in the spec above is a Membrane Element) that dictates how data flows through the pipeline. If you’re new to Membrane, there’s a great tutorial to help you get started.

Audio processing and playback

Let’s walk through the audio processing part of the pipeline. We begin with our source WebRTC element. Since Gemini accepts raw 16 bit PCM audio at 16kHz, we first get the audio stream from the WebRTC element, parse and decode it, and then pass it as the audio input to the GeminiEndpoint element.

The GeminiEndpoint Membrane element maintains the WebSocket connection to the Gemini endpoint and forwards the media buffers it receives to Gemini. It also handles audio as WebSocket messages from Gemini and sends them along the pipeline. Additionally, if the Gemini API detects that the user has spoken during playback, it notifies us with a message, which we use to clear the PlaybackQueue buffer.

We’ve already briefly discussed PlaybackQueue: it buffers the audio and, in the event that GeminiEndpoint sends an interruption event, clears the buffer.

From here on, we have the audio that needs to be sent to the user. The second half of the process looks like the inverse of the first. We encode the audio as Opus and send it to the audio input of the WebRTC sink.

Before doing that, we pass it through the Realtimer element to ensure that the audio is played back in real-time.

Sending video

We follow a similar setup for sending video, but you’ll notice the process is much simpler because we’re only sending video and don’t have to handle receiving a video stream.

The H.264 video is parsed and decoded, then re-encoded as JPEG. This ensures that the GeminiEndpoint receives JPEG buffers, which is exactly what we need to send to the Gemini API:

@impl true
  def handle_buffer(:video, buffer, _ctx, state) do
    video = Base.encode64(buffer.payload)

    frame =
      %{
        realtime_input: %{
          media_chunks: [
            %{
              mime_type: "image/jpeg",
              data: video
            }
          ]
        }
      }
      |> Jason.encode!()

    :ok = GeminiWebSocket.send_frame(state.ws, frame)
    {[], state}
  end

Gemini should now have access to both the video stream and your microphone audio, so you can ask it what it’s seeing.

Conclusion

And there you have it! We’ve explored how large language models like Google Gemini are gaining audio and video capabilities, and how Membrane makes it easier to build a multimedia pipeline for real-time interactions with these LLMs.

If you want to try it out yourself, definitely give Livebook a go: https://github.com/samrat/gemini_live_with_membrane