-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathk_means_&_pca.py
179 lines (114 loc) · 6.34 KB
/
k_means_&_pca.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# -*- coding: utf-8 -*-
"""K-Means & PCA.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1cREVMJ4-CtQqyhr_4B4dXOvkolxOCuU8
# K-Means & PCA
### Setup
Let's setup Spark on Colab environment. Run the cell below!
"""
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
"""Now we import some of the libraries usually needed by our workload.
---
"""
# Commented out IPython magic to ensure Python compatibility.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# %matplotlib inline
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
"""Let's initialize the Spark context."""
# create the session
conf = SparkConf().set("spark.ui.port", "4050")
# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
"""I can easily check the current version and get the link of the web interface. In the Spark UI, I can monitor the progress of my job and debug the performance bottlenecks (if my Colab is running with a **local runtime**)."""
spark
"""If I run this on the Google colab hosted runtime, the cell below will create a *ngrok* tunnel which will allow me to still check the Spark UI."""
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
"import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"
"""### Data Preprocessing
In this Notebook, rather than downloading a file from some where, I will load a famous machine learning dataset, the [Breast Cancer Wisconsin dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_breast_cancer.html), using the ```scikit-learn``` datasets loader.
"""
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()
"""For convenience, given that the dataset is small, I will first construct a Pandas dataframe, tune the schema, and then convert it into a Spark dataframe."""
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
pd_df.head()
df = spark.createDataFrame(pd_df)
def set_df_columns_nullable(spark, df, column_list, nullable=False):
for struct_field in df.schema:
if struct_field.name in column_list:
struct_field.nullable = nullable
df_mod = spark.createDataFrame(df.rdd, df.schema)
return df_mod
df = set_df_columns_nullable(spark, df, df.columns)
df = df.withColumn('features', array(df.columns))
vectors = df.rdd.map(lambda row: Vectors.dense(row.features))
df.printSchema()
"""With the next cell, I am going build the two datastructures that we will be using throughout this Notebook:
* ```features```, a dataframe of Dense vectors, containing all the original features in the dataset;
* ```labels```, a series of binary labels indicating if the corresponding set of features belongs to a subject with breast cancer, or not.
"""
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row), ["features"])
labels = pd.Series(breast_cancer.target)
"""### Building machine learning model
Now I am ready to cluster the data with the [K-means](https://spark.apache.org/docs/latest/ml-clustering.html) algorithm included in MLlib (Spark's Machine Learning library).
Also, I am setting the ```k``` parameter to **2**, fit the model, and the compute the [Silhouette score](https://en.wikipedia.org/wiki/Silhouette_(clustering)) (i.e., a measure of quality of the obtained clustering).
**IMPORTANT:** I am using the MLlib implementation of the Silhouette score (via ```ClusteringEvaluator```).
"""
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(features)
# Make predictions
predictions = model.transform(features)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f'Silhouette: {silhouette}')
"""Next, I will take the predictions produced by K-means, and compare them with the ```labels``` variable (i.e., the ground truth from our dataset).
Then, I will compute how many data points in the dataset have been clustered correctly (i.e., positive cases in one cluster, negative cases in the other).
I am using ```np.count_nonzero(series_a == series_b)``` to quickly compute the element-wise comparison of two series.
**IMPORTANT**: K-means is a clustering algorithm, so it will not output a label for each data point, but just a cluster identifier! As such, label ```0``` does not necessarily match the cluster identifier ```0```.
"""
predictions_df = predictions.toPandas()
converted_pre = predictions_df['prediction'].apply(lambda x: 0 if x else 1)
np.count_nonzero(converted_pre.values == labels.values)
"""Now I am performing dimensionality reduction on the ```features``` using the [PCA](https://spark.apache.org/docs/latest/ml-features.html#pca) statistical procedure, available as well in MLlib.
Setting the ```k``` parameter to **2**, effectively reducing the dataset size of a **15X** factor.
"""
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
pca = PCA(k=2, inputCol="features", outputCol="pca")
model = pca.fit(features)
result = model.transform(features).select("pca")
result.show(truncate=False)
"""Now running K-means with the same parameters as above, but on the ```pcaFeatures``` produced by the PCA reduction that I just executed.
I am also computing the Silhouette score, as well as the number of data points that have been clustered correctly.
"""
kmeans = KMeans(featuresCol='pca').setK(2).setSeed(1)
model = kmeans.fit(result)
pca_predictions = model.transform(result)
pca_evaluator = ClusteringEvaluator(featuresCol='pca')
pca_silhouette = pca_evaluator.evaluate(pca_predictions)
print(f'Silhouette after PCS {pca_silhouette}')
pca_predictions_df = pca_predictions.toPandas()
pca_converted_pre = pca_predictions_df['prediction'].apply(lambda x: 0 if x else 1)
np.count_nonzero(pca_converted_pre == labels.values)
#stopping Spark environment
sc.stop()