技術関連

[PySpark]ビッグデータを分析用にPySparkを使用してみる

更新日:

概要

作成したもの

  • ワインの品質用のデータセットをPySparkで前処理

使用した技術・ライブラリ

  • Pyspark
  • EMR

動機

仕事でTBサイズのCSVファイルを扱うことがあり、前処理でPySparkを使用したのでその忘備録として今回の記事を書きました。

なぜPySparkを使用するのか

1GB程度のデータ分析では通常Pandasを使用しますが、それ以上のサイズのデータになった場合、
処理に膨大な時間がかかったり、そもそもメモリ上に乗せることができず処理を回すことができなくなります。

そうした場合に、Hadoop等の分散基盤を利用することでデータを分割して並列処理が可能になるので、
大きなデータサイズでも処理を高速に回すことが出来ます。

ただし、分散基盤の処理の記述に使用されるHIVEやPresto、Sparkは、SQLベースの言語になっている為、
普段SQLを書きなれていない分析者にとって非常に使いづらいです。

書きやすい文法で分散基盤の処理を記述したい場合にPySparkを使用することで、
Pandasと似た文法で書くことが出来るようになります。

PySparkの記述例

分散基盤をAWSのEMRを使用して構築しましたが、今回環境構築については割愛させていただきます。

使用データ

kaggleからワインの品質データセットを利用しました。本来であれば100GBクラスのデータでないと、分散基盤の恩恵を受けられませんが、今回は動作の確認のため小さなデータを使用しました。https://kagle.com/sh6147782/winequalityred
ダウンロードしたCSVをGZIPで分割圧縮し、S3に格納しました。
大きなデータを使用する場合は、1.5GBずつくらいでファイルを分割すると分散基盤で早く読み込めます。

データの読み込み

S3に格納した複数のGZIPをまとめて読み込むことができます。
Pandasと違い、Jupyter上でそのまま実行しても実行結果はきれいに表示することはできません。

# 複数のgzipファイルをそのまま読み込むことが可能
df = spark.read.csv("s3://xxxbucket/*.gz",header=True, sep=',', encoding='utf8')
df.show()

結果

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|          0|           1.9|    0.076|                 11|                  34| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|          0|           2.6|    0.098|                 25|                  67| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|                 15|                  54|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|                 17|                  60|  0.998|3.16|     0.58|    9.8|      6|
|          7.4|             0.7|          0|           1.9|    0.076|                 11|                  34| 0.9978|3.51|     0.56|    9.4|      5|
|          7.4|            0.66|          0|           1.8|    0.075|                 13|                  40| 0.9978|3.51|     0.56|    9.4|      5|
|          7.9|             0.6|       0.06|           1.6|    0.069|                 15|                  59| 0.9964| 3.3|     0.46|    9.4|      5|
|          7.3|            0.65|          0|           1.2|    0.065|                 15|                  21| 0.9946|3.39|     0.47|     10|      7|
|          7.8|            0.58|       0.02|             2|    0.073|                  9|                  18| 0.9968|3.36|     0.57|    9.5|      7|
|          7.5|             0.5|       0.36|           6.1|    0.071|                 17|                 102| 0.9978|3.35|      0.8|   10.5|      5|
|          6.7|            0.58|       0.08|           1.8|    0.097|                 15|                  65| 0.9959|3.28|     0.54|    9.2|      5|
|          7.5|             0.5|       0.36|           6.1|    0.071|                 17|                 102| 0.9978|3.35|      0.8|   10.5|      5|
|          5.6|           0.615|          0|           1.6|    0.089|                 16|                  59| 0.9943|3.58|     0.52|    9.9|      5|
|          7.8|            0.61|       0.29|           1.6|    0.114|                  9|                  29| 0.9974|3.26|     1.56|    9.1|      5|
|          8.9|            0.62|       0.18|           3.8|    0.176|                 52|                 145| 0.9986|3.16|     0.88|    9.2|      5|
|          8.9|            0.62|       0.19|           3.9|     0.17|                 51|                 148| 0.9986|3.17|     0.93|    9.2|      5|
|          8.5|            0.28|       0.56|           1.8|    0.092|                 35|                 103| 0.9969| 3.3|     0.75|   10.5|      7|
|          8.1|            0.56|       0.28|           1.7|    0.368|                 16|                  56| 0.9968|3.11|     1.28|    9.3|      5|
|          7.4|            0.59|       0.08|           4.4|    0.086|                  6|                  29| 0.9974|3.38|      0.5|      9|      4|
|          7.9|            0.32|       0.51|           1.8|    0.341|                 17|                  56| 0.9969|3.04|     1.08|    9.2|      6|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
only showing top 20 rows

データの前処理

指定のデータ型やカラム名に変更することが可能です。

df = df.withColumnRenamed('fixed acidity', 'fixed_acidity') # カラム名の変更
df = df.withColumn('quality', df.quality.cast('int')) # 型の変更

集計

groupByの関数を使用することで、値ごとに結果を集計することが出来ます。
品質ごとに数を集計してから、数が多い順にソートしています。

df.groupBy('quality').count().sort('count', ascending=False).show() # クオリティごとに集計

結果

+-------+-----+
|quality|count|
+-------+-----+
|      5|  680|
|      6|  637|
|      7|  199|
|      4|   52|
|      8|   18|
|      3|   10|
+-------+-----+

保存

クオリティでパーティションを作成し、クオリティごとに結果をS3に格納しています。
クオリティ単位でCSVファイルが作成され結果が保存されます。

# クオリティごとにCSVファイルを分割して保存
re_df = df.repartition('quality')# クオリティでパーティションを作成
re_df.write.partitionBy(['quality']).mode("overwrite").csv('s3://xxxbucket/output/', header=True)

結論・感想

PySparkを利用したデータの処理を簡単に解説してみました。
実際は大きなデータで処理を回そうとすると、記述の違いでパフォーマンスに大きな違いがでます。
ベースがSQLであるため複雑な処理を書こうとすると分散基盤でも時間がかかる場合がありますが、
書き方によって1時間かかる処理が数秒で書けてしまうことがあるので、次回はこの辺を解説してみたいと思います。

-技術関連

Copyright© AIなんて気合いダッ! , 2020 All Rights Reserved Powered by AFFINGER5.