2014년 12월 14일 일요일

Spark 사용기 5 - SparkSQL 실무 활용기

Spark 를 실무에서 사용하기 시작한지 어언 2달...
아직까지는 Heavy Batch 일부와 Adhoc 개발 부분에서만 사용중이지만, 쓰면 쓸수록, 그 압도적인 성능에 감탄을 금할 수 없다.

물론 시행착오가 없었던 것은 아니다. 실무 사용하기 전 검증 단계에서 사용했던, Spark + tachyon + Shark 는  Memory 관련 에러가 빈번했었고, Public 가상화 환경에서의 태스트 이긴 하였으나, Spark + Yarn 의 태스트는 Spark StandAlone 에 비하여 성능이 더 떨어지는 현상도 접한 바 있다. ( Yarn 테스트는 아직 충분히 되지 않았다. Production 환경에서 여러가지 설정을 바꾸어가며 좀더 지켜봐야 하는 부분이다.)

오늘은 Spark + Scala M/R + SparkSQL 을 실무 사용하며, 발견한 SparkSQL 의 미숙함에 대하여 블로그 해보겠다.

우선 최근에 접했던 것은 Not IN 요건.
결과부터 말하자면, SparkSQL 은 Not IN 을 아직 지원하지 않는다. 물론, 간단한 요건이면, 로딩하는 순간에 filter 로 제외 시키고 불러 올 수 있다. 그렇게 처리 가능한 루틴은 매우 빠르게 Not IN 처리가 가능하다. 단, 아래 SQL 처럼 무언가 요건이 내포된 경우의 Not IN 은 Filter 로 처리하기 힘든 요건이다.

우선 아래 쿼리는 SparkSQL 이 지원하지 않는다는 것이 다소 놀랍지는 않았다. Hive 도 아래 쿼리를 지원하지 않음을 기 알고 있었기 때문이다. (hive0.13 이상은 지원하는지 체크해보지는 않았다... 우리 Production 기준으로 Hive 도 아래 Not IN 쿼리를 지원하지 않는다.)

[1] 첫번째 시도.

         val query3 = """
                   select
                        count(distinct pcId)
                   from
                        everyClickFlat_PC
                   where
                        mbrId = '' and
                        url like '%s' and
                        pcId not in (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like '%s')
                   """.format(classRow.pc_url,classRow.pc_url)
          val results3 = sqlContext.sql(query3)

이렇게 수행하면 아래와 같은 에러가 발생한다.

java.lang.RuntimeException: [9.38] failure: string literal expected
                        pcId not in (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like 'http://www.ssg.com/')
                                     ^
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)
        at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:73)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:260)
        at $iwC$$iwC.inline(<console>:220)
        at $iwC$$iwC$$anonfun$dayJob$1.apply(<console>:80)
        at $iwC$$iwC$$anonfun$dayJob$1.apply(<console>:80)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at $iwC$$iwC.dayJob(<console>:80)
        at $iwC$$iwC$$anonfun$1.apply$mcVI$sp(<console>:56)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at $iwC$$iwC.<init>(<console>:41)
        at $iwC.<init>(<console>:345)
        at <init>(<console>:347)
        at .<init>(<console>:351)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at org.apache.spark.repl.SparkILoop.pasteCommand(SparkILoop.scala:782)
        at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:309)
        at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:309)
        at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
        at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
        at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:766)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

[2] 두번째 시도



그냥 시도 이다. 쿼리가 정확하게 Not In 과 동일한 원하는 결과물을 가져다 줄지 검증은 아래처럼 너무 오래걸려서 검증해보지도 못하고 kill 하였다...

 val query3 = """
                   select
                        count(distinct A.pcId)
                   from
                        everyClickFlat_PC as A, everyClickFlat_PC as B
                   where
                        A.mbrId = '' and
                        A.url like '%s' and
                        B.mbrId != '' and
                        B.url like '%s' and
                        A.pcId != B.pcId
                   """.format(classRow.pc_url,classRow.pc_url)
          val results3 = sqlContext.sql(query3)

이렇게 했더니, 에러는 안나지만.... 무쟈게 오래 걸린다. 5분정도 기다리다가 kill..

[3] 세번째 시도
이번 역시 그냥 시도이다.. Not In 과 비스무리한 결과라도 얻을라고 했던 Just 시도 이다..

         val query3 = """
                   select
                        count(distinct A.pcId)
                   from
                         (select distinct pcId from everyClickFlat_PC where mbrId = '' and url like '%s') as A, 
                         (select distinct pcId from everyClickFlat_PC where mbrId != '' and url like '%s') as B
                   where
                        A.pcId != B.pcId
                   """.format(classRow.pc_url,classRow.pc_url)
          val results3 = sqlContext.sql(query3)

이 역시 무쟈게 오래 걸린다.. 이 역시 8분 정도 기다리다가 kill...

[4] 네번째 시도

사실 Not In 은 Left Semi Join 을 이용하면 동작할것임은 어느정도 예상하고는 있었다. Not In 을 지원하는 RDBMS 에서 조차, 성능상 이유로, Not In 보다는 semi join 을 많이 사용하곤 하므로...
left semi join 사용 했더니 약 5초만에 결과가 나왔다. 참고로 저 테이블은 300Gb 가 넘는 크기의 테이블이다. (물론 성능 극대화를 위하여, 파티션 처리를, 먼저 전처리 해두었다.)

        val query3_3 = """
                   select
                          count(distinct results3_1.pcId)
                   from
                         results3_1 left semi join results3_2 on results3_1.pcId != results3_2.pcId
                   """
          val results3_3 = sqlContext.sql(query3_3)

          pc_uv2 = results3_3.collect()(0)(0).toString.toDouble

[5] 결론

물론 이번에는 실험해보지 않았지만, SparkSQL 을 우회해서 사용하지 않고, Scala 로 그냥 구현하면 Not In 을 훨씬 빠르게 구현할 수 있을 것이나, 300Gb 가 5초 정도면, SparkSQL 도 쓸만한 속도이다... 시도해보진 않았지만, 같은 쿼리를 Hive 환경에서 테스트 했다면, 수분 혹은 수십분도 걸릴 수 있었을 것이다.

Hive 나 SparkSQL 이 RDBMS 에 비하여 지원하지 않는 아쉬운 기능 중, 실무에서 제일 쉽게 접하는 것중 첫번째 정도는 inline query 가 아닐 까 한다. 두번째 정도가 Not In 일것 같다. inline 쿼리는 scala 로 처리하면, 매우 빠르게 구현이 가능했고, Not In 은 SparkSQL 이 직접 지원하진 않지만, Left Semi Join 이 대안 정도로 쓸만 한것 같다가 오늘의 결론이다...



2014년 12월 1일 월요일

Elasticsearch 설치기 1 - 기본 설치 & plug-in 설치

Elasticsearch 설치는 매우매우 쉽다.

(1) 우선 공식 홈페이지에서 zip 파일을 다운로드 받는다.

http://www.elasticsearch.org/overview/elkdownloads/

(2) 그리고 압축을 푼다.

(3) 그리고 아래처럼 실행하면 끝.... 사실 아래처럼 백그라운드 실행을 할 필요는 없다. 그러나, 최초 Getting Started 데로, 실행시 프롬프트로 빠져나가지 않았고, 프롬프트로 돌아오면 데몬이 종료됨을 발견하여, 2번째 시도에서는 아래처럼 Ctrl+z 로 나와서 backgroud 프로세스로 돌려 주었었다. 물론 그렇게 하여도, 데모나이즈 화가 잘되는 것 같았지만, 좀더 자료를 찾아보니, -d 옵션을 주면 데몬화 실행이 됨을 알 수 있었다..

백그라운드 실행 - 이렇게 할 필요는 없다.
-d 옵션을 주면 데모나이즈 실행 모드 이다.
(4) 설치 확인은 아래처럼 Restful 접근을 해보면 가능하다.

(5) 뭔가 불안하리 만큼 너무 설치가 간단하다. Web-Console 이라도 띄워보기 위해 플러그인을 뒤져 보았다. marvel 이라고 하는 걸쭉한 모니터링 도구가 바로 검색 되었다. 아래처럼 plug-in 설치도 매우 쉽다. ( marvel 은 이름이 생소하였지만, 설치하고 나서 알고보니 kibana 위에 구성된 3rd party kibana 활용 dashboard 였다.. 요즘 뜨고 있는 ELK 의 K )
marvel 설치 후에는 elasticsearch 재실행이 필요하다.
(6) marvel web console 화면은 아래와 같은 주소로 띄울 수 있다. 9200 으로 방화벽 신청이 필요할 수 있다.

http://설치서버주소:9200/_plugin/marvel/
-> kibana dashboard 페이지로 direction 된다.


일단.... 오늘은 여기까지....
이제 좀 놀다가 자야겠다....

회의는 회사에서... R&D 는 집에서....



Spark 사용기 4 - cache() 쓰고 안쓰고 비교

최근에 만든 Spark 위의 Scala M/R 배치는 3개 HDFS File 을 가지고 연산을 수행하는데, 모두 Memory 에 로딩 가능한 정도의 크기이다. 이 경우 RDD 를 Cache 할 필요가 있을까?

그래서 실험을 해 보았다.

(1) Cache 안썻을 때
-> 14초

(2) Cache 썼을 때
-> 1차 시도: 45초
-> 2차 시도: 32초

1차 시도를 했을때 , 45초나 걸려서 다소 놀랐었다. 그때는 뭔가 효율적으로 자료구조를 메모리 DB 화 하기 위해 복잡한 insert 루틴을 수행하고 있나 보다 했었다. 그런데, 그런 논리라고 한다면, 2차 시도때 훨씬 빨라 져야 정상일 것이다.
그런데, 결과는 약간 빨라지긴 했지만, 그다지 안썻을 때에 비해 전혀 개선된 것 같지가 않았다.

(3) cache() 했을 때 값이 남아 있는지 여부?

Shell 콘솔을 껏다 키면, 살짝 간격을 두고 다 없어진다. 그럼 뭔차이일까?
오히려 더 느리기만 하고.... (persist 를 쓰면, keeping 이 되긴 하겠지만...)

일단 짐작을 해 보자면, 클러스터를 Yarn 기반으로 구동 시키면, MR 코드가 Yarn 의 Job 을 수행시켰었다. 이 경우 Cache 를 적절하게 써주면, HDFS 가 아닌 Spark Memory 를 활용하며, 좀더 효율적인 연산을 하지 않을 까 기대했었다...  그 시나리오를 실험해보자, 경미하게 속도 개선이 있었지만, 역시 큰 개선은 없었다. 그러나, Job 성격에 따라 성능 향상이 많은 경우도 있을 것이다. 일단, Cache 는 그런 용도라고 짐작하고 넘어가야겠다.

Yarn 모드가 아닌 Spark Engine 자체로 Master 와 Slave 노드를 설정하고 Memory Instance 를 띄워서 Memory Cluster 를 구동한 경우에는 Memory Cluster 에 로딩 가능한 크기인 경우, 굳이 Cache 를 쓰지 않아도 메모리에서 모든 연산이 이루어지는 듯 하다. 아니, 위 실험 처럼 오히려 속도가 빠르다....
즉, 그러한 환경에서는 ( Stanalone Master & Slave Cluster 모드) Cache 를 쓰지 않는게 더 효율적인듯 하다. 가끔 pinning 할때만, persist 기능을 적절히 써주면 될듯 하다.




ps... persist 를 적절히 쓰면 10배 이상 성능이 개선된다고 한다... 아래 메뉴월 원문을 paste 해본다.  이런실험은 좀더 다양한 M/R 을 짜보고 종합적으로 실험해 보아야 할듯 하다. 일단, 오늘 코딩한 소스로는 아무것도 안쓴게 젤로 빠르다...

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes, or store it off-heap in Tachyon. These levels are set by passing a StorageLevel object (ScalaJavaPython) to persist(). The cache() method is a shorthand for using the default storage level, which isStorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:
Storage LevelMeaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SERStore RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SERSimilar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory.

2014년 11월 28일 금요일

OpenSource Search Engine 비교 - Solr vs ElasticSearch

Apache Solr ElasticSearch 두 오픈 소스 검색 엔진에 대하여 비교해 보았다
( BigData 배치에서도 Search Engine 이 NoSQL 처럼 another 데이타 소스로서 의미를 가질 수 있으며, 기타, 관제, 모니터링, RealTime Alert 등 여러가지 용도로 Search Engine 의 필요성이 대두 되었기 때문이다. )

본 비교는 그러한 Needs 에서 뭘로 솔루션을 선정할까를 (사실 나는 마음이 정해져 있고, 준비도 어느정도 해오고 있었지만..) 내부에서 공유하기 위해 정리 되었다.

여기서부터는 합쇼체....

우선 두 엔진 이외에도 수많은 다른 오픈 소스 엔진들이 있긴 하지만, 위 두 엔진들이 각각 나머지 모두를 합친 것보다 인지도나 커뮤니티 활성도가 높은 상태이므로, 위 두 엔진만 비교토록 하겠습니다. (ASPSeek, BBDBot, Datapark, ebhath, Eureka, ht://Dig, Indri, ISearch, IXE, Lucene, Managing, Gigabytes (MG), MG4J, mnoGoSearch, MPS Information Server, Namazu, Nutch, Omega, OmniFind IBM Yahoo! Ed., OpenFTS, PLWeb, SWISH-E, SWISH++, Terrier, WAIS/ freeWAIS, WebGlimpse, XML Query Engine, XMLSearch, Zebra, and Zettair.)

Cloud Service Base Search Engine 도 존재합니다. SMS 나 결제 시스템 사용하듯이, 종량제 과금으로 Cloud 버전의 Search 서치 서비스를 사용할 수 있으며, 대표적인 것은 Amazon CloudSearch(http://aws.amazon.com/ko/cloudsearch/) 입니다.

[1] 사전 비교 자료 조사.

두 검색엔진을 비교한 수많은 아티클이 존재합니다. 구글 기준, 최근 트랜드와 자체 검색엔진 랭킹을 반영한, Top 5 Page 의 아티클 들은 다음과 같습니다.

[KeyWord : solr vs elasticsearch]

1.      http://solr-vs-elasticsearch.com/ : 상당히 방대한 항목에 대하여 두 검색엔진의 객관적 지표를 짧은 문구로 정리해놓고 있습니다.
[ 어느 한쪽에 치우치지 않았습니다. ]
2.      http://stackoverflow.com/questions/10213009/solr-vs-elasticsearch : 두 엔진의 장단점을 답변자가 요약해서 보여주고 있습니다.
[ elasticsearch 가 좋다에 치우쳐 있습니다. ]
3.      http://blog.sematext.com/2012/08/23/solr-vs-elasticsearch-part-1-overview/ : 6 chapter 로 나누어 각각을 비교하고 있으며, 상세한 설명이 곁들여져 있으며, 벤치마크 자료들이 첨부되어 있으나, 다소 옛날(2012-08) 자료 입니다.
4.      http://www.solrtutorial.com/solr-vs-elasticsearch.html : solr 커뮤니티에서의 두 엔진 비교 페이지 입니다. 두 엔진은 대등하다라고 표현하고 있습니다.
5.      http://www.slideshare.net/sematext/battle-of-the-giants-apache-solr-vs-elasticsearch-apachecon : 슬라이드쉐어 자료입니다. 솔라 쿡북 저자가 만든 자료입니다. 방대한 항목에 대하여 두 엔진의 방식을 설명하고 있습니다.
http://www.slideshare.net/sematext/battle-of-the-giants-round-2-apache-solr-vs-elasticsearch?next_slideshow=1 :
요게 더 최신의 2탄 슬라이드 자료 입니다.

[keyword : elasticsearch vs solr]
1.      http://solr-vs-elasticsearch.com/ : 1과 동일
4.      http://thinkbiganalytics.com/solr-vs-elastic-search/ : 8개 항목에서 4:4 의 동일 스코어를 보이며, 어느 한쪽이 승자라고 말하고 있지 않습니다.
ElasticSearch Win Category : Foundations , Automatic Shard Rebalancing , Nested Typing , Percolation Queris
Solr Win Category : Shard Splitting , Distributed Group By , Community , Vendor Support

[2] 구글 트랜드



[결론]
위 내용들을 종합한 각 엔진의 주요 장점은 다음과 같습니다.

ElasticSearch 강점
(1)   최근 트랜드 상으로는 좀더 상승세를 타고 있습니다. ( 구글 트랜드, 최신 Code Commit , Code Comment 수 등)
(2)   ElasticSearch 가 좀더 최신에 생긴 Open Source Community 이고, Solr 의 단점을 개선하는 목적의 개선 features를 많은 부분 목표로 삼고 태생하였습니다.
(3)   Nested Type, Nested Document , 분산서치에서의 Join 서포트, Lucene Query 외에도 복잡한 Structured Query 서포트가 가능합니다.
(4)   자동화된 밸런싱, HA 구성을 위한 자체적인 Master Node 선정 관리( Solr zookeepr 쿼럼에게 위임하고 있습니다. )
(5)   운영중 스키마 변경이 가능합니다. ( Solr 는 리스타트가 필요함. )
(6)   Realtime Features
A.     Solr의 인덱스 갱신 주기가 수십분 빨라도 수분 정도임에 반하여, ElasticSearch 1초정도 입니다.
B.      Percolation Query 라는 새로 생겨난 기법이 존재하여, 특정 쿼리를 등록해 두고, 해당 쿼리에 부합하는 신규 인덱스가 등록되면, 얼럿 해주는 기능이 추가되었습니다.
C.      Log 패턴을 보고 이상 징후를 감지하는 등에 활용 가능할 듯 합니다.

Solr 강점
(1)   ElasticSearch 가 신생 엔진이라고 한다면, Solr 는 이미 안정화 단계를 지난 검색엔진 입니다.
(2)   이로 인하여 많은 아티클, 경험자, 커뮤니티 아티클, 기여 기업 등이 존재합니다.
(3)   Vendor Support 가 가능합니다.
(4)   Pivot Query 기능, JMX 노출 등의 기능이 있습니다.
(5)   PHP , Python Ruby , Java 등의 Serialized Output 이 가능합니다.
(6)   Primary Shard 넘버를 바꿀 수 있습니다. (반면에, ElasticSearch default 5 이고, replication 개수는 자유롭게 늘릴수 있으나, 디폴트 샤딩 개수를 변경하지 못합니다.)

ps. 위 장단점 정리는 현 시점의 정리가 아닌 위 비교 사이트의 요약입니다. 해당 사이트의 정리 시점이 다소 과거 인지라, 현재 버전은 위 내용과 다를 수 있습니다.

2014년 11월 23일 일요일

Spark 사용기 3 - Group By, Order By 성능비교 및 Future Possibilities : Hive vs Spark M/R vs Spark SQL

이번에는 Group By 와 Sort 가 있는 집계( sum() ) 쿼리를,

(1) Hive 로
(2) Spark + Scala M/R 로
(3) Spark + Scala + SparkSQL 로

동일하게 구현해보고, 성능 비교 및 개발 생산성 비교를 해 보았다.

Test  로 사용된 테이블의 Row 수 및 컬럼 수는 아래와 같다.
해당 테이블은 성능 검증 용 Test 셈플 테이블이다. Row 수 등이 어떤 의미가 있는 데이타는 아니므로, 수치에 신경을 쓰진 말기를....

대략 1300만건 정도의 조그마한 테이블이다.
최근에 회사에서 수천억 Row 단위의 테이블들을 다루고 있긴 한데...갸들로 실험하기에는 Hive 쿼리가 수시간이 소요될 것이므로, 일단 단순한 셈플링 데이타 테이블로 실험을 시작하였다.
아래는 컬럼 갯수 참고용 스샷 이다.  스미카상  Date 로 Partition 해 두었다.
(역시 임시 생성 용 스키마 이니 너무 의미를 갖진 말기를....)


컬럼 갯수는 24개.  ( date 키로 partition )

(1) Hive

TEST SQL 은 아래와 같다.

select 
      date , inflositeno, sum(price) 
from 
      everyclick_order_mapping 
group by date,inflositeno 
order by date;

간단한 쿼리 지만, Group by , Sort , Sum 이 있는 전형적인 Full Scan 집계 쿼리 이다.
위를 수행한 결과는 아래와 같다.

42초. 

(date 는 Partition 되어 있어, 통으로 되어 있는 경우에 비해, date 로 group by 하는데 최적화 되어 있는 스키마 구조이긴 하다. 허나 어쨋든 오래 걸렸다.)

(2) Spark ( with Scala M/R )

먼저, 순수하게 위 로직을 scala 코딩으로 구현해 보았고, spark 엔진 위에서 실험 해 보았다.
코드는 아래와 같다.

val hdfsFile = sc.textFile("hdfs://master001.prod.moneymall.ssgbi.com:9000/moneymall/hivedata/everyclick_order_mapping/*")
case class EveryClickOrderClass(currdate:String , siteno:String, price:Double)
val result = hdfsFile.map(_.split("\t")).map(row => EveryClickOrderClass(row(0), row(1) , row(6).toString.toDouble ))

// scala coding.
val keyVals = result.map( clickOrder => ((clickOrder.currdate, clickOrder.siteno), (1 , clickOrder.price)))
val results = keyVals.reduceByKey( (a,b) => (a._1 + b._1 , a._2 + b._2))
results.sortByKey().collect().foreach( println)


Map 과 Reduce 단계는 거의 시간이 소요 되지 않는다, 0.3초 미만....
오히려 Sort 하는 과정이, 아무래도 Network 를 이용하기 때문에 조금 시간이 소요 된듯 하다..(코드에서 Print 시점 Sort 를 걸었다..)


그래 봤자, 결과는 0.39초...윗부분에 다른 step 들도 좀 더 있었지만, 무시할만큼 찰나의 시간에 끝난 일들이다.

전부 합쳐도 1초 미만. 경이적인 속도 이다.

(3) Spark ( with Scala & Spark SQL )

다음으로 1.x 버전에서 도입된 Spark SQL ....
Spark Scala 내부 RDD 자료 구조에다 데고, 바로 Query 를 날릴 수 있다. C# 으로 코딩하다가 Linq 기술로 C# 내 자료구조에 데고 쿼리 하던 때와 흡사한 생산성을 꽤할 수 있다.

더 매력적인 것은 HBase 나 HDFS 혹은 MySQL 등의 멀티 소스로부터 RDD 를 생성하고, 각 N개 멀티 소스를 Join 쿼리를 이용할 수 있다는 점이다.

row 를 Class 로 정의하여 DTO Value Object 클래스화 해 놓고, 해당 Data 가 담긴 Class Structure 를 Generic Class 의 Array. 즉, Java 로 치면 Generic List 를 각 Table 단위로 등록해주고 해당 Table 에 쿼리가 가능하다는 점이 매우 매력적이다.

코드는 아래와 같다.

val hdfsFile = sc.textFile("hdfs://master001.prod.moneymall.ssgbi.com:9000/moneymall/hivedata/everyclick_order_mapping/*")
case class EveryClickOrderClass(currdate:String , siteno:String, price:Double)
val result = hdfsFile.map(_.split("\t")).map(row => EveryClickOrderClass(row(0), row(1) , row(6).toString.toDouble ))

// scala sql.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
result.registerAsTable("clickAndOrder")
val results = sqlContext.sql("""select currdate, siteno, count(*) , sum(price)
  from clickAndOrder
  group by currdate,siteno
  order by currdate""")
results.collect().foreach( println)


 SparkSQL 은 sort 를 포함하여 수행하였기 때문에 모든 과정이 0.6초 정도 걸렸다... Spark 콘솔에서 한줄 한줄 실행하며, 결과를 확인 할 수 있어, Hive 나 Spark + Java 보다 매력적이기도 하다.
위처럼 SQL 을 실행하는 순간에는 실행계획이 바로 보인다.

결과 출력 시점에는 sort 를 할 필요가 없다. foreach 만 써주었기 때문인지, 결과 출력은 0.2초 만에 끝났다.


일단 전부 합쳐도 1초 미만....

일단, 너무 순간에 결과가 나와 다소 성급한 판단이긴 하지만, Spark + Scala M/R 과  Spark + Scala + Spark SQL 은 속도면에서 그리 큰 차이가 있는 거 같지는 않다.


(4) Future Work &  Spark M/R(SparkSQL) 의 가능성.

사실 이번 실험은 실험 용  Data Set 이 너무 작았다.

본 편에서 언급하진 않았지만, 실제로는 뒤이어 몇가지 실험을 더 해보긴 했는데, 사실은 6 Tera Byte 크기의 데이타 였다. (해당 결과는 Hive 로는 결과를 기다리는데 너무 오래 걸려 중간에 kill 하고 더이상의 Hive 비교를 중단 하였다. )

해당 Big 한 Data 로 재 실험을 했을 때는, Spark 로 brute force 알고리즘을 돌리기에도 다소 부담되는 바 조금더 알고리즘 적인 접근을 해보았는데..... 바로, Hash Table 형 NoSQL 과 HDFS 와 HBase 같은 Range Sorted Dictionary 를 Nested loop Join 해본 점이다.. (이부분은 기회가 되는데로 별도 편으로 블로깅 할 예정이다.)

사실 메뉴얼이 빈약하고, googling 을 해보아도 자료가 좀 빈약한데, Hbase 의 scan 쿼리가 불가능하지는 않았다... (별로 편하게 되어 있진 않지만...가능하였다.)
Scan 이 되고 안되고는 매우 중요한 요소인데, Spark 팀의 HBase 셈플은 그걸 간과 한듯 하다. 모든 셈플들은 모든 데이타를 HBase 에서 몽땅 가져다 놓고, Spark 엔진으로 모든걸 처리하고 있다. 실제 Production 환경에서 이런 경우는 빈번하지 않을 것이다.

우리의 경우만 하더라도, 메모리 클러스터에 로딩 자체가 불가능한 거대 테이블과의 부담되는  Join 이 왕왕 있다. 크기가 작은 테이블들은 HBase 에 있는 경우가 거의 없고, 그냥 HDFS 안에 존재한다.

이런 경우 컴팩트한 테이블을 메모리에 몽땅 올려 놓고, Join 바깥 쪽에서 루핑 돌며, 거대 테이블을  Nested loop Join 을 할수 밖에 없다. (물론 거대 테이블이 메모리에 모두 로딩 가능하다면, 거대 테이블을 메모리에 로딩 후 cache() 나 persist() 걸어 놓고 로직 연산을 하는 것이 더 유리 할 것이다.)

두가지 방법이 있을 것이다. 큰 데이타는 어차피 메모리에 로딩이 안된다면, 작은 테이블로 루프를 돌거나... 큰 테이블로 루프를 돌거나....
일반적인 Brute Force Nested Loop 조인이라면, 그나마 메모리 로딩이 가능한 작은 테이블을 메모리에 올려놓고, 큰 테이블로 For문을 도는 것이 훨씬 유리하다..

하지만, 내가 Hbase 의 Scan 억세스에 주목했던 이유는, 작은 테이블로 Loop 를 돌면서 큰 테이블을  Scanning 접근 했을 때... ( Like 나 시계열 범위 Join 일때) Nested Loop Join 알고리즘으로 접근해도, 내부적으로 Merge Join 처럼 동작하여, 범위 Join 이 매우 빨라 질 수 있다는 데, 있다.. 범위 Join 이 아닌 직접 1:1 매칭 Join 이라면, Redis 나 Memcache 나 MongoDB 나 Casandra 같은 Hash Table 성격의 NoSQL 을 이용하면 될 것이다. 그 경우 Nested Loop Join 알고리즘으로 접근해도, 내부적으로는 Hash Join 처럼 동작 할 것이기 때문이다.

여튼 Spark + Scala + SparkSQL  의 조합은 매우 매력적인 조합임에 틀림 없다. 앞으로 오래동안 부대낄 수 있을 것 같은 생각이 강하게 든다...


2014년 11월 4일 화요일

Spark 사용기 2 - Python vs Scala vs Java : Spark 핸들링 언어 비교

Python 과 Scala , 그리고 Java 중 가장 손에 익은 언어는 Java 임에 틀림이 없다.
그러나, Python 도 익히 손에 익어 있고, 코딩하는 재미가 Java 보다 훨씬 더 큰게 사실이다. 그리고, Scala 는 아직까지는 다소 손에 덜 익긴 했지만, Python 이상으로 코딩에 재미를 느껴 가고 있다.

Java 가 최근에 너무 지겨워 진것도 사실이긴 하지만, 그 모든 것보다, Java 로 Spark 를 하지 않아야 하는 절대 절명의 이유가 하나 있는데...

바로 Scala 도 되고, Python 도 되지만, Java 는 Interactive 한 cli 콘솔이 없다는 것이다. 코딩을 하는 동안 한줄 한줄 데이타가 어떻게 바뀌고, 현 상태의 데이타는 어떤 상태인지, cli 를 통하여 Scala 와 Python 은 바로바로 확인이 가능하지만, Java 는 처음부터 끝까지 상상속의 코딩을 하고 난 다음, 배포 스크립트를 만들고, 배포 후에나 결과를 확인 할 수 있다. 결과가 한방에 성공적으로 나와 준다면 다행이지만, 결과가 미덥지 못하기라도 한다면, 라인 단위로 디버깅이라도 하고 싶다면, 여간 불편한게 아니다...



Python 은 Scala 보다 다소 느린게 사실이긴 하지만, Python 또한 , 버리지 못하는 또하나의 이유가 있다. 바로, IPython 같은 훌륭한 확장 CLI 가 있다는 것과, 그것을 통하여, 데이타를 중간중간 시각화 해볼 수 도 있다는 것이다.




위와 같은 이유를 제외 하고도, Functional Programming 을 Java 로 하는것은 여간 불편한게 아니다. 아래와 같은 코드만 봐도... Python 이나 Scala 가 짧고 우아하게 표현하는 것을 Java 는 너무나 장황하게 표현하고 있으며, 다분히 억지 스럽워서 가독성도 떨어진다.



  • Spark with Python


  • Spark with Scala






  • Spark with Java


2014년 10월 28일 화요일

Spark 사용기 1 - Hive vs Spark vs SAS : select count(*) performance 비교

오늘은 야간에 데이타센터 님께서 작업하시므로 새벽 대기를 해야하는 날이다.

오랫만에 철야 하는 기념으로 다가, 오늘은 드디어 Code 를 돌려 보도록 하겠다.

우선 master node 에서 아래 명령어를 수행하였다.
 ./spark-shell --master spark://master003.prod.moneymall.ssgbi.com:7077

아래처럼 멋쮠 Spark 텍스트 로고가 보인다.

우선 첫번째로 해보고 싶은 것은 Hive Table 의 Seclect Count(*) 와 Spark Engine 에서의 Line Count 의 성능 비교이다. 덤으로 SAS 에서 Hadoop Connector(내부적으로 HiveQL JDBC를 이용한다.)의 성능도 함께 비교해 보았다. 

간단한 수행이므로, Hive 는 Hive Shell Console 에서, 그리고 Spark 는 Spark의 Scala Console 에서 수행 해보았다. SAS 는 SAS 서버에서 직접 Base Programe Code로 수행하였다.

[1] Hive Console 에서 수행.

수행한 쿼리는 다음과 같다.

hive> select count(*) from search_keyword_result;

결과는 다음과 같이, 15.459초



[2] SCALA Console 에서 수행.

비교를 위해 선정한 위의 Table 은 Hive 의 External Table 로서, HDFS 입장에서는 Directory 이다. 해당 디렉토리에 일배치가 끝난 파일이 하루에 한개씩 파일이 생기는 구조이며, Hive Table 입장에서는 Row 가 일단위로 Append 되는 형태의 Table 이다.

그래서, 먼저 처음 수행한 코드는 아래와 같다.

 val hdfsFile = sc.textFile("hdfs://master001.prod.moneymall.ssgbi.com:9000/user/hive/warehouse/research_keyword_result/")

혹시나 디렉토리가 바로 로딩 되진 않을까 하는 기대여서 였지만....
역시나... 아래처럼 에러가 발생한다.


서칭을 좀 해보니, 디렉토리를 통채로 로딩할때는 아래처럼 하면 된다.

 val hdfsFile = sc.textFile("hdfs://master001.prod.moneymall.ssgbi.com:9000/user/hive/warehouse/research_keyword_result/*")

count 를 위한 아래와 같은 짧은 코드를 위 코드 뒤이어 수행해 보았다.

hdfsFile.count()

결과는 놀랄 말큼 빠르다.
0.38초....

결과도 Hive 와 동일하다.


[3] SAS 에서 Hive Connector 로 수행.

(1) 첫번째 시도. ( with SAS Dataset Binding )

SAS 에서 맨처음 해본것은 Data Step 에서 Hive Table 라이브러리를 임시 테이블로 Dataset 로딩 해보는 작업이었다. 그렇게만 하여도, Observation 관측치 값을 알려주기 때문이다. 그런데, 위에서 scala가 1초도 안걸리는 수행을 임시 테이블 바인딩 작업이 무려 21분이나 소요 되었다. (현재 12시가 넘어버린 시점이라 위 시점과 아래 스샷의 시점차에 의한 결과 값이 미묘하게 다르지만, 너무 오래걸려서 저걸 다시 수행하고 싶진 않다.)


뭘 해보기도 전에 21분씩이나 초기 로딩에 소요 되는 것은 대략 안습이다. 게다가 저게 얼마 주고 산건데.... 벤츠를 3대는 뽑았을 금액인데..

(2) 두번째 시도. ( with SAS Memory Iteration )

일단 SAS 도 포기하고 싶지 않아 몇가지 실험을 더 해보기로 하였다. 사실 Hive Table 을 SAS DataSet으로 바인딩 하는 것은 Hive Table 이 매우 클 수 있음을 감안하면, 좀 무식한 접근 일 수 있다. Data Download Time 및 serialize Time 을 고려하면, 위 첫번째 시도는 그럴만도 했다고 일단은 스스로 위안을 해보도록 하자.

그래서 아래처럼 메모리 상에서 루프 돌며 Count 를 구해 보았다. 결과는 일단, 드라마틱하게 개선....

21분 걸렸던게 무려 3분8초로 단축 되었다.


(3) 세번째 시도. ( With SAS SQL )

한가지만 더 실험해 보고 싶은 욕구가 생겼다... 흑, 조금있으면 출근해야 할 정도로 시간이 많이 지나 버렸긴 하지만....잠자기는 틀린것 같고..

SAS 영업께서는 늘 말씀 하셨다. SAS 의 Hadoop Connector는 Hadoop의 자원을 쓰기 때문에 Hadoop 과 공존하며, 상호 보완적 하이브리드 Analytics 환경을 제공해 준다고...

이 말을 꼭 검증해 보고 싶었다. 사실 위가 사실이 아니라면, 나는 오늘 부로 Hadoop 데이타를 SAS 를 이용해서 핸들링하는 시도를 아예 접을 생각이었다.

결과는 우선 영업의 말이 사실이다는 점. SAS SQL 은 일단 Hadoop에게 Query 수행을 일임해준듯 한 결과가 나왔다. 이로써 느려 터진 SAS 는 Hadoop과 공존 할때, Heavy 한 업무는 Hadoop 에게 맞기고, 좀더 스마트한 계산만 SAS 로 하면 된다는 존재 이유가 생겼다.

수행시간은 무려 57초. (처음 21분에 비하면 무지 개선 되었다. Hadoop 의 힘을 빌은 결과이긴 하지만... 왠지 Hive 대신 Shark 를 Spark 위에 올리고, HiveQL을 Shark에게 날리면, 57초가 20초대 미만으로 줄어들것도 같은 생각이 들긴 한다.)


[4] Hive vs Spark vs SAS 오늘의 결론!
 
(1) Hive vs Spark vs SAS(with HiveQL)
15.45 s : 0.38 s : 57.2 s

(2) Hive vs Spark vs SAS(only SAS)
15.45 s : 0.38 s : 3m 8s

Spark Perfect Win!!!!!!!!!!!!!!!!!!!

2014년 10월 22일 수요일

Spark 설치기 5 - Cluster Installation with Cluster Launch Scripts

이레 저레 벌써 새벽 0시 22분...

잠을 청하기에 앞서... 드디어... Cluster Launch Scripts 를 가지고 Cluster 위에 재대로 (운영 수준으로) 클러스터를 설치 해보기로 하겠다...

우선 선행적으로 필요한 것은 아래와 같다.

(1) password-less ssh 세팅...
- 이미 세팅 되어 있으므로 skip..

(2) SPARK_HOME 수정.
- 우리 cluster 는 spark 0.9.1 버전을 깔았었던 이력이 있으므로, .bashrc  안에 있는 잔해를 수정해주었다.
- 그러고 보니 spark 0.9.X 버전 대에는 sbt 로 패키지 설치를 했던 기억이....

SPARK_HOME=/data01/spark/spark-1.1.0-bin-hadoop1
export SPARK_HOME

(3) 이제 본격적으로 세팅.
(ㄱ) cd $SPARK_HOME/conf
(ㄴ) mv spark-env.sh.template spark-env.sh
(ㄷ) vi slaves  하고 각 WorkNode 의 호스트명 기입.
(ㄹ) vi spark-env.sh 하고 최 하단에 아래 기입. 인스턴스 갯수를 2개로 설정했기 때문에, 한 머신당 8g 메모리가 할당 되도록 세팅하였다. 적절한 메모리 세팅은 추후 운영상황을 보며 조정하여야 할 테지만, 우선은 매우 적게 보수적으로 세팅 하였다.

SPARK_MASTER_IP=master003.prod.moneymall.ssgbi.com
SPARK_WORKER_MEMORY=4g
SPARK_WORKER_INSTANCES=2
(ㅁ) 마지막으로 마스터 노드의 sbin 디렉토리에서 아래 script 수행.
./start-all.sh


에러 하나 없이 깔끔하게 설치 되었다.

오늘은 여기까쥐.... 자야겠다.

2014년 10월 21일 화요일

Spark 설치기 4 - Stand Alone on a Cluster 설치2

[약간의 부가적인 실험]

우선 Worker Node 한곳에서 ps 를 날려 보았다...


노드 관리 상황을 실험해 보기 위해 Kill 로 해당 프로세스를 죽여 보았다...

2개 노드를 Kill 로 죽이고 죽인 Node 중 한개를 다시 수동으로 구동하였다.


kill 로 죽인경우에도 Master Node 에서는 바로 DEAD 로 감지가 되었다.
그런데, 다시 프로세스를 띄우니, 기존 DEAD 노드가 ALIVE 로 바뀌진 않았고, ALIVE 노드가 새로 추가 되었다.

그리고 나서 몇분 후.....

다시 새로 고침을 해 보았다.

음... Dead 2개가 사라졌다...

일단, 간단한 실험이었지만, 매우 Reasonable 하게 장애 노드 감지 및 처리가 이루어지는 듯 하다.


2014년 10월 19일 일요일

Spark 설치기 3 - Stand Alone on a Cluster 설치

우선 클러스터 모드 설치에 앞서 이론 공부 부터...
https://spark.apache.org/docs/latest/cluster-overview.html

그리고 설치 가이드 문서 정독하기.
https://spark.apache.org/docs/latest/spark-standalone.html


(1) 복수개의 데이타 노드와 마스터 노드에 파일 다운로드
다운로드 경로는 skip. 설치기2에서 사용했던 file 을 그대로 사용하였다.

(2) 그리고 압축 풀기.. (모든 마스터노드와 데이타노드에서..)

tar xvzf spark-1.1.0-bin-hadoop1.tgz

(3) Master Node 에서 아래 명령어 수행하여 일단 Master Node 만 띄운다.


./sbin/start-master.sh

(4) 웹UI뜨는지 확인

(default port 는 8080 인데, 우리는 storm web ui 가 해당 port 를 이미 사용중이다.)
(에러 날 줄 알았는데, 수행해보니 에러가 나지 않길래 log를 상세하게 뒤져보니 warning 만 뜨고, 해당 port 가 사용중이라 8081 로 시도한다는 문구가 보인다.)
아래 이미지 처럼 아직은 worker node 가 모두 떠 있지 않은 상태임.


(5) 각 DataNode 로 가서 Worker 구동


cd /data01/spark
tar xvzf spark-1.1.0-bin-hadoop1.tgz
cd spark-1.1.0-bin-hadoop1

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://master003.prod.moneymall.ssgbi.com:7077

ctrl+z
bg 
요부분은 메뉴얼에 없는 부분인데. 백그라운드 수행이 자동으로 되지 않고 있다. 수동으로 해주었지만, 사실, 운영시에는 요 방식이 아닌 뒤이어 실험할, Cluster Launch Scripts 방식으로 가야 할듯.

worker 노드 구동 확인

전체 노드에 대하여 위 작업 수행.

위 작업 이후 Master Node 에서 Worker 들이 모두 구동 되었는지 확인.

요기까진 아무런 막힘이 없었다. 우선 몸풀기 정도로 생각하고, 현재 세팅은 여기서 마무리...

일요일 오후... 다행히 내일이 휴가인지라... 오늘 저녁엔 저걸 다 내리고, 운영용으로 재 세팅을 할 예정이다.


2014년 10월 11일 토요일

Spark 설치기 2 - Stand Alone 설치

우선 나의 Portable 개발 환경인 CentOS 노트북에 먼저 Stand Alone 설치....

참고로 나의 CentOS 노트북에는 Hadoop 1.2.1 이 깔려 있다.

[1] 다운로드

https://spark.apache.org/downloads.html

[2] 다운로드 옵션

1.1.0
Pre-built for Hadoop1.x
Direct Download....

이렇게 옵션을 주고 spark-1.1.0-bin-hadoop1.tgz 를 받았다.

[3] pre-requiste...


Spark runs on Java 6+ and Python 2.6+. For the Scala API, Spark 1.1.0 uses Scala 2.10. You will need to use a compatible Scala version (2.10.x).

[4] 설치

설치는 걍... 압축 풀면 끝난다...

[5] 테스트 Scala 버전 M/R 실행.

spark top 폴더에서 아래 처럼 수행.
[spiccato@hoonnote spark-1.1.0-bin-hadoop1]$ ./bin/run-example SparkPi 10

결과는
Pi is roughly 3.142752

vi 로 run-example 파일을 열어보자 아래 같은 내용이 보인다.

if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
  EXAMPLE_CLASS="org.apache.spark.examples.$EXAMPLE_CLASS"
fi

"$FWDIR"/bin/spark-submit \
  --master $EXAMPLE_MASTER \
  --class $EXAMPLE_CLASS \
  "$SPARK_EXAMPLES_JAR" \
  "$@"

즉, 위 소스는 EXAMPLE_CLASS 안에 있는 듯 하다.

위 위치에서 SparkPi.scala 소스를 열어 보았다.

위 스칼라 소스를 수행하면 수행 시간은  0.620 초가 소요된다.

[6] 테스트 Python 버전 M/R 실행.

소스는 아래와 같다.

위 파이썬 소스를 수행하면 수행 시간이 scala 보다 2배정도 느린 1.270 초가 소요된다.
위 결과 만으로 절대 비교 하면, Python이 느린 것 처럼 보이긴 하지만,
만약 Python 코드를 Scala 코드로 Convert 하는 시간이 위 시간 만큼의 차이를 주고, 실제 계산 수행은 동일하다고 한다면, 대용량 데이타를 처리하는 시간은 동일하다고 할수도 있을 것이므로 이는 좀더 지켜보아야 할것 같다.

[7] 결론

로컬모드 설치는 압축만 풀면 된다...

2014년 10월 10일 금요일

Spark 설치기 1 - 잡담들.

[1] 설치에 앞서 잡담들...

사실 Spark 는 수도없이 깔아보았지만, 그동안은 익숙한 HiveQL을 함께 쓰기 위해 항상 Shark 와 함께 깔수 있는 0.9.X 버전만 깔아왔었다...(이참에는 1.1.0 을 깔것이다.)

그리고, 만났던 수많은 Heap Memory 관련 에러들....

한때, 메모리 Cache 부분을 조금더 효율적으로 사용하고자, Shark + Spark + Tachyon 조합으로 시스템을 재 구성하여 설치 시도도 해 보았었다.

결론은... 테스트로 한두개 돌리는 것은 꽤 좋은 성능을 보이고, 매우 고무적이었다. 그러나, 운영시스템에서 실제 운영용 배치로 스케줄에 걸어 여러 Job 과 함께 사용을 하면, 역쉬나 수많은 Memory 관련 에러로 유지 자체가 힘들 다는 것..


그래서 선택한 아키텍처는.... Shark 와 Tachyon 까지 없애고, 그냥... Spark 만 쓰는 것...

Simple is best 라는 말이 있듯이....기본이 불안한 경우는 기본을 불안하게 만드는 곁가지들을 모두 제거하는 것.

이제와서 Spark 를 포기하고 싶지는 않고, 그동안 Shark , Tachyon 와의 궁합때문에 어쩔수 없이 사용하지 못했던 최신 1.1.0 버전으로 다시한번 도전해 보기로 하였다.

물론 Shark 를 걷어 냈으므로, Scala 로 M/R 을 짤 예정이기도 하다.. 내가 이를 위해 정독 중인 Scala 서적...



그리고, 엊그제 Amazon 에서 구매한 Spark 서적. 따끈하다 못해 심지어 출판 예정일이 내년2월인 미출시 서적의 Early_released 버전 ebook 이다.



이놈의 기술 역마살 때문에.... 한권을 다 읽기도 전에 두번째 서적을 사서 읽기 시작하면서, 두권을 다 읽기도 전에 (운영모드 포함)설치를 동시에 착수하고 있다...

2014년 10월 5일 일요일

Spark 다시 보기...

Shark + Spark + Tachyon 의 조합에서 Hive 대비 수십배의 성능을 보고 흥분하던 순간이 있었지만, 그 흥분은 오래가지 못했다...
운영 M/R Batch 에 실적용하는 경우 Heap Memory 관련 에러가 너무 자주 발생하였고, 기타 여러가지 에러들로 인하여, 안정성이 매우 떨어 졌기 때문...

결국, 한동안 저 조합의 사용을 접고 있다가, 다시 Shark 와 Tachyon 의 콜라보레이션을 포기하고, 최신 버전의 Spark 에서 Scala 랭귀지로만 접근하여, Spark 엔진을 다시한번 우리 운영(Production) M/R Batch 에 적용 해 보기로 결심 하였다.

우선 찾은 유용한 아티클은 다음과 같다.

(1) Using Parquet and Scrooge with Spark
http://engineering.ooyala.com/blog/using-parquet-and-scrooge-spark

우리가 현재 Parquet 포맷을 쓰고 있기 때문에, 제일 먼저 검색 했던 것이, Parquet 와의 Compatibility Issue 였다. Shark에서의 Parquet Compatibility Issue 때문에 한두번 쓴맛을 본적이 있기 때문에, 아직 안심하긴 힘들지만, 어찌 되었든, 좋은 참고 모델이 생겨 안심이다.

(2) scrooge
https://twitter.github.io/scrooge/
https://github.com/twitter/scrooge
트위터는 이미 cascading 위에서 scala 기반으로 scalding 을 잘 쓰고 있어서이겠지만, 요런 유용한 scala 확장 응용모듈들을 많이 보유하고 있다. scala 로 spark 를 헨들링함에 있어, 다양한 우리쪽 다른 Data 원천과도 멀티소스 Join 이 필요할 것이므로,(예를 들어 HBase 나 MemCached 와...) scrooge 같은 thrift 제너레이터가 유용하게 쓰일 수 있을 듯 하다.