Sparkの設定パラメータ一覧 ~本体設定編
-
タグ:
- #Spark
Sparkを最近よく触っていて、設定値をデフォルトで設定して使っていましたが、もう少し使いこなしたいので一度整理するために一覧化してまとめました。
Sparkの設定値メモ
アプリケーションプロパティ
※赤字がパフォーマンスに関連しそうなプロパティ
プロパティ名 | 内容 | デフォルト |
spark.app.name | アプリケーションの名前。これはUIの中やログデータの中に現れるでしょう。 | (none) |
spark.driver.cores | ドライバープロセスのために使われるコアの数。クラスターモードのみ。 | 1 |
spark.driver.maxResultSize | 各Sparkアクション(例えば、collect)のための全てのパーティションの直列化された結果の総サイズの制限。少なくとも1Mでなければなりません。0は無制限です。総サイズがこの制限を越えるとジョブは中止されるでしょう。制限を高くsるうとドライバでout-of-memoryエラーを起こすかもしれません(spark.driver.memory およびJVMでのオブジェクトのオーバーヘッドによります)。適切な制限を設定することで、out-of-memoryエラーからドライバを守ることができます。 | 1g |
spark.driver.memory / --driver-memory | ドライバープロセスのために使われるメモリの量。例えば SparkContextが初期化される場所。(e.g. 1g, 2g). | 1g |
注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-memory コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。 | ||
spark.executor.memory / --executor-memory | executorプロセスあたりに使用するメモリ量(例えば2g, 8g)。 | 1g |
spark.extraListeners | SparkListenerを実装するクラスのカンマ区切りのリスト; SparkContextを初期化する場合に、これらのクラスが生成されSparkのリスナーバスに登録されるでしょう。SparkConfを受け付ける引数が一つのコンストラクタをクラスが持つ場合は、そのコンストラクタが呼ばれるでしょう; そうでなければ、引数を持たないコンストラクタが呼ばれるでしょう。有効なコンストラクタが見つからない場合は、SparkContextの生成は例外で失敗するでしょう。 | (none) |
spark.local.dir | Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。注意: Spark 1.0 以降では、これはクラスタマネージャーによって、SPARK_LOCAL_DIRS (スタンドアローン, Mesos) あるいは LOCAL_DIRS (YARN) 環境変数で上書きされます。 | /tmp |
spark.logConf | SparkContextが開始された時に、INFOとして有効なSparkConfを記録します。 | FALSE |
spark.master | 接続するためのクラスタマネージャー 許可されたマスターURLのリストも見てください。 | (none) |
ランタイム環境
プロパティ名 | 意味 | デフォルト |
spark.driver.extraClassPath |
ドライバーのクラスパスの先頭に追加する特別なクラスパスの登録。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-class-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
|
(none) |
spark.driver.extraJavaOptions |
ドライバに渡す特別なJVMオプションの文字列。例えば、GC設定あるいは他のログ。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
|
(none) |
spark.driver.extraLibraryPath | ドライバーJVMを起動する場合は使用する特別なライブラリパスを設定してください。 | (none) |
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-library-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。 | ||
spark.driver.userClassPathFirst | (実験的なもの) ドライバーでクラスをロードするときに、Sparkの自身のjarを超えてユーザ追加のjarを優先するかどうか。この機能はSparkの依存とユーザの依存間の衝突を和らげるために使うことができます。それは現在のところ実験的な機能です。これはクラスターモードのみで使用されます。 | FALSE |
spark.executor.extraClassPath | executorのクラスパスの先頭に追加する特別なクラスパス。これは主として古いバージョンのSparkとの後方互換性のために存在しています。ユーザは一般的にこのオプションを設定する必要はありません。 | (none) |
spark.executor.extraJavaOptions | executorに渡すための特別なJVMオプションの文字列例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいはヒープサイズの設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。ヒープサイズの瀬て地はspark.executor.memoryを使って設定することができます。 | (none) |
spark.executor.extraLibraryPath | executor JVMを起動する場合に使う特別なライブラリパスを設定する。 | (none) |
spark.executor.logs.rolling.maxRetainedFiles | システムによって保持されるだろう最新のローリングログファイルの数を設定する。古いログファイルは削除されるでしょう。デフォルトは無効です。 | (none) |
spark.executor.logs.rolling.maxSize | executorのログがロールオーバーされる最大のファイルサイズを設定します。デフォルトではローリングは無効です。古いログの自動クリーニングに関しては spark.executor.logs.rolling.maxRetainedFiles を見てください。 | (none) |
spark.executor.logs.rolling.strategy | executorログのローリングの計画を設定します。デフォルトでは無効です。"time" (時間ベースのローリング)あるいは"size" (サイズベースのローリング)に設定することができます。"time"に関しては、ローリングの間隔を設定するためにspark.executor.logs.rolling.time.interval を使用します。"size"に関しては、ローリングの最大サイズを設定するためにspark.executor.logs.rolling.size.maxBytes を使用します。 | (none) |
spark.executor.logs.rolling.time.interval | executorログがロールオーバーされる時間の間隔を設定します。デフォルトではローリングは無効です。有効な値はdaily, hourly, minutely あるいは秒単位の任意の間隔です。古いログの自動クリーングに関してはspark.executor.logs.rolling.maxRetainedFilesを見てください。 | daily |
spark.executor.userClassPathFirst | (実験的なもの) spark.driver.userClassPathFirstと同じ機能ですが、executorインスタンスに適用されます。 | FALSE |
spark.executorEnv.[EnvironmentVariableName] | EnvironmentVariableNameによって指定される環境変数をexecutorプロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。 | (none) |
spark.python.profile | Pythonワーカーでのプロファイリングを有効にする。プロファイルの結果はsc.show_profiles()によって表示されるか、ドライバーが終了する前に表示されるでしょう。sc.dump_profiles(path)によってディスクにダンプすることもできます。いくつかのプロファイルの結果が手動で表示された場合は、ドライバーが終了する前に自動的に表示されないでしょう。デフォルトでは、pyspark.profiler.BasicProfiler が使われますが、これは SparkContext コンストラクタへのパラメータとしてプロファイルクラスに渡すことで上書きすることができます。 | FALSE |
spark.python.profile.dump | ドライバーが終了する前にプロファイルの結果を出力するために使われるディレクトリ。結果は各RDDごとに分割されたファイルとしてダンプされるでしょう。それらはptats.Stats()によってロードすることができます。指定された場合は、プロファイルの結果は自動的に表示されないでしょう。 | (none) |
spark.python.worker.memory | 集約の間にpythonワーカープロセスごとに使用されるメモリの量。JVMメモリ文字列と同じフォーマット(例えば512m, 2g)。集約の間に使用されるメモリ量がこの量を超える場合は、データがディスクに流し込まれます。 | 512m |
spark.python.worker.reuse | Pythonワーカーを再利用するかしないか。yesであれば、固定数のPythonワーカーが使用されます。各タスクでPythonプロセスをfork()する必要はありません。大きくブロードキャストする場合はとても便利です。ブロードキャストは各タスクごとにJVMからPythonワーカーに転送する必要はないでしょう。 | TRUE |
シャッフルの挙動
プロパティ名 | 意味 | デフォルト |
spark.reducer.maxSizeInFlight | 各reduceタスクから同時に取り出すmap出力の最大サイズ。各出力は受け取るのにバッファを生成するため、これはreduceタスクごとの固定のメモリのオーバーヘッドを表します。ですので、メモリが多く無い場合は小さくしてください。 | 48m |
spark.shuffle.compress | map出力ファイルを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。 | TRUE |
spark.shuffle.file.buffer | 各シャッフルファイル出力ストリームのためのインメモリバッファのサイズ。これらのバッファはディスクのシークの数を減らし、中間シャッフルファイルの生成時のシステムコールを減らします。 | 32k |
spark.shuffle.io.maxRetries | (Nettyのみ) 0以外の値に設定された場合は、IOに関係する例外により失敗したフェッチは自動的に再試行されます。この再試行ロジックは、長いGCの停止あるいは一時的なネットワーク接続問題に直面した場合に、シャッフルを安定するのに役立ちます。 | 3 |
spark.shuffle.io.numConnectionsPerPeer | (Nettyのみ) ホスト間の接続は大きなクラスタのために接続の準備を減らすために再利用されます。多くのハードディスクと少ないホストのクラスタに関しては、これは不十分な並行制御が全てのディスクを一杯にする結果になります。そのためユーザはこの値を増やそうと思うかも知れません。 | 1 |
spark.shuffle.io.preferDirectBufs | (Nettyのみ) オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをNettyからオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。 | TRUE |
spark.shuffle.io.retryWait | (Nettyのみ) フェチの再試行間でどれだけ待つか。maxRetries * retryWaitとして計算される、再試行によって起きる遅延の最大はデフォルトで15秒です。 | 5s |
spark.shuffle.manager | データをシャッフルするために使用する実装。利用可能な2つの実装があります: sort および hash。ソートベースのシャッフルはメモリ効率がより良く、1.2からはデフォルトのオプションです。 | sort |
spark.shuffle.service.enabled | 外部のシャッフルサービスを有効にします。このサービスはexecutorが安全に削除されるようにexecutorによって書き込まれるシャッフルファイルを保持します。もしspark.dynamicAllocation.enabled が"true"であれば、これは有効でなければなりません。外部シャッフルサービスはそれを有効にするためにセットアップされていなければなりません。詳細はdynamic allocation configuration and setup documentation を見てください。 | FALSE |
spark.shuffle.service.port | 外部のシャッフルサービスが実行されるだろうポート。 | 7337 |
spark.shuffle.sort.bypassMergeThreshold | (上級) ソートベースのシャッフルマネージャーの中で、もしマップ側の集約が無く、最大これだけの削減パーティションがある場合は、データのmerge-sortingを避けます。 | 200 |
spark.shuffle.spill.compress | シャッフルの間に流し込まれるデータを圧縮するかどうか。圧縮はspark.io.compression.codecを使うでしょう。 | TRUE |
Spark UI
プロパティ名 | 意味 | デフォルト |
spark.eventLog.compress | もしspark.eventLog.enabled がtrueであれば、ログイベントを圧縮するかどうか。 | FALSE |
spark.eventLog.dir | spark.eventLog.enabled がtrueであれば、Sparkイベントが記録されるベースディレクトリ。このベースディレクトリ内でSparkは各アプリケーションごとにサブディレクトリを作成し、このディレクトリ内にアプリケーションに固有のイベントを記録します。ユーザは、履歴ファイルが履歴サーバによって読み込まれることができるように、HDFSディレクトリのような単一の場所に設定したがるかも知れません。 | file:///tmp/spark-events |
spark.eventLog.enabled | アプリ家k-ションが終了した後でWebUIを再構築するのに便利なSparkイベントを記録するかどうか。 | FALSE |
spark.ui.killEnabled | web uiからステージおよび対応するジョブをkillすることを許可する。 | TRUE |
spark.ui.port | 芽折および作業データを表示する、アプリケーションのダッシュボードのポート。 | 4040 |
spark.ui.retainedJobs | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのジョブを記憶するか。 | 1000 |
spark.ui.retainedStages | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのステージを記憶するか。 | 1000 |
spark.worker.ui.retainedExecutors | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutorを記憶するか。 | 1000 |
spark.worker.ui.retainedDrivers | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したドライバーを記憶するか。 | 1000 |
spark.sql.ui.retainedExecutions | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutionを記憶するか。 | 1000 |
spark.streaming.ui.retainedBatches | ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したバッチを記憶するか。 | 1000 |
圧縮および直列化
プロパティ名 | 意味 | デフォルト |
spark.broadcast.compress | ブロードキャスト変数を送信する前に圧縮するかどうか。一般的に良い考えです。 | TRUE |
spark.closure.serializer | クロージャーが使用するシリアライザークラス。現在のところJavaのシリアライザーのみがサポートされています。 | org.apache.spark.serializer. |
JavaSerializer | ||
spark.io.compression.codec | RDDパーティション、ブロードキャスト変数およびシャッフル出力のような内部データを圧縮するために使われるコーディック。デフォルトでは、Sparkは3つのコーディックを提供します: lz4, lzfおよびsnappy. コーディックを指定するために、完全修飾クラス名を使用することもできます。例えば、org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodecおよび org.apache.spark.io.SnappyCompressionCodec. | snappy |
spark.io.compression.lz4.blockSize | LZ4圧縮コーディックが使用される場合、LZ4圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、LZ4が使われる場合のシャッフルメモリの量も小さくなるでしょう。 | 32k |
spark.io.compression.snappy.blockSize | Snappy圧縮コーディックが使用される場合、Snappy圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、Snappyが使われる場合のシャッフルメモリの量も小さくなるでしょう。 | 32k |
spark.kryo.classesToRegister | Kryo シリアライゼイションを使う場合は、Kryoに登録するためにカンマ区切りのカスタムクラス名を渡してください。詳細はチューニングガイド を見てください。 | (none) |
spark.kryo.referenceTracking | Kryoを使ってデータをシリアライズした場合に同じオブジェクトへの参照を追跡するかどうか。オブジェクトグラフがループを持っている場合は必要で、同じオブジェクトの複数のコピーが含まれている場合は効率のために有用です。そうでないと分かっている場合は、パフォーマンスの改善のために無効にすることができます。 | true (Spark SQL Thrift サーバを使う場合は false) |
spark.kryo.registrationRequired | Kryoを使って登録を要求するかどうか。'true'に設定すると、Kryoは登録されていないクラスがシリアライズされると例外を投げます。false(デフォルト)に設定されると、Kryoは各オブジェクトに加えて登録されていないクラス名を書き込むでしょう。クラス名の書き込みは大きなパフォーマンスのオーバーヘッドになりえます。つまり、このオプションを有効にすることはユーザがクラスの登録を省略しないことを厳密に強制することができます。 | FALSE |
spark.kryo.registrator | Kryoシリアライズを使う場合、カスタムクラスをKryoに登録するためにこのクラスを設定してください。もしクラスを独自の方法で登録する必要がある場合は、このプロパティが有用です。例えば、独自のフィールドのシリアライザ。そうでなければ、spark.kryo.classesToRegister がもっと簡単です。KryoRegistratorを拡張したクラスにそれを設定しなければなりません。詳細は チューニングガイド を見てください。 | (none) |
spark.kryoserializer.buffer.max | kryoのシリアライズバッファの最大許容サイズ。これはシリアライズしようと思っているどのオブジェクトよりも大きくなければなりません。Kryo内で"buffer limit exceeded" 例外があった場合はこれを増やしてください。 | 64m |
spark.kryoserializer.buffer | Kryoのシリアライズバッファの初期サイズ。各ワーカー上で コアごとに1つのバッファがあることに注意してください。このバッファは必要に応じてspark.kryoserializer.buffer.max まで増加するでしょう。 | 64k |
spark.rdd.compress | シリアライズされたパーティションを圧縮するかどうか(例えば、StorageLevel.MEMORY_ONLY_SER)。ちょっとしたCPU時間の追加で相当なスペースを節約することができます。 | FALSE |
spark.serializer | ネットワークを越えて送信するか、シリアライズされた形でキャッシュされる必要があるシリアライズオブジェクトのために使うクラス。デフォルトのJava シリアライズ はどのようなシリアライズ可能なJavaオブジェクトについても動作しますが、とても遅いです。そのため、スピードが必要な場合は org.apache.spark.serializer.KryoSerializer の使用およびKryoシリアライズの設定 をお勧めします。 org.apache.spark.Serializerのどのようなサブクラスにもなることができます。 |
org.apache.spark.serializer.
JavaSerializer (org.apache.spark.serializer.
Spark SQL Thriftサーバを使う場合のKryoSerializer)
|
spark.serializer.objectStreamReset | org.apache.spark.serializer.JavaSerializerを使ってシリアライズする場合は、シリアライザーが冗長なデータの書き込みを避けるためにオブジェクトをキャッシュしますが、それらのオブジェクトのガベージコレクションを停止します。'reset'を呼ぶことで、シリアライザからその情報をフラッシュし、古いオブジェクトを収集されるようにします。この定期的なリセットをオフにするには、-1を設定します。デフォルトでは、100オブジェクトごとにシリアライザをリセットします。 | 100 |
メモリ管理
プロパティ名 | 意味 | デフォルト |
spark.memory.fraction | 実行とストレージのために (heap space - 300MB) の一部分が使われます。これを小さくすると、零れ落ちる頻度が高くなりキャッシュデータの追い出しが起こります。The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. これはデフォルトの値にしておくことをお勧めします。詳細は this descriptionを見てください。 | 0.75 |
spark.memory.storageFraction | 立ち退きに耐性のあるストレージメモリの量。spark.memory.fractionによって取り分けられる領域のサイズの割合として表現されます。これを高くすると、実行のために利用可能な作業メモリが少なくなり、タスクがディスクにもっと頻繁に零れ落ちるかも知れません。これはデフォルトの値にしておくことをお勧めします。詳細はこの説明を見てください。 | 0.5 |
spark.memory.offHeap.enabled | trueにすると、特定の操作のためにオフヒープメモリを使おうとするでしょう。もしオフヒープメモリの利用が可能であれば、spark.memory.offHeap.size は有効でなければなりません。 | TRUE |
spark.memory.offHeap.size | オフヒープ割り当てのために使うことができる絶対メモリ量。この設定はヒープメモリの利用率に影響が無いため、もしexecutorの総消費メモリが何らかのハードリミットに合わせる必要がある場合はJVMヒープサイズをそれに応じて減らすようにしてください。spark.memory.offHeap.enabled=trueの場合は、これは正の値に設定されなければなりません。 | 0 |
spark.memory.useLegacyMode |
Spark 1.5以前で使われていた古いメモリ管理モードを有効にするかどうか。以前のモードではヒープ領域を頑なに固定サイズの領域に分割します。アプリケーションがチューニングされていない場合は過度にこぼれることになります。これが有効にされない場合、以下の非推奨のメモリー断片設定は読み込まれません: spark.shuffle.memoryFraction
spark.storage.memoryFraction
spark.storage.unrollFraction
|
FALSE |
spark.shuffle.memoryFraction | (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。シャッフルの間に集約および集団を作るために使われるJavaヒープの断片。いつでも、シャッフルに使われる全てのインメモリのマップの全体のサイズはこの制限によって束縛されます。これを超えた内容なディスクにこぼれ始めるでしょう。流し込みが頻繁に起きる場合は、spark.storage.memoryFractionを犠牲にしてこの値を増やすことを考えます。 | 0.2 |
spark.storage.memoryFraction | (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。Sparkのメモリキャッシュによって使用されるJava ヒープの断片。これはJVM内のオブジェクトの古い世代より大きくてはいけません。デフォルトではヒープの0.6ですが、古い世代のサイズを独自に設定した場合はそれを増やすことができます。 | 0.6 |
spark.storage.unrollFraction | (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。メモリ内の展開されないブロックが使う spark.storage.memoryFraction の断片。新しいブロックを完全に展開するために十分なストレージスペースが無い場合に、既存のブロックを削除することで動的に割り当てられます。 | 0.2 |
Executionの挙動
プロパティ名 | 意味 | デフォルト |
spark.broadcast.blockSize | TorrentBroadcastFactoryのためのブロックの各部分のサイズ。あまりに大きい値はブロードキャスト中の並行度が下がります(遅くなります); しかし、あまりに小さいとBlockManagerはパフォーマンスの打撃を受けるかも知れません。 | 4m |
spark.broadcast.factory | 使用するブロードキャストの実装。 | org.apache.spark.broadcast. |
TorrentBroadcastFactory | ||
spark.cleaner.ttl | どれくらい長くSparkがメタデータを覚えているかの期間(秒数) (ステージが生成されてから、タスクが生成されてからなど)。定期的な掃除により、持続時間より古いメタデータは忘れ去られるでしょう。これは長い時間/日 Sparkを実行するのに有用です (例えば、Spark ストリーミング アプリケーションの場合は 24/7 実行します)。この期間より長くメモリに存在するRDDも掃除されるだろうことに注意してください。 | (infinite) |
spark.executor.cores | 各executor上で使用されるコアの数。YARNおよびスタンドアローンモードのみ。スタンドアローンモードの場合は、ワーカー上で十分なコアがあると仮定すると、このパラメータを設定することでアプリケーションが同じワーカー上で複数のexecutorを実行することができます。そうでなければ、アプリケーションごとに1つのexecutorが各ワーカー上で実行されるでしょう。 | YARNモードの場合は1、スタンドアローンモードの場合はワーカー上の全ての利用可能なコア。 |
spark.default.parallelism | ユーザによって設定されなかった場合は、 join, reduceByKey および parallelize のような変換によって返されるRDD内のデフォルトの数。 |
reduceByKey および joinのような分散シャッフル操作については、親RDDの中の最も大きな数のパーティションです。親RDDが無い parallelizeのような操作については、クラスタマネージャーに依存します:
ローカルモード: ローカルマシーン上のコアの数
Mesos fine grained mode: 8
その他: 全てのexecutorノードのコアの総数あるいは2のどちらか大きいほう
|
spark.executor.heartbeatInterval | ドライバへの各executorのハートビートの間隔。ハートビートはドライバにexecutorがまだ生きていて実行中のタスクのためのマトリックスによってそれを更新することを知らせます。 | 10s |
spark.files.fetchTimeout | ドライバからSparkContext.addFile()を使って追加されたファイルを取り出す時に使う通信のタイムアウト。 | 60s |
spark.files.useFetchCache | true(デフォルト)に設定した場合は、ファイルの取り出しは同じアプリケーションに所属するexecutorによって共有されるローカルキャッシュを使うでしょう。これは同じホスト上で多くのexecutorを実行する場合にタスクの起動パフォーマンスを改善することができます。falseに設定すると、これらのキャッシュの最適化は無効にされ、全てのexecutorはファイルのそれらの固有のコピーを取り出すでしょう。この最適化はNFSファイルシステム上にあるSparkローカルディレクトリを使用するために無効にされるかも知れません (詳細はSPARK-6313 を見てください)。 | TRUE |
spark.files.overwrite | ターゲットのファイルが存在し、その内容が元のものとは一致しない場合に、SparkContext.addFile()を使って追加されたファイルを上書きするかどうか。 | FALSE |
spark.hadoop.cloneConf | trueに設定された場合、各タスクについて新しいHadoop設定 オブジェクトをクローンします。このオプションは設定スレッドセーフ問題を回避するために有効にすべきです (詳細はSPARK-2546 を見てください)。これらのもなぢによって影響を受けないジョブについて予期せぬパフォーマンスの低下を避けるために、デフォルトでは無効です。 | FALSE |
spark.hadoop.validateOutputSpecs | trueの場合は、saveAsHadoopFileおよび他の変数で使われる出力の仕様(例えば、出力ディレクトリが既に存在しているかを調べる)を検証します。これは既に存在する出力ディレクトリが原因の例外を沈黙させるために無効にされるかも知れません。以前のSparkのバージョンと互換性を持たせたい場合を除いて、ユーザはこれを向こうにしないことをお勧めします。単純に、手動で出力ディレクトリを削除するためにHadoopのFileSystem APIを使用します。チェックポイントリカバリの間、既存の出力ディレクトリにデータが上書きされる必要がある知れないので、Spark ストリーミングのStreamingContextによって生成されるジョブについては、この設定は無視されます。 | TRUE |
spark.storage.memoryMapThreshold | ディスクからブロックを読み込む場合のSparkメモリーマップの上のブロックサイズ。これによりSparkのメモリーマッピングがとても小さいブロックになることを防ぎます。オペレーティングシステムのページサイズに近いか少ないブロックの場合に、一般的にメモリーマッピングは高負荷になります。 | 2m |
spark.externalBlockStore.blockManager | RDDを格納する外部ブロックマネージャー(ファイルシステム)の実装。ファイルシステムURLは spark.externalBlockStore.urlによって設定されます。 | org.apache.spark.storage.TachyonBlockManager |
spark.externalBlockStore.baseDir | RDDを格納する外部ブロックストアのディレクトリ。ファイルシステムのURLは spark.externalBlockStore.url によって設定されます。Tachyonファイルシステム上の複数のディレクトリのカンマ区切りのリストもありえます。 | System.getProperty("java.io.tmpdir") |
spark.externalBlockStore.url | 外部ブロックストア内の外部ブロッカーファイルシステムの下のURL。 | tachyon://localhost:19998 for Tachyon |
ネットワーク
プロパティ名 | 意味 | デフォルト |
spark.akka.frameSize | "control plane"通信内で許される最大のメッセージ(MB); 一般的にexecutorおよびドライバー間で送信されるmapの出力サイズの情報にのみ適用されます。何千ものmapおよびreduceタスクを使うジョブを実行している場合はこれを増やしてください。フレームサイズに関するメッセージが表示されるでしょう。 | 128 |
spark.akka.heartbeat.interval | Akkaの組み込みで入る転送障害detectorを無効にするために大きな値に設定されます。この機能を使うつもりであれば、再び有効にすることができます (お勧めできません)。大きな間隔値はネットワークのオーバーヘッドを減らし、小さな値(~1s)はAkkaの障害検知のためにより参考になるかも知れません。必要であれば、これをspark.akka.heartbeat.pausesと組み合わせて調整してください。障害検知の使用の肯定的な使用例になりそうなのは: 感度が良い障害検知はいたずら好きなexecutorを素早く取り除くのに役立つかもしれません。しかし、実際のSparkクラスタではGCの休止とネットワークの遅延が起こりえるため、通常ではそうではありません。それは置いておくとしても、これを有効にするとノード間で大量のハートビートの交換がおこり、ネットワークがそれらで溢れることになります。 | 1000s |
spark.akka.heartbeat.pauses | Akkaの組み込みで入る転送障害detectorを無効にするために大きな値に設定されます。この機能を使うつもりであれば、再び有効にすることができます (お勧めできません)。Akkaのための許容できるハートビートの休止。これはGCの休止の感度を制御するために使うことができます。必要であれば、これをspark.akka.heartbeat.intervalと組み合わせて調整してください。 | 6000s |
spark.akka.threads | 通信のために使用するactorスレッドの数。ドライバが大量のCPUコアを持つ場合に、大きなクラスタでは増やした方が有効かも知れません。 | 4 |
spark.akka.timeout | Sparkノード間での通信のタイムアウト。 | 100s |
spark.blockManager.port | 全てのブロックマネージャーがlistenするポート。ドライバおよびexecutorの両方にあります。 | (random) |
spark.broadcast.port | ドライバーのHTTPブロードキャストサーバがlistenするポート。これはtorrentのブロードキャストとは関係ありません。 | (random) |
spark.driver.host | ドライバがlistenするホスト名あるいはIPアドレス。executorとスタンドアローンマスターが通信するために使われます。 | (local hostname) |
spark.driver.port | ドライバーがlistenするポート。executorとスタンドアローンマスターが通信するために使われます。 | (random) |
spark.executor.port | executorがlistenするポートドライバと通信するために使われます。 | (random) |
spark.fileserver.port | ドライバのHTTPファイルサーバがlistenするポート。 | (random) |
spark.network.timeout | 全てのネットワークの相互交流のタイムアウト。spark.core.connection.ack.wait.timeout, spark.akka.timeout, spark.storage.blockManagerSlaveTimeoutMs, spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout あるいは spark.rpc.lookupTimeout が設定されていない場合は、その代わりにこの設定が使われるでしょう。 | 120s |
spark.port.maxRetries | ポートへバインドする時に、再試行を諦める最大数。ポートに特定の値(非0)が指定された場合、続く試行では再試行する前に以前の試行で使われたポートに1を加えたものになります。これは本質的に指定された開始ポートから、ポート + maxRetries までのポートの範囲を試すことになります。 | 16 |
spark.replClassServer.port | ドライバーのHTTPクラスサーバがlistenするポート。これはSparkシェルにのみ関係があります。 | (random) |
spark.rpc.numRetries | RPCタスクが諦めるまでの再試行の数。この数の最大数までRPCタスクが実行されるでしょう。 | 3 |
spark.rpc.retry.wait | RPCのask操作が再試行するまで待つ期間。 | 3s |
spark.rpc.askTimeout | RPCのask操作がタイムアウトするまで待つ期間。 | 120s |
spark.rpc.lookupTimeout | RPCリモートエンドポイントがタイムアウトするまで待つ時間。 | 120s |
スケジューリング
プロパティ名 | 意味 | デフォルト |
spark.cores.max | <c0>スタンドアローン配備クラスタあるいは "coarse-grained"共有モードのMesos クラスターで実行している場合、アプリケーションが(各マシーンからではなく)クラスターから要求するCPUコアの最大総数。設定されていない場合は、デフォルトはSparkのスタンドアローンクラスタマネージャーでspark.deploy.defaultCores、Mesos上では無制限(利用可能な全てのコア)になります。 | (not set) |
spark.locality.wait | データーローカルタスクを諦めローカルではないノード上でそれが起動するまで待つ時間。同じ待ちが複数のローカルレベルを経由するたびに使われるでしょう(process-local, node-local, rack-local およびそれら全て)。spark.locality.wait.nodeなどを設定することで、各レベルで待ち時間をカスタマイズすることもできます。タスクに時間が掛かりほとんどローカルを見ない場合はこの設定を増やすべきですが、通常デフォルトでよく動作します。 | 3s |
spark.locality.wait.node | ノード局地性を待つための局地性をカスタマイズします。例えば、これを0にしてノード局地性をスキップし、すぐにrack局地性を探すことができます(もしクラスタがrack情報を持っている場合)。 | spark.locality.wait |
spark.locality.wait.process | プロセス局地性を待つための局地性をカスタマイズします。特定のexecutorプロセスにあるキャッシュされたデータにアクセスしようとするタスクに影響があります。 | spark.locality.wait |
spark.locality.wait.rack | rack局地性を待つための局地性をカスタマイズします。 | spark.locality.wait |
spark.scheduler.maxRegisteredResourcesWaitingTime | スケジュールが始まる前にリソースが登録するための最大待ち時間。 | 30s |
spark.scheduler.minRegisteredResourcesRatio | スケジューリングが始まる前に待つ、登録されたリソースの最小割合(登録されたリソース/期待される総リソース) (リリースはyarnモードではexecutor、スタンドアローンモードおよびMesos coarsed-grainedモード ['spark.cores.max' 値は Mesos coarse-grained モードのための総期待リソース] ではCPUコアです)。0.0と1.0の間のdoubleとして指定されます。リソースの最小の割合に達したかどうかに関係なく、スケジューリングが始まる前に待つ最大の時間は spark.scheduler.maxRegisteredResourcesWaitingTimeによって設定されます。 | YARNモードでは 0.8; スタンドアローンモードおよびMesos coarse-grained モードでは 0.0。 |
spark.scheduler.mode | 同じSparkContextにサブミットされたジョブ間のscheduling mode。次々にジョブをキューする代わりに、一様に共有するために使う FAIR を設定することができます。複数ユーザのサービスに有用です。 | FIFO |
spark.scheduler.revive.interval | スケジューラがワーカープロセスに多数をさせるために提供する間隔の長さ。 | 1s |
spark.speculation | "true"に設定すると、タスクの投機的な実行を行います。1つ以上のタスクがステージで遅く実行している場合、再起動されるだろうことを意味します。 | FALSE |
spark.speculation.interval | どれだけの頻度でSparkが投機するためにタスクをチェックするか。 | 100ms |
spark.speculation.multiplier | 平均より遅いタスクが何回投機と見なされるか。 | 1.5 |
spark.speculation.quantile | 指定のステージで投機が有効になる前にタスクのパーセンテージが終了していなければならないか。 | 0.75 |
spark.task.cpus | 各タスクごとに割り当てるコアの数。 | 1 |
spark.task.maxFailures | ジョブを諦める前の各タスクの失敗の数。1以上でなければなりません。許可された再試行の数 = この値 - 1. | 4 |
動的割り当て
プロパティ名 | 意味 | デフォルト |
spark.dynamicAllocation.enabled |
動的リソース割り当てを設定するかどうか。これはこのアプリケーションに登録されているexecutorの数を負荷に応じて上下させます。現在のところYARNノードでのみ利用可能なことに注意してください。詳細は ここの説明を見てください。
これには spark.shuffle.service.enabled が設定されることが必要です。以下の設定も関係あります: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors および spark.dynamicAllocation.initialExecutors
|
FALSE |
spark.dynamicAllocation.executorIdleTimeout | 動的な割り当てが有効でexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 | 60s |
spark.dynamicAllocation.cachedExecutorIdleTimeout | 動的な割り当てが有効でキャッシュされたデータブロックを持つexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 | infinity |
spark.dynamicAllocation.initialExecutors | 動的割り当てが有効な場合、実行するexecutorの初期の数。 | spark.dynamicAllocation.minExecutors |
spark.dynamicAllocation.maxExecutors | 動的割り当てが有効な場合のexecutorの数の上限。 | infinity |
spark.dynamicAllocation.minExecutors | 動的割り当てが有効な場合のexecutorの数の下限。 | 0 |
spark.dynamicAllocation.schedulerBacklogTimeout | もし動的割り当てが有効で、この期間内より多くの残されているタスクがある場合は、新しいexecutorがリクエストされるでしょう。詳細は ここの説明を見てください。 | 1s |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout | spark.dynamicAllocation.schedulerBacklogTimeoutと同じですが、後に続くexecutorのリクエストのみに使用されます。詳細は <a0>ここ</a0>の説明を見てください。 | schedulerBacklogTimeout |
セキュリティ
プロパティ名 | 意味 | デフォルト |
spark.acls.enable | Spackのaclが有効にされるべきかどうか。友好な場合は、これはユーザがジョブを表示あるいは修正するアクセス権限をもつかどうかを調べます。これはユーザが分かっていなければならず、もしユーザがnullとして渡された場合はなにも調べられないことに注意してください。ユーザを認証および設定するために、UIを使ってフィルターを使うことができます。 | FALSE |
spark.admin.acls | 全てのSparkジョブを表示および修正アクセスを持つユーザ/管理者のカンマ区切りのリスト。もし共有クラスタ上で実行し、動作する時にデバッグを手伝ってくれる一連の管理者あるいは開発者がりう場合は、これが使われるかも知れません。リストに "*"を入れることは、全てのユーザが管理権限を持つことを意味します。 | Empty |
spark.authenticate | Sparkが内部接続を認証するかどうか。YARN上で実行しない場合はspark.authenticate.secretを見てください。 | FALSE |
spark.authenticate.secret | Sparkがコンポーネント間で認証するために使われる秘密キーを設定します。YARN上で実行しておらず、認証が有効な場合は、これが設定される必要があります。 | None |
spark.authenticate.enableSaslEncryption | 認証が有効な場合に暗号化通信を有効にする。このオプションは現在のところブロック転送サービスでのみサポートされています。 | FALSE |
spark.network.sasl.serverAlwaysEncrypt | SASL認証をサポートするサービスに対して非暗号化通信を無効にする。これは現在のところ外部シャッフルサービスでのみサポートされています。 | FALSE |
spark.core.connection.ack.wait.timeout | 接続がタイムアウトして諦めるまでにどれだけ長くackを待つか。GCのような長い休止で不本意なタイムアウトを避けるために、大きな値を設定することができます。 | 60s |
spark.core.connection.auth.wait.timeout | 接続がタイムアウトして諦めるまでにどれだけ長く認証を待つか。 | 30s |
spark.modify.acls | Sparkジョブへの修正アクセスを持つユーザのカンマ区切りのリスト。デフォルトでは、Sparkジョブを開始したユーザがそれの修正(例えばkill)アクセスを持ちます。リストに "*"を入れることは、全てのユーザが修正する権限を持つことを意味します。 | Empty |
spark.ui.filters |
Spark web UIへ適用するフィルタークラス名のカンマ区切りのリスト。フィルターは標準の javax servlet Filterでなければなりません。各フィルターへのパラメータは以下のようなjavaシステムプロパティの設定により指定することもできます:
spark.<フィルター名>.params='param1=value1,param2=value2'
例えば:
-Dspark.ui.filters=com.test.filter1
-Dspark.com.test.filter1.params='param1=foo,param2=testing'
|
None |
spark.ui.view.acls | Spark web uiへの表示アクセスを持つユーザのカンマ区切りのリスト。デフォルトでは、Sparkジョブを開始したユーザのみが表示アクセスを持ちます。リストに "*"を入れることは、全てのユーザがこのSparkジョブを見る権限を持つことを意味します。 | Empty |
暗号化
プロパティ名 | 意味 | デフォルト |
spark.ssl.enabled |
全てのサポートされたプロトコル上でSSL接続を有効にするかどうか。
spark.ssl.xxxのような全てのSSL設定、xxx は特定の設定プロパティ、は、全てのサポートされたプロトコルのためのグローバル設定を意味します。特定のプロトコルのためにグローバル設定を上書きするためには、そのプロパティがプロトコル固有の名前空間で上書きされなければなりません。
YYYで表される特定のプロトコルのためのグローバル設定を上書くためにspark.ssl.YYY.XXX 設定を使ってください。現在のところ、YYY はAkkaベース接続のための akka あるいはブロードキャストおよびファイルサーバのための fs のどちらかがありえます。
|
FALSE |
spark.ssl.enabledAlgorithms | cipherのカンマ区切りのリスト。指定されたcipherはJVMによってサポートされていなければなりません。The reference list of protocols one can find on this page. | Empty |
spark.ssl.keyPassword | キーストア内のプライベートキーのパスワード。 | None |
spark.ssl.keyStore | キーストアファイルへのパス。パスはコンポーネントが開始されたディレクトリへの絶対あるいは相対パスです。 | None |
spark.ssl.keyStorePassword | キーストアへのパスワード。 | None |
spark.ssl.protocol | プロトコル名。プロトコルはJVMによってサポートされていなければなりません。The reference list of protocols one can find on this page. | None |
spark.ssl.trustStore | trust-stoerファイルへのパス。パスはコンポーネントが開始されたディレクトリへの絶対あるいは相対パスです。 | None |
spark.ssl.trustStorePassword | trust-store のパスワード。 | None |
Spark ストリーミング
プロパティ名 | 意味 | デフォルト |
spark.streaming.backpressure.enabled | Sparkストリーミングの内部的なバックプレッシャー機構を有効または無効にします(1.5から)。これにより、Sparkストリーミングは現在のバッチのスケジュールされた遅延および処理時間に基づいた受信のレートの制御を行い、従ってシステムはシステムが処理できる分だけの速度で受信します。内部的には、これは動的にreceiverの受信レートの最小を設定します。spark.streaming.receiver.maxRateおよびspark.streaming.kafka.maxRatePerPartitionが設定されている場合に、この値で上限を制限されます (以下を見てください)。 | FALSE |
spark.streaming.blockInterval | Spark ストリーミング レシーバーによって受け取られるデータはSparkに格納される前にデータのブロックにチャンクされ、その時の間隔。お勧めの最小値 - 50ms。詳細はSpark ストリーミング プログラミング ガイドのパフォーマンス チューニングの章を見てください。 | 200ms |
spark.streaming.receiver.maxRate | 各レシーバーがデータを受け取るだろう最大レート (秒間あたりのレコードの数)。実際、各ストリームは秒間あたり最大この数のレコードを消費するでしょう。この設定を0または負数に設定すると、レートに制限をしないことになるでしょう。詳細はSparkストリーミングプログラミングガイドの 開発ガイド を見てください。 | not set |
spark.streaming.receiver.writeAheadLog.enable | レシーバーの先行書き込みログを有効にする。レシーバーによって受け取られた全ての入力データはドライバの故障後にリカバーできるように先行書き込みログに保存されるでしょう。詳細はSparkストリーミングプログラミングガイドの 開発ガイド を見てください。 | FALSE |
spark.streaming.unpersist | Sparkストリーミングで生成および永続化されているRDDがSparkのメモリから自動的に非永続化されるように強制します。Sparkストリーミングによって受け取られた生の入力データも自動的に削除されます。これをfalseにすると、自動的に削除されなかったかのように生データと永続RDDがストリーミングアプリケーション外からアクセス可能になるでしょう。しかし、Sparkでの高いメモリ利用量と引き換えになります。 | TRUE |
spark.streaming.stopGracefullyOnShutdown | trueの場合、JVMシャットダウンの時にSparkはすぐにではなく、グレースフルにStreamingContext をシャットダウンします。 | FALSE |
spark.streaming.kafka.maxRatePerPartition | 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最大レート(秒間あたりのレコード数)。詳細はKafka 統合ガイド を見てください。 | not set |
spark.streaming.kafka.maxRetries | Maximum number of consecutive retries the driver will make in order to find the latest offsets on the leader of each partition (a default value of 1 means that the driver will make a maximum of 2 attempts). stream APIを差している新しいKafkaにのみ適用されます。 | 1 |
spark.streaming.ui.retainedBatches | ガベージコレクティングの前にSparkストリーミングUIおよびステータスAPIがどれだけのバッチを記憶するか。 | 1000 |
SparkR
プロパティ名 | 意味 | デフォルト |
spark.r.numRBackendThreads | SparkRパッケージからのRPC呼び出しを処理するためにRBackendによって使用されるスレッドの数。 | 2 |
spark.r.command | ドライバーおよびワーカーの両方のためにクラスタモードでRスクリプトを実行するための実行ファイル。 | Rscript |
spark.r.driver.command | ドライバーのためのクライアントモードでRスクリプトを実行するための実行ファイル。クラスターモードでは無視されます。 | spark.r.command |
投稿日:2018-11-11
更新日:2018-11-13