ITANDI TECH BLOG

イタンジのスタッフブログです。イベントや技術情報などを発信しています。

ExecutionContextの状態を取得する

RailsScalaとインフラの三刀流と言えば聞こえはいいけど実態は雑用係に近いぽんこつです。最近はPlayframework/ScalaのプロジェクトのExecutionContextの状態を監視しようと色々やってて、その第一段としてExecutionContextの状態を取得するところからやります。

ExecutionContextとは

ScalaにはExecutionContextという並行並列実行に使う便利なものがありまして、適当にRunnableなクラスを投げ込むとmainとは別のスレッドでよしなに実行してくれます。便利ですね。また、そのようなものなので、殆どの場合、実装として内部にThreadPoolを持ってます。みんながよく書いてる

import scala.concurrent.ExecutionContext.Implicits.global

これはデフォルトで生成されるExecutionContextをimplicitで読めるようにするための魔法のimportです。勿論このようなものを自作することができて、以下のように作れます

val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(64))

便利ですね。上記で作っているExecutionContextの内部のThreadPoolはThread数が64で固定されたもので、Executorsの各種メソッドを呼び出せば、全く違うようなものを作ることができます。ちなみに、ExecutorsはJavaAPIなので、JavaでもExecutionContextが無い以外は同じようなことができます。

モチベーション

ここからが本題ですが、先程作ったExecutionContext、Thread数が64固定で設定しましたが、このThreadPoolがどの程度使われているか興味ありませんか。ぼくは大変興味があります。これが分かるとThread枯渇系の障害対応もできますし、パラメータチューニングも捗りそうです。計測できるようにしましょう。

Threadのリストを取得する

Java APIには全ThreadのStackTraceのリストを取得するAPIがあります。

Thread.getAllStackTraces(); // 返り値はMap[Thread, Array[StackTraceElement]

これを使えば現在動いてる全Threadの状態が取れます。StackTraceも同時に取得していてオーバヘッドが気になりますが、今は気にしないことにしましょう。Threadからは以下のような情報が取れます。

  • id
  • name
  • state
  • priority

stateを見るとWAITING/RUNNABLEなどの情報が取れます。特定のThreadPoolで使われているThreadのWAITINGとRUNNABLEの比率を見ると、そのThreadPoolの忙しさ具合が取れそうです。

問題はこのThreadのリストは本当に全てのThreadを含んでいて、GC用に確保されたThreadも数に含まれていたりします。厄介なのでnameでフィルターできると便利そうです。

ExecutionContextが立てるThreadに名前を付ける

実はExecutionContextが内部で使うThreadPoolのThreadには上記で取得できる名前を任意に付けることができます。Executors.newHogeThreadPoolの最後の引数にはThreadFactoryというclassを渡すことができます。

Executors.newFixedThreadPool(64, threadFactory);

自分で好きな名前を付けるThreadFactoryを作り、生成するThreadに好きな名前を付けられるようにしましょう。

import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicInteger

class NamedThreadFactory(name: String) extends ThreadFactory {
  private val counter: AtomicInteger = new AtomicInteger

  override def newThread(runnable: Runnable): Thread = {
    val tName = s"${name}-${counter.incrementAndGet()}"
    new Thread(null, runnable, tName)
  }
}

このようにnewThreadメソッドをoverrideして任意のThreadを返すようにすればOKです。上記のclassは${name}-${number}みたいな名前のThreadを自動で生成してくれるScalaのclassです。

あとは先程の方法でThreadのリストを取得し、名前で分類すれば、特定ThreadPoolのThreadのみを集めることができます。ちなみに私の場合はRunnableとそれ以外の状態のThreadの数を取得するWebAPIを作りました。これを使って可視化するところまでやる話は完全にインフラの話になるので、後編にまわします。お楽しみに!

既存Railsプロダクトのdocker化

どうも、プログラマとして入社したのになぜか最近コードあまり書いてないぽんこつです。ここ数ヶ月で既存プロジェクトを頑張ってdocker化してECSにデプロイしたのでまとめます。

ただ社内でもあまりノウハウが蓄積されているわけではなく、試行錯誤の連続で、一旦公開することにしましたが、今後も随時変更されていくと思いますし、是非「弊社はこうしたぜ〜」とかあれば参考にしたいので、教えていただけると幸いです。

設計

今回Docker化したプロジェクトは、unicornとsidekiqを使ったサービスで、静的ファイルはnginxが管理しているようなサービスです。一般的なよくあるRailsの構成だとおもいます。

これをdocker化してECSのサービスとして載せるにあたり、ECSのサービスとしてはweb-appとsidekiqの2つ、web-appは内部でnginxとunicornのコンテナをそれぞれ持つようにしています。sidekiqとunicornのイメージは同一で、commandの指定でそれぞれunicorn、sidekiqを起動しています。

既にあるALBからのリクエストを切り替える感じになります。

Terraform

ECSの構成が複雑になることが予想されていたので、terraformで構成管理をすることにしました。terraformは既存の構成をtfファイルに落とすような使い方は難しそうだったので、新規で構成した部分のみの対応です。インフラの変更をGitHubのPullRequestベースで管理できるようになったのは大きな進歩だと思っています。

静的ファイル

非常に悩んだのですが、静的ファイルは結局nginxのコンテナに載せることにしました。速度的にunicorn管理下だと辛いと思ったのが理由なのですが、結果的にnginxのimageを管理する手間が1つ増えることになりました。

後でこの話を別の人にしたら、前段にCloudFront置いてunicorn管理下に置いてるという話を聞いて、そっちの方が良かったかも、と今では思ってます。

ちなみにS3管理案も考えたのですが、切り戻しを考えると面倒が多いのではないかという気がします。

Docker imageの作成

Railsは既に開発環境だけdocker-composeに移行していたので、本番でも動くように若干の修正をくわえただけです。nginxはassets:precompileした後にそのassetsをコピーして設定ファイル封じこめてdocker-compose buildみたいなシェルを書きました。強引。

ちなみにUUIDでもなんでもいいのでタグ付けは必須です。latestだと切り戻しできなくなります。注意!

Capistranoの置換

RailsのDocker化最大の難所はCapistranoの再実装です。半分以上はECSが肩代わりしてくれますが、なかなかそうもいかない箇所もあります。ECSが全部やってくれないものとしては以下のようなものがあります。

DBのマイグレーション

自前でやる必要があります。悩んだ結果、db:migrate用のタスクを単体でECSで実行することにしました。新しいマイグレーションファイルのあるimageでcommandを弄ればできます。

当初はunicornが起動するタスクのcommandにdb:migrateを書いてしまおうと思ったのですが、appサーバの台数分同時にdb:migrateが走ったときに何が起こるのが想像が付かなかったのでやめました。PostgreSQLならALTER TABLEでtransactionが有効になってるので問題無い気がしますが、MySQL系はALTER TABLEでのTransactionが効きません。

Deployまわり

既存のRailsプロジェクトのdeployはCircleCIでcapistranoを叩くような形式になっていました。デグレしないためにはこれを再現する必要があります。

色々悩みましたが、最終的にこれはシェルスクリプトでやるしかないという結論になりました。circleci.ymlに直で書いても良かったのですが、手元でもdeployができる状態を維持したかったのでシェルスクリプトを作り、それをCircleCIで叩く方法になっています。

シェルスクリプトもそんな複雑なことはしておらず、

  • docker-composeでimage作成
  • imageにタグ付け
  • taskのイメージを変更してterraform apply
  • db:migrateタスクの起動

を順番に実行するだけです。

Wheneverの置き換え

地味に面倒くさかったのが、Wheneverの置き換えです。Wheneverは自動でrakeタスクを実行するようにcronを設定する仕組みですが、多くの場合複数のマシンで動かすのは困るので1台だけで実行する必要があります。また、えてしてcronとdockerは相性がよくありません。

方法の1つはシンプルにDBマイグレーションでやったようにECSのタスクとして実行する方法ですが、こと我々のプロジェクトに限っては非常にコストが掛かる方法でした。なぜかというと1分毎に実行しているpollingタスクがあったからです。

結果的に私が選んだのは、rakeタスクをcontrollerから実行できるエンドポイントを作り、lambdaから定期的にHTTPリクエストを投げるという方法です。HTTPアクセスはserverlessのプロジェクトを作ることで実現し、これは当初不安定でしたが今は問題なく動いているように見えます。

rakeタスクをcontrollerから叩くのは以下のようなコードで実現できます。

Rails.application.load_tasks

Rake::Task[task_path].execute

Rake::Task[task_path].clear

しかしこのコードは一方で「rakeタスクを実行するたびにtaskのツリーを全部読みにいっている」ようです。実際これは実行に数秒かかるため、Controller直下ではなく、sidekiqで実行するようにしています。このあたりが原因かは分かりませんが、ひどく不安定で、今同じことをやるなら回避します。今更ですが、きちんとcron用のECSサービスを作り、wheneverをそのまま使って展開した方が良かったかもしれないです。

現状の問題点

このように既存のRailsプロジェクトをDocker化してECSに載せた訳ですが、現在分かっている問題の1つにdeploy時間があります。

CircleCI上でassets:precompileをかけ、2つのdocker imageを、キャッシュの効かないCircleCI上で動かしているので問題があります。これはキャッシュが効かないというよりキャッシュを活用してない方に問題があります。

ArchLinuxでMySQLビルド

どうも、仕事でArchLinuxを使っているぽんこつです。公式パッケージがMySQLからMariaDBに変わってしまったのでAUR管理下の自前ビルドMySQLに移行して開発していましたが、MySQLが8.0に上がったタイミングで色々困ったことがあったのでまとめておきます。

No such file libpalm.so

これ再現する人は少ないと思うんですが、libpalmが偶然アップデートしてAURが使えなくなりました。これの原因と対処が以下に書いてあるとおりです。助かりました。

libalpm が 10 になったせいで AUR が検索すらできない件 - 想像力の欠如は深刻な欠点の一つである。

tmpfs溢れ

MySQL8.0になって大分プログラムが大きくなったのか、yaourtがデフォルトで使う/tmpディレクトリが溢れるようになりました。/tmpはデフォルトでtmpfsとしてメインメモリを使う設定になっており、デフォルトでメインメモリの半分に設定されています。自分の環境では8GBがtmpfsに割り当てられていましたが、8GBだと不足するという問題が発生します。

メモリを増設したりtmpfsの容量を増やしてもよかったのですが、ディスクがNVMeなので普通にディスクに書くように修正しました。yaourtの設定ファイル/etc/yaourtrcの

#TMPDIR="/tmp"

 を別のディレクトリにすればいいでしょう。私は/buildというディレクトリを権限777で新規に作り、そこに設定しました。

ccache有効化

どの程度効果があるかは分からないですが、コンパイル速度を早めるためにccacheを有効化しました。ccacheパッケージをインストールした後に、/etc/makepkg.confの

 BUILDENV=(!distcc color !ccache check !sign)

 のccacheにかかっている!を外せば有効になります。

※実際ビルドしてccache -s でcache hit rateを見ましたが、3.89%とか出ているので、MySQLのビルドそのものだとあまり意味は無いかもしれません

その他のmakepkgの高速化

以下に纏めてあります。

Ryzenで爆速yaourt

yaourtのパッケージの圧縮を高速化

ScalaでDNS lookupする

イタンジのなんでも屋エンジニアぽんこつです。最近業務の基幹機能がScalaで置き換えられることによってようやく沢山Scalaが書けるよやったね!

メールアドレスのホストの妥当性をチェックしたいときなど、DNSのlookupをする機会はまあまあありますが、それをScalaでやるときの話です。

基本的にはJavaに同梱されているjavax.naming.directoryを使えばいいです。すごいAPIまわりが古くさくてしんどいですが、枯れているので普通に使う分には問題ないでしょう。

ついでに今回要件としては

  • メールアドレスのホストの妥当性をチェックしたいのでMXを使う
  • 何度もDNSを引きたくないのでヒープにLRUCacheを持つ

の2つがあり、結果的に以下のように若干複雑な感じになりました。

import java.util
import java.util.Collections
import javax.naming.directory.InitialDirContext

import scala.util.Try
import scala.collection.JavaConverters._

object Main {
def main(args: Array[String]): Unit = {
val dns = new DNS()
println(s"itandi.co.jp: ${dns.hasMx("itandi.co.jp")}")
println(s"itandi.co.jp: ${dns.hasMx("itandi.co.jp")}")
println(s"example.com: ${dns.hasMx("example.com")}")
println(s"example.com: ${dns.hasMx("example.com")}")
}
}

class DNS {
val cache = Collections.synchronizedMap(new LRUCache[String, Boolean](1000))
val EnvTable = Map("java.naming.factory.initial" -> "com.sun.jndi.dns.DnsContextFactory")
val Env = new util.Hashtable[String, String](EnvTable.asJava)
val ictx = new InitialDirContext(Env)

def hasMx(host: String): Boolean = {
Option(cache.get(host)).getOrElse {
val attrs = Try { ictx.getAttributes(host, Array("MX", "A")) }.toOption
val res = attrs.exists(0 < _.size)
cache.put(host, res)
res
}
}
}

class LRUCache[K, V](limit: Int) extends java.util.LinkedHashMap[K, V](16, 0.75f, true) {
override def removeEldestEntry(entry: java.util.Map.Entry[K, V]): Boolean =
limit < size()
}

こんな感じで書けます。

注意事項とか

Cacheについて

真面目にやるならTTLでCacheを失効させるべきで、自前でDNSキャッシュサーバを立てて運用する方が良いです。ただメール送信前のホスト名のチェックという用途なら然程問題ではないと判断して今回はこうなってます。

ホストの存在は到着を保証しない

このチェックはMXレコードかAレコードの存在をチェックしますが、あったとしてもメールが受信できる状態になっているかは分かりません。なので不達を減らす効果しかありません。

Rubyでちゃんと文字列からURIを抽出する

イタンジでエンジニアをしているぽんこつです。フロントからインフラまでなんでもやってます。

なにか文字列の塊からURI/URLを抽出しなければいけない、という案件はRubyに限らずよくでてくると思いますが、実際に真面目にやらないといけないとなるとかなりしんどいです。日本語URL、invalidなホスト名etc...

RubyであればURI.extractという、URIを文字列中から探して、ヒットしたURIを全部返してくれるメソッドがあるのですが、

  • obsolateである
  • matchした文字列でしか返ってこない

という問題があり、あまり使いたくはありません。

ところでURI.extractですが、Rubyソースコードを読んでみると、

  1. RFC2396_Parser.extractを呼び出す
  2. make_regexp正規表現を生成
  3. `string`.scan

しているだけということが分かります。make_regexp正規表現さえあれば、なんでもできそうですね!表題のパターンだとこうなります。

regexp = URI::DEFAULT_PARSER.make_regexp(['http', 'https'])
text.to_enum(:scan, regexp).map { Regexp.last_match }.each { |match|
  ...
}

URIスキームを引数に持つのでftpは除外したいとかでも大丈夫。どこぞで公開されていた謎正規表現と違ってRFCに準拠しているという安心感があります。やはり正規表現は自作したら負けですね。

digdag+embulkを使ってテーブル毎にカラムを設定してBigQueryに流し込んでみた

横澤です、平素より格別のご高配を賜り、厚く御礼申し上げます。

先日DWH系のネタについてAthenafluentdの二つを書いたのですが、結論としてdigdag+embulkという構成に落ち着いてしまったのでここにご報告させて頂きます。

ここに至った細かい経緯は本記事の最後にまとめておきますので、まずは設定ファイル例を公開しようと思います。なお、digdagやembulkの細かい使い方についてはWEB上に良記事があるので割愛します。

まずメインの呼び出し先となる.digファイルです、定期実行タイミングをscheduleで登録し、!includeを使って登録DBの接続情報を外に出してgitignoreで管理しています。digdagにはsecret機能もあるのですが、server利用でないと扱えないようなので取り敢えずこのような形でgithubに上がらないように逃しています。

timezone: UTC

schedule:
  daily>: 10:00:00

+main:
  !include : my-db.dig

step1では全カラムを転送するテーブルを並べています、embulk用のymlファイルについては後ほど解説します。

  +step1:
    for_each>:
      table: [
        users
        , products
      ]
    _do:
      embulk>: mysql_to_bigquery.yml

step2では全カラムではなく、特定のカラム指定や特定の行指定を行いたいテーブルをembulk側のqueryで設定しています。データ量が大して大きくないテーブルならば特に考えずに全カラム、全行を投げても良いのですが、データ量がそこそこ大きいテーブルで全部やってしまうと稼働するインスタンスのコストが増加してしまいます。重厚長大なテーブルについてはMECEにデータを指定する事で転送用のインスタンスコストを抑えつつ転送速度を確保させています。

  +step2:
    embulk>: mysql_to_bigquery_orders.yml

続いては接続設定用の.digファイルです、これはdigdagワークフロー内で変数を展開するexport機能を使って指定するだけです。

_export:
  host: 'localhost'
  user: 'me'
  password: 'my-pass'
  database: 'my-database'
  project_id: 'my-projectid'
  dataset: 'my-dataset'

最後にembulk用のymlファイルです、digdag側から変数設定されて呼び出すのでシンプルにワンファイルで住むのが魅力的ですね。またbigqueryの転送にはembulk-output-bigqueryを使っているのですが、テンポラリテーブルを作ってスキーマに合わせたloadをしてくれるので楽且つ安全で嬉しい感じです。

in:
  type: mysql
  host: ${host}
  user: ${user}
  password: '${password}'
  database: ${database}
  table: ${table}
out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: ./my-bigquery.json
  project: ${project_id}
  dataset: ${dataset}
  auto_create_table: true
  table: ${table}
  allow_quoted_newlines: true

また.digで設定したstep2用のembulk設定ファイルを用意します。こちらでは前述したデータ範囲を指定する為にembulk-input-mysqlのqueryキーワードを使って取得カラムや範囲を指定しています。

in:
  type: mysql
  host: ${host}
  user: ${user}
  password: '${password}'
  database: ${database}
  query: |
    select id,user_id,product_id from orders
out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: ./my-bigquery.json
  project: ${project_id}
  dataset: ${dataset}
  auto_create_table: true
  table: orders
  allow_quoted_newlines: true

embulk-input-mysqlにはselectやwhereといった設定キーワードもあるのでもっと綺麗な感じに出来ないか試行錯誤していたのですが、取り敢えず動かす事を目標に泥臭く書いています。embulkプラグインのオプションは豊富に用意されているのでよく読むと様々な使い方を発見できそうです。

そもそもAthenaは「使ってみた」という感じなのでDWHとしての本番採用は最初から考えておらず、この業界特有の雑多なCSVを良い感じにリレーショナルDB的に扱えるかどうか試してみる、という目的で使っていました。 fluentdについてはしばらく運用していたのですが、バッファリングサーバーの可用性や拡張性を維持するのが結構大変で、途中で運用を止めたりデータを減らしたりと思うような運用が出来ておりませんでした。 しかし、よくよく考えると今回のDWHはアドホックな分析に使われる目的のみを期待されているので、リアルタイムにデータ連携される必要はなく、fluentdによるストリーミング処理は廃止してembulkによるバッチ処理に切り替える事を決めました。 embulkをshellなりコマンドで呼んでも良いのですが、複数DBやテーブル毎に微妙にカラム構造を変えたい、という要件があったのでワークフローエンジンを使って上手い事出来ないかを調べたらdigdagがどうやら良さそうだったので、取り敢えずdigdag+embulkで構成を作ってみました。 現在のところはこの構成で上手く回っており、redashからもGBQに接続出来るようにしたので今まではエンジニアでなければ出来なかった(エンジニアでもかなり面倒でした)サービスを横断したデータ取得が出来るようになりQualityOfDataLifeが向上したような予感がしています。

イタンジでは多くのデータを活用してサービスのグロースにコミットするプロダクトエンジニアと、探索的にデータを活用して全く別の兆候を発見する機械学習エンジニアを募集しております!

AWS Data PipelineとAthenaを使ってお手軽DWHを3分クッキング

横澤です、いつもお世話になっております。

本当は5月に書こうと思っていたネタなのですが、最近は時空の歪みの影響を強く受ける事があり気づいたら6月になってしまいました。

本題です、本エントリではData Pipelineを使ってRDSデータをS3にエクスポートし、去年のre:inventで発表のあったAthenaを使ってDWHっぽいものを作ってみるエントリです。Athenaについては説明記事やチュートリアル記事が沢山あるので気になったらググって見て下さい。Data Pipelineはデータ処理に特化したワークフローエンジンの様なサービスです。名前の通りデータの抽出や加工、集約等についてAWS内部の様々なリソースを活用してワークフローを組み立てる事が出来ます。今回はRDS(MySQL)に入ってるデータをCSVファイルとしてS3に吐き出す部分をData Pipelineで実行し、Athenaで読み込めるようにしてみました。

【最初の1分:ジョブの登録】

RDSからS3にCSVを出力する処理はテンプレートが用意されているので簡単に作成できます。 テンプレートを選択したらDBへの接続情報、バッチスケジュール、S3のバケットを指定すれば初期設定は完了です。

このタイミングで幾つか注意点があります。

  • Athenaは現時点(2017-06-08)ではUSリージョンのみサービス提供となっているので、読み込み先となるS3バケットもUSリージョンに作成する必要があります。

  • Data Pipelineは「DataPipelineDefaultRole」と「DataPipelineDefaultResourceRole」というロールを使うのでこれらにS3やRDSへのアクセス権を付与する必要があります

  • データエクスポートを実行するEC2インスタンスについて、デフォルトだとt1.microが選ばれていますがt系インスタンスVPC上への作成がデフォなので接続先がVPCに置かれていない場合はm系のインスタンスを使うなどの工夫が必要です

【間の1分:ジョブの編集】

登録されたジョブはこのような感じでワークツリーで可視化されます、それぞれのタスクを選択すると右側でconfigを修正する事が出来ます。 RDSでsubnetを設定している場合はEc2Resourceタスクを開いてオプションでsubnet-idを設定しないとconnectionエラーになってしまいます。またEC2からRDSへ上手く接続出来ない時にはデバッグ目的でEC2にsshで入りたくなる事もあるのでキーペアも登録しておくと後々幸せになれるかもです。

【最後の1分:Athenaでクエリ発行】

これでジョブは登録されたのでスケジュールをon-demandに設定してactivateするとワークフローが実行され、成功するとS3にCSVが吐き出されます。ワークフローエンジンらしく各ジョブはこんな感じで結果成否やログが見れるのでハマった時も修正がかけやすいです。 最後にAthenaのQueryEditorを使ってHiveQL形式のCREATE TABLE文を発行すればDWHっぽいテーブルの完成です!一点ハマったポイントとしてLOCATIONは「s3://[バケット名]/[パス名]」と指定しなければならず、当初はリージョンURLを含めて指定していたせいでエラーが起きてました・・・

以上で3分クッキングは完了です、実際にはもっと時間かかりましたが本記事を読んで頂く事で3分くらいで作れるようになると嬉しい限りです。そしてここまで書いておいてなんですが、イタンジではDWHっぽい事を実現するツールとしてはGoogle Big Queryを使っており、最近はDigdagというワークフローエンジン経由でembulkを動作させてData Pipelineと似たような事をやっています。なので今回はあくまでもData PipelineとAthenaを実験する目的でやってみた的なネタなので実運用するとどうなるかは未知数だったりします。

イタンジ株式会社ではこのようなデータエンジニアリングにテンションが上がるエンジニアや、集約管理されたデータを使って探索的にデータ解析したいエンジニアを募集しております。