아직까지는 Heavy Batch 일부와 Adhoc 개발 부분에서만 사용중이지만, 쓰면 쓸수록, 그 압도적인 성능에 감탄을 금할 수 없다.
물론 시행착오가 없었던 것은 아니다. 실무 사용하기 전 검증 단계에서 사용했던, Spark + tachyon + Shark 는 Memory 관련 에러가 빈번했었고, Public 가상화 환경에서의 태스트 이긴 하였으나, Spark + Yarn 의 태스트는 Spark StandAlone 에 비하여 성능이 더 떨어지는 현상도 접한 바 있다. ( Yarn 테스트는 아직 충분히 되지 않았다. Production 환경에서 여러가지 설정을 바꾸어가며 좀더 지켜봐야 하는 부분이다.)
오늘은 Spark + Scala M/R + SparkSQL 을 실무 사용하며, 발견한 SparkSQL 의 미숙함에 대하여 블로그 해보겠다.
우선 최근에 접했던 것은 Not IN 요건.
결과부터 말하자면, SparkSQL 은 Not IN 을 아직 지원하지 않는다. 물론, 간단한 요건이면, 로딩하는 순간에 filter 로 제외 시키고 불러 올 수 있다. 그렇게 처리 가능한 루틴은 매우 빠르게 Not IN 처리가 가능하다. 단, 아래 SQL 처럼 무언가 요건이 내포된 경우의 Not IN 은 Filter 로 처리하기 힘든 요건이다.
우선 아래 쿼리는 SparkSQL 이 지원하지 않는다는 것이 다소 놀랍지는 않았다. Hive 도 아래 쿼리를 지원하지 않음을 기 알고 있었기 때문이다. (hive0.13 이상은 지원하는지 체크해보지는 않았다... 우리 Production 기준으로 Hive 도 아래 Not IN 쿼리를 지원하지 않는다.)
[1] 첫번째 시도.
val query3 = """
select
count(distinct pcId)
from
everyClickFlat_PC
where
mbrId = '' and
select
count(distinct pcId)
from
everyClickFlat_PC
where
mbrId = '' and
url like '%s' and
pcId not in (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like '%s')
""".format(classRow.pc_url,classRow.pc_url)
val results3 = sqlContext.sql(query3)
""".format(classRow.pc_url,classRow.pc_url)
val results3 = sqlContext.sql(query3)
이렇게 수행하면 아래와 같은 에러가 발생한다.
java.lang.RuntimeException: [9.38] failure: string literal expected
pcId not in (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like 'http://www.ssg.com/')
^
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260)
at $iwC$$iwC.inline(<console>:220)
at $iwC$$iwC$$anonfun$dayJob$1.apply(<console>:80)
at $iwC$$iwC$$anonfun$dayJob$1.apply(<console>:80)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at $iwC$$iwC.dayJob(<console>:80)
at $iwC$$iwC$$anonfun$1.apply$mcVI$sp(<console>:56)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at $iwC$$iwC.<init>(<console>:41)
at $iwC.<init>(<console>:345)
at <init>(<console>:347)
at .<init>(<console>:351)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.pasteCommand(SparkILoop.scala:782)
at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:309)
at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:309)
at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:766)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[2] 두번째 시도
그냥 시도 이다. 쿼리가 정확하게 Not In 과 동일한 원하는 결과물을 가져다 줄지 검증은 아래처럼 너무 오래걸려서 검증해보지도 못하고 kill 하였다...
val query3 = """
select
count(distinct A.pcId)
from
everyClickFlat_PC as A, everyClickFlat_PC as B
where
A.mbrId = '' and
select
count(distinct A.pcId)
from
everyClickFlat_PC as A, everyClickFlat_PC as B
where
A.mbrId = '' and
A.url like '%s' and
B.mbrId != '' and
B.url like '%s' and
A.pcId != B.pcId
""".format(classRow.pc_url,classRow.pc_url)
val results3 = sqlContext.sql(query3)
val results3 = sqlContext.sql(query3)
이렇게 했더니, 에러는 안나지만.... 무쟈게 오래 걸린다. 5분정도 기다리다가 kill..
[3] 세번째 시도
이번 역시 그냥 시도이다.. Not In 과 비스무리한 결과라도 얻을라고 했던 Just 시도 이다..
val query3 = """
select
count(distinct A.pcId)
from
(select distinct pcId from everyClickFlat_PC where mbrId = '' and url like '%s') as A,
select
count(distinct A.pcId)
from
(select distinct pcId from everyClickFlat_PC where mbrId = '' and url like '%s') as A,
(select distinct pcId from everyClickFlat_PC where mbrId != '' and url like '%s') as B
where
A.pcId != B.pcId
""".format(classRow.pc_url,classRow.pc_url)
val results3 = sqlContext.sql(query3)
val results3 = sqlContext.sql(query3)
이 역시 무쟈게 오래 걸린다.. 이 역시 8분 정도 기다리다가 kill...
[4] 네번째 시도
사실 Not In 은 Left Semi Join 을 이용하면 동작할것임은 어느정도 예상하고는 있었다. Not In 을 지원하는 RDBMS 에서 조차, 성능상 이유로, Not In 보다는 semi join 을 많이 사용하곤 하므로...
left semi join 사용 했더니 약 5초만에 결과가 나왔다. 참고로 저 테이블은 300Gb 가 넘는 크기의 테이블이다. (물론 성능 극대화를 위하여, 파티션 처리를, 먼저 전처리 해두었다.)
val query3_3 = """
select
count(distinct results3_1.pcId)
from
select
count(distinct results3_1.pcId)
from
results3_1 left semi join results3_2 on results3_1.pcId != results3_2.pcId
"""
val results3_3 = sqlContext.sql(query3_3)
val results3_3 = sqlContext.sql(query3_3)
pc_uv2 = results3_3.collect()(0)(0).toString.toDouble
[5] 결론
물론 이번에는 실험해보지 않았지만, SparkSQL 을 우회해서 사용하지 않고, Scala 로 그냥 구현하면 Not In 을 훨씬 빠르게 구현할 수 있을 것이나, 300Gb 가 5초 정도면, SparkSQL 도 쓸만한 속도이다... 시도해보진 않았지만, 같은 쿼리를 Hive 환경에서 테스트 했다면, 수분 혹은 수십분도 걸릴 수 있었을 것이다.
Hive 나 SparkSQL 이 RDBMS 에 비하여 지원하지 않는 아쉬운 기능 중, 실무에서 제일 쉽게 접하는 것중 첫번째 정도는 inline query 가 아닐 까 한다. 두번째 정도가 Not In 일것 같다. inline 쿼리는 scala 로 처리하면, 매우 빠르게 구현이 가능했고, Not In 은 SparkSQL 이 직접 지원하진 않지만, Left Semi Join 이 대안 정도로 쓸만 한것 같다가 오늘의 결론이다...
댓글 없음:
댓글 쓰기