Showing posts with label mapreduce. Show all posts
Showing posts with label mapreduce. Show all posts

Friday, June 17, 2011

k-medoids Clustering

I am very excited to share that I have completed implementing the k-medoids Clustering algorithm that share a lot in common with k-means, however, unlike k-means where the centroid is the mean of the data-points within the same cluster, k-medoids on the other hand calculates the medoid(centroid) as an actual representative data-point amongst the data-points belonging to the same cluster. So, the idea is that the representative data-point or the medoid minimizes some optimization function, which in most cases, is an error function.

The data that I chose to use was movielens to cluster movies based on what users watched a movie. The data is available on : http://www.grouplens.org/node/73.  I have chosen the one with 1 million ratings. But the structure of the data is the same across all different sizes of data.
There are 3 data files once you untar.

MOVIE data
movie id :: movie title :: movie genre

USERS data
user id :: gender :: age :: occupation :: zip code

RATINGS data
user id :: movie id :: rating :: timestamp

Since, I needed data-points to be movies, i.e. for each movie would have the list of users that watched this movie. This requires massaging the RATINGS data a bit.

For e.g.

if the RATINGS data has :

1::2000::5::978300760
2::2000::3::978302109
3::2000::3::978301968
4::2000::4::978300275
5::2000::5::978824291

you would want to have the data that you want k-medoids to fed something like this:

2000::1^2^3^4^5

Please note that I am ignoring the following fields :
USERS$gender
USERS$age
USERS$occupation
USERS$zip code

RATINGS$rating
RATINGS$timestamp

All I care is what users watched a particular movie. After performing the data transpose, I had in my input data 3,706 movies.

To accomplish this, I resorted to a one-time execution of a map-reduce job that aggregates the user list for each movie and outputs to a file that would serve as a input to the k-medoids algorithm.

So, as I mentioned earlier the only thing that separates k-medoids from k-means is the way the Centroids(medoids here) are re-calculated. I applied the PAM algorithm(Partitioning around medoids) to find k medoids, one for each of the k clusters.

The algorithm proceeds in two steps:
BUILD-step: This step sequentially selects k "centrally located" objects, to be used as initial medoids

SWAP-step: If the objective function can be reduced by interchanging (swapping) a selected object with an unselected object, then the swap is carried out. This is continued till the objective function can no longer be decreased.

This entire process can be implemented in the reducer as follows :

MAPPER : (exactly as in k-means) Finds out the closest medoid for the data-point and emits as key the medoid id and the value as the data-point.

REDUCER : (implements the PAM)
for each movie in values
    if making movie as new centroid, improves the best sse
        swap movie with the current centroid
        best sse = sse(movie, values)

I chose JaccardSimilarity as a measure of how similar 2 movies are. If A and B are 2 movies and the set of users that have watched movies A and B are a and b respectively, the JaccardSimilarity is given by

JaccardSimilarity(a,b) = |intersection(a, b)| / |union(a, b)|

Dissimilarity(a, b) or Distance(a, b) thats required for calculating SSE is simply 1 - JaccardSimilarity(a, b).

I chose k = 37, no apriori tuning, just since I wanted to form clusters of size 3706/20 = approx. 100.

Silhouette is an index [-1, 1] indicating how good the resulting clusters are. A value close to 1 for Silhouette index indicates the data-points are perfectly clustered. A Silhouette of -1 indicates all the data-points have been wrongly assigned to the clusters. A Silhouette close to 0 indicates that data-points are equally close to atleast one other cluster than the one they are assigned to. Silhouette for a data-point i is defined as :

Silhouette(i) = b(i) - a(i) / max(a(i), b(i))
    b(i) : average distance from data-point i to all the data-points in the the closest cluster(other than the one i belongs to)
    a(i) : average distance from data-point i to all the data-points in the cluster i belongs to

The initial execution with the setting of k=37 gave an overall Silhouette of 0.06555513; which indicated that most of the data-points were almost exactly within the same distance of atleast one other cluster than the cluster it belongs to. This situation can be overcome by two ways :

- Post-process the resultant clusters to merge clusters that improve the overall SSE
- Choose different values of k

I preferred for the moment going for tuning with different values of k.

I will be committing the code soon on github. Check back later and I will post a link to both the k-means and k-medoids implementations in Java.

Thursday, June 16, 2011

k-means Clustering

I am starting off my blog with one of my favorite unsupervised class of algorithms - kmeans.

I just completed successfully implementing k-means on a simple data, data generated from a mixture of 3 Gaussian distributions with means spread out and standard deviation of 1. I have tested it both in a pseudo-distributed environment locally as well as on a small EC2 cluster on Amazon AWS.

Let me jot down the plain old k-means algorithm, even though I am sure most you reading this post might be aware of it.

Iterative k-means
Step 1 : Choose k random initial cluster centroids
Step 2 : Assign each data point to the closest cluster centroid
Step 3 : Re-calculate the cluster centroids based on some aggregate function applied on the data points that belong to the same cluster
Step 4 : Repeat Step 2, 3 until data points do not change clusters

Map-Reduce k-means
The above iterative version of k-means can be converted to a map-reduce framework as follows:


A JVM is spawned for every block of data for the slave node. The map method gets called for every record with the block. In the initialization step, the mapper randomly picks a number between 0..k-1 and assigns it as the centroid id to the data-point. The map method hence emits the random number indicating the centroid id as key and the data-point as value. Please note there there may be other implementations where the mapper emits the centroid id as key and a two element tuple <sum, n> as value, where sum is the sum of all data-points belonging to the centroid and n is the number of data-points assigned to the cluster.
M1 :: map(k1, v1)                
    choose a random number c between 0..k-1
    emit <c, v1>


In every iteration following the initialization step, the mapper calculates the nearest centroid for each data-point and emits the centroid id as key and the data point as a value.
M2 :: map(k1, v1)               
    find the nearest centroid c to data-point v1
    emit <c, v1>


The actual work of reducer starts once all the mappers have completed executing. Reducer aggregates the list of data-points in list v and emits the centroid id as key and the aggregate as value. The aggregate(new centroid) in kmeans is the mean of all the data-points in the cluster. The same reducer can be used to aggregate the values at the initialization as well at each of the iteration step. In fact, in this scenario, the reducer can also be used to utilize the combiner or mapper-final functionality of map-reduce framework.
R1 :: reduce(k2, list v)         
    emit <k2, aggregate(v)>

Just on a side note, I am using the Amazon Elastic MapReduce Ruby Client to submit my job on AWS. For that, I need to create bucket on S3 and save my input data and jar file. s3cmd utility is a great command tool to performing some useful filesystem command. It shares a lot in common with the 'hadoop fs' command.
Amazon Elastic MR Ruby Client : http://aws.amazon.com/developertools/2264
s3 tools : http://s3tools.org/s3tools

One other thing to note here is that when you create a jobflow on Amazon AWS, it takes a while to go from 'starting' to 'running' mode and hence it sometime is irritating if you are developing a prototype or just fixing a minor bug in your program and want to test it out. It is for this reason why I won't really recommend the Amazon AWS API since your code would then be totally dependent that your inputs and outputs are all supposed to be on S3. If you pass your paths in some properties file or just pass them as command line arguments, you can then run the same code locally on a single node first using the hadoop command and then Amazon AWS using the Elastic Mapreduce ruby client. The only difference that I have really seen is that if you have any 'hadoop fs' commands in your code, for the single node execution using the hadoop command on command line, you need to specify the complete path i.e. '/usr/local/hadoop/bin/hadoop fs -copyToLocal ...' vs. when you execute on Amazon AWS you can just say 'hadoop fs -copyToLocal ...'.

I am currently in the process of trying out k-medoids, again a prototype based clustering method, however, unlike k-means, where the centroids are an approximation of the cluster data-points and not an actual data-point, k-mediod chooses an actual data-points as the centroids as representatives of the clusters. I am testing k-medoids on the movie-lens data with 1 million movie ratings. I will share more insight in the next blog about this.