[Troubleshooting] Spark2 UDF NPE Cases
Spark 2 UDF에서의 NullPointerException Cases
Hive QL보다 Spark가 가지는 장점 중 하나는 자유도 높게 원하는 동작을 구현할 수 있다는 점이다. 그것이 가능한 하나의 이유는 UDF (User Defined Function)일 것이고, 일반적인 개발자라면 쉽게 작은 함수 블록을 선언 및 구현 후 Spark DataFrame에 적용할 수 있다. 다만, 이런 UDF의 경우 디버깅이 쉽지 않고 에러 메시지를 만나면 워낙 많은 케이스가 존재해서 내가 겪었던 몇 가지 사례를 정리해서 올려본다.
UDF가 참조하는 Column이 null 값을 가지고 있는 경우
가장 빨리 의심해봐야 하는 부분이다. 입력으로 들어가는 값이 null이라면 연산 과정에서 OOM이 발생한다.
아래와 같이 null 값을 가지고 있는 DataFrame이 있다고 하자.
val testDF = spark.createDataFrame(Seq(
"hello", null, "world"
).map(Tuple1.apply)).toDF("text")
testDF.show
// +-----+
// | text|
// +-----+
// |hello|
// | null|
// |world|
// +-----+
여러 가지 방법이 있겠지만, 표적 하는 Column에서 null을 가지는 record를 제거하는 방법이 있다
testDF.filter(col("text").isNotNull).show
// +-----+
// | text|
// +-----+
// |hello|
// |world|
// +-----+
또는 UDF 내부에서 null 체크를 하는 방법도 있다.
val myUpperUDF = udf[String, String] { input =>
input match {
case null => "NA"
case _ => input.toUpperCase
}
}
testDF.withColumn("upper_text", myUpperUDF(col("text"))).show
// +-----+----------+
// | text|upper_text|
// +-----+----------+
// |hello| HELLO|
// | null| NA|
// |world| WORLD|
// +-----+----------+
마지막으로, 첫 번째 방법처럼 Column 하나하나에 대해서 점검하는 것이 아니라 DataFrame에서 하나라도 null이나 missing value를 가지는 record들을 처리하는 방법도 있다
naDroppedDF = testDF.na.drop
naDroppedDF.show
// +-----+
// | text|
// +-----+
// |hello|
// |world|
// +-----+
UDF의 Input/Output이 호출/반환하는 Column과 Type이 일치하지 않는 경우
마찬가지로 자주 실수할 수 있는 부분이다. 대부분 UDF Input/Output Type과 호출/반환 Column Type이 다르면 빌드 시 에러가 뜬다. 하지만 간혹 실제 실행 전까지 UDF의 Type과 호출 인자의 Type이 다르더라도 문제없이 컴파일 및 빌드가 되는 경우가 있었다.
case class Geultto(word: String, rate: Double, success: Int)
val testDF = spark.createDataFrame(Seq(
Geultto("hello", 1.0, 3),
Geultto("world", 1.5, 4),
Geultto("geultto", 12.4, 5)
)).toDF
testDF.show
// +-------+----+-------+
// | word|rate|success|
// +-------+----+-------+
// | hello| 1.0| 3|
// | world| 1.5| 4|
// |geultto|12.4| 5|
// +-------+----+-------+
printSchema를 통해서 Type 확인이 가능하니, OOM이 발생한다면 우선 Null Value 체크 후 Null Value가 없다면 Type이 일치하는지 확인해보자
testDF.printSchema
// root
// |-- word: string (nullable = true)
// |-- rate: double (nullable = false)
// |-- success: integer (nullable = false)
UDF 내에서 DataFrame 연산
입사 초기에 내가 겪은 바보 같지만 있을 법한 실수였다. 코드를 통해서 예제를 보자.
case class Target(wordListOne: Seq[String], WordListTwo: Seq[String])
case class WordSimilarity(first: String, second: String, similarity: Double)
val targetData = Seq(
Target(Seq("Spark", "Wrong", "Something"), Seq("Java", "Grape", "Banana")),
Target(Seq("Java", "Scala"), Seq("Scala", "Banana")),
Target(Seq(""), Seq("Grape", "Banana")),
Target(Seq(""), Seq("")))
val targets = spark.createDataset(targetData)
targets.show(truncate = false)
// +-------------------------+---------------------+
// |wordListOne |WordListTwo |
// +-------------------------+---------------------+
// |[Spark, Wrong, Something]|[Java, Grape, Banana]|
// |[Java, Scala] |[Scala, Banana] |
// |[] |[Grape, Banana] |
// |[] |[] |
// +-------------------------+---------------------+
val similarityData = Seq(
WordSimilarity("Spark", "Java", 0.8),
WordSimilarity("Scala", "Spark", 0.9),
WordSimilarity("Java", "Scala", 0.9),
WordSimilarity("Apple", "Grape", 0.66),
WordSimilarity("Scala", "Apple", -0.1),
WordSimilarity("Gine", "Spark", 0.1))
val dict = spark.createDataset(similarityData)
dict.show(truncate = false)
// +-----+------+----------+
// |first|second|similarity|
// +-----+------+----------+
// |Spark|Java |0.8 |
// |Scala|Spark |0.9 |
// |Java |Scala |0.9 |
// |Apple|Grape |0.66 |
// |Scala|Apple |-0.1 |
// |Gine |Spark |0.1 |
// +-----+------+----------+
val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) =>
dict.filter(
(($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7
).count
)
val countDF = targets.withColumn("positive_count", countPositiveSimilarity($"wordListOne", $"wordListTwo"))
// Expected result (?)
// +-------------------------+---------------------+--------------+
// |wordListOne |WordListTwo |positive_count|
// +-------------------------+---------------------+--------------+
// |[Spark, Wrong, Something]|[Java, Grape, Banana]|1 |
// |[Java, Scala] |[Scala, Banana] |1 |
// |[] |[Grape, Banana] |0 |
// |[] |[] |0 |
// +-------------------------+---------------------+--------------+
회사 내의 Confidential한 내용을 예제 코드로 바꾼 내용이다. 하고 싶었던 것은 String의 List Column 2개에 대해서 사전 정보의 Similarity가 일정 값 이상인 Pair의 개수를 구하는 것이 목적인 코드이다. 예를 들어 Expected result의 첫 번째 줄은 wordListOne의 Spark와 wordListTwo의 Java는 dict에 존재하기 때문에 positive_count가 1이 생기게 된다.
이 코드는 언뜻 봤을 때 그럴듯하다고 생각할 수도 있다 (내 경우 Spark를 제대로 공부하지 않고 실무에 들어가서 그렇게 생각했었다...) 하지만 Spark는 Nested operation을 DataFrame에 대해서 지원하지 않기 때문에 이런 방식의 UDF는 사용할 수 없다. 쉽게 얘기해서 UDF내에서 DataFrame 연산을 사용할 수 없다.
따라서 이런 경우 미리 DataFrame을 Join 등으로 합친 후 UDF를 새로 구성하거나, 원하는 동작의 요구 사항 및 구조를 새로 정의해야 한다. 내 경우 비슷한 작업을 LSH를 통해서 해결하였다.
참고 자료 (Spark UDF Nested Operation 관련)
- https://stackoverflow.com/questions/47111607/why-does-this-spark-code-make-nullpointerexception?noredirect=1#comment81173197_47111607
- https://stackoverflow.com/questions/38244766/getting-null-pointer-exception-while-accessing-broadcasted-dataframe?noredirect=1&lq=1
TL;DR
- UDF를 굳이 만들지 않고 해결할 수 있다면 Spark에서 제공하는 함수를 활용하자
- UDF를 사용한다면, 해당 Column의 null 값 체크 및 호출/반환의 Type이 일치하는지 확인하자
- UDF 내에서는 DataFrame 연산을 할 수 없다. 설계를 바꾸거나 방법을 우회하자
댓글
이 글 공유하기
다른 글
-
Adapter Pattern
Adapter Pattern
2024.03.31Adapter pattern? 여러분들은 어댑터라는 용어를 어디서 들어보셨나요? 저는 해외여행 시 필수품 중 하나인 110v 어댑터가 먼저 생각이 나는데요. 110v 어댑터는 우리가 한국에서 사용하는 220v에 디자인 되어있는 전자기기를 110v 단자에서도 사용할 수 있게 도와주는 중간 매개체 정도로 표현할 수 있을 것 같습니다. 소프트웨어 개발에서도 Adapter Pattern이라는 것이 존재하고, 위에서 얘기한 110v 어댑터와 비슷한 역할을 위해서 도와주는 하나의 디자인 패턴이라고 볼 수 있겠는데요. 여러 디자인 패턴과 마찬가지로 어댑터 패턴 역시 객체지향 설계의 여러 원칙을 준수하기 위해서, 그리고 코드의 재사용 및 유연성을 위해서 도입된 패턴입니다. 좀 더 자세한 내용은 아래에서 알아보겠습니다 … -
PyTorch의 모듈 import는 어떻게 동작하는 걸까?
PyTorch의 모듈 import는 어떻게 동작하는 걸까?
2024.02.18nn.Linear(….)? 저를 포함하여 PyTorch를 사용하는 대부분은 아래처럼 필요한 torch 관련 패키지를 import 하여 사용하는 것에 아주 익숙할 것입니다 import torch from torch import nn m = nn.Linear(20, 30) input = torch.randn(128, 20) output = m(input) print(output.size()) nn 패키지에서는 Linear 뿐만 아니라 PyTorch에서 제공하는 다양한 Layer (e.g., Dropout, BatchNorm 등)과 Loss (e.g., KLD) 그리고 Container (ModuleList) 등을 사용할 수 있는데요. 어느 날 회사 업무 중 PyTorch 내부 코드 및 구조를 살펴볼 일이 … -
Singleton Pattern
Singleton Pattern
2024.02.04What is Singleton pattern? 싱글턴 패턴은 클래스가 하나의 유일한 인스턴스만 가지면서, 해당 인스턴스에 대해 전역 액세스를 제공하는 디자인 패턴입니다. 데이터베이스 객체처럼 프로그램 전반에 걸쳐서 단 하나의 유일한 객체만 존재하며, 여러 클라이언트에서 호출이 되어야 하는 경우 싱글턴 패턴을 고려해 볼 수 있습니다. 또한 전역 변수와 비슷한 효과를 지니지만, 좀 더 엄밀한 제어가 가능합니다. 이번 포스트에서는 싱글턴 패턴의 목적과 구현 방법, 장/단점 그리고 실제 사용 사례를 다뤄보겠습니다. How to implement? 싱글턴 패턴은 GoF에서 소개하는 여러 가지 디자인 패턴 중 구현 난이도가 쉬운 편에 속하는데요. 우선 클래스 다이어그램을 먼저 보고 실제 구현된 코드를 같이 보면서… -
한국어 형태소 분석기 성능 비교
한국어 형태소 분석기 성능 비교
2018.12.10형태소 분석기 비교 자연언어처리 모델 설계 전 데이터 전처리는 매우 중요합니다. 특히, 한국어 자연언어처리에서는 문장 분해의 여러 가지 최소 단위를 가질 수 있습니다. 한국어에서 최소 단위는 자소, 음절, 형태소 등이 될 수 있고 자소나 음절 분해보다 형태소 분해는 문장에서의 위치나 문맥에 따라 달라지므로 쉽지 않은 문제가 있는데요. 예를 들어 '하늘을 나는 자동차'와 '나는 밥을 먹는다'에서 '나는'은 문맥에 따라 주어 '나'를 의미할 수도 있고 '날다'를 의미할 수도 있습니다. 최근 카카오에서 딥러닝 기반의 형태소 분석기 khaiii를 발표했습니다. 기존에도 여러 형태소 분석기(한나눔, KOMORAN 등)들이 존재했지만, 딥러닝 기반의 형태소 분석기는 제가 아는 한 처음이었기 때문에 관심이 생겨 여러…
댓글을 사용할 수 없습니다.