2014년 12월 14일 일요일

Spark 사용기 5 - SparkSQL 실무 활용기

Spark 를 실무에서 사용하기 시작한지 어언 2달...
아직까지는 Heavy Batch 일부와 Adhoc 개발 부분에서만 사용중이지만, 쓰면 쓸수록, 그 압도적인 성능에 감탄을 금할 수 없다.

물론 시행착오가 없었던 것은 아니다. 실무 사용하기 전 검증 단계에서 사용했던, Spark + tachyon + Shark 는  Memory 관련 에러가 빈번했었고, Public 가상화 환경에서의 태스트 이긴 하였으나, Spark + Yarn 의 태스트는 Spark StandAlone 에 비하여 성능이 더 떨어지는 현상도 접한 바 있다. ( Yarn 테스트는 아직 충분히 되지 않았다. Production 환경에서 여러가지 설정을 바꾸어가며 좀더 지켜봐야 하는 부분이다.)

오늘은 Spark + Scala M/R + SparkSQL 을 실무 사용하며, 발견한 SparkSQL 의 미숙함에 대하여 블로그 해보겠다.

우선 최근에 접했던 것은 Not IN 요건.
결과부터 말하자면, SparkSQL 은 Not IN 을 아직 지원하지 않는다. 물론, 간단한 요건이면, 로딩하는 순간에 filter 로 제외 시키고 불러 올 수 있다. 그렇게 처리 가능한 루틴은 매우 빠르게 Not IN 처리가 가능하다. 단, 아래 SQL 처럼 무언가 요건이 내포된 경우의 Not IN 은 Filter 로 처리하기 힘든 요건이다.

우선 아래 쿼리는 SparkSQL 이 지원하지 않는다는 것이 다소 놀랍지는 않았다. Hive 도 아래 쿼리를 지원하지 않음을 기 알고 있었기 때문이다. (hive0.13 이상은 지원하는지 체크해보지는 않았다... 우리 Production 기준으로 Hive 도 아래 Not IN 쿼리를 지원하지 않는다.)

[1] 첫번째 시도.

         val query3 = """
                   select
                        count(distinct pcId)
                   from
                        everyClickFlat_PC
                   where
                        mbrId = '' and
                        url like '%s' and
                        pcId not in (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like '%s')
                   """.format(classRow.pc_url,classRow.pc_url)
          val results3 = sqlContext.sql(query3)

이렇게 수행하면 아래와 같은 에러가 발생한다.

java.lang.RuntimeException: [9.38] failure: string literal expected
                        pcId not in (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like 'http://www.ssg.com/')
                                     ^
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
        at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260)
        at $iwC$$iwC.inline(<console>:220)
        at $iwC$$iwC$$anonfun$dayJob$1.apply(<console>:80)
        at $iwC$$iwC$$anonfun$dayJob$1.apply(<console>:80)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at $iwC$$iwC.dayJob(<console>:80)
        at $iwC$$iwC$$anonfun$1.apply$mcVI$sp(<console>:56)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at $iwC$$iwC.<init>(<console>:41)
        at $iwC.<init>(<console>:345)
        at <init>(<console>:347)
        at .<init>(<console>:351)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at org.apache.spark.repl.SparkILoop.pasteCommand(SparkILoop.scala:782)
        at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:309)
        at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:309)
        at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
        at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
        at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:766)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

[2] 두번째 시도



그냥 시도 이다. 쿼리가 정확하게 Not In 과 동일한 원하는 결과물을 가져다 줄지 검증은 아래처럼 너무 오래걸려서 검증해보지도 못하고 kill 하였다...

 val query3 = """
                   select
                        count(distinct A.pcId)
                   from
                        everyClickFlat_PC as A, everyClickFlat_PC as B
                   where
                        A.mbrId = '' and
                        A.url like '%s' and
                        B.mbrId != '' and
                        B.url like '%s' and
                        A.pcId != B.pcId
                   """.format(classRow.pc_url,classRow.pc_url)
          val results3 = sqlContext.sql(query3)

이렇게 했더니, 에러는 안나지만.... 무쟈게 오래 걸린다. 5분정도 기다리다가 kill..

[3] 세번째 시도
이번 역시 그냥 시도이다.. Not In 과 비스무리한 결과라도 얻을라고 했던 Just 시도 이다..

         val query3 = """
                   select
                        count(distinct A.pcId)
                   from
                         (select distinct pcId from everyClickFlat_PC where mbrId = '' and url like '%s') as A, 
                         (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like '%s') as B
                   where
                        A.pcId != B.pcId
                   """.format(classRow.pc_url,classRow.pc_url)
          val results3 = sqlContext.sql(query3)

이 역시 무쟈게 오래 걸린다.. 이 역시 8분 정도 기다리다가 kill...

[4] 네번째 시도

사실 Not In 은 Left Semi Join 을 이용하면 동작할것임은 어느정도 예상하고는 있었다. Not In 을 지원하는 RDBMS 에서 조차, 성능상 이유로, Not In 보다는 semi join 을 많이 사용하곤 하므로...
left semi join 사용 했더니 약 5초만에 결과가 나왔다. 참고로 저 테이블은 300Gb 가 넘는 크기의 테이블이다. (물론 성능 극대화를 위하여, 파티션 처리를, 먼저 전처리 해두었다.)

        val query3_3 = """
                   select
                          count(distinct results3_1.pcId)
                   from
                         results3_1 left semi join results3_2 on results3_1.pcId != results3_2.pcId
                   """
          val results3_3 = sqlContext.sql(query3_3)

          pc_uv2 = results3_3.collect()(0)(0).toString.toDouble

[5] 결론

물론 이번에는 실험해보지 않았지만, SparkSQL 을 우회해서 사용하지 않고, Scala 로 그냥 구현하면 Not In 을 훨씬 빠르게 구현할 수 있을 것이나, 300Gb 가 5초 정도면, SparkSQL 도 쓸만한 속도이다... 시도해보진 않았지만, 같은 쿼리를 Hive 환경에서 테스트 했다면, 수분 혹은 수십분도 걸릴 수 있었을 것이다.

Hive 나 SparkSQL 이 RDBMS 에 비하여 지원하지 않는 아쉬운 기능 중, 실무에서 제일 쉽게 접하는 것중 첫번째 정도는 inline query 가 아닐 까 한다. 두번째 정도가 Not In 일것 같다. inline 쿼리는 scala 로 처리하면, 매우 빠르게 구현이 가능했고, Not In 은 SparkSQL 이 직접 지원하진 않지만, Left Semi Join 이 대안 정도로 쓸만 한것 같다가 오늘의 결론이다...



댓글 없음:

댓글 쓰기