はじめに
こんにちは、オープンエイトの山崎です。
今回は、Akka Streams で Alpakka を使って AWS SQS と接続しデータを送受信する方法について説明します。また本記事内では AWS SQS をローカル環境でシミュレートするために ElasticMQ を使用します。ElasticMQ のセットアップについてもあわせて簡単に説明します。記事執筆時の環境は以下の通りです。
- Scala 2.13.5
- sbt 1.4.7
- Akka 2.6.13
- Alpakka SQS 2.0.2
- ElasticMQ 1.1.0
オープンエイトでは、SNS 配信効果測定サービス Insight BRAIN において内部のジョブ管理に SQS を利用していますが、その接続処理において実際に Akka Streams と Alpakka を使用しています。
Alpakka
Alpakka とは
Akka Streams で様々なリソースと接続するためのコンポーネント群を提供するプロジェクトが Alpakka です。Akka の公式プロジェクトとして開発・提供されています。たとえばローカルのテキストファイルをストリーム処理する Source や、処理結果を S3 に保存する Flow など、様々な入力ソースや出力先を Akka Streams のコンポーネントとして汎用的に使えるかたちにしてくれています。
Alpakka Documentation
https://doc.akka.io/docs/alpakka/current/
外部サービスとの連携処理が簡単に構築できてしまうのでうまく Alpakka を活用できれば開発効率の向上が見込めるでしょう。
今回はその中から AWS SQS 用のモジュールを使用します。
AWS SQS • Alpakka Documentation
https://doc.akka.io/docs/alpakka/current/sqs.html
ちなみに「Alpakka」という名前は動物のアルパカから取っているものと思われます。アルパカは英語の綴りだと「Alpaca」ですがフィンランドやノルウェーでは「Alpakka」と表記するようです。「Akka」という単語が含まれているためこの名前が採用されたのかなと思うのですが、ドキュメントでもそのあたりは特に説明されてなさそうなので正確なところはわかりませんね…。
セットアップ
Alpakka は接続対象ごとにモジュールが分かれているので必要なものだけを選んで導入します。たとえば今回は SQS モジュールが必要なので libraryDependencies
を以下のように記述します。
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream" % "2.6.13", "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "2.0.2" )
実際のコードについては後述します。
AWS SQS
AWS SQS とは
Amazon SQS(サーバーレスアプリのためのメッセージキューサービス)| AWS
https://aws.amazon.com/jp/sqs/
SQS (Simple Queue Service) は AWS が提供するマネージド型のメッセージキューサービスです。AWS では MQ、Kinesis、MSK など複数のメッセージキュー系サービスが提供されていますが、名前の通り最もシンプルな機能を提供しているのが SQS です。プロプライエタリなサービスであり AWS にロックインされる点はネックですが、低コストで利用できる上に単純なユースケースなら大体 SQS でカバーできるのではと思います。
ElasticMQ のセットアップ
SQS は AWS のサービスなので、開発用にローカル環境で動作させるといったことができないのですが、SQS 互換 API を提供する ElasticMQ というアプリケーションを利用することで擬似的にローカル環境で SQS を動作させることができます。
softwaremill/elasticmq: In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.
https://github.com/softwaremill/elasticmq
キューのデータはインメモリで扱われるため永続化されず ElasticMQ サービスを終了すると消えてしまいます。なのであくまでも開発用のツールという位置づけで捉えておくのがよさそうです。
ElasticMQ は Alpakka SQS のテストでも利用されているようです。我々も開発環境で利用していますが、これまでのところ実際の SQS と挙動が異なって困ったというようなことは起きていません。
ちなみに ElasticMQ 自体は Scala 製です。ただし Docker イメージでも提供されているため特に Scala を意識せずに利用できます。Jar 単体で実行したりライブラリとしてアプリケーションに組み込んだり様々な利用方法が提供されていますが、今回は Docker イメージを利用することにします。
docker-compose.yml
を作成し以下のように記述します。
version: '3' services: elasticmq: image: softwaremill/elasticmq container_name: elasticmq_test ports: - 9324:9324 volumes: - ./elasticmq.conf:/opt/elasticmq.conf
設定ファイル elasticmq.conf
を以下のように作成します。今回は test-queue.fifo
という名前で FIFO キューを作成します。
include classpath("application.conf") aws { accountId = queue } queues { "test-queue.fifo" { defaultVisibilityTimeout = 10 seconds fifo = true contentBasedDeduplication = true } }
上記ファイルが用意できたら docker-compose
コマンドでコンテナを起動します。
docker-compose up -d
コンテナが起動すると test-queue.fifo
というキューが自動的に作成され localhost:9324
で接続可能な状態になります。そのエンドポイントを指定すれば AWS CLI などを使って SQS として扱うことが可能です。
# AWS CLI でキューをリストしてみる aws sqs list-queues --endpoint-url 'http://localhost:9324'
{ "QueueUrls": [ "http://localhost:9324/queue/test-queue.fifo" ] }
ちなみに、おそらくバグだと思うのですが、現行バージョンだと接続 URL の AWS アカウントの部分は queue
という文字列でないとうまく動作しないようです。たとえば elasticmq.conf
で aws.accountId = aws01
のように記述すれば http://localhost:9324/aws01/test-queue.fifo
で接続できそうに思えるのですが手元の環境では正しく動作しませんでした。
Alpakka で AWS SQS 接続
準備が整ったので実際に Alpakka で SQS (ElasticMQ) に接続してみます。
まず最初に SqsAsyncClient
を生成して SQS 接続情報を設定します。
import java.net.URI import akka.actor.ActorSystem import akka.stream.alpakka.sqs.SqsSourceSettings import com.github.matsluni.akkahttpspi.AkkaHttpClient import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.sqs.SqsAsyncClient val endpoint = "http://localhost:9324" implicit val actorSystem = ActorSystem("example") implicit val sqsClient = SqsAsyncClient .builder() .credentialsProvider( StaticCredentialsProvider.create( AwsBasicCredentials.create("AK", "SK") // (1) ) ) .endpointOverride(URI.create(endpoint)) // (2) .region(Region.AP_NORTHEAST_1) .httpClient(AkkaHttpClient.builder() .withActorSystem(actorSystem).build()) .build()
(1) 今回は本物の SQS ではなくローカルの ElasticMQ への接続なのでアクセスキーとシークレットキーは適当な値を記述しておきます。
(2) endpointOverride
でエンドポイントを指定することで接続先を AWS ではなくローカルに向けることができます。
Subscribe
SQS からメッセージをストリーム取得するには SqsSource
を使用します。このとき先程生成した SqsAsyncClient
が implicit
で参照されます。この SqsSource
はその名の通り Akka Streams の Source なので Flow や Sink とつないでデータフローを構築できます。
val queueUrl = endpoint + "/queue/test-queue.fifo" val settings = SqsSourceSettings() SqsSource(queueUrl, settings) .runWith(Sink.foreach { message: Message => val body = message.body println(s"received: ${body}") })
設定等に問題がなければこれで SQS と接続された状態になります。試しにこのアプリケーションを実行した状態で AWS CLI を使ってメッセージを送信してみます。グループ ID が必要になるのですがとりあえず適当な値で大丈夫です。
# AWS CLI で "hello" というメッセージを送信してみる aws sqs send-message \ --queue-url "http://localhost:9324/queue/test-queue.fifo" \ --endpoint-url "http://localhost:9324" \ --message-group-id "x" \ --message-body "hello"
成功すると Scala アプリケーション側のコンソールに received: hello
と表示されるはずです。
ちなみにこのままだと SQS 上のメッセージは消えずに残り続けます。なので少し時間が経つとまた同じメッセージを受信してしまいます。一度受信した処理済みのメッセージを削除するには ACK メッセージを送ってメッセージを削除する必要があります。その場合、以下のように SqsAckFlow
や SqsAckSink
を使用します。
import akka.stream.alpakka.sqs.MessageAction import akka.stream.alpakka.sqs.SqsAckResult SqsSource(queueUrl, settings) .map(MessageAction.Delete(_)) .via(SqsAckFlow(queueUrl)) .runWith(Sink.foreach { res: SqsAckResult => val body = res.messageAction.message.body println(s"received: ${body}") })
これで一度受信したメッセージは SQS 上から削除されます。
Publish
SQS にメッセージを送信する場合は SqsPublishSink
や SqsPublishFlow
を使います。
import akka.stream.alpakka.sqs.scaladsl.SqsPublishSink import akka.stream.scaladsl.Source import software.amazon.awssdk.services.sqs.model.SendMessageRequest val queueUrl = endpoint + "/queue/test-queue.fifo" val message = SendMessageRequest.builder .messageBody("hello") .messageGroupId("x") .build Source.single(message) .runWith(SqsPublishSink.messageSink(queueUrl))
これを実行すると SQS にメッセージが 1 件送信されます。
# AWS CLI でキューからメッセージを取得してみる aws sqs receive-message \ --queue-url "http://localhost:9324/queue/test-queue.fifo" \ --endpoint-url "http://localhost:9324"
{ "Messages": [ { "MessageId": "5be5527a-ec4e-4f3f-8ad2-67a713938673", "ReceiptHandle": "5be5527a-ec4e-4f3f-8ad2-67a713938673#2406b392-60b7-4019-b3b9-4a62620bdbcc", "MD5OfBody": "5d41402abc4b2a76b9719d911017c592", "Body": "hello" } ] }
さいごに
Akka Streams はロジックを Source、Flow、Sink といったかたちでモジュール化して扱うことができるためモジュールの組み合わせでデータフローを構築できる点が非常に強力だと感じています。Alpakka は SQS 接続以外にも様々なモジュールを提供しているので、要件がマッチすれば外部サービスとのデータ連携部分は Alpakka で手軽に構築してしまうことができます。その分ビジネスロジックの開発に集中できるので、うまく活用することで開発効率の向上が図れるのではないでしょうか。
ということで、今回は以上となります。非常に駆け足の内容ではありますが本記事が Akka Streams 活用の一助となれば幸いです。最後までお読みいただきありがとうございました。

Akka実践バイブル アクターモデルによる並行・分散システムの実現
- 作者:Raymond Roestenburg,Rob Bakker,Rob Williams
- 発売日: 2017/12/13
- メディア: 単行本(ソフトカバー)

- 作者:Martin Odersky,Lex Spoon,Bill Venners
- 発売日: 2016/09/20
- メディア: 単行本(ソフトカバー)