Blog

Apache Spark 을 이용한 빅데이터 분석 (12)

Spark

Word2vec

단어를 벡터로 바꿔주는 알고리즘
벡터의 차원이 많아질수록 메모리를 많이 사용하게 되기 때문에, driver 와 executor 의 메모리를 충분히 확보해줘야한다.
Spark 는 Skip-gram 만 지원한다.
CBOW 보다는 Skip-gram 이 좀 더 좋다.
일반적으로 Skip-gram 을 사용하는것을 권장한다
예시 사이트: https://word2vec.kr/search/

Prerequisite

Spark 메모리 확보
spark.driver.memory: 6g
spark.executor.memory: 6g
선형대수(linear algebra) 라이브러리 다운로드
$ wget -O $HOME/spark3/jblas-1.2.4.jar https://repo1.maven.org/maven2/org/jblas/jblas/1.2.4/jblas-1.2.4.jar
Plain Text
복사
Spark jars 설정
spark.jars: ...$HOME/spark3/jblas-1.2.4.jar
위키피디아 데이터
다운로드 사이트: https://dumps.wikimedia.org/kowiki/
$ mkdir -p $HOME/data/wiki $ wget -O $HOME/data/wiki/kowiki-latest-pages-articles.xml.bz2 https://dumps.wikimedia.org/kowiki/latest/kowiki-latest-pages-articles.xml.bz2 $ sudo pip install wikiextractor $ python -m wikiextractor.WikiExtractor $HOME/data/wiki/kowiki-latest-pages-articles.xml.bz2 $ hdfs dfs -mkdir -p /data/wiki $ hdfs dfs -put -f /$HOME/data/wiki/text/ /data/wiki/
Plain Text
복사

모델 준비 실습

위키피디아 데이터 불러오기

val ko_wiki_df = spark .read .option('lineSep', '</doc>') .textFile('hdfs://master-01:9000/data/wiki/text/*/wiki*')
Scala
복사

제목과 내용 분리

val ko_wiki_df2 = ko_wiki_df .select(split($"value", " title=\"", 2)(1).as("value")) .select(split($"value", "\">", 2).as("value")) .select($"value"(0).as("title"), trim($"value"(1)).as("content")) ko_wiki_df2.createOrReplaceTempView("ko_wiki_parsed")
Scala
복사

NLTK, KoNLPy 설치

$ pip install -U nltk konlpy $ PYTHON_COMMAND_TEMP=""" import nltk, konlpy nltk.download('punkt') nltk.download('averaged_perceptron_tagger') "" $ python -c "$PYTHON_COMMAND_TEMP"
Plain Text
복사

NLTK, KoNLPy 테스트

from nltk.tokenize import word_tokenize from nltk.tag import pos_tag from konlpy.tag import Okt, Kkma text = "Hello World. My name is blabla" print(word_tokenize(text)) # ['Hello', 'World', '.', 'My', 'name', 'is', 'blabla'] print(pos_tag(word_tokenize(text))) # [('Hello', 'NNP'), ('World', 'NNP'), ('.', '.'), ('My', 'PRP$'), ('name', 'NN'), ('is', 'VBZ'), ('blabla', 'JJ')] text_ko = '안녕하세요. 블라블라블라입니다. 여행을 가봐요' okt = Okt() print(okt.morphs(text_ko)) # ['안녕하세요', '.', '블라블라', '블', '라', '입니다', '.', '여행', '을', '가봐요'] print(okt.pos(text_ko)) # [('안녕하세요', 'Adjective'), ('.', 'Punctuation'), ('블라블라', 'Noun'), ('블', 'Noun'), ('라', 'Josa'), ('입니다', 'Adjective'), ('.', 'Punctuation'), ('여행', 'Noun'), ('을', 'Josa'), ('가봐요', 'Verb')] kkma = Kkma() print(kkma.morphs(text_ko)) # ['안녕', '하', '세요', '.', '블라', '블라', '블라', '이', 'ㅂ니다', '.', '여행', '을', '가보', '아요'] print(kkma.pos(text_ko)) # [('안녕', 'NNG'), ('하', 'XSV'), ('세요', 'EFN'), ('.', 'SF'), ('블라', 'NNG'), ('블라', 'NNG'), ('블라', 'NNG'), ('이', 'VCP'), ('ㅂ니다', 'EFN'), ('.', 'SF'), ('여행', 'NNG'), ('을', 'JKO'), ('가보', 'VV'), ('아요', 'EFN')]
Python
복사

학습데이터 형태소 분석

from pyspark.sql.functions import * from pyspark.sql.types import Row from konlpy.tag import Okt okt = Okt() ko_wiki_train_data_df = spark \ .table("ko_wiki_parsed") \ .where(col("title").isNotNull()) \ .where(col("content").isNotNull()) \ .rdd \ .map(lambda row: Row(title=row.title, content=row.content, text=Okt().nouns(row.content))) \ .toDF() ko_wiki_train_data_df = ko_wiki_train_data_df.rdd.coalesce(10).toDF() ko_wiki_train_data_df.createOrReplaceTempView("ko_wiki_train_data") ko_wiki_train_data_df \ .write \ .format("parquet") \ .mode("overwrite") \ .saveAsTable("ko_wiki_train_data_table")
Python
복사

학습 데이터 캐시

val ko_wiki_train_data_hdfs_df = spark.read.load("hdfs://master-01:9000/data/warehouse/ko_wiki_train_data_table") ko_wiki_train_data_hdfs_df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
Scala
복사

Word2vec 트레이닝

import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel} val word2vec = new Word2Vec() .setInputCol("text") .setOutputCol("result") .setStepSize(0.025) .setMaxSentenceLength(1000) .setMinCount(5) .setSeed(1L) .setVectorSize(100) .setWindowSize(5) val model = word2vec.fit(ko_wiki_train_data_hdfs_df) val ko_wiki_predicted_df = model.transform(ko_wiki_train_data_hdfs_df) model .write .overwrite() .save("hdfs://master-01:9000/model/wirki/word2vc/ml_v100")
Scala
복사

모델 사용 실습

Word2vec 모델 불러오기

import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel} val sameModel = Word2VecModel.load("hdfs://master-01:9000/model/wiki/word2vec/ml_v100") val df_vectors = sameModel.getVectors.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
Scala
복사

벡터 연산을 위해 DoubleMatrix 생성 함수

spark 에서 제공하는 vector 에는 연산이 없기 때문에 jblas 에 있는 DoubleMatrix 으로 변환
DenseVector 가 조금 더 메모리 효율적임
import org.jblas.DoubleMatrix import org.jblas.{DoubleMatrix => DM} import breeze.linalg.DenseVector import breeze.linalg.{DenseVector => DV} import org.apache.spark.ml.linalg.Vector def word2DM(word: String) = { new DM(df_vectors.where($"word" === word).head.getAs[Vector]("vector").toArray) } def word2DV(word: String) = { new DV(df_vectors.where('word === word).head.getAs[Vector]("vector").toArray) }
Scala
복사

연산 함수 정의

import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.DataFrame def searchByRelation(positiveWord: String, positiveWord2: String, num: Int): DataFrame = { val target = word2DM(positiveWord).add(word2DM(positiveWord2)) val result = sameModel.findSynonyms(Vectors.dense(target.toArray), num) result } def searchByRelation(positiveWord: String, negativeWord: String, positiveWord2: String, num: Int): DataFrame = { val target = word2DM(positiveWord).sub(word2DM(negativeWord)).add(word2DM(positiveWord2)) val result = sameModel.findSynonyms(Vectors.dense(target.toArray), num) result }
Scala
복사

검색

val positive_word_01 = "서울" val negative_word_02 = "대한민국" val positive_word_03 = "일본" val result = searchByRelation(positive_word_01, negative_word_02, positive_word_03, 1) result.show() +----+-----------------+ |word| similarity| +----+-----------------+ |도쿄|0.658542811870575| +----+-----------------+
Scala
복사

차원 축소 ( PCA )

import org.apache.spark.ml.feature.PCA import org.apache.spark.ml.linalg.DenseMatrix import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.linalg.Vector val pca = new PCA() .setInputCol("vector") .setOutputCol("pca_features") .setK(2) val pca_model = pca.fit(df_vectors) val sameModelVectorsPCA = pca_model.transform(df_vectors) val seq2: Seq[(String, Vector)] = sameModelVectorsPCA .map(row => (row.getString(0), row.getAs[Vector](2))) .collect() .toSeq
Scala
복사

추천 시스템

Collaborative Filtering ( 협업 필터링 )
유사한 행위를 한 사람들을 대상으로 세그먼트로 분류하여 추천한다
matrix completion 라고도 불린다
순서
데이터 수집 → 데이터 탐색 및 정렬 → 모델 생성 → 추천 → 모델 평가 → 하이퍼 파라미터 튜닝