Blog

Apache Spark 을 이용한 빅데이터 분석 (6)

Spark

SQL, MLib, GraphX 을 사용하더라도 기본적으로 Spark 은 RDD 베이스로 동작한다
RDD reduceByKey 는 교환법칙, 결합법칙이 성립되어야 한다
RDD 로 데이터 분석을 할 수 있지만, 그거보다는 Data Fame, SQL 로 작업하는게 편하고 확장하기 좋다
특별한 경우가 아니면 SQL 을 사용하는게 정신건강에 이롭다

Broadcast

Driver 에 선언된 변수를 Executor 에 효과적으로 전달할 때 사용한다
Broadcast 를 명시하지 않아도 자동으로 Broadcast 가 되어 변수를 사용할 수 있지만, Task 개수만큼 복제되기 때문에 불필요한 비용들이 증가하게된다
Broadcast 를 사용하게되면 Executor 개수만큼만 전달하기 때문에 메모리 사용량이나 네트워크 통신 비용들이 줄어들어서 효율적으로 사용할 수 있다.
Example
val bc_airports = sc.broadcast(rdd_airports.collectAsMap)
Scala
복사

Map-Side Join

join 으로 사용할 때 성능이 생각보다 안나와서 broadcast + Map-Side Join 으로 성능을 개선해볼 수 있다
데이터 크기가 클 경우에는 Driver 에서 OOM 이 발생할 수 있기 때문에 주의해야한다.
SQL 을 사용할 경우 데이터 크기에 따라서 Map-Side Join 혹은 Left Outer Join 등을 알아서 선택해준다. ( 매우 편리함 )
Example
val bc_carriers = sc.broadcast(rdd_carriers.collectAsMap) val bc_airports = sc.broadcast(rdd_airports.collectAsMap) val rdd_join_bc = rdd_flight.map({ case ((carrier, origin, dest), row) => ( (carrier, origin, dest), (carrier, bc_carriers.value.getOrElse(carrier, None)), (origin, bc_airports.value.getOrElse(origin, None)), (dest, bc_airports.value.getOrElse(dest, None)), row ) })
Scala
복사

Cache

자주 사용하는 데이터를 메모리 혹은 디스크에 캐시해두면 훨씬 비용 & 성능 효율적으로 사용할 수 있다.
캐시는 Action 이 발생할 때 저장된다.
RDD 캐시를 사용하지 말고 DataFrame 을 캐시하는게 더 좋다

캐시 적용

val rdd = sc.textFile("hdfs:///data/test.csv" rdd.name = "rdd_cache" rdd.cache rdd.top(5)
Scala
복사

캐시 해제

rdd.unpersist()
Scala
복사

타입별 메모리 크기 비교

Int (39.2 KiB)
val rdd_int = sc.parallelize(1 to 10000) rdd_int.name = "rdd_int" rdd_int.cache rdd_int.count
Scala
복사
String ( 489.2 KiB )
val rdd_str = rdd_int.map(_.toString) rdd_str.name = "rdd_str" rdd_str.cache rdd_str.count
Scala
복사
CaseStr ( 714.4 KiB )
case class CaseStr(str: String) val rdd_case_str = rdd_int.map(x => CaseStr(x.toString)) rdd_case_str.name = "rdd_case_str" rdd_case_str.cache rdd_case_str.count
Scala
복사

Persist vs Cache

cachepersist 는 모두 데이터를 저장공간에 올려두는 동일한 작업을 한다
RDD.cache: persist(StorageLevel.MEMORY_ONLY)
DF.cache: persist(StorageLevel.MEMORY_AND_DISK)

Storage Level

메모리에 저장할지, 디스크에 저장할지, Serialize 를 할지 안할지를 선택할 수 있다
Serialized 를 하면 저장 공간을 줄일 수 있지만 CPU 사용률이 증가한다