ie_test
2018年7月22日日曜日
luigi をデータハンドリングに使ってみた
# 目的 データ分析をしていると、色々なデータを join( merge ) したり、 フィルターかけたり、一部のカラムだけ取ってきたりといった作業が多くなる。 そういった作業の末に作り上げたデータなどを作り直したり、また一部の工程だけを 違ったやり方にして実行するといったことを頼まれた時に四苦八苦してしまうことが多い。 luigi はそういった工程管理をスクリプト化することで管理しやすくする。 (少なくとも,そういった作業を簡易にしたいと思って開発されたものである。) # 他のツールなど 同じような目的のツールはビルドのためのツールが真っ先に思い浮かぶ。 make コマンドや python の scons など。 make はかなり汎用といえば汎用だが記述がシェルスクリプト前提であるため少し辛い。 scons は python で記述できるので競合たり得るかもしれない。 違いとしてはネットワーク越しの生成物のチェックや hadoop などの他ツールとの統合性が良さげらしい。 (あんまりピンとこないのでまたおいおい調べる必要があるだろう。。) 他にも色々あるかもだが著者の調査不足により、この議論はこれまでにとどめる。 # ツールインストール python のライブラリで, pip install luigi とすればインストールできる。 # ツールの簡単な使い方 以下は train_cust_item.csv, test_cust_item.csv という (キャストID,アイテムID) がレコードであるような csv ファイルから 正例、負例のレコードを作成し、必要なカラムを取ってきてジョインし、 学習用データと確認用データである train_data.csv, test_data.csv を作る工程を luigi で書いたものである。 (実際には train_cust_item.csv, test_cust_item.csv を作る部分も luigi でコード化しているのだが 説明には長すぎるのでそこはカット.) make_minus_data, make_fill_data, my_catenate は別途実装が必要. またかなりベタに書いてるのでもっと綺麗に書く方法があると思われる。 luigi_sample.py ``` class TestTrainSplit(luigi.Task): def output(self): return { "train_fpath" : luigi.LocalTarget("train_cust_item.csv"), "test_fpath" : luigi.LocalTarget("test_cust_item.csv") } class TestDataPlus(luigi.Task): def requires(self): return TestTrainSplit() def run(self): print("TestDataPlus") def output(self): return luigi.LocalTarget("test_cust_item.csv") class TrainDataPlus(luigi.Task): def requires(self): return TestTrainSplit() def run(self): print("TrainDataPlus") def output(self): return luigi.LocalTarget("train_cust_item.csv") class TestDataMinus(luigi.Task): def requires(self): return {"item_fpath": MakeItemData(), "cust_item_fpath": TestDataPlus() } def run(self): make_minus_data( self.input()["item_fpath"].path, self.input()["cust_item_fpath"].path, self.output().path ) def output(self): return luigi.LocalTarget("test_cust_item_minus.csv") class TrainDataMinus(luigi.Task): def requires(self): return {"item_fpath": MakeItemData(), "cust_item_fpath": TrainDataPlus() } def run(self): make_minus_data( self.input()["item_fpath"].path, self.input()["cust_item_fpath"].path, self.output().path ) def output(self): return luigi.LocalTarget("train_cust_item_minus.csv") class TestDataFill(luigi.Task): def requires(self): return {"test_data_plus_path": TestDataPlus(), "test_data_minus_path": TestDataMinus(), "user_data_path": MakeUserData(), "item_data_path": MakeItemData() } def run(self): make_fill_data( self.input()["user_data_path"].path, self.input()["item_data_path"].path, self.input()["test_data_plus_path"].path, self.output()["plus_out_path"].path, self.input()["test_data_minus_path"].path, self.output()["minus_out_path"].path ) def output(self): return {"plus_out_path":luigi.LocalTarget("test_cust_item_fill.csv"), "minus_out_path": luigi.LocalTarget("test_cust_item_minus_fill.csv") } class TrainDataFill(luigi.Task): def requires(self): return {"train_data_plus_path": TrainDataPlus(), "train_data_minus_path": TrainDataMinus(), "user_data_path": MakeUserData(), "item_data_path": MakeItemData() } def run(self): make_fill_data( self.input()["user_data_path"].path, self.input()["item_data_path"].path, self.input()["train_data_plus_path"].path, self.output()["plus_out_path"].path, self.input()["train_data_minus_path"].path, self.output()["minus_out_path"].path ) def output(self): return {"plus_out_path":luigi.LocalTarget("train_cust_item_fill.csv"), "minus_out_path": luigi.LocalTarget("train_cust_item_minus_fill.csv") } class MakeTestData(luigi.Task): def requires(self): return TestDataFill() def run(self): fpath_list = [ self.input()[input].path for input in self.input() ] my_catenate( fpath_list, self.output().path ) def output(self): return luigi.LocalTarget("test_data.csv") class MakeTrainData(luigi.Task): def requires(self): return TrainDataFill() def run(self): fpath_list = [ self.input()[input].path for input in self.input() ] my_catenate( fpath_list, self.output().path ) def output(self): return luigi.LocalTarget("tain_data.csv") class MakeTrainTestData(luigi.Task): def requires(self): return [ MakeTrainData(), MakeTestData() ] def run(self): print("MakeTrainTestData ok") ``` # コードの説明 通常、作業内容は「class MakeTrainTestData(luigi.Task)」のようにクラスで定義していく。 このクラスは三つのメソッドから成り立つ。 requires は、このタスクを開始する前に終わっておくべきタスクを指定する。 output はこのタスクの終了でどういったものが生成されるかを記述する。 私は細かい使い方がわからなかったので全てファイルを生成物として指定する使い方だけしている。 # 使い方 以下のコマンドで luigi_sample.py にある class MakeTrainData の output で指定されている "train_data.csv" が生成されるまでの作業が全て実行される。 ``` PYTHONPATH="." luigi --local-scheduler --module luigi_sample MakeTrainData ``` ただし、依存途中で指定されているファイルがある場合は、そのファイルの生成を行う class の run メソッドは実行されない。 もしくは 他のタームで $ luigid とすると localhost:8082 で luigi のデーモンが動く. すると --local-scheduler の部分を省略しても実行できる。 ``` $ PYTHONPATH="." luigi --module luigi_ver5 MakeTrainTestData ``` これはデーモンに処理を投げるようになるということである。 このように実行すると実行の詳細な情報をブラウザで確認できる。
0 件のコメント:
コメントを投稿
次の投稿
前の投稿
ホーム
登録:
コメントの投稿 (Atom)
0 件のコメント:
コメントを投稿