Skip to content Skip to sidebar Skip to footer

How To Run Multi Threaded Jobs In Apache Spark Using Scala Or Python?

I am facing a problem related to concurrency in spark which is stopping me from using it in production but I know there is a way out of it. I am trying to run Spark ALS on 7 millio

Solution 1:

It is pretty clear that bottleneck in your program is a search for candidates. Given the Spark architecture it severely limits your ability to parallelize and adds substantial overhead by starting Spark job for each user.

Assuming typical scenario, with 7 million users and a billion products most of time you'll predict over a whole range of products minus few already bought by the user. At least in my opinion important question is why even bother with filtering. Even if you recommend product which has been previously bought is it really harmful?

Unless you have very strict requirements I would simply ignore the problem and use MatrixFactorizationModel.recommendProductsForUsers which pretty much does all the job, excluding data export, for you. After that you can perform bulk export and you're good to go.

Now lets say you have a clear no-duplicates policy. Working under assumption that a typical user purchased only a relatively small number of products you can start with obtaining a set of products for each user:

val userProdSet = buy_values
    .map{case(user, product, _) => (user, product)} 
    .aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)

Next you can simply map userProdSet to get predictions:

// Number of predictions for each user
val nPred = 30;

userProdSet.map{case(user, prodSet) => {
    val recommended = model
         // Find recommendations for user
        .recommendProducts(_, nPred + prodSet.size))
        // Filter to remove already purchased 
        .filter(rating => !prodSet.contains(rating.product))
        // Sort and limit
        .sortBy(_.rating)
        .reverse
        .take(nPred)
    (user, recommended)
}}

You can improve further by using mutable sets for aggregation and by broadcasting the model but thats a general idea.

If number of user in user_ids is lower than number of user in a whole set (buy_values) you can simply filter userProdSet to keep only a subset of users.

Solution 2:

1.4 has recommendAll for generating all recommendations so that it can be served through kv stores.

Post a Comment for "How To Run Multi Threaded Jobs In Apache Spark Using Scala Or Python?"