The membrane_s3_plugin
(as of version 0.1.2) has gotten outdated with changes in membrane_core
. Here's an S3 sink implementation that works with the latest Membrane:
defmodule Membrane.S3.Sink do
@moduledoc """
Element that uploads incoming buffers to an S3 bucket.
Pipeline logs are directed to standard output by default. To separate them from the sink's output
we recommend redirecting the logger to standard error. For simple use cases using the default logger
configuration (like stand-alone scripts) this can be achieved by simply calling redirect_logs_to_stderr/0.
"""
use Membrane.Sink
alias ExAws.S3
# 5MB
@min_chunk_size 5 * 1024 * 1024
def_options(
bucket: [
spec: String.t(),
description: "Name of the S3 bucket"
],
object: [
spec: String.t(),
description: "Object key in the S3 bucket"
]
)
def_input_pad(:input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any)
@spec redirect_logs_to_stderr() :: :ok
def redirect_logs_to_stderr() do
:ok = :logger.remove_handler(:default)
LoggerBackends.add(LoggerBackends.Console)
LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)
end
@impl true
def handle_init(_ctx, %__MODULE__{bucket: bucket, object: object}) do
{:ok, upload_id} = initiate_multipart_upload(bucket, object)
{[],
%{
bucket: bucket,
object: object,
upload_id: upload_id,
part_number: 1,
buffer: <<>>,
parts: []
}}
end
@impl true
def handle_playing(_ctx, state) do
{[demand: :input], state}
end
# @impl true
# def handle_start_of_stream(:input, _ctx, state) do
# init_op = ExAws.S3.initiate_multipart_upload(state.bucket, state.object)
# %{status_code: 200, body: %{upload_id: upload_id}} = ExAws.request!(init_op)
# {[], %{state | upload_id: upload_id, part_number: 1}}
# end
@impl true
def handle_buffer(:input, buffer, _ctx, %{buffer: existing_buffer} = state) do
new_buffer = existing_buffer <> buffer.payload
if byte_size(new_buffer) >= @min_chunk_size do
{to_upload, remaining} = split_buffer(new_buffer)
part = upload_part(to_upload, state)
{[demand: :input],
%{
state
| buffer: remaining,
part_number: state.part_number + 1,
parts: [part | state.parts]
}}
else
{[demand: :input], %{state | buffer: new_buffer}}
end
end
@impl true
def handle_end_of_stream(:input, _ctx, state) do
finalize_upload(state)
{[], state}
end
# @impl true
# def handle_terminate_request(_ctx, state) do
# IO.inspect("handle_terminate_request #{inspect(state)}", label: "handle_terminate_request")
# finalize_upload(state)
# {[terminate: :normal], state}
# end
defp initiate_multipart_upload(bucket, object) do
%{status_code: 200, body: %{upload_id: upload_id}} =
S3.initiate_multipart_upload(bucket, object)
|> ExAws.request!()
{:ok, upload_id}
end
defp upload_part(part, %{
bucket: bucket,
object: object,
upload_id: upload_id,
part_number: part_number
}) do
%{status_code: 200, headers: headers} =
S3.upload_part(bucket, object, upload_id, part_number, part)
|> ExAws.request!()
{:ok, etag} = find_etag(headers)
{part_number, etag}
end
defp find_etag(headers) do
headers
|> Enum.find_value(
{:error, :invalid_etag},
fn {key, value} ->
if String.equivalent?(String.downcase(key), "etag") do
{:ok, String.trim(value, "\"")}
end
end
)
end
defp finalize_upload(state) do
state =
if byte_size(state.buffer) > 0 do
part = upload_part(state.buffer, state)
%{state | part_number: state.part_number + 1, buffer: <<>>, parts: [part | state.parts]}
else
state
end
finalize_multipart_upload(state)
state
end
defp finalize_multipart_upload(%{
bucket: bucket,
object: object,
upload_id: upload_id,
parts: parts
}) do
S3.complete_multipart_upload(bucket, object, upload_id, Enum.reverse(parts))
|> ExAws.request!()
end
defp split_buffer(buffer) when byte_size(buffer) < @min_chunk_size do
{buffer, <<>>}
end
defp split_buffer(buffer) do
<<to_upload::binary-size(@min_chunk_size), remaining::binary>> = buffer
{to_upload, remaining}
end
end