Spark
READ/WRITE Data Format
•
Built-In
◦
Parquet, JDBC, JSON, ORC, Hive
◦
CSV ( 2.0+ Built In )
◦
Avro ( 2.4+ Built In )
◦
HDFS, Amazon S3, H2
•
External ( Packages )
◦
AWS Redshift, MongoDB, Cassandra, HBase, Couchbase, etc
◦
Available 45+ Packages
JDBC ( MySQL )
Prerequisite
•
MySQL 8.0.24
◦
user: root
◦
database: clud
◦
table: test
•
MySQL JDBC Driver ( https://dev.mysql.com/downloads/connector/j/)
Zeppelin 설정
•
Interpreter 추가
- Interpreter Name: mysql
- Interpreter group: jdbc
Scala
복사
•
Properties
- default.url: jdbc:mysql://{HOST}:3306/
- default.user: root
- default.password: {HIDDEN}
- default.driver: com.mysql.cj.jdbc.Driver
Scala
복사
•
Dependencies
- Artifact: mysql:mysql-connector-java:8.0.24
Scala
복사
Zeppelin Interpreter 확인
%mysql
show databases;
SQL
복사
Spark Interpreter 수정
다운로드 받은 JDBC Driver 경로를 입력
- spark.jars: $HOME/downloads/mysql-connector-java-8.0.24.jar
SQL
복사
Spark Warehouse 설정
•
테스트하기 위해 DerbyDB 를 사용했으나, 보통은 MySQL, PostgreSQL 을 사용함
경로: $HOME/spark3/conf/hive-site.xml
$ vi $HOME/spark3/conf/hive-site.xml
<!-- derby....-->
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby:;databaseName=/clud/metastore_db/metastore_db;create=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.apache.derby.jdbc.EmbeddedDriver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>hdfs://master-01:9000/clud/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
</configuration>
Shell
복사
Spark JDBC
val df = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://{HOST}:3306/clud")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("user", "root")
.option("password", "")
.option("dbtable", "test")
.load()
df.printSchema()
df.show(false)
Scala
복사
실습
SQL
df.createOrReplaceTempView("tmp_test")
val df_sql = spark.sql("SELECT * FROM tmp_test WHERE id > 3")
df_sql.show()
// 테이블 내용 조회
spark.table("tmp_test").show()
// 테이블 목록 조회
spark.catalog.listTables.show()
Scala
복사
Table by SubQuery
원본 데이터에서 일부 데이터만 조회해서 처리해야할 때 사용한다
val df_sub = spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://{HOST}:3306/")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("user", "root")
.option("password", "")
.option("dbtable", "(SELECT * FROM clud.test WHERE id > 3) as subtest")
.load()
df_sub.show()
Scala
복사
Properties 분리
val prop: java.util.Properties = new java.util.Properties()
prop.setProperty("user", "root")
prop.setProperty("password", ")
prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")
df_sub
.select("name", "website", "manager")
.write
.mode(org.apache.spark.sql.SaveMode.Append)
.jdbc("jdbc:mysql://{HOST}:3306/clud", "test_new", prop)
Scala
복사
UDF ( User Defined Function )
val extractSuffixUDF = spark
.udf
.register(
"myUDF",
(arg1: String) => arg1.substring(arg1.lastIndexOf(".") + 1)
)
df_sub
.select("id", "name", "website", "manager")
.withColumn("website_suffix", extractSuffixUDF(col("website")) // UDF 를 직접 호출 가능
.write
.mode("overwrite")
.jdbc("jdbc:mysql://{HOST}:3306/clud", "test_new", prop)
df_sub
.select("id", "name", "website", "manager")
.withColumn("website_suffix", callUDF("myUDF", 'website) // callUDF 함수로 호출 가능
.write
.mode("append")
.jdbc("jdbc:mysql://{HOST}:3306/clud", "test_new", prop)
spark
.catalog
.listFunctions
.where($"name" like "%myUDF%")
.show(500, false)
Scala
복사
BroadcastHashJoin
df_people = spark.read.json("hdfs://master-01:9000/data/resources/people.json")
// BroadcastHashJoin
df_people
.filter("age > 11")
.join(df, df_people("name") === df("manager"))
.show()
// BroadcastHashJoin + Predicate Pushdown
df_people
.join(df, df_people("name") === df("manager"))
.filter("age > 11")
.show()
Scala
복사
•
데이터가 적으면 Spark 가 알아서 BroadcastHashJoin 을 수행한다
•
SparkSQL Optimizer 가 최적화해서 처리하기 때문에 filter + join 과 join + filter 는 동일하게 실행된다
SQL Join
df.createOrReplaceTempView("test")
df_people.createOrReplaceTempView("people")
spark.sql("""
SELECT website, avg(age), max(id)
FROM people a
JOIN test b ON a.name = b.manager
WHERE a.age > 11
GROUP BY b.website
""").show
Scala
복사
Partition
df
.write
.partitionBy("manager", "name")
.mode("overwrite")
.saveAsTable("partition_table")
sql("SELECT * FROM partition_table").show(false)
Scala
복사
결론
•
RDD로 직접 작성하는것보다 Spark SQL 로 작성하는게 좋다 (정신건강에 이롭다)
•
파티션은 성능 향상을 위해 사용한다
•
Spark 에서 derby 를 설정해주지 않으면 애플리케이션이 종료됨과 동시에 삭제되기 때문에 꼭 hive-site 를 설정해야 한다