Blog

Apache Spark 을 이용한 빅데이터 분석 (8)

Spark

READ/WRITE Data Format

Built-In
Parquet, JDBC, JSON, ORC, Hive
CSV ( 2.0+ Built In )
Avro ( 2.4+ Built In )
HDFS, Amazon S3, H2
External ( Packages )
AWS Redshift, MongoDB, Cassandra, HBase, Couchbase, etc
Available 45+ Packages

JDBC ( MySQL )

Prerequisite

MySQL 8.0.24
user: root
database: clud
table: test

Zeppelin 설정

Interpreter 추가
- Interpreter Name: mysql - Interpreter group: jdbc
Scala
복사
Properties
- default.url: jdbc:mysql://{HOST}:3306/ - default.user: root - default.password: {HIDDEN} - default.driver: com.mysql.cj.jdbc.Driver
Scala
복사
Dependencies
- Artifact: mysql:mysql-connector-java:8.0.24
Scala
복사

Zeppelin Interpreter 확인

%mysql show databases;
SQL
복사

Spark Interpreter 수정

다운로드 받은 JDBC Driver 경로를 입력
- spark.jars: $HOME/downloads/mysql-connector-java-8.0.24.jar
SQL
복사

Spark Warehouse 설정

테스트하기 위해 DerbyDB 를 사용했으나, 보통은 MySQL, PostgreSQL 을 사용함
경로: $HOME/spark3/conf/hive-site.xml
$ vi $HOME/spark3/conf/hive-site.xml <!-- derby....--> <configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:derby:;databaseName=/clud/metastore_db/metastore_db;create=true</value> <description>JDBC connect string for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>org.apache.derby.jdbc.EmbeddedDriver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>hdfs://master-01:9000/clud/warehouse</value> <description>location of default database for the warehouse</description> </property> </configuration>
Shell
복사

Spark JDBC

val df = spark .read .format("jdbc") .option("url", "jdbc:mysql://{HOST}:3306/clud") .option("driver", "com.mysql.cj.jdbc.Driver") .option("user", "root") .option("password", "") .option("dbtable", "test") .load() df.printSchema() df.show(false)
Scala
복사

실습

SQL

df.createOrReplaceTempView("tmp_test") val df_sql = spark.sql("SELECT * FROM tmp_test WHERE id > 3") df_sql.show() // 테이블 내용 조회 spark.table("tmp_test").show() // 테이블 목록 조회 spark.catalog.listTables.show()
Scala
복사

Table by SubQuery

원본 데이터에서 일부 데이터만 조회해서 처리해야할 때 사용한다
val df_sub = spark .read .format("jdbc") .option("url", "jdbc:mysql://{HOST}:3306/") .option("driver", "com.mysql.cj.jdbc.Driver") .option("user", "root") .option("password", "") .option("dbtable", "(SELECT * FROM clud.test WHERE id > 3) as subtest") .load() df_sub.show()
Scala
복사

Properties 분리

val prop: java.util.Properties = new java.util.Properties() prop.setProperty("user", "root") prop.setProperty("password", ") prop.setProperty("driver", "com.mysql.cj.jdbc.Driver") df_sub .select("name", "website", "manager") .write .mode(org.apache.spark.sql.SaveMode.Append) .jdbc("jdbc:mysql://{HOST}:3306/clud", "test_new", prop)
Scala
복사

UDF ( User Defined Function )

val extractSuffixUDF = spark .udf .register( "myUDF", (arg1: String) => arg1.substring(arg1.lastIndexOf(".") + 1) ) df_sub .select("id", "name", "website", "manager") .withColumn("website_suffix", extractSuffixUDF(col("website")) // UDF 를 직접 호출 가능 .write .mode("overwrite") .jdbc("jdbc:mysql://{HOST}:3306/clud", "test_new", prop) df_sub .select("id", "name", "website", "manager") .withColumn("website_suffix", callUDF("myUDF", 'website) // callUDF 함수로 호출 가능 .write .mode("append") .jdbc("jdbc:mysql://{HOST}:3306/clud", "test_new", prop) spark .catalog .listFunctions .where($"name" like "%myUDF%") .show(500, false)
Scala
복사

BroadcastHashJoin

df_people = spark.read.json("hdfs://master-01:9000/data/resources/people.json") // BroadcastHashJoin df_people .filter("age > 11") .join(df, df_people("name") === df("manager")) .show() // BroadcastHashJoin + Predicate Pushdown df_people .join(df, df_people("name") === df("manager")) .filter("age > 11") .show()
Scala
복사
데이터가 적으면 Spark 가 알아서 BroadcastHashJoin 을 수행한다
SparkSQL Optimizer 가 최적화해서 처리하기 때문에 filter + joinjoin + filter 는 동일하게 실행된다

SQL Join

df.createOrReplaceTempView("test") df_people.createOrReplaceTempView("people") spark.sql(""" SELECT website, avg(age), max(id) FROM people a JOIN test b ON a.name = b.manager WHERE a.age > 11 GROUP BY b.website """).show
Scala
복사

Partition

df .write .partitionBy("manager", "name") .mode("overwrite") .saveAsTable("partition_table") sql("SELECT * FROM partition_table").show(false)
Scala
복사

결론

RDD로 직접 작성하는것보다 Spark SQL 로 작성하는게 좋다 (정신건강에 이롭다)
파티션은 성능 향상을 위해 사용한다
Spark 에서 derby 를 설정해주지 않으면 애플리케이션이 종료됨과 동시에 삭제되기 때문에 꼭 hive-site 를 설정해야 한다