De-mystifying Recommender Systems

by Patrick Cloutier

March 7, 2017.

When I first heard about machine learning, I wondered what happened behind the scenes for this to function. In this blog I try to explain what my logic is as we create a custom recommender on relatively straightforward concepts. We will be using Scala, and more specifically using the shell to run our commands.

We will be looking at the data gathered from a movie site with user ids, user ratings and movies ids. The idea here will be to create a system that will try to “guess” the rating that someone will give a movie they haven’t seen. The first step is to test these ratings towards movies they have already seen. Once we are satisfied with the rating, we will move on to showing the top recommended choices by us.

First things I like to do when working with Scala using the shell is to reduce the amount of noise being sent to the screen. Using this command sets things up very well:

The next step is to import some libraries.

Now we can get started! Of course, the first question should be what exactly are we doing? This is indeed a good question. To start tackling the problem, let us look at an algorithm out there that does exactly this. Slope One is a family of algorithms that try to tackle this exact problem. The following algorithm is a tweaked version of Slope One. Let us say we have a matrix of movies and users with ratings such as:

Our question becomes, how do we figure out how User 1 would rate Movie A?

The general idea is to compare the user’s ratings to the average ratings of the others. For movie B, we would get 4 – (3 + 2 + 5 + 2)/4 = 4 – 3 = 1

For movie C, we would get 2 – (2 + 3 + 5 + 1)/4 = 2 – 2.75 = -0.75

Taking the average of these we get 0.125. This means that on average, User 1 rates things 0.125 away from the mean. So taking this information we do:

0.125 + (1 + 2 + 3 + 4)/4 = 0.125 + 2.5 = 2.625

This would be the estimate for this user. Essentially all this algorithm is doing is measuring the distance from the mean, and applying that to the average rating on the unknown movie. This is in essence Slope One. Now that we have a specific example, I’ll generalize it a little to play around with the formula.

Let us call Avg(i) the average for movie I and U(i) the user we are trying to predict’s rating for Movie i.

This means we have (U(B) – Avg(B) + U(C) – Avg(C))/2 + Avg(A)

Since we want to simplify this as much as possible, as we are programming this into Scala, we make this:

(U(B) + U(C) – (Avg(B) + Avg(C)) + Avg(A)x2)/2

Now you may be asking at this point, how is this simpler than what we did above?

Essentially we did the following, Sum(all movies the user has rated) – Avg(all the movies the user has rated) + The number of movies the user rated x Avg(the movie that the user hasn’t rated), then we divide by the number of movies the user rated. The reason I consider this simpler is that when we program this, this is as general a formula as we can get. Regardless of how many movies there are, we can use this formula and don’t have to worry about calculations for every line.

Of course, we are dealing with a ridiculously large amount of data. The data we are using has around 250,000 users and over 30,000 movies. To make things easier on our end, we will take the average of everyone, including the user. Since we are taking a lot of people, adding the extra user will change the average very minutely. Now let’s program this in Scala!

First step is to create the dataframe structure. A dataframe is essentially what will house the data in Spark where we can do SQL like statements to further refine the data. Then we import it.

Let me explain a little the commands that we just performed.

case class creates the form the data will be imported into.

textFile grabs the information from a text file, row by row.

zipWithIndex() is a function that numbers each row.

map performs a function on a row by row basis.

The first mapping puts a comma between the row number and the rest of the line, the second one splits the row up by separated commas (I am being paranoid here, but if there is a comma between quotations, it won’t be counted as a comma, and that should explain the odd expression there).

filter reduces the data to the set we have defined

We filter the ratings to only show ratings that exist. Non-existent entries will cause issues.

We then convert each section in the appropriate number type of the structured we made previously to convert into a data frame.

Then we convert it into a data frame.

Next we create a list of all possible users and take a random user (where is the fun in taking one we know?).

Next we get all the movie ids and get the average ratings for every movie. This is the first tweak to the algorithm above. This is where we get the average movie rating for all movies including what the user in question rated. The user’s rating will be lost in the “noise” in this case.

Next we create a list to store our ratings for each movie. We also need the information for the user in question; along with the sum of all their ratings and the amount of movies they have rated.

Next we create the average rating of all users from the list of movies that the user rated

Now we do the predictions.

Now we have the list of predictions based on Slope One.

Of course the accuracy here could be improved. Right now, all users are created equal. However we know that people don’t all vote the same ways, especially when it comes to movies.

The next step is to only compare the people that someone feels as being similar. There are multiple ways of doing this, and this should be considered the final tweak to the Slope One algorithm. In our case, we will use simple statistics: the Pearson coefficient. It should also be noted that this is also where all the time is consumed. The actual predictions are easy and fast. Finding the similar users is more complicated as we will see shortly.

As was mentioned, we will only look at people that have a similar voting pattern as the user we are analyzing. Here we will find the movies that user has seen and pick out only the users that have seen a similar movie.

I will explain a little the logic here. First we get a list of all movies the user has seen. This is represented by Movies Voted. (Take the distinct movies and make a list of the them)

Next we get a list of all the users that have voted on it. This is represented by UsersToBeConsidered. (Take our original data and filter it by the list we just created)

The final step here is to take all the data from these users and stick them into a container for further reference.

Since our ultimate goal is to grab a list of similar users, we will create the containers for them in the form of an ArrayBuffer (an array you don’t have to set the size of).

Now we use the Pearson Coefficient (it essentially gives a rating between -1 and 1 where -1 means they are absolutely not similar and 1 means they are strongly similar) to tell us if a user is similar to another user. There is a built in stats function in Scala that enables you to do this with two columns.

Here is the basic logic of the loop:

Check to see if the user isn’t himself/herself.

Make a matrix of all the movies in question with the original user and another user as columns.

Put those columns into the Pearson Coefficient algorithm.

If the value is greater than 0.3, store it in our first container.

If the value is greater than 0, store it in our second container.

When either the first container hits 15 users (strong similarity) or the second container hits 50 users (weak and strong similarities) stop filling the containers.

To do this loop we will need to import another library with the following command:

import scala.util.control.Breaks._

Next we create the loop.

Now there is a chance that this list of similar users is blank. If there are no strong relationships, then we should use the weak and strong relationships. If there are none of those, we will use all the users that have voted on the same movies as our user as the grouping.

Now we have a list of users that are similar to the one we want to do predictions on.

First step is to grab the list of movies that can be predicted (we have a smaller subset of the movies that the people that are similar to you have seen) and to make a mapping of what the average was of the movies.

Next we create the container for the ratings of all the movies, the ratings of our user, the sum of all the user’s ratings and the number of movies that have been seen.

We now get the average ratings of all other users in the group over all the movies that have been seen by the user.

And now finally we do the predictions.

The thing you will notice here is that some of the ratings are higher than 5. This means that on average, our user rated over the average by a certain amount and the average was already high on that particular movie. These should all be reset to 5 (as there is no movie rating higher than 5).

And there you have it. We have created a list of movies and ratings of that user. All that needs to be done is to take the top movies that have been highly rated (that the user hasn’t seen) and you have your own recommender system created from scratch. I won’t go through the details of doing this, since the hard part has already been completed (however it is in the example code below).

Next is the code that I created for the second part. This should make it easier if people are interested in taking parts of it (rather than typing everything out)

sc.setLogLevel(“ERROR”)

//Using a model section

//library imports

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.rdd._

import sqlContext.implicits._

import scala.util.matching.Regex

import org.apache.spark.mllib.evaluation.MulticlassMetrics

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.util.MLUtils

import Array._

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.storage.StorageLevel._

import math._

import org.apache.spark.sql.functions.lit

import scala.util.control.Breaks._

import scala.collection.mutable

//Custom Classes for Data Frames

case class Rating(UserID: Int, MovieID: Int, Rating: Double, Row: Int)

case class UserListing(UserID: Int)

case class UserPrediction(MovieID: Int, Rating: Double)

case class MovieNames(MovieID: Int, Name: String)

//load data from ratings file into an rdd

val Ratings = sc.textFile(“file:///root/Test/ratings.csv”).zipWithIndex().map{case(d, i)=>d + “,” + i.toString}.

map(_.split(“,(?=([^\”]*\”[^\”]*\”)*[^\”]*\$)”)).filter(x => x(4).trim.toInt > 0).

map(p => Rating(p(0).trim.toInt, p(1).trim.toInt, p(2).trim.toDouble, p(4).trim.toInt)).toDF().cache()

val User = Ratings.select(“UserID”).distinct.collect() //array of user ids

val UserToBeAdded = User((random*(User.size – 1)).toInt)(0).asInstanceOf[Int]

val TargetInfo = Ratings.filter(\$”UserID” === UserToBeAdded)

val MoviesVoted = TargetInfo.select(“MovieID”).distinct.map(x=>x(0).asInstanceOf[Int]).collect()

val UsersToBeConsidered = Ratings.where(\$”MovieID”.in(MoviesVoted.map(lit(_)):_*)).groupBy(“UserID”).count().

map(x=>x(0).asInstanceOf[Int]).collect()

val ReducedUsers = Ratings.where(\$”MovieID”.in(MoviesVoted.map(lit(_)):_*)).where(\$”UserID”.in(UsersToBeConsidered.map(lit(_)):_*))

var SimilarUsers = ArrayBuffer.empty[Int]

var SimilarUsersValue = 0

var SimilarUsers1 = ArrayBuffer.empty[Int]

var SimilarUsers1Value = 0

breakable { for (j <- 0 to UsersToBeConsidered.size -1){

if (SimilarityValue > 0.3){

SimilarUsers += UsersToBeConsidered(j)

SimilarUsersValue += 1

}

if (SimilarityValue > 0){

SimilarUsers1 += UsersToBeConsidered(j)

SimilarUsers1Value += 1

}

}

if (SimilarUsersValue >= 15 || SimilarUsers1Value >= 50) break

}}

if (SimilarUsers.size == 0){

SimilarUsers = SimilarUsers1

}

if (SimilarUsers.size == 0){

SimilarUsers = UsersToBeConsidered.to[mutable.ArrayBuffer]

}

//Do predictions

val Movies = Ratings.filter(\$”UserID”.in(SimilarUsers.map(lit(_)):_*)).select(“MovieID”).distinct.collect() //array of movie ids

val Prod = Ratings.filter(\$”UserID”.in(SimilarUsers.map(lit(_)):_*)).groupBy(“MovieID”).agg(avg(“Rating”)).rdd.map(x => (x(0),x(1))).collectAsMap()

//Create variables

val userRatings = ArrayBuffer.empty[(Int,Double)]

val UserDF = Ratings.filter(\$”UserID” === UserToBeAdded).select(“MovieID”,”UserID”,”Rating”)

val UserDFSum = UserDF.agg(sum(“Rating”)).collect()(0)(0).asInstanceOf[Double]

val UserDFSize = UserDF.count().toInt

//calculate sum of products that user has used

var AvgMovieRating = 0.0

for (i <- 0 to UserDFSize – 1){

AvgMovieRating += Prod.getOrElse(UserDF.collect()(i)(0).asInstanceOf[Int],-1.0).asInstanceOf[Double]

}

for (i <- 0 to Movies.size.toInt – 1){

userRatings += ((Movies(i)(0).asInstanceOf[Int],

((UserDFSum – AvgMovieRating) + 1.0*UserDFSize*Prod.getOrElse(Movies(i)(0),-999).asInstanceOf[Double])/(1.0*UserDFSize)))

}

for (i <- 0 to userRatings.size – 1){

if (userRatings(i)._2 > 5){

userRatings.update(i,((Movies(i)(0).asInstanceOf[Int],5)))

}}

val userRatingsDF = sc.parallelize(userRatings).map(p => UserPrediction(p._1, p._2)).toDF()

val MovieNamesDF = sc.textFile(“file:///root/Test/movies.csv”).zipWithIndex().map{case(d, i)=>d + “,” + i.toString}.

map(_.split(“,(?=([^\”]*\”[^\”]*\”)*[^\”]*\$)”)).

map(p => MovieNames(p(0).trim.toInt, p(1).trim)).toDF()

val userRatingsNamesDF = userRatingsDF.as(‘a).join(MovieNamesDF.as(‘b), \$”a.MovieID” <=> \$”b.MovieID”).select(\$”b.Name”.alias(“Name”),\$”a.Rating”.alias(“Rating”)).

orderBy(\$”Rating”.desc).drop(\$”MovieID”.in(MoviesVoted.map(lit(_)):_*))

userRatingsNamesDF.show(50)