spark-udf-oom

Spark 2 UDF에서의 NullPointerException Cases

Hive QL보다 Spark가 가지는 장점 중 하나는 자유도 높게 원하는 동작을 구현할 수 있다는 점이다. 그것이 가능한 하나의 이유는 UDF (User Defined Function)일 것이고, 일반적인 개발자라면 쉽게 작은 함수 블록을 선언 및 구현 후 Spark DataFrame에 적용할 수 있다. 다만, 이런 UDF의 경우 디버깅이 쉽지 않고 에러 메시지를 만나면 워낙 많은 케이스가 존재해서 내가 겪었던 몇 가지 사례를 정리해서 올려본다.

UDF가 참조하는 Column이 null 값을 가지고 있는 경우

가장 빨리 의심해봐야 하는 부분이다. 입력으로 들어가는 값이 null이라면 연산 과정에서 OOM이 발생한다.
아래와 같이 null 값을 가지고 있는 DataFrame이 있다고 하자.

val testDF = spark.createDataFrame(Seq(
  "hello", null, "world"
).map(Tuple1.apply)).toDF("text")

testDF.show
// +-----+
// | text|
// +-----+
// |hello|
// | null|
// |world|
// +-----+

여러 가지 방법이 있겠지만, 표적 하는 Column에서 null을 가지는 record를 제거하는 방법이 있다

testDF.filter(col("text").isNotNull).show
// +-----+
// | text|
// +-----+
// |hello|
// |world|
// +-----+

또는 UDF 내부에서 null 체크를 하는 방법도 있다.

val myUpperUDF = udf[String, String] { input =>
    input match {
        case null => "NA"
        case _ => input.toUpperCase
    }
}

testDF.withColumn("upper_text", myUpperUDF(col("text"))).show
// +-----+----------+
// | text|upper_text|
// +-----+----------+
// |hello|     HELLO|
// | null|        NA|
// |world|     WORLD|
// +-----+----------+

마지막으로, 첫 번째 방법처럼 Column 하나하나에 대해서 점검하는 것이 아니라 DataFrame에서 하나라도 null이나 missing value를 가지는 record들을 처리하는 방법도 있다

naDroppedDF = testDF.na.drop

naDroppedDF.show
// +-----+
// | text|
// +-----+
// |hello|
// |world|
// +-----+

UDF의 Input/Output이 호출/반환하는 Column과 Type이 일치하지 않는 경우

마찬가지로 자주 실수할 수 있는 부분이다. 대부분 UDF Input/Output Type과 호출/반환 Column Type이 다르면 빌드 시 에러가 뜬다. 하지만 간혹 실제 실행 전까지 UDF의 Type과 호출 인자의 Type이 다르더라도 문제없이 컴파일 및 빌드가 되는 경우가 있었다.

case class Geultto(word: String, rate: Double, success: Int)

val testDF = spark.createDataFrame(Seq(
  Geultto("hello", 1.0, 3), 
  Geultto("world", 1.5, 4), 
  Geultto("geultto", 12.4, 5)
)).toDF

testDF.show
// +-------+----+-------+
// |   word|rate|success|
// +-------+----+-------+
// |  hello| 1.0|      3|
// |  world| 1.5|      4|
// |geultto|12.4|      5|
// +-------+----+-------+

printSchema를 통해서 Type 확인이 가능하니, OOM이 발생한다면 우선 Null Value 체크 후 Null Value가 없다면 Type이 일치하는지 확인해보자

testDF.printSchema
// root
//  |-- word: string (nullable = true)
//  |-- rate: double (nullable = false)
//  |-- success: integer (nullable = false)

UDF 내에서 DataFrame 연산

입사 초기에 내가 겪은 바보 같지만 있을 법한 실수였다. 코드를 통해서 예제를 보자.

case class Target(wordListOne: Seq[String], WordListTwo: Seq[String])

case class WordSimilarity(first: String, second: String, similarity: Double)

val targetData = Seq(
  Target(Seq("Spark", "Wrong", "Something"), Seq("Java", "Grape", "Banana")),
  Target(Seq("Java", "Scala"), Seq("Scala", "Banana")),
  Target(Seq(""), Seq("Grape", "Banana")),
  Target(Seq(""), Seq("")))
val targets = spark.createDataset(targetData)

targets.show(truncate = false)
// +-------------------------+---------------------+
// |wordListOne              |WordListTwo          |
// +-------------------------+---------------------+
// |[Spark, Wrong, Something]|[Java, Grape, Banana]|
// |[Java, Scala]            |[Scala, Banana]      |
// |[]                       |[Grape, Banana]      |
// |[]                       |[]                   |
// +-------------------------+---------------------+

val similarityData = Seq(
  WordSimilarity("Spark", "Java", 0.8),
  WordSimilarity("Scala", "Spark", 0.9),
  WordSimilarity("Java", "Scala", 0.9),
  WordSimilarity("Apple", "Grape", 0.66),
  WordSimilarity("Scala", "Apple", -0.1),
  WordSimilarity("Gine", "Spark", 0.1))
val dict = spark.createDataset(similarityData)

dict.show(truncate = false)
// +-----+------+----------+
// |first|second|similarity|
// +-----+------+----------+
// |Spark|Java  |0.8       |
// |Scala|Spark |0.9       |
// |Java |Scala |0.9       |
// |Apple|Grape |0.66      |
// |Scala|Apple |-0.1      |
// |Gine |Spark |0.1       |
// +-----+------+----------+

val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) =>
  dict.filter(
    (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7
  ).count
)

val countDF = targets.withColumn("positive_count", countPositiveSimilarity($"wordListOne", $"wordListTwo"))
// Expected result (?)
// +-------------------------+---------------------+--------------+
// |wordListOne              |WordListTwo          |positive_count|
// +-------------------------+---------------------+--------------+
// |[Spark, Wrong, Something]|[Java, Grape, Banana]|1             |
// |[Java, Scala]            |[Scala, Banana]      |1             |
// |[]                       |[Grape, Banana]      |0             |
// |[]                       |[]                   |0             |
// +-------------------------+---------------------+--------------+

회사 내의 Confidential한 내용을 예제 코드로 바꾼 내용이다. 하고 싶었던 것은 String의 List Column 2개에 대해서 사전 정보의 Similarity가 일정 값 이상인 Pair의 개수를 구하는 것이 목적인 코드이다. 예를 들어 Expected result의 첫 번째 줄은 wordListOne의 Spark와 wordListTwo의 Java는 dict에 존재하기 때문에 positive_count가 1이 생기게 된다.

이 코드는 언뜻 봤을 때 그럴듯하다고 생각할 수도 있다 (내 경우 Spark를 제대로 공부하지 않고 실무에 들어가서 그렇게 생각했었다...) 하지만 Spark는 Nested operation을 DataFrame에 대해서 지원하지 않기 때문에 이런 방식의 UDF는 사용할 수 없다. 쉽게 얘기해서 UDF내에서 DataFrame 연산을 사용할 수 없다.

따라서 이런 경우 미리 DataFrame을 Join 등으로 합친 후 UDF를 새로 구성하거나, 원하는 동작의 요구 사항 및 구조를 새로 정의해야 한다. 내 경우 비슷한 작업을 LSH를 통해서 해결하였다.

참고 자료 (Spark UDF Nested Operation 관련)

TL;DR

  1. UDF를 굳이 만들지 않고 해결할 수 있다면 Spark에서 제공하는 함수를 활용하자
  2. UDF를 사용한다면, 해당 Column의 null 값 체크 및 호출/반환의 Type이 일치하는지 확인하자
  3. UDF 내에서는 DataFrame 연산을 할 수 없다. 설계를 바꾸거나 방법을 우회하자