Spark Cache
RDD Cache
비직렬화
Executor 에 Cache 할 메모리가 부족하면 Cache 를 하지 않는다. 이를 최대한 쥐어짜기 위해 repartition 를 사용해서 조금 더 메모리를 사용할 수0 있지만, 트레이드 오프가 생기는걸 감안해야 한다.
•
Fraction Cached: 17%
val rdd_string_array = sc
.textFile("hdfs://data/file.csv")
.filter(row => !row.startsWith("Year"))
.map(row => row.split(","))
.setName("rdd_start_array")
.persist()
println(rdd_string_array.count())
> 7453215
println(rdd_string_array.getNumPartitions)
> 100
Scala
복사
•
Fraction Cached: 23%
val rdd_string_array = sc
.textFile("hdfs://data/file.csv", 100)
.filter(row => !row.startsWith("Year"))
.map(row => row.split(","))
.setName("rdd_start_array")
.persist()
println(rdd_string_array.count())
> 7453215
println(rdd_string_array.getNumPartitions)
> 100
Scala
복사
직렬화
MEMORY_ONLY_SER Storage Level 를 설정하게 되면 훨씬 더 메모리 효율적으로 Cache할 수 있다
•
Fraction Cached: 100%
val rdd_string_array = sc
.textFile("hdfs://data/file.csv", 100)
.filter(row => !row.startsWith("Year"))
.map(row => row.split(","))
.setName("rdd_start_array")
.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
println(rdd_string_array.count())
> 7453215
println(rdd_string_array.getNumPartitions)
> 6
Scala
복사
DataFrame Cache
DataFrame 은 Columnar 형식으로 캐시하기 때문에 훨씬 캐시 용량을 줄일 수 있다.
일반적으로 Parquet 포맷을 사용한다
•
스키마 추론 X, Fraction Cached: 100%, 321MiB
val df_string = spark
.read
.option("header", true)
.option("inferSchema", false)
.csv("hdfs://data/file.csv")
.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
df_string.printSchema
root
|-- Year: string (nullable = true)
|-- Month: string (nullable = true)
|-- DayofMonth: string (nullable = true)
|-- DayOfWeek: string (nullable = true)
|-- DepTime: string (nullable = true)
...
println(df_string.count())
> 7453215
Scala
복사
•
스키마 추론 O, Fraction Cached: 100%, 272.4MiB
val df_string = spark
.read
.option("header", true)
.option("inferSchema", true)
.csv("hdfs://data/file.csv")
.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
df_string.printSchema
root
|-- Year: integer (nullable = true)
|-- Month: integer (nullable = true)
|-- DayofMonth: integer (nullable = true)
|-- DayOfWeek: integer (nullable = true)
|-- DepTime: string (nullable = true)
...
println(df_string.count())
> 7453215
Scala
복사
•
직렬화, 스키마 추론 O, Fraction Cached: 100%, 270.8MiB
val df_string = spark
.read
.option("header", true)
.option("inferSchema", true)
.csv("hdfs://data/file.csv")
.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
df_string.printSchema
root
|-- Year: integer (nullable = true)
|-- Month: integer (nullable = true)
|-- DayofMonth: integer (nullable = true)
|-- DayOfWeek: integer (nullable = true)
|-- DepTime: string (nullable = true)
...
println(df_string.count())
> 7453215
Scala
복사
결론
RDD 보다는 DataFrame 을 사용하고, 메모리 최적화가 필요한 경우 직렬화(SER) 를 해야 한다.
Delimiter
일반적으로 new line (\n, \r\n) 으로 데이터를 구분하지만, 특정 상황에서는 Delimiter 를 바꿔야하는 경우가 생긴다. ( 특정 키워드, 바이트 사이즈 등 )
[READ] RDD Hadoop Configuraiton
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
var conf = new Configuration(sc.hadoopConfiguration)
conf.set("textinputformat.record.delimiter", "custom_delimiter")
val rdd = sc
.newAPIHadoopFile("hdfs://data/file.csv", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
.map{case (key, value) => (key.toString.toLong, value.toString)}
.map(_._2)
.map(_ + "custom_end_line")
Scala
복사
[WRITE] DataFrameWriter Option
RDD를 DataFrame 으로 변환한 후 option 에 lineSep 을 추가해주면 된다.
•
lineSep (default \n): defines the line separator that should be used for writing.
sc.textFile("hdfs://data/file.csv")
.toDF
.write
.mode("overwrite")
.option("lineSep", "custom_delimiter")
.text("hdfs://data/file1.csv")
Scala
복사
PySpark
[READ] RDD
rdd = sc \
.newAPIHadoopFile(
"hdfs://data/file.csv",
"org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text",
conf = {
"textinputformat.record.delimiter": "____<my delimiter>____"
}
) \
.map(lambda tup: (int(tup[0]), str(tup[1]))) \
.map(lambda tup: tup[1]) \
.map(lambda value: value + "custom_delimiter")
print(">>> rdd.count(): {}".format(rdd.count()))
for x in rdd.collect():
print(x)
Python
복사
[READ] DataFrame
df = sc \
.read \
.option("lineSep", "custom_delimiter") \
.text("hdfs://data/file1.csv")
Python
복사
[WRITE] DataFrame
sc.textFile("hdfs://data/file.csv") \
.map(lambda x: (x, )) \
.toDF() \
.write \
.mode("overwrite") \
.option("lineSep", "custom_delimiter") \
.text("hdfs://data/file1.csv")
Python
복사
Spark SQL
•
Ansi SQL (SQL2003) 포맷 혹은 Data Frame 으로 사용할 수 있음
◦
DataSet: Java, Scala 에서만 나옴 ( R, Python 에서는 사용 불가 )
▪
RDD + DataFrame
•
Hive, Avro, Parquet, ORC, JSON, JDBC 등에 대한 데이터 소스에 접근 가능
•
Spark SQL 은 Hive 와 호환됨
•
BI ( Business Intelligence ) 에 연결하기 위한 JDBC / ODBC 드라이버를 제공함
◦
Thrift Server 를 띄워서 사용 가능
히스토리
•
RDD (2011년): 비정형 데이터 처리
◦
JVM 객체 분산 컬렉션
◦
함수형 기반의 작업 ( map, filter, etc )
•
DataFrame (2013년): 정형 데이터 처리
◦
표현식 기반의 작업, 사용자 정의 함수 (UDFs)
◦
논리적 실행 계획 및 최적화 프로그램
◦
빠르고 효율적인 내부 표현
•
DataSet (2015년): DataFrame 장점 + RDD 장점
◦
내부적으로는 Row 로 처리하지만, 외부적으로는 JVM 객체로 처리함
◦
Type Safe + Fast
Why Spark SQL
•
Catalyst Optimizer 로 성능 향상
•
Columnar 포맷, 압축, 파티션 등으로 효율적으로 데이터를 읽을 수 있음
•
직렬화가 빠름 ( Encoder vs Java Seerialization or Kryo )
•
직렬화된 데이터에 대한 작업이 가능함 ( 필터링, 정렬, 해시 등을 역직렬화를 하지 않고 가능함 )
•
Tungsten 실행 엔진 사용
•
DataFrame 으로 실행하게 되면 Python, Java/Scala, R 등으로 작성하더라도 Logical Plan 을 통해 JVM 에서 직접 실행되기 때문에 언어에 따른 성능 차이가 없음
•
상대적으로 코드를 간결하게 작성할 수 있음 ( Hadoop vs RDD vs SQL )
◦
RDD
data = sc.textFile(...).split("\t")
data \
.map(lambda x: (x[0], [int(x[1]), 1]) \
.reduceByKey(lambda x, y: [x[0] + y[0], x[1] + y[1]]) \
.map(lambda x: [x[0], x[1][0] / x[1][1]]) \
.collect()
Python
복사
◦
SQL
SELECT name, ave(age)
FROM people
GROUP BY name
SQL
복사
◦
DataFrame
sqlCtx \
.table("people") \
.groupBy("name") \
.agg("name", avg("age")) \
.colect()
Python
복사
사용해보기
•
Parquet 바로 호출
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resoures/users.parquet`")
Scala
복사
•
임시 뷰 테이블 등록
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM peope")
Scala
복사
•
DataFrame
df.select($"name", $"age").filter($"age" > 21).groupBy("age").collection()
Scala
복사
•
DataSet
df.map(row => row.getAs[Long]("age") + 10
Scala
복사