Blog

Apache Spark 을 이용한 빅데이터 분석 (11)

Spark

Structured Streaming

System 의 시간과 관계없이 들어온 데이터의 timestamp 를 참조하여 데이터를 처리한다.
윈도우는 시작시간 ≤ 현재시간 < 종료시간 을 기준으로 데이터를 자른다. ( 엔진 설명 문서가 잘못되어있음 )
데이터를 집계할 때 과거 데이터를 사용하지 않으려면 Watermark 를 사용한다

Twitter

Prerequisite

Python 3.7

Prepare

all node
$ sudo python3.7 -m pip install -U numpy pandas sklearn matplotlib hdfs $ sudo python3.7 -m pip list
Shell
복사
master node
$ wget -O $HOME/spark3/spark-streaming-twitter_2.12-2.4.0.jar https://repo1.maven.org/maven2/org/apache/bahir/spark-streaming-twitter_2.12/2.4.0/spark-streaming-twitter_2.12-2.4.0.jar $ wget -O $HOME/spark3/spark-tags_2.12-3.1.1.jar https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.1.1/spark-tags_2.12-3.1.1.jar $ wget -O $HOME/spark3/twitter4j-stream-4.0.6.jar https://repo1.maven.org/maven2/org/twitter4j/twitter4j-stream/4.0.6/twitter4j-stream-4.0.6.jar $ wget -O $HOME/spark3/twitter4j-core-4.0.6.jar https://repo1.maven.org/maven2/org/twitter4j/twitter4j-core/4.0.6/twitter4j-core-4.0.6.jar $ wget -O $HOME/spark3/gson-2.8.5.jar https://repo1.maven.org/maven2/com/google/code/gson/gson/2.8.5/gson-2.8.5.jar
Shell
복사
Zeppelin Interpreter 설정에 위의 5가지 jar 정보 추가

실습

트위터 인증 모듈
object Utils extends Serializable { import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder def getAuth = { val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = Array("{{ consumerKey }}", "{{ consumerSecret }}", "{{ accessToken }}", "{{ accessTokenSecret }}") val builder: ConfigurationBuilder = new ConfigurationBuilder() builder.setOAuthConsumerKey(consumerKey); builder.setOAuthConsumerSecret(consumerSecret); builder.setOAuthAccessToken(accessToken); builder.setOAuthAccessTokenSecret(accessTokenSecret); Some(new OAuthAuthorization(builder.build())) } }
Scala
복사
트위터 데이터 수집
import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.twitter.TwitterUtils import com.google.gson.Gson import twitter4j.conf.ConfigurationBuilder import twitter4j.auth.OAuthAuthorization val Array(outputDirectory, numTweetsToCollect, intervalSecs, partitionsEachInterval) = Array("hdfs://master-01:9000/data/twitter/tweets_", "2000", "5", "1") val ssc = new StreamingContext(sc, Seconds(intervalSecs.toLong)) val tweetInputDStream = TwitterUtils.createStream(ssc, Utils.getAuth) val tweetStream = tweetInputDStream.mapPartitions(iter => { var gson = new Gson() iter.map(gson.toJson(_)) }) var numTweetsCollected = 0L tweetStream.foreachRDD((rdd, time) => { val count = rdd.count() if (count > 0) { rdd.coalesce(partitionsEachInterval.toInt).saveAsTextFile(outputDirectory + time.milliseconds.toString) numTweetsCollected += count if (numTweetsCollected >= numTweetsToCollect.toLong) { ssc.stop(false) } } println(s"""${numTweetsCollected}개 수집 완료""") })
Scala
복사
데이터 읽기
import pyspark df_tweet = spark \ .read \ .json("hdfs://master-01:9000/data/twitter/tweets_*") \ .presist(pyspark.StorageLevel.MEMORY_ONLY) df_tweet.createOrReplaceTempView("df_tweet")
Python
복사
학습
from pyspark.sql.functions import * from pyspark.sql.types import Row from pyspark.ml.feature import HashingTF from pyspark.ml.clustering import KMeans, KMeansModel tweet_path, model_path, num_cluster, num_iteration = "hdfs://master-01:9000/data/twitter/tweets_*", "hdfs://master-01:9000/model/twitter/kmeans", 100, 50 df_tweet = spark.read.json(tweet_path) rdd_texts = df_tweet.rdd.map(lambda row: row['text']) n = 2 rdd_terms = rdd_texts.map(lambda text: [text[i:i+n] for i in range(len(text)-(n-1))]) df_terms = rdd_terms.map(lambda x: Row(terms=x)).toDF() hashingTF = HashingTF(inputCol="terms", outputCol="features", numFeatures=1000) df_features = hashingTF.transform(df_terms) df_features.cache() kmeans = KMeans() \ .setFeatureCol("features") \ .setPredictionCol("prediction") \ .setK(num_cluster) \ .setInitMode("k-means||") \ .setMaxIter(num_iteration) \ .setSeed(11) model = kmeans.fit(df_features) model.write().overwrite().save(model_path) df_features.unpersist()
Python
복사
타겟 클러스터 선택
n = 2 df_some_tweets = spark \ .sql("SELECT lang, text from df_tweet LIMIT 2000") \ .rdd \ .map(lambda row: Row(lang=row["lang"], text=row["text"], text_ngram=([row["text"][i:i+n] for i in range(len(row["text"])-(n-1))]))) \ .toDF() hashingTF2 = HashingTF(inputCol="text_ngram", outputCol="features", numFeatures=1000) df_some_features = hashingTF2.transform(df_somw_tweets) model = KMeansModel.load(df_some_tweets) df_some_transformed = model2.transform(df_some_features) df_some_transformed.cache() for num in range(num_cluster): df_some_transformed.where(col('prediction') == num).select("lang", regexp_replace("text", "\n", " ").alias("text"), "prediction").show(3, truncate=False) df_some_transformed.unpersist()
Python
복사
원하는 데이터들이 모인 클러스터 번호를 찾아냄
서비스 적용하기 ( Predict, Filter, Save ... )
import org.apache.spark.ml.clustering.KMeansModel import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.feature.HashingTF import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.twitter.TwitterUtils val Array(modelPath, clusterNumber) = Array("hdfs://master-01:9000/model/twitter/kmeans", "63") val ssc = new StreamingContext(sc, Seconds(intervalSecs.toLong)) val tweets = TwitterUtils.createStream(ssc, Utils.getAuth) val tweetsText = tweets.map(status => (status.getLang, status.getText)) val model = KMeansModel.load(modelPath) val hashingTF = new HashingTF() .setInputCol("text_ngram") .setOutputCol("features") .setNumFeatures(1000) tweetsText.foreachRDD((rdd, time) => { val df_tweets = rdd .map(lang_text => (lang_text._1, lang_text._2, lang_text._2.sliding(2).toSeq, time.milliseconds)) //--n-gram DataFrame 생성.... .toDF("lang", "text", "text_ngram", "time") val df_features = hashingTF.transform(df_tweets) val df_transformed = model.transform(df_features) val df_tweet_filtered = df_transformed .where($"prediction" === clusterNumber.toInt) .select($"lang", regexp_replace($"text", "\n", " ").as("text"), $"time") df_tweet_filtered.createOrReplaceTempView("df_tweet_filtered") df_tweet_filtered.write.mode("append").saveAsTable("df_tweet_filtered_append") }) ssc.start()
Scala
복사