この記事は、fukuoka.ex Elixir/Phoenix Advent Calendar 2019 の7日目です。 昨日は @sym_num さんの「ElixirでPrologコンパイラを作ったお話」でした。

この記事ではElixirのデータ処理パイプラインの構築ツールである「Broadway」を触ってみたので、その過程を書いてみようと思います。

Broadwayとは

公式リポジトリ冒頭では、

Build concurrent and multi-stage data ingestion and data processing pipelines with Elixir. It allows developers to consume data efficiently from different sources, known as producers, such as Amazon SQS, RabbitMQ, and others.

と書かれています。

Google翻訳に投げてみると、「Elixirを使用して、同時および多段階のデータ取り込みおよびデータ処理パイプラインを構築します。開発者は、Amazon SQS、RabbitMQなどのプロデューサーと呼ばれるさまざまなソースからデータを効率的に使用できます。」 とのこと。和訳するとなんとなくイメージが湧きそうですね。

データ処理パイプラインはおよそ以下のような流れです。

image-1

  1. ユーザーの投稿したデータなどがメッセージとしてキューに溜まる
  2. キューからメッセージを取り出し、メッセージの内容に応じてデータの加工を行う

    • 画像のサムネイル化、minifyなど
  3. 加工した結果を別の場所に保存する

    • ストレージ、DBなど

という手順です。

この図での、「データソースからメッセージを取り出して、値に応じて処理を分岐させて、結果を格納する」という役割を担うのがBroadwayです。

image-2

Broadwayに関する詳細な説明はgumiTECHさんの記事が非常によくまとまっているので、そちらをご覧ください。

ちなみに名前の由来はニューヨークのブロードウェイにあやかってるそうです。プロデューサー、ステージなど確かにブロードウェイっぽい用語がでてきます。素敵ですね!

The name Broadway was taken from the famous Broadway street in New York City, as we hope to be equally renowned by our stages and producers. :)

ローカルで試してみる

mix new から実際にBroadwayを動かし、queueに溜まったメッセージの内容をDBに保存するまでをゴールとして、環境を作ります。

執筆時のElixirのversionは次の通りです。

$ elixir -v
Erlang/OTP 21 [erts-10.3.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]

Elixir 1.9.1 (compiled with Erlang/OTP 20)

前提

以下の3点を前提としています。

  • Elixirの実行環境があること
  • docker/docker-composeを利用できること
  • aws cliが実行できること

mixプロジェクトの作成

いつも通り、mix newからプロジェクトを作成します。--supをつけてOTPアプリケーションのガワも合わせて生成します。

$ mix new broadway_example --sup

depsの追加

次のdepsを追加します。

# mix.exs
   defp deps do
     [
-      # {:dep_from_hexpm, "~> 0.3.0"},
-      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
+      {:broadway, "~> 0.5.0"},
+      {:broadway_sqs, "~> 0.5.0"},
+      {:hackney, "~> 1.15.2"},
+      {:jason, "~> 1.1.2"},
+      {:sweet_xml, "~> 0.6.6"},
+      {:ecto_sql, "~> 3.0"},
+      {:postgrex, ">= 0.0.0"}
     ]
   end
 end

hackney,jason,sweet_xmlはbroadway_sqsが依存関係として持つex_awsを利用するのに必要です。

depsの追加が終わったら、deps.getしておきます

$ mix deps.get

docker-composeの用意

の2つのコンテナを構築するcomposeファイルを作成します。LocalstackはAWS SQSのモック作成用です。

プロジェクトのルート(mix.exsがあるディレクトリ)にdocker-compose.ymlを作成し、次のように記載します。

version: "3"

services:
  localstack:
    image: localstack/localstack
    ports:
      - "4576:4576"
      - "9000:9000"
    environment:
      - SERVICES=sqs
      - DATA_DIR=/tmp/localstack/data
      - DEFAULT_REGION=ap-northeast-1
      - PORT_WEB_UI=9000
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - ./volume/localstack:/tmp/localstack
  postgres:
    image: postgres:alpine
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_DB=default
      - POSTGRES_USER=default
      - POSTGRES_PASSWORD=secret
    volumes:
      - ./volume/postgres/data:/var/lib/postgresql/data

SQSのモックを作成するのでLocalstackのSERVICESにsqsを指定しています。さらにvolume用のディレクトリを作成します。

$ mkdir -p volume/localstack
$ mkdir -p volume/postgres

次のような状態であればOKです。

 .
 ├── README.md
 ├── _build
 ├── config
 ├── deps
+├── docker-compose.yml
 ├── lib
 ├── mix.exs
 ├── mix.lock
 ├── test
+└── volume
+    ├── localstack
+    └── postgres

composeが用意できたら、コンテナを起動します。

$ docker-compose up -d

docker-compose psでコンテナがUpとなっていればOKです。

            Name                           Command              State                                       Ports                                    
-----------------------------------------------------------------------------------------------------------------------------------------------------
broadway_example_localstack_1   docker-entrypoint.sh            Up      4567/tcp, 4568/tcp, 4569/tcp, 4570/tcp, 4571/tcp, 4572/tcp, 4573/tcp,        
                                                                        4574/tcp, 4575/tcp, 0.0.0.0:4576->4576/tcp, 4577/tcp, 4578/tcp, 4579/tcp,    
                                                                        4580/tcp, 4581/tcp, 4582/tcp, 4583/tcp, 4584/tcp, 4585/tcp, 4586/tcp,        
                                                                        4587/tcp, 4588/tcp, 4589/tcp, 4590/tcp, 4591/tcp, 4592/tcp, 4593/tcp,        
                                                                        4594/tcp, 4595/tcp, 4596/tcp, 4597/tcp, 8080/tcp, 0.0.0.0:9000->9000/tcp     
broadway_example_postgres_1     docker-entrypoint.sh postgres   Up      0.0.0.0:5432->5432/tcp                                                       

Ectoのセットアップ

Ectoの設定を行います。Repoモジュールとconfigの生成用のコマンドが提供されているので実行します。Repoモジュール名はプロジェクト名に合わせてBroadwayExample.Repoとします。

$ mix ecto.gen.repo -r BroadwayExample.Repo

* creating lib/broadway_example
* creating lib/broadway_example/repo.ex
* creating config/config.exs
Don't forget to add your new repo to your supervision tree
(typically in lib/broadway_example/application.ex):

    {BroadwayExample.Repo, []}

And to add it to the list of ecto repositories in your
configuration files (so Ecto tasks work as expected):

    config :broadway_example,
      ecto_repos: [BroadwayExample.Repo]

config.exsが生成されているので、先ほどdocker-compose.ymlに記載した接続情報を設定します。また、ecto.gen.repoの実行結果のメッセージにあるように、ecto_reposの設定も加えておきます。これを書くことで、mix ecto.xxxで実行するEctoのタスクから指定したRepoモジュールに対する操作が可能になります。

 import Config
 
 config :broadway_example, BroadwayExample.Repo,
-  database: "broadway_example_repo",
-  username: "user",
-  password: "pass",
-  hostname: "localhost"
+  database: "default",
+  username: "default",
+  password: "secret",
+  hostname: "localhost",
+  port: 5432
+
+config :broadway_example,
+  ecto_repos: [BroadwayExample.Repo]

設定が完了したら、DBの作成コマンドを実行してみて疎通確認をしましょう。

$ mix ecto.create
Compiling 3 files (.ex)
Generated broadway_example app
The database for BroadwayExample.Repo has already been created

問題なく接続され、すでにデータベースが作成済みであればOKです。postgresのDockerイメージは所定の環境変数を指定した状態で起動するとあらかじめデータベースを初期化した状態で起動してくれるので、すでに作成済みとなっています。

環境変数について確認したい方はコチラをご覧ください。

続いて、lib/broadway_example/application.exを開き、supervision treeにRepoモジュールを追加します。

# lib/broadway_example/application.ex
   def start(_type, _args) do
     children = [
+      {BroadwayExample.Repo, []}
       # Starts a worker by calling: BroadwayExample.Worker.start_link(arg)
       # {BroadwayExample.Worker, arg}
     ]

User schemaの作成

後ほど利用するサンプル用のユーザースキーマをこのタイミングで作っておきます。

Ectoのecto.gen.migrationからマイグレーションファイルを生成します

$ mix ecto.gen.migration create_users

作成されたマイグレーションファイルを次のように記述します。

defmodule BroadwayExample.Repo.Migrations.CreateUsers do
  use Ecto.Migration

  def change do

    create table(:users) do
      add :name, :string
      add :age, :integer

      timestamps()
    end

  end
end

名前と年齢だけを持ったシンプルなテーブルです。記述後、migrateを実行します。

$ mix ecto.migrate

16:17:31.049 [info]  == Running 20191205161630 BroadwayExample.Repo.Migrations.CreateUsers.change/0 forward

16:17:31.051 [info]  create table users

16:17:31.082 [info]  == Migrated 20191205161630 in 0.0s

migrateが無事に実行できたら、テーブルに対応するschemaのファイルも追加しておきます。 lib/broadway_example/schema/user.exのファイルを作成し、次のようにschemaを記述します。

defmodule BroadwayExample.Schema.User do
  use Ecto.Schema
  import Ecto.Changeset

  schema "users" do
    field(:name, :string)
    field(:age, :integer)

    timestamps()
  end

  def changeset(user, attrs \\ %{}) do
    user
    |> cast(attrs, [:name, :age])
    |> validate_required([:name, :age])
  end
end

これでEctoの設定は終わりです。

SQSのセットアップ

続いて、localstackで用意したSQSのモックに対してキューを作成します。

acccess keyとsecret keyの環境変数を指定してaws cliを動作させましょう。値自体は適当な値でOKです。

direnvを利用した場合は、次のような設定になります。

export AWS_ACCESS_KEY_ID=dummy_access_key_id
export AWS_SECRET_ACCESS_KEY=dummy_secret_access_key
export AWS_DEFAULT_REGION=ap-northeast-1

設定ができたら、aws cliを使ってqueueを作成します。エラーとなったメッセージを貯めるためのデッドレターキューも作成します。

# メッセージを格納するキューを作成
$ aws sqs create-queue --queue-name broadway-example --endpoint-url http://localhost:4576
# デッドレターキューの作成
# https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html
$ aws sqs create-queue --queue-name broadway-example-dlq --endpoint-url http://localhost:4576
# デッドレターキューの設定
$ aws sqs set-queue-attributes \
    --queue-url http://localhost:4576/queue/broadway-example \
    --endpoint-url http://localhost:4576 \
    --attributes '{"RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:ap-northeast-1:queue:broadway-example-dlq\",\"maxReceiveCount\":\"3\"}"}'

これでSQSのモックが用意できました。

ex_awsの設定

次のようにex_awsのconfigを設定します。

# config/config.exs
 config :broadway_example,
   ecto_repos: [BroadwayExample.Repo]
+
+config :ex_aws,
+  json_codec: Jason
+
+config :ex_aws, :sqs,
+  scheme: "http://",
+  host: "localhost",
+  port: 4576

ex_awsではAWS CLIと同じく環境変数から認証情報を取得してくれるので、access keyとsecret keyに関するconfigは記載不要です。

この状態でiexからモジュールが動作するか確認しておきましょう。先ほど作成したqueueの情報が返って来ていればOKです。

$ iex -S mix
Erlang/OTP 21 [erts-10.3.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]

Interactive Elixir (1.9.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> ExAws.SQS.list_queues() |> ExAws.request!()
%{  
  body: %{
    queues: ["http://localhost:4576/queue/broadway-example-dlq",
     "http://localhost:4576/queue/broadway-example"],
    request_id: "00000000-0000-0000-0000-000000000000"
  },
  headers: [
    {"Server", "BaseHTTP/0.6 Python/3.6.8"},
    {"Date", "Sat, 07 Dec 2019 10:46:25 GMT"},
    {"Connection", "close"},
    {"Content-Type", "text/plain; charset=UTF-8"},
    {"content-length", "480"},
    {"Access-Control-Allow-Origin", "*"},
    {"Access-Control-Allow-Methods", "HEAD,GET,PUT,POST,DELETE,OPTIONS,PATCH"},
    {"Access-Control-Allow-Headers",
     "authorization,content-type,content-md5,cache-control,x-amz-content-sha256,x-amz-date,x-amz-security-token,x-amz-user-agent,x-amz-acl,x-amz-version-id"},
    {"Access-Control-Expose-Headers", "x-amz-version-id"}
  ],
  status_code: 200
}

Broadwayモジュールの実装

やっと本題のBroadwayのファイルを書いていきます。Broadwayモジュールという呼び方が正しいかはわかりませんが、use Broadwayを書いて記述していくモジュールのことをそのように呼ぶことにします。

lib/broadway_example.exを開き、デフォルトのコードをすべて上書きして次のように記述します。

defmodule BroadwayExample do
  use Broadway

  alias Broadway.Message
  alias BroadwayExample.Schema.User

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        # queue_urlに監視するqueueの値をセットする。ここでは先ほど作成したbroadway-exampleキューを指定している
        module: {
          BroadwaySQS.Producer,
          [
            queue_url: "http://localhost:4576/queue/broadway-example",
            # hexdocsには:allと指定すれば良いと書いてあるが2019/12/06時点で動作しない、、ので"All"と文字列で指定する
            message_attribute_names: ["All"]
          ]
        }
      ],
      processors: [
        default: []
      ],
      batchers: [
        db: []
      ]
    )
  end

  # メッセージが届いた時に発火する
  # message変数に%Broadway.Message{}構造体が入ってくる
  def handle_message(
        _processor_name,
        message,
        _context
      ) do
    # Broadway.Message.put_batcher/2でバッチ処理を分岐させる
    Message.put_batcher(message, :db)
  end

  def handle_batch(:db, messages, _batch_info, _context) do
    messages
    |> Enum.map(fn %Message{metadata: %{message_attributes: message_attributes}} = message ->
      user_attrs = make_user_attrs_from_message_attributes(message_attributes)

      %User{}
      |> User.changeset(user_attrs)
      |> BroadwayExample.Repo.insert()
      |> case do
        # insertに成功したらmessageをそのまま返却
        # 失敗したらMessage.failed/2を呼び出して失敗として記録する
        # エラーに落としておくことで、SQS上でデッドレターキューに移動させることができる
        # https://hexdocs.pm/broadway_sqs/BroadwaySQS.Producer.html#module-acknowledgments
        {:ok, _} -> message
        {:error, %Ecto.Changeset{} = error} -> Message.failed(message, error)
      end
    end)
  end

  def handle_failed(messages, _context) do
    IO.inspect(messages)

    messages
  end

  # SQSのメッセージに設定できるメッセージ属性から値を取得する
  defp make_user_attrs_from_message_attributes(message_attributes) do
    name =
      message_attributes
      |> Map.get("Name")
      |> get_value_from_attribute()

    age =
      message_attributes
      |> Map.get("Age")
      |> get_value_from_attribute()

    %{
      name: name,
      age: age
    }
  end

  defp get_value_from_attribute(nil), do: nil
  defp get_value_from_attribute(%{value: value}), do: value
end

ポイントは次の通りです。

  • start_link/1でBroadwayによるデータ処理パイプラインの挙動を定義しています。さまざまなoptionがあるので、使うときは一度hexdocsを眺めた方が良さそうです。
  • handle_message/3で個々のメッセージを受け取ったときに行いたい処理を記述します。データのdecodeなど、メッセージに対する前処理や、メッセージに応じてバッチ処理の内容の分岐はここに記述します。
  • handle_batch/4でメッセージに対する一括処理を行います。handle_messageでメッセージを前処理→handle_batchで前処理されたメッセージたちをまとめて処理するイメージです。この例ではUserの情報を持ったメッセージを受け取り、そのままDBへinsert、失敗したらメッセージを失敗として処理するフローを書いています。
  • handle_failed/2でエラーとなったメッセージのみ後処理できます。

Broadwayモジュールを記述したら、Ectoと同じくsupervision treeに追加します。lib/broadway_example/application.exを開き、次のようにBroadwayモジュールを追加します。

# lib/broadway_example/application.ex

   def start(_type, _args) do
     children = [
       {BroadwayExample.Repo, []},
+      {BroadwayExample, []}
       # Starts a worker by calling: BroadwayExample.Worker.start_link(arg)
       # {BroadwayExample.Worker, arg}
     ]

これで、やっとBroadwayによってSQSのメッセージを処理させる準備が整いました。

メッセージを送るスクリプトを用意する

SQSへメッセージを送るスクリプトを用意しましょう。script/send_message.exsを作成し、次のように記述します(ディレクトリ、ファイル名はなんでもOKです)

ExAws.SQS.send_message(
  "http://localhost:4576/queue/broadway-example",
  "user message",
  message_attributes: [
    %{
      name: "Name",
      data_type: :string,
      value: "test name"
    },
    %{
      name: "Age",
      data_type: :number,
      value: 20
    }
  ]
)
|> ExAws.request!()

これでメッセージを送る準備ができました。

メッセージを送る

まずはアプリケーションの起動が必要です。iexから起動しておきます。

$ iex -S mix

この状態で別terminalから先ほど作ったスクリプトを実行します。

$ mix run script/send_message.exs 

実行後、iexで立ち上げていたアプリケーションのログにinsertのデバックログが見えていればOKです。

send_message

iexからユーザーのレコードを取得してみると、確かにinsertされていることがわかります。SQSのメッセージをデータソースとして、DBへのinsertができました!

iex(2)> BroadwayExample.Repo.all(BroadwayExample.Schema.User)

17:57:39.367 [debug] QUERY OK source="users" db=6.9ms queue=1.8ms
SELECT u0."id", u0."name", u0."age", u0."inserted_at", u0."updated_at" FROM "use
rs" AS u0 []
[
  %BroadwayExample.Schema.User{
    __meta__: #Ecto.Schema.Metadata<:loaded, "users">,
    age: 20,
    id: 1,
    inserted_at: ~N[2019-12-07 08:54:44],
    name: "test name",
    updated_at: ~N[2019-12-07 08:54:44]
  }
]

バリデーションを足してみる

試しに年齢を20以上のみ通すようにしましょう。lib/broadway_example/schema/user.exを開き、validate_numberを追加します。

     user
     |> cast(attrs, [:name, :age])
     |> validate_required([:name, :age])
+    |> validate_number(:age, greater_than_or_equal_to: 20)
   end
 end

バリデーションを追加したら、エラーとなる値を送るようにスクリプトを修正します。

# script/send_message.exs
 ExAws.SQS.send_message("http://localhost:4576/queue/broadway-example", "user message",
   message_attributes: [
     %{
       name: "Name",
       data_type: :string,
       value: "test name"
     },
     %{
       name: "Age",
       data_type: :number,
+       value: 10
     }
   ]
 )
 |> ExAws.request!()

この状態でメッセージを送信します。

$ mix run script/send_message.exs 

iexのコンソールにはinsertのdebugログではなく、handle_failed/2に記述したIO.inspectの結果が表示され、statusのtupleにErrorのchangesetが入っていることがわかります。

iex(24)> [
  %Broadway.Message{
    acknowledger: {BroadwaySQS.ExAwsClient,
     #Reference<0.3935415440.1661730823.90834>,
     %{
       receipt: %{
         id: "62f74563-ef3c-4f00-8415-bccb9a8714ae",
         receipt_handle: "62f74563-ef3c-4f00-8415-bccb9a8714ae#5f14c20d-92ba-4001-8695-33541ddc080b"
       }
     }},
    batch_key: :default,
    batch_mode: :bulk,
    batcher: :db,
    data: "user message",
    metadata: %{ 
      attributes: [],
      md5_of_body: "f8ec55cf268ca6106fb23a98523b7549",
      message_attributes: %{
        "Age" => %{
          binary_value: "",
          data_type: "Number",
          name: "Age",
          string_value: "10",
          value: 10
        },
        "Name" => %{
          binary_value: "", 
          data_type: "String",
          name: "Name",
          string_value: "test name",
          value: "test name"
        }
      },
      message_id: "62f74563-ef3c-4f00-8415-bccb9a8714ae",
      receipt_handle: "62f74563-ef3c-4f00-8415-bccb9a8714ae#5f14c20d-92ba-4001-8695-33541ddc080b"
    },
    status: {:failed,
     #Ecto.Changeset<
       action: :insert,
       changes: %{age: 10, name: "test name"},
       errors: [
         age: {"must be greater than or equal to %{number}",
          [validation: :number, kind: :greater_than_or_equal_to, number: 20]}
       ],
       data: #BroadwayExample.Schema.User<>,
       valid?: false
     >}
  }
]

この表示が3回走ったあと、デッドレターキューをみてみると、メッセージが1つ格納されています。maxReceiveCountを3に設定していたので、4回目の取得のタイミングでデッドレターキューに移動しているようです。

iex(28)> ExAws.SQS.receive_message("http://localhost:4576/queue/broadway-example-dlq") |> ExAws.request!()
%{
  body: %{
    messages: [ 
      %{
        attributes: [],
        body: "user message",
        md5_of_body: "f8ec55cf268ca6106fb23a98523b7549",
        message_attributes: [],
        message_id: "62f74563-ef3c-4f00-8415-bccb9a8714ae",
        receipt_handle: "62f74563-ef3c-4f00-8415-bccb9a8714ae#ae823242-b172-43e4-9ba2-68ee1f31459a"
      }
    ], 
    request_id: "00000000-0000-0000-0000-000000000000"
  },
  headers: [
    {"Server", "BaseHTTP/0.6 Python/3.6.8"},
    {"Date", "Sat, 07 Dec 2019 10:41:22 GMT"},
    {"Connection", "close"},
    {"Content-Type", "text/plain; charset=UTF-8"},
    {"Content-Length", "760"},
    {"Access-Control-Allow-Origin", "*"},
    {"Access-Control-Allow-Methods", "HEAD,GET,PUT,POST,DELETE,OPTIONS,PATCH"},
    {"Access-Control-Allow-Headers",
     "authorization,content-type,content-md5,cache-control,x-amz-content-sha256,x-amz-date,x-amz-security-token,x-amz-user-agent,x-amz-acl,x-amz-version-id"},
    {"Access-Control-Expose-Headers", "x-amz-version-id"}
  ],
  status_code: 200
}

これでエラーとなった場合はエラー用のキューにメッセージが溜まるので、問題の切り分けがしやすいですね。

まとめ

Broadwayを使ってAWS SQSのメッセージを処理する一連の流れを試してみました。前処理もバッチ処理も柔軟に分岐できるので、「画像だったらdecodeしてs3へ格納」「テキストメッセージであればDBへ保存」など、色々な種類のメッセージをスムーズに処理することができそうです。関数パターンマッチとの相性が最高ですね🤤

まさにElixirらしいツールで、今後も要注目で使っていきたいと思います!ぜひお試しください!

明日12/8は @koyo-miyamura さんの「PhoenixでRedisを使った簡単ランキングの実装」です。お楽しみに!