Solution
Algorithm Steps:
-
Step 1: Initialize K cluster centroid randomly
-
Step 2: Assign Cluster id to each data point based on closest cluster centroid
-
Step 3: Update Cluster Centroid to the mean value of all the data points which belongs to a given cluster assigned in step number 2.
-
Step 4: Repeat Step 2 and 3 until convergance.
Code
import numpy as np
import numpy.typing as npt
np.random.seed(10)
def initialize_cluster(X: np.ndarray, K: int) -> np.ndarray:
"""
Function to Initialize K Cluster Centroid, Each Cluster centroid should have m elements
Args:
X: ndarray of (n x m)
K: Number of Cluster Centroids
Return:
ndarray of (K X m)
"""
# Shuffle the data points
np.random.shuffle(X)
# Take the first K data points as initial cluster centroids
initial_centroids = X[:K, :]
return initial_centroids
def compute_cluster_centroid(X: np.ndarray, cluster_ids: np.ndarray, K: int) -> np.ndarray:
"""
Function to compute Cluster Centroid based on given cluster_ids
Args:
X: ndarray of (n x m)
cluster_ids: ndarray of (n x 1), ith element denotes which cluster X[i] belongs to
K: number of clusters, each element in cluster_ids have value in range [0, K-1]
Return:
ndarray of (K X m): Cluster Centroids, mean value of all the elements in each cluster
"""
cluster_centroids = np.zeros((K, X.shape[1])) # Initialize cluster centroids
for k in range(K):
# Select data points belonging to cluster k
cluster_points = X[cluster_ids == k]
# Compute mean along the first axis (rows) to get cluster centroid
centroid = np.mean(cluster_points, axis=0)
# Update cluster centroid
cluster_centroids[k] = centroid
return cluster_centroids
def compute_closest_cluster(x: np.ndarray, cluster_centroids: np.ndarray) -> int:
"""
Function to compute closest cluster centroid of a given data point and cluster_centroids
Args:
x: ndarray (1 x m) , single data point
cluster_centroids: ndarray (K x m) , K cluster centroids
Return:
Index of Cluster Centroid which is closest to the data point x
"""
# Compute Euclidean distance between x and each cluster centroid
distances = np.linalg.norm(cluster_centroids - x, axis=1)
# Find the index of the closest cluster centroid
closest_cluster_index = np.argmin(distances)
return closest_cluster_index
def update_cluster_ids(X: np.ndarray, cluster_centroids: np.ndarray) -> np.ndarray:
"""
Function to update cluster_ids for all data points
Args:
X: ndarray (n x m), dataset with n rows and m columns
cluster_centroids: ndarray (K x m) , K cluster centroids
Return:
cluster_ids: ndarray (n x 1), where each element denotes closest cluster centroid
"""
n = X.shape[0] # Number of data points
cluster_ids = np.zeros((n, 1), dtype=int) # Initialize cluster_ids
# Update cluster_ids for each data point
for i in range(n):
cluster_ids[i] = compute_closest_cluster(X[i], cluster_centroids)
return cluster_ids
def train_KMeans(X: np.ndarray, K: int, number_of_iterations: int):
"""
Function to train K-Means algorithm
Args:
X: ndarray (n x m), dataset with n rows and m columns
K: number of clusters
number_of_iterations: number of iterations for training
Return:
cluster_ids of each sample, final_cluster_centroids
"""
# Step 1: Initialize cluster centroids
cluster_centroids = initialize_cluster(X, K)
# Run loop for the given number of iterations
for _ in range(number_of_iterations):
# Step 2a: Compute cluster ids for each data point
cluster_ids = update_cluster_ids(X, cluster_centroids)
# Step 2b: Compute new cluster centroids with given cluster ids
cluster_centroids = compute_cluster_centroid(X, cluster_ids, K)
return cluster_ids, cluster_centroids