Blog

Apache Spark 을 이용한 빅데이터 분석 (7)

Spark Cache

RDD Cache

비직렬화

Executor 에 Cache 할 메모리가 부족하면 Cache 를 하지 않는다. 이를 최대한 쥐어짜기 위해 repartition 를 사용해서 조금 더 메모리를 사용할 수0 있지만, 트레이드 오프가 생기는걸 감안해야 한다.
Fraction Cached: 17%
val rdd_string_array = sc .textFile("hdfs://data/file.csv") .filter(row => !row.startsWith("Year")) .map(row => row.split(",")) .setName("rdd_start_array") .persist() println(rdd_string_array.count()) > 7453215 println(rdd_string_array.getNumPartitions) > 100
Scala
복사
Fraction Cached: 23%
val rdd_string_array = sc .textFile("hdfs://data/file.csv", 100) .filter(row => !row.startsWith("Year")) .map(row => row.split(",")) .setName("rdd_start_array") .persist() println(rdd_string_array.count()) > 7453215 println(rdd_string_array.getNumPartitions) > 100
Scala
복사

직렬화

MEMORY_ONLY_SER Storage Level 를 설정하게 되면 훨씬 더 메모리 효율적으로 Cache할 수 있다
Fraction Cached: 100%
val rdd_string_array = sc .textFile("hdfs://data/file.csv", 100) .filter(row => !row.startsWith("Year")) .map(row => row.split(",")) .setName("rdd_start_array") .persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER) println(rdd_string_array.count()) > 7453215 println(rdd_string_array.getNumPartitions) > 6
Scala
복사

DataFrame Cache

DataFrame 은 Columnar 형식으로 캐시하기 때문에 훨씬 캐시 용량을 줄일 수 있다.
일반적으로 Parquet 포맷을 사용한다
스키마 추론 X, Fraction Cached: 100%, 321MiB
val df_string = spark .read .option("header", true) .option("inferSchema", false) .csv("hdfs://data/file.csv") .persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY) df_string.printSchema root |-- Year: string (nullable = true) |-- Month: string (nullable = true) |-- DayofMonth: string (nullable = true) |-- DayOfWeek: string (nullable = true) |-- DepTime: string (nullable = true) ... println(df_string.count()) > 7453215
Scala
복사
스키마 추론 O, Fraction Cached: 100%, 272.4MiB
val df_string = spark .read .option("header", true) .option("inferSchema", true) .csv("hdfs://data/file.csv") .persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY) df_string.printSchema root |-- Year: integer (nullable = true) |-- Month: integer (nullable = true) |-- DayofMonth: integer (nullable = true) |-- DayOfWeek: integer (nullable = true) |-- DepTime: string (nullable = true) ... println(df_string.count()) > 7453215
Scala
복사
직렬화, 스키마 추론 O, Fraction Cached: 100%, 270.8MiB
val df_string = spark .read .option("header", true) .option("inferSchema", true) .csv("hdfs://data/file.csv") .persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER) df_string.printSchema root |-- Year: integer (nullable = true) |-- Month: integer (nullable = true) |-- DayofMonth: integer (nullable = true) |-- DayOfWeek: integer (nullable = true) |-- DepTime: string (nullable = true) ... println(df_string.count()) > 7453215
Scala
복사

결론

RDD 보다는 DataFrame 을 사용하고, 메모리 최적화가 필요한 경우 직렬화(SER) 를 해야 한다.

Delimiter

일반적으로 new line (\n, \r\n) 으로 데이터를 구분하지만, 특정 상황에서는 Delimiter 를 바꿔야하는 경우가 생긴다. ( 특정 키워드, 바이트 사이즈 등 )

[READ] RDD Hadoop Configuraiton

import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text var conf = new Configuration(sc.hadoopConfiguration) conf.set("textinputformat.record.delimiter", "custom_delimiter") val rdd = sc .newAPIHadoopFile("hdfs://data/file.csv", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) .map{case (key, value) => (key.toString.toLong, value.toString)} .map(_._2) .map(_ + "custom_end_line")
Scala
복사

[WRITE] DataFrameWriter Option

RDD를 DataFrame 으로 변환한 후 optionlineSep 을 추가해주면 된다.
lineSep (default \n): defines the line separator that should be used for writing.
sc.textFile("hdfs://data/file.csv") .toDF .write .mode("overwrite") .option("lineSep", "custom_delimiter") .text("hdfs://data/file1.csv")
Scala
복사

PySpark

[READ] RDD

rdd = sc \ .newAPIHadoopFile( "hdfs://data/file.csv", "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf = { "textinputformat.record.delimiter": "____<my delimiter>____" } ) \ .map(lambda tup: (int(tup[0]), str(tup[1]))) \ .map(lambda tup: tup[1]) \ .map(lambda value: value + "custom_delimiter") print(">>> rdd.count(): {}".format(rdd.count())) for x in rdd.collect(): print(x)
Python
복사

[READ] DataFrame

df = sc \ .read \ .option("lineSep", "custom_delimiter") \ .text("hdfs://data/file1.csv")
Python
복사

[WRITE] DataFrame

sc.textFile("hdfs://data/file.csv") \ .map(lambda x: (x, )) \ .toDF() \ .write \ .mode("overwrite") \ .option("lineSep", "custom_delimiter") \ .text("hdfs://data/file1.csv")
Python
복사

Spark SQL

Ansi SQL (SQL2003) 포맷 혹은 Data Frame 으로 사용할 수 있음
DataSet: Java, Scala 에서만 나옴 ( R, Python 에서는 사용 불가 )
RDD + DataFrame
Hive, Avro, Parquet, ORC, JSON, JDBC 등에 대한 데이터 소스에 접근 가능
Spark SQL 은 Hive 와 호환됨
BI ( Business Intelligence ) 에 연결하기 위한 JDBC / ODBC 드라이버를 제공함
Thrift Server 를 띄워서 사용 가능

히스토리

RDD (2011년): 비정형 데이터 처리
JVM 객체 분산 컬렉션
함수형 기반의 작업 ( map, filter, etc )
DataFrame (2013년): 정형 데이터 처리
표현식 기반의 작업, 사용자 정의 함수 (UDFs)
논리적 실행 계획 및 최적화 프로그램
빠르고 효율적인 내부 표현
DataSet (2015년): DataFrame 장점 + RDD 장점
내부적으로는 Row 로 처리하지만, 외부적으로는 JVM 객체로 처리함
Type Safe + Fast

Why Spark SQL

Catalyst Optimizer 로 성능 향상
Columnar 포맷, 압축, 파티션 등으로 효율적으로 데이터를 읽을 수 있음
직렬화가 빠름 ( Encoder vs Java Seerialization or Kryo )
직렬화된 데이터에 대한 작업이 가능함 ( 필터링, 정렬, 해시 등을 역직렬화를 하지 않고 가능함 )
Tungsten 실행 엔진 사용
DataFrame 으로 실행하게 되면 Python, Java/Scala, R 등으로 작성하더라도 Logical Plan 을 통해 JVM 에서 직접 실행되기 때문에 언어에 따른 성능 차이가 없음
상대적으로 코드를 간결하게 작성할 수 있음 ( Hadoop vs RDD vs SQL )
RDD
data = sc.textFile(...).split("\t") data \ .map(lambda x: (x[0], [int(x[1]), 1]) \ .reduceByKey(lambda x, y: [x[0] + y[0], x[1] + y[1]]) \ .map(lambda x: [x[0], x[1][0] / x[1][1]]) \ .collect()
Python
복사
SQL
SELECT name, ave(age) FROM people GROUP BY name
SQL
복사
DataFrame
sqlCtx \ .table("people") \ .groupBy("name") \ .agg("name", avg("age")) \ .colect()
Python
복사

사용해보기

Parquet 바로 호출
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resoures/users.parquet`")
Scala
복사
임시 뷰 테이블 등록
df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM peope")
Scala
복사
DataFrame
df.select($"name", $"age").filter($"age" > 21).groupBy("age").collection()
Scala
복사
DataSet
df.map(row => row.getAs[Long]("age") + 10
Scala
복사