ie_test
2018年8月1日水曜日
luigi の機能色々
luigi で色々行う際にちょっとハマったことなどを書いてます。 luigi の基本的な取り扱いに関しては下記を参照 https://ietoa.blogspot.com/2018/07/luigi.html # luigi で google cloud strage(以下 gcs) を依存先に設定する方法 gcs 上のファイルを依存先として設定する方法を調査した。 ## 課題設定 gcs 上の a-test バケット上の test_dir フォルダの中の tmp_test.txt ファイルを a-test2 バケット上に tmp_test2.txt ファイルとしてコピーする。 ## コード ``` import os import luigi import luigi.contrib.gcs class Orig(luigi.Task): def output(self): return luigi.contrib.gcs.GCSTarget("gs://a-test/test_dir/tmp_test.txt") class Testup(luigi.Task): def requires(self): return Orig() def run(self): input_path = self.input().path output_path = self.output().path os.system( "gsutil cp %s %s"%(input_path, output_path ) ) def output(self): return luigi.contrib.gcs.GCSTarget("gs://a-test2/tmp_test2.txt") ``` 実際の実行には gsutil コマンドを使っている。 (gsutil コマンドの設定に関しては以下を参照,gcloud コマンドの設定ができれば使える。 https://ietoa.blogspot.com/2018/07/gcp-google-cloud-platform.html ) 依存の設定に luigi.contrib.gcs.GCSTarget を使っているが その前に import luigi.contrib.gcs という記述がないとエラーが生じるので注意。 # luigi で Bigquery のテーブルを依存先に設定する方法 Bigquery 上のテーブルを依存先として設定する方法を調査した。 ## コード 以下は Bigquery において, プロジェクトコードが watashi_no_project, dataset が "temp_results"で、 "my_table" -> "my_table_filter" -> "my_table_count" のように中間テーブルの作成が依存している場合のコードとなっている. BigqueryTarget は table.dataset_id, table.table_id によって dataset と table を表す文字列を取得できる。 (https://github.com/spotify/luigi/blob/master/luigi/contrib/bigquery.py) ``` import luigi from luigi.contrib.bigquery import BigqueryTarget PROJECT_ID = 'watashi_no_project' ... class SelectQuery( luigi.Task ): dest_dataset_id = "temp_results" dest_table_id = "my_table" def run(): print("this is given!") exit(1) def output(self): return BigqueryTarget( PROJECT_ID, self.dest_dataset_id, self.dest_table_id ) class FilterQuery( luigi.Task ): dest_dataset_id = "temp_results" dest_table_id = "my_table_filter" start_date = "2018-04-01" def requires(self): return SelectQuery() def run(self): exec_filter_query_by_date( self.start_date, self.input().table.dataset_id, self.input().table.table_id, self.output().table.dataset_id, self.output().table.table_id ) def output(self): return BigqueryTarget( PROJECT_ID, self.dest_dataset_id, self.dest_table_id ) class KaimonoCountQuery( luigi.Task ): dest_dataset_id = "temp_results" dest_table_id = "my_table_count" def requires(self): return FilterQuery() def run(self): exec_count_query( self.input().table.dataset_id, self.input().table.table_id, self.output().table.dataset_id, self.output().table.table_id ) def output(self): return BigqueryTarget( PROJECT_ID, self.dest_dataset_id, self.dest_table_id ) ``` # luigi で同じ task を並列に実行したい場合 下記のコードのように各タスクにパラメータを設定してやって、 あるタスクの requires に違うパラメータでインスタンスを設定する。 さらに設定ファイル、実行コマンドにおいて指定する要素がある。 ## コード ``` #coding:utf-8 import os import luigi class TestTask(luigi.Task): n = luigi.IntParameter() # インスタンス毎に異なるパラメータ. def run(self): os.system( "echo hello %d >%s"%( self.n, self.output().path) ) def output(self): return luigi.LocalTarget( "test_task_%d.txt"%self.n ) class TestTaskTotal(luigi.Task): def requires(self): ret_list = [] for i in range(4): ret_list.append( TestTask(n=i) ) return ret_list def run(self): print("ok") ``` ## 設定ファイルと実行コマンド 並列実行する場合、さらに以下のように「設定ファイル」と「実行コマンド」における設定が必要 ### 設定ファイル 4 並列したい場合は以下のように設定する必要がある。(デフォルトは実行するマシンのコア数になっている。) luigi.cfg ``` [core] parallel-scheduling-processes=4 ``` ### 実行コマンド 4 並列したい場合、以下のように実行する必要がある。(設定がないと実行はされるが並列実行にはならないので計算時間の改善にならない)。 gcloud コマンドで複数マシンに並列実行させる場合などで有効。 ``` PYTHONPATH="." luigi --local-scheduler --module mytmp TestTaskTotal --workers=4 ``` # 並列実行に関する話題 以下の記事のよると windows 上での並列実行では問題が生じる可能性がある。(未確認) https://qiita.com/soyiharu/items/e54bd963f9faf22c6cae また並列実行ジョブの個数と速度感については以下を参照。 https://qiita.com/yanagi3150/items/15b2928b5018b197ab45 思想としてもある程度の粗い粒度でのジョブ実行を想定していることがわかる。 https://luigi.readthedocs.io/en/stable/design_and_limitations.html ## 注意 joblib と併用すると問題になる可能性がある。(未検証) # オンメモリでの取り扱いについて luigi はオンメモリでの依存処理は推奨していないようだ。(以下の記事参照) https://qiita.com/yanagi3150/items/6dd88af31ad55d98bb33 # 依存設定のカスタマイズ luigi はデフォルトだと依存解決にターゲットファイルがあるかないかしか見ない。make がファイルの更新時間を見るのと比べるとだいぶてきとう。多分、ネットワーク越しの依存などを見るため、時間の同期などを保証できないだろうと想定しているからと思われる。 その代わり依存解決のカスタマイズができる。 https://qiita.com/ngr_t/items/b928bc13457571e25519 と https://gist.github.com/demoray/b503c887518941d264b0 を見る限りでは task クラスの complete メソッドを定義してやれば良い。 ## 依存関係の実装 上記のリンク先の記事を参考に書いてみた。 以下は test_task.txt が test_task2.txt より新しければ TestTask2 が実行される. ``` #coding:utf-8 import os import datetime import luigi class TestTask(luigi.Task): def run(self): os.system( "echo hello 111 >%s"%( self.output().path) ) def output(self): return luigi.LocalTarget( "test_task.txt" ) class TestTask2(luigi.Task): def requires(self): return TestTask() def run(self): os.system( "cat %s >%s"%( self.input().path, self.output().path) ) def output(self): return luigi.LocalTarget( "test_task_2.txt" ) def complete(self): output_fpath = self.output().path if os.path.exists( output_fpath ) == False: return False input_fpath = self.input().path if os.path.exists( input_fpath ) == False: return False input_datetime = datetime.datetime.fromtimestamp( \ os.path.getmtime( input_fpath ) ) output_datetime = datetime.datetime.fromtimestamp( \ os.path.getmtime( output_fpath ) ) if input_datetime > output_datetime: return False return True ``` なんとなくだけれども全てのタスクにここまで書かせるのはきついだろうと思うんで、何か良いライブラリやクラスがあるんではないかとも思う。 とりあえずここでは実装できて、実際に機能するということだけ確かめておいた。 # 全てのターゲットを消去して実行したいときなど 以下のページで方法についてだいぶ議論している。まだ追えてない。。時間のあるときに再調査! https://github.com/spotify/luigi/issues/595
0 件のコメント:
コメントを投稿
次の投稿
前の投稿
ホーム
登録:
コメントの投稿 (Atom)
0 件のコメント:
コメントを投稿