1. 들어가며
Apache Spark를 활용해 여러 애플리케이션을 만들어보는 경험을 하고 있는 현재, 여러 오류를 마주하고 해결하는 과정의 연속이라 기록용도로 작성해보고자 합니다.
이번 포스팅에서는 Apache Spark를 활용하는 데 있어서 발생될 수 있는 오류 중 disk 가 가득 찬 경우입니다.
2. 원인 분석
우선 일단 Apache Spark의 Strucutred streaming을 활용할 때 누적되어 저장되게 되는 데이터를 알아봅시다.
2-1. logs file 문제?
첫 번째로, spark의 logs 파일입니다.
Spark는 자체 로깅을 위한 표준 라이브러리로 log4 j 사용합니다. Spark 내부에서 발생하는 모든 일은 셸 콘솔과 구성된 기본 저장소에 기록됩니다. Spark는 또한 앱 작성자를 위한 템플릿을 제공하므로 동일한 log4j 라이브러리를 사용하여 Spark의 기존 및 적절한 로깅 구현에 원하는 메시지를 추가할 수 있습니다 [2].
말 그대로 spark를 로그는 디버깅 및 추적 가능성뿐 아니라 비즈니스 인텔리전스를 위한 것입니다. 따라서 누적되어 저장되게 되고 이 과정에서 disk에 가득 차는 경우도 발생됩니다. 하지만 애플리케이션이 종료될 정도의 직접적인 영향을 주지 않습니다. (disk에 가득하게 하겠지만)
2-2. work dict 문제?
두 번째로, spark의 work 디렉토리입니다.
Stack overflow 내용 중 [3]
....
According to the documentation on Spark Standalone Mode the work directory is described as: Directory to run applications in, which will include both logs and scratch space (default: SPARK_HOME/work).
Here scratch space means that it is "including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks."
In the work folder you will find for each application the .jar libraries such that the executor have access to the libraries. In addition, it contains some temporary data based on the processing logic and actual data (not on the amount of processing triggers). The sub-folders 0, 1 are incremental for different jobs/stages or runs of the same application. (To be frank, I am not fully knowledgeable about those sub-folders.)
....
위 내용을 보면 애플리케이션이 동작하는데 있어서 활용되는 RDD 자체를 저장하는 과정입니다. 따라서 누적되어 저장되게 되고 이 과정에서 disk에 가득 차는 경우도 발생됩니다. 하지만 애플리케이션이 종료될 정도의 직접적인 영향을 주지 않습니다. (disk에 가득하게 하겠지만)
추가적으로 스파크 공식문서에 의하면 주기적으로 제거할 수 있는 옵션이 존재합니다. [4]
SPARK_WORKER_OPTS supports the following system properties:
spark.worker.cleanup.enabled | false | Enable periodic cleanup of worker / application directories. Note that this only affects standalone mode, as YARN works differently. Only the directories of stopped applications are cleaned up. This should be enabled if spark.shuffle.service.db.enabled is "true" | 1.0.0 |
spark.worker.cleanup.interval | 1800 (30 minutes) | Controls the interval, in seconds, at which the worker cleans up old application work dirs on the local machine. | 1.0.0 |
spark.worker.cleanup.appDataTtl | 604800 (7 days, 7 * 24 * 3600) | The number of seconds to retain application work directories on each worker. This is a Time To Live and should depend on the amount of available disk space you have. Application logs and jars are downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space, especially if you run jobs very frequently. | 1.0.0 |
spark.shuffle.service.db.enabled | true | Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior enabled). You should also enable spark.worker.cleanup.enabled, to ensure that the state eventually gets cleaned up. This config may be removed in the future. | 3.0.0 |
spark.storage.cleanupFilesAfterExecutorExit | true | Enable cleanup non-shuffle files(such as temp. shuffle blocks, cached RDD/broadcast blocks, spill files, etc) of worker directories following executor exits. Note that this doesn't overlap with `spark.worker.cleanup.enabled`, as this enables cleanup of non-shuffle files in local directories of a dead executor, while `spark.worker.cleanup.enabled` enables cleanup of all files/subdirectories of a stopped and timeout application. This only affects Standalone mode, support of other cluster managers can be added in the future. | 2.4.0 |
spark.worker.ui.compressedLogFileLengthCacheSize | 100 | For compressed log files, the uncompressed file can only be computed by uncompressing the files. Spark caches the uncompressed file size of compressed log files. This property controls the cache size. | 2.0.2 |
2-3. state store 문제?
마지막으로, stateful operation 입니다.
Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation [1]
위 글은 Spark 공식문서 내용 중 하나로, Spark Structured Streaming에서 stateful operation은 이전 데이터를 활용해야 되는 집계와 같은 operation을 말하며, 이를 위해 state라는 data를 디스크 및 메모리에 저장을 해야 합니다.
Driver stack trace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:274)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
... 34 more
Caused by: java.io.IOException: No space left on device
at java.base/java.io.FileOutputStream.writeBytes(Native Method)
at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:182)
at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:191)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.$anonfun$writeIndexFileAndCommit$3(IndexShuffleBlockResolver.scala:169)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1369)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:169)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:164)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.io.IOException: No space left on device
at java.base/java.io.FileOutputStream.writeBytes(Native Method)
at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
at java.base/java.io.DataOutputStream.flush(DataOutputStream.java:123)
at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:182)
... 13 more
위 코드 내용을 보면 No space left on device와 같은 오류 내용을 확인 할 수 있습니다. 이는 state라는 데이터를 저장하는데 과정에 있어서 disk가 꽉차는 경우로 spark 애플리케이션 자체가 종료되며 아래와 같은 오류를 마주 할 수 있습니다.
4. 해결 방안
일단 서버에 disk 내용을 확인해봅시다.
## 디스크 내용 확인
df -k
df -h
위 명령어를 통해 아래 사진과 같은 결과를 확인할 수 있습니다.
위 사진 중 /dev/sda 부분에 문제가 발생 됐을 가능성이 높습니다. (저의 경우 3.6T가 가득 차 있어서 정리가 된 화면입니다. )
## root로 이동
cd /
## 용량을 많이 차지하는 상위 20개 sorting
du -max / | sort -rn | head -20
루트로 이동 후 용량을 많이 차지하는 상위 20개의 파일을 검색해 본 결과 아래 사진과 같이 디스크 내에 꽤나 많은 용량을 차지하는 것을 확인할 수 있습니다.
이제 어디서 용량이 많이 발생됐는지 확인도 했으니 안전하게 제거하면 될 것 같습니다.
## 제거
rm -rf {dict}
이제 다시 애플리케이션을 동작하면 문제없이 동작하는 것을 확인 할 수 있습니다.
5. 마치며
이번 포스팅에서는 spark 애플리케이션을 운영하면서 누적될 수 있는 데이터로 인해서 애플리케이션이 종료되는 것을 확인했고, 그 문제를 해결하는 과정까지 보았습니다.
이러한 문제는 흔하지 않겠습니다만, 혹시 과정을 정리해보았습니다.
REFERENCE
Structured Streaming Programming Guide - Spark 3.3.2 Documentation
Structured Streaming Programming Guide Overview Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on s
spark.apache.org
[2]. https://medium.com/hackernoon/how-to-log-in-apache-spark-f4204fad78a
How to log in Apache Spark
An important part of any application is the underlying log system we incorporate into it. Logs are not only for debugging and traceability…
medium.com
Purpose of spark worker directories in /work/app-xxxxxxx/{0, 1, 2, ...} and periodic cleanup
I'm running a Spark 3.4 long running structured streaming job. Whenever the job starts, an application directory of the form app-xxxxxxxxxx is created for the job in the work directory. However wit...
stackoverflow.com
[4]. https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts