앞전 포스팅에서 spark를 설치해보았고, 잘 동작하는지 테스트하기위해
pyspark를 통해 워드카운트를 실행해보겠습니다.
spark가 설치되어있고,
python3버전이 설치되어있음을 가정합니다.
https://developer-woong.tistory.com/49
https://developer-woong.tistory.com/35
1. 워드카운트에 사용할 파일 HDFS 업로드
action server: mast01
user: hadoop
pwd: /opt/apps/spark-3.2.3
cmd:
hdfs dfs -put /opt/apps/spark-3.2.3/README.md /user/spark/
2. pyspark shell 진입
action server: mast01
user: hadoop
pwd: /opt/apps/spark-3.2.3
cmd:
pyspark --master yarn
3. 소스 작성
action server: mast01
user: hadoop
pwd: /opt/apps/spark-3.2.3
cmd:
import pyspark.sql.functions as f
spark = SparkSession.builder.appName("wordcount").getOrCreate()
textFile = spark.read.text("hdfs://NNHA/user/spark/README.md")
textFile.withColumn('word', f.explode(f.split(f.col('value'), ' ')))\
.groupBy('word')\
.count()\
.sort('count', ascending=False)\
.show()
결과확인
4. .py 생성 후 spark-submit으로 실행시켜보기
action server: mast01
user: hadoop
pwd: /opt/apps/spark-3.2.3
cmd:
# pyspark-shell 빠져나오기
Ctrl + D
# hdfs://NNHA/user/spark/README.md를 워드카운트하여
# hdfs://NNHA/user/spark/resultcount 경로 하위에 결과를 저장하는 스크립트
vi /opt/apps/spark-3.2.3/testwordcount.py
import pyspark
import pyspark.sql.functions as f
spark = pyspark.sql.SparkSession.builder.appName("wordcountSAVE").getOrCreate()
textFile = spark.read.text("hdfs://NNHA/user/spark/README.md")
textFile.withColumn('word', f.explode(f.split(f.col('value'), ' ')))\
.groupBy('word')\
.count()\
.sort('count', ascending=False)\
.write.format("csv").option("header", "false").mode("append").save("hdfs://NNHA/user/spark/resultcount")
5. spark-submit 실행
action server: mast01
user: hadoop
pwd: /opt/apps/spark-3.2.3
cmd:
spark-submit --master yarn --deploy-mode cluster /opt/apps/spark-3.2.3/testwordcount.py
결과확인
mast02:38088 리소스매니저 UI 확인
mast01:9870 resultcount 폴더 확인
반응형