Spring Batchの概要メモ
-
カテゴリ:
- Java
-
タグ:
- #Spring
Spring Frameworkて覚えることがいっぱいありますよね。やっとSpringMVCに慣れたと思ったら今度はSpringBatchを勉強しないといけなくなりました。
:;(∩´﹏`∩);:
今回はSpringBacthに関わる用語のメモ的なもので、自分が見返した時に思い出しの作業の負荷を少しでも軽減させるためのものなので、そこんとこ、ご了承ください。
SpringBatchとは
SpringBatchとは、SpringFrameworkに含まれるバッチアプリケーションフレームワークです。バッチ処理に必要なものがいろいろ入っており、SpringのDIコンテナやAOPなども利用できます。
SpringBatchの構成要素
大きな構成要素としては以下のようなもんがあります。
Job
バッチ処理の実行単位。SpringBatchの1つの処理に対して1つのJobが必要になります。
JobLauncher
Jobを起動するためのインターフェースです。まずこいつが最初に動いてバッチ処理に必要なJobやSTEPを管理していくれます。
STEP
Jobを構成する1つの処理がSTEPになります。Jobに対してSTEPが1対多の関係になり、データ取込みや加工、出力などの処理をそれぞれSTEPで作成していくことになります。
STEPの種類にはTasklet(タスクレット)モデルとチャックモデルの2種類がある。
JobRepository
JobやSTEPの実行状況を管理してくれます。実行情報はデータベースで永続的に保存され、JobやSTEPはそれぞれIDで紐づかれ管理されます。
STEPの並列処理や逐次処理をするとき、前処理の完了ステータスを見て実行するか判断する際もJobRepositoryの情報を参照しています。
実行ログとしても重宝します。
JobInstance
JobIDとJobパラメータを使ってバッチジョブの管理をしてくれる機構。同じJobIDとJobパラメータで複数バッチを実行しても、最初のバッチが実行中なら2つ目のバッチは実行されず、また最初のバッチが完了していた場合はエラーになる。
launch-context.xml
SpringBatchの設定ファイルです。「jobLauncher」、「jobExplorer」、「jobLoader」、「jobRepository」、「dataSource」などをどのオブジェクトを使用するか定義しています。
module-context.xml
バッチの構成を定義する設定ファイルです。自分の場合は1バッチ(Job)に対して1つmodule-context.xmlを作りイメージ。
Jobを構成するSTEPや参照するオブジェクト、処理の順番などを設定します。
タスクレットモデルとチャンクモデル
上記にも述べましたが、SpringBatchのSTEPにはタスクレットモデルとチャックモデルの2つに分類されます。
それぞれ使う目的も造りの構成も全く違うので区別して理解しましょう。
チャンクモデル
バッチ処理で比較的登場頻度の多い処理である「取込」、「加工/集計」、「出力」を一連の流れで実行できる仕組みがチャンクモデルです。
利用シーンとしては1件ずつ処理をするバッチではなく、大量のデータを塊で処理する際によく使います。
・ItemReaderインターフェイス
「取込」処理の際に使われるインターフェース。必ずread()メソッドが呼ばれる。
・ItemProcessorインターフェース
「加工/集計」処理の際に使われるインターフェース。必ずprocess()メソッドが呼ばれる。
・ItemWriterインターフェース
「出力」処理の際に使われるインターフェース。必ずwrite()メソッドが呼ばれる。
タスクレットモデル
1つのSTEPに対して、チャンクモデルの型にはまらないような細かな処理や特殊な動きをする処理に使います。どんな形の処理でも良いためとても自由度が高いです。
module-context.xmlの例
まず、module-context.xmlのメインとなる部分を説明します。最初に読み込まれ部分となり、メインフローとよく言われます。
<!-- メインフロー呼び出し. -->
<batch:job id="import.csv" incrementer="jobParametersIncrementer">
<batch:validator ref="MainJobParametersValidator" />
<batch:step id="subFlowStep" next="moveProcessedFiles">
<batch:tasklet start-limit="100" >
<batch:chunk reader="ItemReader" processor="ItemProcessor" writer="ItemWriter" commit-interval="1" />
</batch:tasklet>
</batch:step>
<batch:step id="moveProcessedFiles">
<batch:tasklet ref="moveProcessedFilesTasklet" />
</batch:step>
</batch:job>
<!-- インクリメント -->
<bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" />
<!-- Jobパラメータチェック -->
<bean id="MainJobParametersValidator" class="jp.co.batch.item.main.validator.MainJobParametersValidator" />
1行目にある「batch:job」のタグでJobの情報を記載します。「incrementer」ではバッチの実行情報を保持するJobParameterを指定します。15行目にJobParameterのオブジェクトを定義しています。
3行目にバリデーターを設定しています。バッチ時に渡されるパラメータのバリデーションチェックをするオブジェクトを定義します。「ref」はreference(参照)の略で、設定を外出しもの指定するときに使います。ここでは17行目を参照しています。
4~8行目の「batch:step」でSTEPに関する内容を設定します。「id="subFlowStep"」でSTEPの識別名を設定しいます。「next」ではsubFlowStepの後続の処理を設定しています。subFlowStep⇒moveProcessedFilesの順番で処理が実行されるイメージです。
5行目の「batch:tasklet」でタスクレットモデルを定義しています。「start-limit」はSTEPの最大実行回数を指定します。ここではバッチのリスタートなどにより再実行が100まで可能です。
6行目に「batch:chunk」でチャンクモデルを定義しています。「reader」、「processor」、「writer」でそれぞれのオブジェクトを指定しているのが分かると思います。「commit-interval」でコミット間隔を設定しています。このコミットとは単純にトランザクションのコミットではなく「reader->processor->writer」の一連の塊を指しています。
具体的にはItemReaderでデータがコミット数になるまで読み込み、上限に達したらItemProcessorに渡します。次にItemProcessorでコミット数になるまで処理を続け、上限になったら今度は「commit-interval="1"」とすると1件ずつコミットをしてくれます。
<!-- 複数ファイル読込 -->
<bean id="ItemReader" class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step" lazy-init="true">
<property name="resources" value="file:#{jobParameters[filePath]}/imports/#{jobParameters[fileName]}" />
<property name="delegate" ref="fileItemReader" />
</bean>
<!-- CSVファイル読込 -->
<bean id="fileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="linesToSkip" value="1" />
<property name="encoding" value="MS932" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="jp.co.batch.item.main.transform.CsvMainDelimitedLineTokenizer" />
</property>
<property name="fieldSetMapper">
<bean class="jp.co.batch.item.main.mapping.CsvMainFieldSetMapper" />
</property>
</bean>
</property>
</bean>
続いてItemReaderのサンプルを説明します。メインフローの「batch:chunk」のreaderが、上記サンプルの2行目のidと一致しています。
ItemReaderでファイル取込みをする処理を指定しますが、このサンプルではCSVファイルを取り込む想定のものになっています。
3行目にある「property name="resources"」取り込む対象のCSVファイルの置き場所を設定します。正規表現を使って複数ファイルを取込み対象にすることができます。
4行目の「property name="delegate"」では取込処理を実行するItemReaderのオブジェクトを指定します。
<!-- 加工/集計 -->
<bean id="ItemProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
<property name="delegates">
<list>
<ref bean="validationProcessor" />
<ref bean="convertCsvMainModelProcessor" />
</list>
</property>
</bean>
<!-- 入力チェックItemProcessor -->
<bean id="validationProcessor" class="org.springframework.batch.item.validator.ValidatingItemProcessor" scope="step">
<property name="validator">
<bean id="validator" class="jp.co.batch.item.main.validator.CsvMainModelValidationValidator" />
</property>
</bean>
<!-- CSVデータ変換ItemProcessor -->
<bean id="convertCsvMainModelProcessor" class="jp.co.batch.item.main.ConvertCsvMainModelItemProcessor" scope="step">
<property name="techGeneCd" value="#{jobParameters[target]}" />
<property name="userId" value="#{jobParameters[userId]}" />
</bean>
お次はItemProcessorのサンプルになります。
ItemReaderの例と構成は変わらないので重複している部分の説明は省きますが、違いがあるポイントとしては3~8行目の「property name="delegates"」内で「list」タグを使って読み出すオブジェクトを複数指定しています。
<!-- CompositeItemWriter -->
<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<list>
<ref bean="dbItemWriter" />
</list>
</property>
</bean>
<!-- DB出力クラス -->
<bean id="dbItemWriter" class="jp.co.batch.item.main.CsvMainItemWriter" />
最後にItemWriterの例です。
上記の例は取り込んだCSVファイルのデータをデータベースに登録するサンプルですが、仮にファイルに出力する場合は「property name="resources"」で出力先のパスを指定する流れになります。
JobRepositoryのテーブル定義
・BATCH_Job_INSTANCE
Jobに関するすべての情報が格納されています。JobRepositoryの最上層の親データです。
・BATCH_Job_EXECUTION
BATCH_Job_INSTANCEと紐づき、JobExecutionオブジェクトに関する情報が格納されています。Job単位でデータが作成されます。
・BATCH_Job_EXECUTION_SEQ
BATCH_Job_INSTANCEの主キーとなるカラムのシーケンス値が保存されています。
・BATCH_Job_EXECUTION_CONTEXT
Job実行時に渡されたパラメータの情報がJOSN形式で保存されています。
・BATCH_Job_EXECUTION_PARAMS
Jobに対するJobParameterに保持されている情報が格納されています。コマンドラインから渡された情報や、SpringMVC内で「JobParameters jobParameters = jobParametersConverter.getjobParameters(エンティティ);」というように設定される情報だと思う、、、
・BATCH_STEP_EXECUTION
StepExecutionオブジェクトに関連する情報が格納されているテーブルです。STEP単位でデータが作成されます。
・BATCH_STEP_EXECUTION_SEQ
BATCH_Job_INSTANCEの主キーとなるカラムのシーケンス値が保存されています。
・BATCH_STEP_EXECUTION_CONTEXT
STEP実行時に渡されたパラメータの情報がJOSN形式で格納されています。
サブフロー
サブフローとは、例えば「Bの処理はAの処理が完了してから実行する」ように処理の実行順序に制約がある場合に用います。
<!-- メインフロー呼び出し. -->
<batch:job id="main.batch" incrementer="jobParametersIncrementer">
<batch:validator ref="BatchJobParametersValidator" />
<batch:step id="step1" next="subFlow.decider" >
<batch:tasklet start-limit="10">
<batch:chunk reader="targetCenterReader" writer="jobParameterWriter" commit-interval="1" />
</batch:tasklet>
</batch:step>
<!-- サブフロー呼び出し. -->
<batch:decision id="subFlow.decider" decider="subFlowParameters" >
<batch:next on="CONTINUE" to="subFlow.call"/>
<batch:end on="COMPLETED"/>
</batch:decision>
<batch:step id="subFlow.call" next="subFlow.decider">
<batch:job ref="import.csv" job-parameters-extractor="subFlowParameters"/>
</batch:step>
</batch:job>
STEPのスキップ(batch:skippable-exception-classes、batch:listener)
現在勉強中です。。。。( ・`◡・´)
<batch:job id="import.csv" incrementer="jobParametersIncrementer">
<batch:validator ref="importMainCsvJobParametersValidator" />
<batch:step id="subFlowStep" next="moveProcessedFiles">
<batch:tasklet start-limit="100" >
<batch:chunk reader="ItemReader" processor="ItemProcessor" writer="ItemWriter" commit-interval="1" skip-limit="10000">
<batch:skippable-exception-classes>
<batch:include class="org.springframework.batch.item.validator.ValidationException"/>
<batch:include class="jp.co.batch.exception.NotExistException"/>
<batch:include class="jp.co.batch.exception.AlreadyExistsException"/>
<batch:exclude class="java.io.FileNotFoundException"/>
</batch:skippable-exception-classes>
</batch:chunk>
<batch:listeners>
<batch:listener ref="skipListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
<batch:step id="moveProcessedFiles">
<batch:tasklet ref="moveProcessedFilesTasklet" />
</batch:step>
</batch:job>
<!-- SKIPリスナクラス -->
<bean id="skipListener" class="jp.co.batch.item.main.listener.CsvExceptionSkipListener" scope="step">
<property name="encode" value="MS932" />
<property name="lineAggregator">
<bean class="jp.co.batch.item.main.transform.CsvLineAggregator" />
</property>
<property name="headerAggregator">
<bean class="jp.co.batch.item.main.transform.CsvHeaderAggregator" />
</property>
</bean>
5~15行目にスキップ関連の記載があります。5行目の「skip-limit」はスキップ対象になったitem(ItemReaderで取り込まれたデータ)が条件になったした際にエラーになります。上記例では10,000件を超えるとFAILが返ってきます。
6~11行目の「batch:skippable-exception-classes」にスキップ対象の例外(Exception)を定義しています。「batch:include」で定義した例外が発生した場合、処理がスキップし後続の処理が動きます。「batch:exclude」で定義した例外が発生した場合が、スキップ対象から除外させFAILとなります。
スキップ処理中に例外が発生した際、別の処理に渡したい場合は「batch:listener」で指定します。これによりスキップされたitemを受け取ることが可能です。
参考サイト:
https://sites.google.com/site/soracane/home/springnitsuite/spring-batch/1-spring-batchno-kihon
https://terasoluna-batch.github.io/guideline/5.0.0.RELEASE/ja/Ch02_SpringBatchArchitecture.html