Sparkの設定パラメータ一覧 ~本体設定編

Sparkを最近よく触っていて、設定値をデフォルトで設定して使っていましたが、もう少し使いこなしたいので一度整理するために一覧化してまとめました。

Sparkの設定値メモ

アプリケーションプロパティ

※赤字がパフォーマンスに関連しそうなプロパティ

プロパティ名 内容 デフォルト
spark.app.name アプリケーションの名前。これはUIの中やログデータの中に現れるでしょう。 (none)
spark.driver.cores ドライバープロセスのために使われるコアの数。クラスターモードのみ。 1
spark.driver.maxResultSize 各Sparkアクション(例えば、collect)のための全てのパーティションの直列化された結果の総サイズの制限。少なくとも1Mでなければなりません。0は無制限です。総サイズがこの制限を越えるとジョブは中止されるでしょう。制限を高くsるうとドライバでout-of-memoryエラーを起こすかもしれません(spark.driver.memory およびJVMでのオブジェクトのオーバーヘッドによります)。適切な制限を設定することで、out-of-memoryエラーからドライバを守ることができます。 1g
spark.driver.memory / --driver-memory ドライバープロセスのために使われるメモリの量。例えば SparkContextが初期化される場所。(e.g. 1g2g).  1g
  注意: ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定は アプリケーション内で直接SparkConfを使って設定するべきではありません。代わりに、--driver-memory コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。  
spark.executor.memory / --executor-memory executorプロセスあたりに使用するメモリ量(例えば2g8g)。 1g
spark.extraListeners SparkListenerを実装するクラスのカンマ区切りのリスト; SparkContextを初期化する場合に、これらのクラスが生成されSparkのリスナーバスに登録されるでしょう。SparkConfを受け付ける引数が一つのコンストラクタをクラスが持つ場合は、そのコンストラクタが呼ばれるでしょう; そうでなければ、引数を持たないコンストラクタが呼ばれるでしょう。有効なコンストラクタが見つからない場合は、SparkContextの生成は例外で失敗するでしょう。 (none)
spark.local.dir Sparkが"スクラッチ"するためのディレクトリ。mapの出力ファイルやディスクに格納されるRDDが含まれます。これはシステムの高速でローカルのディスク上になければなりません。異なるディスク上の複数のディレクトリのカンマ区切りのリストもありえます。注意: Spark 1.0 以降では、これはクラスタマネージャーによって、SPARK_LOCAL_DIRS (スタンドアローン, Mesos) あるいは LOCAL_DIRS (YARN) 環境変数で上書きされます。 /tmp
spark.logConf SparkContextが開始された時に、INFOとして有効なSparkConfを記録します。 FALSE
spark.master 接続するためのクラスタマネージャー 許可されたマスターURLのリストも見てください。 (none)
 

ランタイム環境

プロパティ名 意味 デフォルト
spark.driver.extraClassPath
ドライバーのクラスパスの先頭に追加する特別なクラスパスの登録。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-class-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
(none)
spark.driver.extraJavaOptions
ドライバに渡す特別なJVMオプションの文字列。例えば、GC設定あるいは他のログ。
注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-java-options コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。
(none)
spark.driver.extraLibraryPath ドライバーJVMを起動する場合は使用する特別なライブラリパスを設定してください。  (none)
  注意:ドライバーJVMがこの時点で既に開始されているため、クライアントモードでは、この設定はアプリケーション内で直接SparkConfを使って設定すべきではありません。代わりに、--driver-library-path コマンドラインオプションあるいはデフォルトのプロパティファイルの中で設定してください。  
spark.driver.userClassPathFirst (実験的なもの) ドライバーでクラスをロードするときに、Sparkの自身のjarを超えてユーザ追加のjarを優先するかどうか。この機能はSparkの依存とユーザの依存間の衝突を和らげるために使うことができます。それは現在のところ実験的な機能です。これはクラスターモードのみで使用されます。 FALSE
spark.executor.extraClassPath executorのクラスパスの先頭に追加する特別なクラスパス。これは主として古いバージョンのSparkとの後方互換性のために存在しています。ユーザは一般的にこのオプションを設定する必要はありません。 (none)
spark.executor.extraJavaOptions executorに渡すための特別なJVMオプションの文字列例えば、GC設定あるいは他のログ。このオプションを使ってSparkのプロパティあるいはヒープサイズの設定をすることは違反だということに注意してください。Spark のプロパティはSparkConfオブジェクトを使用するかspark-submitスクリプトと一緒に使われるspark-defaults.confファイルを使用して設定されるべきです。ヒープサイズの瀬て地はspark.executor.memoryを使って設定することができます。 (none)
spark.executor.extraLibraryPath executor JVMを起動する場合に使う特別なライブラリパスを設定する。 (none)
spark.executor.logs.rolling.maxRetainedFiles システムによって保持されるだろう最新のローリングログファイルの数を設定する。古いログファイルは削除されるでしょう。デフォルトは無効です。 (none)
spark.executor.logs.rolling.maxSize executorのログがロールオーバーされる最大のファイルサイズを設定します。デフォルトではローリングは無効です。古いログの自動クリーニングに関しては spark.executor.logs.rolling.maxRetainedFiles を見てください。 (none)
spark.executor.logs.rolling.strategy executorログのローリングの計画を設定します。デフォルトでは無効です。"time" (時間ベースのローリング)あるいは"size" (サイズベースのローリング)に設定することができます。"time"に関しては、ローリングの間隔を設定するためにspark.executor.logs.rolling.time.interval を使用します。"size"に関しては、ローリングの最大サイズを設定するためにspark.executor.logs.rolling.size.maxBytes を使用します。 (none)
spark.executor.logs.rolling.time.interval executorログがロールオーバーされる時間の間隔を設定します。デフォルトではローリングは無効です。有効な値はdailyhourlyminutely あるいは秒単位の任意の間隔です。古いログの自動クリーングに関してはspark.executor.logs.rolling.maxRetainedFilesを見てください。 daily
spark.executor.userClassPathFirst (実験的なもの) spark.driver.userClassPathFirstと同じ機能ですが、executorインスタンスに適用されます。 FALSE
spark.executorEnv.[EnvironmentVariableName] EnvironmentVariableNameによって指定される環境変数をexecutorプロセスに追加します。ユーザは複数の環境変数を設定するために複数のそれらを指定することができます。 (none)
spark.python.profile Pythonワーカーでのプロファイリングを有効にする。プロファイルの結果はsc.show_profiles()によって表示されるか、ドライバーが終了する前に表示されるでしょう。sc.dump_profiles(path)によってディスクにダンプすることもできます。いくつかのプロファイルの結果が手動で表示された場合は、ドライバーが終了する前に自動的に表示されないでしょう。デフォルトでは、pyspark.profiler.BasicProfiler が使われますが、これは SparkContext コンストラクタへのパラメータとしてプロファイルクラスに渡すことで上書きすることができます。 FALSE
spark.python.profile.dump ドライバーが終了する前にプロファイルの結果を出力するために使われるディレクトリ。結果は各RDDごとに分割されたファイルとしてダンプされるでしょう。それらはptats.Stats()によってロードすることができます。指定された場合は、プロファイルの結果は自動的に表示されないでしょう。 (none)
spark.python.worker.memory 集約の間にpythonワーカープロセスごとに使用されるメモリの量。JVMメモリ文字列と同じフォーマット(例えば512m2g)。集約の間に使用されるメモリ量がこの量を超える場合は、データがディスクに流し込まれます。 512m
spark.python.worker.reuse Pythonワーカーを再利用するかしないか。yesであれば、固定数のPythonワーカーが使用されます。各タスクでPythonプロセスをfork()する必要はありません。大きくブロードキャストする場合はとても便利です。ブロードキャストは各タスクごとにJVMからPythonワーカーに転送する必要はないでしょう。 TRUE

 

シャッフルの挙動

プロパティ名 意味 デフォルト
spark.reducer.maxSizeInFlight 各reduceタスクから同時に取り出すmap出力の最大サイズ。各出力は受け取るのにバッファを生成するため、これはreduceタスクごとの固定のメモリのオーバーヘッドを表します。ですので、メモリが多く無い場合は小さくしてください。 48m
spark.shuffle.compress map出力ファイルを圧縮するかどうか。一般的に良い考えです。圧縮はspark.io.compression.codecを使うでしょう。 TRUE
spark.shuffle.file.buffer 各シャッフルファイル出力ストリームのためのインメモリバッファのサイズ。これらのバッファはディスクのシークの数を減らし、中間シャッフルファイルの生成時のシステムコールを減らします。 32k
spark.shuffle.io.maxRetries (Nettyのみ) 0以外の値に設定された場合は、IOに関係する例外により失敗したフェッチは自動的に再試行されます。この再試行ロジックは、長いGCの停止あるいは一時的なネットワーク接続問題に直面した場合に、シャッフルを安定するのに役立ちます。 3
spark.shuffle.io.numConnectionsPerPeer (Nettyのみ) ホスト間の接続は大きなクラスタのために接続の準備を減らすために再利用されます。多くのハードディスクと少ないホストのクラスタに関しては、これは不十分な並行制御が全てのディスクを一杯にする結果になります。そのためユーザはこの値を増やそうと思うかも知れません。 1
spark.shuffle.io.preferDirectBufs (Nettyのみ) オフヒープバッファはシャッフルおよびキャッシュブロック転送の間にガベージコレクションを減らすために使われます。オフヒープメモリが厳しく制限されている環境では、ユーザは全ての割り当てをNettyからオンヒープへ強制するためにこれをオフにしたいと思うかも知れません。 TRUE
spark.shuffle.io.retryWait (Nettyのみ) フェチの再試行間でどれだけ待つか。maxRetries * retryWaitとして計算される、再試行によって起きる遅延の最大はデフォルトで15秒です。 5s
spark.shuffle.manager データをシャッフルするために使用する実装。利用可能な2つの実装があります: sort および hash。ソートベースのシャッフルはメモリ効率がより良く、1.2からはデフォルトのオプションです。 sort
spark.shuffle.service.enabled 外部のシャッフルサービスを有効にします。このサービスはexecutorが安全に削除されるようにexecutorによって書き込まれるシャッフルファイルを保持します。もしspark.dynamicAllocation.enabled が"true"であれば、これは有効でなければなりません。外部シャッフルサービスはそれを有効にするためにセットアップされていなければなりません。詳細はdynamic allocation configuration and setup documentation を見てください。 FALSE
spark.shuffle.service.port 外部のシャッフルサービスが実行されるだろうポート。 7337
spark.shuffle.sort.bypassMergeThreshold (上級) ソートベースのシャッフルマネージャーの中で、もしマップ側の集約が無く、最大これだけの削減パーティションがある場合は、データのmerge-sortingを避けます。 200
spark.shuffle.spill.compress シャッフルの間に流し込まれるデータを圧縮するかどうか。圧縮はspark.io.compression.codecを使うでしょう。 TRUE

 

Spark UI

プロパティ名 意味 デフォルト
spark.eventLog.compress もしspark.eventLog.enabled がtrueであれば、ログイベントを圧縮するかどうか。 FALSE
spark.eventLog.dir spark.eventLog.enabled がtrueであれば、Sparkイベントが記録されるベースディレクトリ。このベースディレクトリ内でSparkは各アプリケーションごとにサブディレクトリを作成し、このディレクトリ内にアプリケーションに固有のイベントを記録します。ユーザは、履歴ファイルが履歴サーバによって読み込まれることができるように、HDFSディレクトリのような単一の場所に設定したがるかも知れません。 file:///tmp/spark-events
spark.eventLog.enabled アプリ家k-ションが終了した後でWebUIを再構築するのに便利なSparkイベントを記録するかどうか。 FALSE
spark.ui.killEnabled web uiからステージおよび対応するジョブをkillすることを許可する。 TRUE
spark.ui.port 芽折および作業データを表示する、アプリケーションのダッシュボードのポート。 4040
spark.ui.retainedJobs ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのジョブを記憶するか。 1000
spark.ui.retainedStages ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけのステージを記憶するか。 1000
spark.worker.ui.retainedExecutors ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutorを記憶するか。 1000
spark.worker.ui.retainedDrivers ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したドライバーを記憶するか。 1000
spark.sql.ui.retainedExecutions ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したexecutionを記憶するか。 1000
spark.streaming.ui.retainedBatches ガベージコレクティングの前にSparkUIおよびステータスAPIがどれだけの終了したバッチを記憶するか。 1000

 

圧縮および直列化

プロパティ名 意味 デフォルト
spark.broadcast.compress ブロードキャスト変数を送信する前に圧縮するかどうか。一般的に良い考えです。 TRUE
spark.closure.serializer クロージャーが使用するシリアライザークラス。現在のところJavaのシリアライザーのみがサポートされています。 org.apache.spark.serializer.
    JavaSerializer
spark.io.compression.codec RDDパーティション、ブロードキャスト変数およびシャッフル出力のような内部データを圧縮するために使われるコーディック。デフォルトでは、Sparkは3つのコーディックを提供します: lz4lzfおよびsnappy. コーディックを指定するために、完全修飾クラス名を使用することもできます。例えば、org.apache.spark.io.LZ4CompressionCodecorg.apache.spark.io.LZFCompressionCodecおよび org.apache.spark.io.SnappyCompressionCodec. snappy
spark.io.compression.lz4.blockSize LZ4圧縮コーディックが使用される場合、LZ4圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、LZ4が使われる場合のシャッフルメモリの量も小さくなるでしょう。 32k
spark.io.compression.snappy.blockSize Snappy圧縮コーディックが使用される場合、Snappy圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、Snappyが使われる場合のシャッフルメモリの量も小さくなるでしょう。 32k
spark.kryo.classesToRegister Kryo シリアライゼイションを使う場合は、Kryoに登録するためにカンマ区切りのカスタムクラス名を渡してください。詳細はチューニングガイド を見てください。 (none)
spark.kryo.referenceTracking Kryoを使ってデータをシリアライズした場合に同じオブジェクトへの参照を追跡するかどうか。オブジェクトグラフがループを持っている場合は必要で、同じオブジェクトの複数のコピーが含まれている場合は効率のために有用です。そうでないと分かっている場合は、パフォーマンスの改善のために無効にすることができます。 true (Spark SQL Thrift サーバを使う場合は false)
spark.kryo.registrationRequired Kryoを使って登録を要求するかどうか。'true'に設定すると、Kryoは登録されていないクラスがシリアライズされると例外を投げます。false(デフォルト)に設定されると、Kryoは各オブジェクトに加えて登録されていないクラス名を書き込むでしょう。クラス名の書き込みは大きなパフォーマンスのオーバーヘッドになりえます。つまり、このオプションを有効にすることはユーザがクラスの登録を省略しないことを厳密に強制することができます。 FALSE
spark.kryo.registrator Kryoシリアライズを使う場合、カスタムクラスをKryoに登録するためにこのクラスを設定してください。もしクラスを独自の方法で登録する必要がある場合は、このプロパティが有用です。例えば、独自のフィールドのシリアライザ。そうでなければ、spark.kryo.classesToRegister がもっと簡単です。KryoRegistratorを拡張したクラスにそれを設定しなければなりません。詳細は チューニングガイド を見てください。 (none)
spark.kryoserializer.buffer.max kryoのシリアライズバッファの最大許容サイズ。これはシリアライズしようと思っているどのオブジェクトよりも大きくなければなりません。Kryo内で"buffer limit exceeded" 例外があった場合はこれを増やしてください。 64m
spark.kryoserializer.buffer Kryoのシリアライズバッファの初期サイズ。各ワーカー上で コアごとに1つのバッファがあることに注意してください。このバッファは必要に応じてspark.kryoserializer.buffer.max まで増加するでしょう。 64k
spark.rdd.compress シリアライズされたパーティションを圧縮するかどうか(例えば、StorageLevel.MEMORY_ONLY_SER)。ちょっとしたCPU時間の追加で相当なスペースを節約することができます。 FALSE
spark.serializer ネットワークを越えて送信するか、シリアライズされた形でキャッシュされる必要があるシリアライズオブジェクトのために使うクラス。デフォルトのJava シリアライズ はどのようなシリアライズ可能なJavaオブジェクトについても動作しますが、とても遅いです。そのため、スピードが必要な場合は org.apache.spark.serializer.KryoSerializer の使用およびKryoシリアライズの設定 をお勧めします。 org.apache.spark.Serializerのどのようなサブクラスにもなることができます。
org.apache.spark.serializer.
JavaSerializer (org.apache.spark.serializer.
Spark SQL Thriftサーバを使う場合のKryoSerializer)
spark.serializer.objectStreamReset org.apache.spark.serializer.JavaSerializerを使ってシリアライズする場合は、シリアライザーが冗長なデータの書き込みを避けるためにオブジェクトをキャッシュしますが、それらのオブジェクトのガベージコレクションを停止します。'reset'を呼ぶことで、シリアライザからその情報をフラッシュし、古いオブジェクトを収集されるようにします。この定期的なリセットをオフにするには、-1を設定します。デフォルトでは、100オブジェクトごとにシリアライザをリセットします。 100

 

メモリ管理

プロパティ名 意味 デフォルト
spark.memory.fraction 実行とストレージのために (heap space - 300MB) の一部分が使われます。これを小さくすると、零れ落ちる頻度が高くなりキャッシュデータの追い出しが起こります。The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. これはデフォルトの値にしておくことをお勧めします。詳細は this descriptionを見てください。 0.75
spark.memory.storageFraction 立ち退きに耐性のあるストレージメモリの量。spark.memory.fractionによって取り分けられる領域のサイズの割合として表現されます。これを高くすると、実行のために利用可能な作業メモリが少なくなり、タスクがディスクにもっと頻繁に零れ落ちるかも知れません。これはデフォルトの値にしておくことをお勧めします。詳細はこの説明を見てください。 0.5
spark.memory.offHeap.enabled trueにすると、特定の操作のためにオフヒープメモリを使おうとするでしょう。もしオフヒープメモリの利用が可能であれば、spark.memory.offHeap.size は有効でなければなりません。 TRUE
spark.memory.offHeap.size オフヒープ割り当てのために使うことができる絶対メモリ量。この設定はヒープメモリの利用率に影響が無いため、もしexecutorの総消費メモリが何らかのハードリミットに合わせる必要がある場合はJVMヒープサイズをそれに応じて減らすようにしてください。spark.memory.offHeap.enabled=trueの場合は、これは正の値に設定されなければなりません。 0
spark.memory.useLegacyMode
Spark 1.5以前で使われていた古いメモリ管理モードを有効にするかどうか。以前のモードではヒープ領域を頑なに固定サイズの領域に分割します。アプリケーションがチューニングされていない場合は過度にこぼれることになります。これが有効にされない場合、以下の非推奨のメモリー断片設定は読み込まれません: spark.shuffle.memoryFraction
        spark.storage.memoryFraction
        spark.storage.unrollFraction
FALSE
spark.shuffle.memoryFraction (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。シャッフルの間に集約および集団を作るために使われるJavaヒープの断片。いつでも、シャッフルに使われる全てのインメモリのマップの全体のサイズはこの制限によって束縛されます。これを超えた内容なディスクにこぼれ始めるでしょう。流し込みが頻繁に起きる場合は、spark.storage.memoryFractionを犠牲にしてこの値を増やすことを考えます。 0.2
spark.storage.memoryFraction (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。Sparkのメモリキャッシュによって使用されるJava ヒープの断片。これはJVM内のオブジェクトの古い世代より大きくてはいけません。デフォルトではヒープの0.6ですが、古い世代のサイズを独自に設定した場合はそれを増やすことができます。 0.6
spark.storage.unrollFraction (非推奨) spark.memory.useLegacyMode が有効な場合のみ読み込まれます。メモリ内の展開されないブロックが使う spark.storage.memoryFraction の断片。新しいブロックを完全に展開するために十分なストレージスペースが無い場合に、既存のブロックを削除することで動的に割り当てられます。 0.2

 

Executionの挙動

プロパティ名 意味 デフォルト
spark.broadcast.blockSize TorrentBroadcastFactoryのためのブロックの各部分のサイズ。あまりに大きい値はブロードキャスト中の並行度が下がります(遅くなります); しかし、あまりに小さいとBlockManagerはパフォーマンスの打撃を受けるかも知れません。 4m
spark.broadcast.factory 使用するブロードキャストの実装。 org.apache.spark.broadcast.
    TorrentBroadcastFactory
spark.cleaner.ttl どれくらい長くSparkがメタデータを覚えているかの期間(秒数) (ステージが生成されてから、タスクが生成されてからなど)。定期的な掃除により、持続時間より古いメタデータは忘れ去られるでしょう。これは長い時間/日 Sparkを実行するのに有用です (例えば、Spark ストリーミング アプリケーションの場合は 24/7 実行します)。この期間より長くメモリに存在するRDDも掃除されるだろうことに注意してください。 (infinite)
spark.executor.cores 各executor上で使用されるコアの数。YARNおよびスタンドアローンモードのみ。スタンドアローンモードの場合は、ワーカー上で十分なコアがあると仮定すると、このパラメータを設定することでアプリケーションが同じワーカー上で複数のexecutorを実行することができます。そうでなければ、アプリケーションごとに1つのexecutorが各ワーカー上で実行されるでしょう。 YARNモードの場合は1、スタンドアローンモードの場合はワーカー上の全ての利用可能なコア。
spark.default.parallelism ユーザによって設定されなかった場合は、 joinreduceByKey および parallelize のような変換によって返されるRDD内のデフォルトの数。
reduceByKey および joinのような分散シャッフル操作については、親RDDの中の最も大きな数のパーティションです。親RDDが無い parallelizeのような操作については、クラスタマネージャーに依存します:
ローカルモード: ローカルマシーン上のコアの数
Mesos fine grained mode: 8
その他: 全てのexecutorノードのコアの総数あるいは2のどちらか大きいほう
spark.executor.heartbeatInterval ドライバへの各executorのハートビートの間隔。ハートビートはドライバにexecutorがまだ生きていて実行中のタスクのためのマトリックスによってそれを更新することを知らせます。 10s
spark.files.fetchTimeout ドライバからSparkContext.addFile()を使って追加されたファイルを取り出す時に使う通信のタイムアウト。 60s
spark.files.useFetchCache true(デフォルト)に設定した場合は、ファイルの取り出しは同じアプリケーションに所属するexecutorによって共有されるローカルキャッシュを使うでしょう。これは同じホスト上で多くのexecutorを実行する場合にタスクの起動パフォーマンスを改善することができます。falseに設定すると、これらのキャッシュの最適化は無効にされ、全てのexecutorはファイルのそれらの固有のコピーを取り出すでしょう。この最適化はNFSファイルシステム上にあるSparkローカルディレクトリを使用するために無効にされるかも知れません (詳細はSPARK-6313 を見てください)。 TRUE
spark.files.overwrite ターゲットのファイルが存在し、その内容が元のものとは一致しない場合に、SparkContext.addFile()を使って追加されたファイルを上書きするかどうか。 FALSE
spark.hadoop.cloneConf trueに設定された場合、各タスクについて新しいHadoop設定 オブジェクトをクローンします。このオプションは設定スレッドセーフ問題を回避するために有効にすべきです (詳細はSPARK-2546 を見てください)。これらのもなぢによって影響を受けないジョブについて予期せぬパフォーマンスの低下を避けるために、デフォルトでは無効です。 FALSE
spark.hadoop.validateOutputSpecs trueの場合は、saveAsHadoopFileおよび他の変数で使われる出力の仕様(例えば、出力ディレクトリが既に存在しているかを調べる)を検証します。これは既に存在する出力ディレクトリが原因の例外を沈黙させるために無効にされるかも知れません。以前のSparkのバージョンと互換性を持たせたい場合を除いて、ユーザはこれを向こうにしないことをお勧めします。単純に、手動で出力ディレクトリを削除するためにHadoopのFileSystem APIを使用します。チェックポイントリカバリの間、既存の出力ディレクトリにデータが上書きされる必要がある知れないので、Spark ストリーミングのStreamingContextによって生成されるジョブについては、この設定は無視されます。 TRUE
spark.storage.memoryMapThreshold ディスクからブロックを読み込む場合のSparkメモリーマップの上のブロックサイズ。これによりSparkのメモリーマッピングがとても小さいブロックになることを防ぎます。オペレーティングシステムのページサイズに近いか少ないブロックの場合に、一般的にメモリーマッピングは高負荷になります。 2m
spark.externalBlockStore.blockManager RDDを格納する外部ブロックマネージャー(ファイルシステム)の実装。ファイルシステムURLは spark.externalBlockStore.urlによって設定されます。 org.apache.spark.storage.TachyonBlockManager
spark.externalBlockStore.baseDir RDDを格納する外部ブロックストアのディレクトリ。ファイルシステムのURLは spark.externalBlockStore.url によって設定されます。Tachyonファイルシステム上の複数のディレクトリのカンマ区切りのリストもありえます。 System.getProperty("java.io.tmpdir")
spark.externalBlockStore.url 外部ブロックストア内の外部ブロッカーファイルシステムの下のURL。 tachyon://localhost:19998 for Tachyon

 

ネットワーク

プロパティ名 意味 デフォルト
spark.akka.frameSize "control plane"通信内で許される最大のメッセージ(MB); 一般的にexecutorおよびドライバー間で送信されるmapの出力サイズの情報にのみ適用されます。何千ものmapおよびreduceタスクを使うジョブを実行している場合はこれを増やしてください。フレームサイズに関するメッセージが表示されるでしょう。 128
spark.akka.heartbeat.interval Akkaの組み込みで入る転送障害detectorを無効にするために大きな値に設定されます。この機能を使うつもりであれば、再び有効にすることができます (お勧めできません)。大きな間隔値はネットワークのオーバーヘッドを減らし、小さな値(~1s)はAkkaの障害検知のためにより参考になるかも知れません。必要であれば、これをspark.akka.heartbeat.pausesと組み合わせて調整してください。障害検知の使用の肯定的な使用例になりそうなのは: 感度が良い障害検知はいたずら好きなexecutorを素早く取り除くのに役立つかもしれません。しかし、実際のSparkクラスタではGCの休止とネットワークの遅延が起こりえるため、通常ではそうではありません。それは置いておくとしても、これを有効にするとノード間で大量のハートビートの交換がおこり、ネットワークがそれらで溢れることになります。 1000s
spark.akka.heartbeat.pauses Akkaの組み込みで入る転送障害detectorを無効にするために大きな値に設定されます。この機能を使うつもりであれば、再び有効にすることができます (お勧めできません)。Akkaのための許容できるハートビートの休止。これはGCの休止の感度を制御するために使うことができます。必要であれば、これをspark.akka.heartbeat.intervalと組み合わせて調整してください。 6000s
spark.akka.threads 通信のために使用するactorスレッドの数。ドライバが大量のCPUコアを持つ場合に、大きなクラスタでは増やした方が有効かも知れません。 4
spark.akka.timeout Sparkノード間での通信のタイムアウト。 100s
spark.blockManager.port 全てのブロックマネージャーがlistenするポート。ドライバおよびexecutorの両方にあります。 (random)
spark.broadcast.port ドライバーのHTTPブロードキャストサーバがlistenするポート。これはtorrentのブロードキャストとは関係ありません。 (random)
spark.driver.host ドライバがlistenするホスト名あるいはIPアドレス。executorとスタンドアローンマスターが通信するために使われます。 (local hostname)
spark.driver.port ドライバーがlistenするポート。executorとスタンドアローンマスターが通信するために使われます。 (random)
spark.executor.port executorがlistenするポートドライバと通信するために使われます。 (random)
spark.fileserver.port ドライバのHTTPファイルサーバがlistenするポート。 (random)
spark.network.timeout 全てのネットワークの相互交流のタイムアウト。spark.core.connection.ack.wait.timeoutspark.akka.timeoutspark.storage.blockManagerSlaveTimeoutMsspark.shuffle.io.connectionTimeoutspark.rpc.askTimeout あるいは spark.rpc.lookupTimeout が設定されていない場合は、その代わりにこの設定が使われるでしょう。 120s
spark.port.maxRetries ポートへバインドする時に、再試行を諦める最大数。ポートに特定の値(非0)が指定された場合、続く試行では再試行する前に以前の試行で使われたポートに1を加えたものになります。これは本質的に指定された開始ポートから、ポート + maxRetries までのポートの範囲を試すことになります。 16
spark.replClassServer.port ドライバーのHTTPクラスサーバがlistenするポート。これはSparkシェルにのみ関係があります。 (random)
spark.rpc.numRetries RPCタスクが諦めるまでの再試行の数。この数の最大数までRPCタスクが実行されるでしょう。 3
spark.rpc.retry.wait RPCのask操作が再試行するまで待つ期間。 3s
spark.rpc.askTimeout RPCのask操作がタイムアウトするまで待つ期間。 120s
spark.rpc.lookupTimeout RPCリモートエンドポイントがタイムアウトするまで待つ時間。 120s

 

スケジューリング

プロパティ名 意味 デフォルト
spark.cores.max <c0>スタンドアローン配備クラスタあるいは "coarse-grained"共有モードのMesos クラスターで実行している場合、アプリケーションが(各マシーンからではなく)クラスターから要求するCPUコアの最大総数。設定されていない場合は、デフォルトはSparkのスタンドアローンクラスタマネージャーでspark.deploy.defaultCores、Mesos上では無制限(利用可能な全てのコア)になります。 (not set)
spark.locality.wait データーローカルタスクを諦めローカルではないノード上でそれが起動するまで待つ時間。同じ待ちが複数のローカルレベルを経由するたびに使われるでしょう(process-local, node-local, rack-local およびそれら全て)。spark.locality.wait.nodeなどを設定することで、各レベルで待ち時間をカスタマイズすることもできます。タスクに時間が掛かりほとんどローカルを見ない場合はこの設定を増やすべきですが、通常デフォルトでよく動作します。 3s
spark.locality.wait.node ノード局地性を待つための局地性をカスタマイズします。例えば、これを0にしてノード局地性をスキップし、すぐにrack局地性を探すことができます(もしクラスタがrack情報を持っている場合)。 spark.locality.wait
spark.locality.wait.process プロセス局地性を待つための局地性をカスタマイズします。特定のexecutorプロセスにあるキャッシュされたデータにアクセスしようとするタスクに影響があります。 spark.locality.wait
spark.locality.wait.rack rack局地性を待つための局地性をカスタマイズします。 spark.locality.wait
spark.scheduler.maxRegisteredResourcesWaitingTime スケジュールが始まる前にリソースが登録するための最大待ち時間。 30s
spark.scheduler.minRegisteredResourcesRatio スケジューリングが始まる前に待つ、登録されたリソースの最小割合(登録されたリソース/期待される総リソース) (リリースはyarnモードではexecutor、スタンドアローンモードおよびMesos coarsed-grainedモード ['spark.cores.max' 値は Mesos coarse-grained モードのための総期待リソース] ではCPUコアです)。0.0と1.0の間のdoubleとして指定されます。リソースの最小の割合に達したかどうかに関係なく、スケジューリングが始まる前に待つ最大の時間は spark.scheduler.maxRegisteredResourcesWaitingTimeによって設定されます。 YARNモードでは 0.8; スタンドアローンモードおよびMesos coarse-grained モードでは 0.0。
spark.scheduler.mode 同じSparkContextにサブミットされたジョブ間のscheduling mode。次々にジョブをキューする代わりに、一様に共有するために使う FAIR を設定することができます。複数ユーザのサービスに有用です。 FIFO
spark.scheduler.revive.interval スケジューラがワーカープロセスに多数をさせるために提供する間隔の長さ。 1s
spark.speculation "true"に設定すると、タスクの投機的な実行を行います。1つ以上のタスクがステージで遅く実行している場合、再起動されるだろうことを意味します。 FALSE
spark.speculation.interval どれだけの頻度でSparkが投機するためにタスクをチェックするか。 100ms
spark.speculation.multiplier 平均より遅いタスクが何回投機と見なされるか。 1.5
spark.speculation.quantile 指定のステージで投機が有効になる前にタスクのパーセンテージが終了していなければならないか。 0.75
spark.task.cpus 各タスクごとに割り当てるコアの数。 1
spark.task.maxFailures ジョブを諦める前の各タスクの失敗の数。1以上でなければなりません。許可された再試行の数 = この値 - 1. 4

 

動的割り当て

プロパティ名 意味 デフォルト
spark.dynamicAllocation.enabled
動的リソース割り当てを設定するかどうか。これはこのアプリケーションに登録されているexecutorの数を負荷に応じて上下させます。現在のところYARNノードでのみ利用可能なことに注意してください。詳細は ここの説明を見てください。
これには spark.shuffle.service.enabled が設定されることが必要です。以下の設定も関係あります: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors および spark.dynamicAllocation.initialExecutors
FALSE
spark.dynamicAllocation.executorIdleTimeout 動的な割り当てが有効でexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 60s
spark.dynamicAllocation.cachedExecutorIdleTimeout 動的な割り当てが有効でキャッシュされたデータブロックを持つexecutorがこの期間以上仕事をしていない場合、executorは削除されるでしょう。詳細は ここの説明を見てください。 infinity
spark.dynamicAllocation.initialExecutors 動的割り当てが有効な場合、実行するexecutorの初期の数。 spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors 動的割り当てが有効な場合のexecutorの数の上限。 infinity
spark.dynamicAllocation.minExecutors 動的割り当てが有効な場合のexecutorの数の下限。 0
spark.dynamicAllocation.schedulerBacklogTimeout もし動的割り当てが有効で、この期間内より多くの残されているタスクがある場合は、新しいexecutorがリクエストされるでしょう。詳細は ここの説明を見てください。 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout spark.dynamicAllocation.schedulerBacklogTimeoutと同じですが、後に続くexecutorのリクエストのみに使用されます。詳細は <a0>ここ</a0>の説明を見てください。 schedulerBacklogTimeout

 

セキュリティ

プロパティ名 意味 デフォルト
spark.acls.enable Spackのaclが有効にされるべきかどうか。友好な場合は、これはユーザがジョブを表示あるいは修正するアクセス権限をもつかどうかを調べます。これはユーザが分かっていなければならず、もしユーザがnullとして渡された場合はなにも調べられないことに注意してください。ユーザを認証および設定するために、UIを使ってフィルターを使うことができます。 FALSE
spark.admin.acls 全てのSparkジョブを表示および修正アクセスを持つユーザ/管理者のカンマ区切りのリスト。もし共有クラスタ上で実行し、動作する時にデバッグを手伝ってくれる一連の管理者あるいは開発者がりう場合は、これが使われるかも知れません。リストに "*"を入れることは、全てのユーザが管理権限を持つことを意味します。 Empty
spark.authenticate Sparkが内部接続を認証するかどうか。YARN上で実行しない場合はspark.authenticate.secretを見てください。 FALSE
spark.authenticate.secret Sparkがコンポーネント間で認証するために使われる秘密キーを設定します。YARN上で実行しておらず、認証が有効な場合は、これが設定される必要があります。 None
spark.authenticate.enableSaslEncryption 認証が有効な場合に暗号化通信を有効にする。このオプションは現在のところブロック転送サービスでのみサポートされています。 FALSE
spark.network.sasl.serverAlwaysEncrypt SASL認証をサポートするサービスに対して非暗号化通信を無効にする。これは現在のところ外部シャッフルサービスでのみサポートされています。 FALSE
spark.core.connection.ack.wait.timeout 接続がタイムアウトして諦めるまでにどれだけ長くackを待つか。GCのような長い休止で不本意なタイムアウトを避けるために、大きな値を設定することができます。 60s
spark.core.connection.auth.wait.timeout 接続がタイムアウトして諦めるまでにどれだけ長く認証を待つか。 30s
spark.modify.acls Sparkジョブへの修正アクセスを持つユーザのカンマ区切りのリスト。デフォルトでは、Sparkジョブを開始したユーザがそれの修正(例えばkill)アクセスを持ちます。リストに "*"を入れることは、全てのユーザが修正する権限を持つことを意味します。 Empty
spark.ui.filters
Spark web UIへ適用するフィルタークラス名のカンマ区切りのリスト。フィルターは標準の javax servlet Filterでなければなりません。各フィルターへのパラメータは以下のようなjavaシステムプロパティの設定により指定することもできます:
spark.<フィルター名>.params='param1=value1,param2=value2'
例えば: 
-Dspark.ui.filters=com.test.filter1 
-Dspark.com.test.filter1.params='param1=foo,param2=testing'
None
spark.ui.view.acls Spark web uiへの表示アクセスを持つユーザのカンマ区切りのリスト。デフォルトでは、Sparkジョブを開始したユーザのみが表示アクセスを持ちます。リストに "*"を入れることは、全てのユーザがこのSparkジョブを見る権限を持つことを意味します。 Empty

 

暗号化

プロパティ名 意味 デフォルト
spark.ssl.enabled
全てのサポートされたプロトコル上でSSL接続を有効にするかどうか。
spark.ssl.xxxのような全てのSSL設定、xxx は特定の設定プロパティ、は、全てのサポートされたプロトコルのためのグローバル設定を意味します。特定のプロトコルのためにグローバル設定を上書きするためには、そのプロパティがプロトコル固有の名前空間で上書きされなければなりません。
YYYで表される特定のプロトコルのためのグローバル設定を上書くためにspark.ssl.YYY.XXX 設定を使ってください。現在のところ、YYY はAkkaベース接続のための akka あるいはブロードキャストおよびファイルサーバのための fs のどちらかがありえます。
FALSE
spark.ssl.enabledAlgorithms cipherのカンマ区切りのリスト。指定されたcipherはJVMによってサポートされていなければなりません。The reference list of protocols one can find on this page. Empty
spark.ssl.keyPassword キーストア内のプライベートキーのパスワード。 None
spark.ssl.keyStore キーストアファイルへのパス。パスはコンポーネントが開始されたディレクトリへの絶対あるいは相対パスです。 None
spark.ssl.keyStorePassword キーストアへのパスワード。 None
spark.ssl.protocol プロトコル名。プロトコルはJVMによってサポートされていなければなりません。The reference list of protocols one can find on this page. None
spark.ssl.trustStore trust-stoerファイルへのパス。パスはコンポーネントが開始されたディレクトリへの絶対あるいは相対パスです。 None
spark.ssl.trustStorePassword trust-store のパスワード。 None

 

Spark ストリーミング

プロパティ名 意味 デフォルト
spark.streaming.backpressure.enabled Sparkストリーミングの内部的なバックプレッシャー機構を有効または無効にします(1.5から)。これにより、Sparkストリーミングは現在のバッチのスケジュールされた遅延および処理時間に基づいた受信のレートの制御を行い、従ってシステムはシステムが処理できる分だけの速度で受信します。内部的には、これは動的にreceiverの受信レートの最小を設定します。spark.streaming.receiver.maxRateおよびspark.streaming.kafka.maxRatePerPartitionが設定されている場合に、この値で上限を制限されます (以下を見てください)。 FALSE
spark.streaming.blockInterval Spark ストリーミング レシーバーによって受け取られるデータはSparkに格納される前にデータのブロックにチャンクされ、その時の間隔。お勧めの最小値 - 50ms。詳細はSpark ストリーミング プログラミング ガイドのパフォーマンス チューニングの章を見てください。 200ms
spark.streaming.receiver.maxRate 各レシーバーがデータを受け取るだろう最大レート (秒間あたりのレコードの数)。実際、各ストリームは秒間あたり最大この数のレコードを消費するでしょう。この設定を0または負数に設定すると、レートに制限をしないことになるでしょう。詳細はSparkストリーミングプログラミングガイドの 開発ガイド を見てください。 not set
spark.streaming.receiver.writeAheadLog.enable レシーバーの先行書き込みログを有効にする。レシーバーによって受け取られた全ての入力データはドライバの故障後にリカバーできるように先行書き込みログに保存されるでしょう。詳細はSparkストリーミングプログラミングガイドの 開発ガイド を見てください。 FALSE
spark.streaming.unpersist Sparkストリーミングで生成および永続化されているRDDがSparkのメモリから自動的に非永続化されるように強制します。Sparkストリーミングによって受け取られた生の入力データも自動的に削除されます。これをfalseにすると、自動的に削除されなかったかのように生データと永続RDDがストリーミングアプリケーション外からアクセス可能になるでしょう。しかし、Sparkでの高いメモリ利用量と引き換えになります。 TRUE
spark.streaming.stopGracefullyOnShutdown trueの場合、JVMシャットダウンの時にSparkはすぐにではなく、グレースフルにStreamingContext をシャットダウンします。 FALSE
spark.streaming.kafka.maxRatePerPartition 新しいKafkaがストリームAPIを差している場合の各Kafkaパーティションから読み込まれるであろうデータの最大レート(秒間あたりのレコード数)。詳細はKafka 統合ガイド を見てください。 not set
spark.streaming.kafka.maxRetries Maximum number of consecutive retries the driver will make in order to find the latest offsets on the leader of each partition (a default value of 1 means that the driver will make a maximum of 2 attempts). stream APIを差している新しいKafkaにのみ適用されます。 1
spark.streaming.ui.retainedBatches ガベージコレクティングの前にSparkストリーミングUIおよびステータスAPIがどれだけのバッチを記憶するか。 1000

 

SparkR

プロパティ名 意味 デフォルト
spark.r.numRBackendThreads SparkRパッケージからのRPC呼び出しを処理するためにRBackendによって使用されるスレッドの数。 2
spark.r.command ドライバーおよびワーカーの両方のためにクラスタモードでRスクリプトを実行するための実行ファイル。 Rscript
spark.r.driver.command ドライバーのためのクライアントモードでRスクリプトを実行するための実行ファイル。クラスターモードでは無視されます。 spark.r.command


投稿日:2018-11-11    更新日:2018-11-13

[スポンサーリンク]

[スポンサーリンク]

  
サイト内検索
プロフィール

プロフィール

[Name : POCO(@PocoIt2019)]
都内で社内SEをしているおじさん。
仕事で得られる知識だけでは限界を感じ、 WEBの勉強がてらITブログを開始。
サーバからWEBサイトまでフルスクラッチで開発しました。
現在は勉強のモチベーションを保つために活用中。
興味があることを雑記的に書いていきます。

[スポンサーリンク]

カテゴリ


タグ

[スポンサーリンク]

最近の記事