Big Data - Advanced

Apache Spark – A Deep Dive – Series 9 of N – Analysis of Most Popular Movies – Using SparkSQL

Infrastructure-as-code on screen
Photo: Markus Spiske / Unsplash · Royalty-free

Problem:

  • Analyse the Most Popular Movie in a more optimized way:
    • Spark Core has efficient mapper, reducer and event functions  to analyse a complex data BUT
      • to get the output we used to a lot of logic to create key value pairs,
      • lot of lambda operations to aggregate the data etc
      • we were using data not in a structured format which can be used to optimize the queries as well as exporting or importing to and fro other databases would get a lot easier

Strategy:

  • In addition to Spark Core we will use SparkSQL
    • to give a structure to the data we use SparkSQL
    • We will use two terms a lot – Dataframes and Datasets
    • DataFrame
      • schema view of an RDD.
      • In RDD each row is a Key value pair
      • In DataFrame each is a Row Object
    • DataSet
      • object(OOPS) view of an RDD.
      • In DataSet each is a Named Row object
      • means a Dataset is a named DataFrame as a type object
  • Advantages of using Spark SQL
    • abstracts the internal intricacies of a RDD by exposing APIs to handle the data
    • can be extended by using user defined functions
    • If each line is a Row object you can use the power of SQL like querying to process data across a cluster as if it was a single database
    • export import data using JDBC, JSON etc

Solution:

  • Explanation of the code
    • Row Object: See how instead of returning a key-value pair its is returning a Row Object where column name is movieID. So this RDD will hold one column where it stores movie IDs
      • # python function to return a Ratings Row Object
        def processRatings(line):

        fields = line.split()
        mvID = int(fields[1])
        return Row(movieID = mvID)

    • DataFrame: See how a Row based RDD is converted to a DataFrame
      • ratingsDataset = session.createDataFrame(ratings)

    • Processing DataFrame: see in one line we are applying SQL like logic to process the data by using functions like group By, count, orderBy etc
      • topMostMovieIDs = ratingsDataset.groupBy(“movieID”).count().orderBy(“count”, ascending=False).cache()

    • Spark SQL like statements:
      • ratings.createOrReplaceTempView(“tblRatings”)

      • spark.sql(“SELECT top 5 movieID, count(movieID) FROM tblRatings groupby movieID order by count”)

  • Please down the code from either of these locations:
    • wget https://testbucket786786.s3.amazonaws.com/spark/sparkTopMostMoviesUsingSparkSQL.py
    • wget https://testbucket786786.s3.amazonaws.com/spark/sparkTopMostMoviesUsingSparkSQLQuery.py
    • OR
    • git clone https://gist.github.com/naeemmohd/1d645ccdef3cbb0d564fe4cb483810af
    • OR
    • # import SparkSession, Row and functions from puspark.sql module
      from pyspark.sql import SparkSession
      from pyspark.sql import Row
      from pyspark.sql import functions # python function to return a Movie Dictionary
      def processMovies():

      movies = {}
      with open(“/home/user/bigdata/datasets/ml-100k/u.item”) as mfile:

      for line in mfile:

      fields = line.split(“|”)
      movieID = int(fields[0])
      movieName= fields[1]
      movies[movieID]= movieName

      return movies

      # python function to return a Ratings Row Object
      def processRatings(line):

      fields = line.split()
      mvID = int(fields[1])
      return Row(movieID = mvID)

      #python function to print results
      def printResults(results):

      for result in results:

      print(“\n%s:\t%d ” %(moviesDictionary[result[0]], result[1]))

      # create a SparkSession
      session = SparkSession.builder.appName(“MostPopularMovies”).getOrCreate() # load the movies
      moviesDictionary = processMovies() # load the ratings Row Objects
      rawData = session.sparkContext.textFile(“/home/user/bigdata/datasets/ml-100k/u.data”) # conevert the ratings to an RDD of Row objects
      ratings = rawData.map(processRatings) # convert the ratings Row Objects into an RDD
      ratingsDataset = session.createDataFrame(ratings) # process the Dataframe
      topMostMovieIDs = ratingsDataset.groupBy(“movieID”).count().orderBy(“count”, ascending=False).cache() # show all topMostMovieIDs
      topMostMovieIDs.show() # collect and display results for topmost 25 movies
      topMost5MovieIDs = topMostMovieIDs.take(5) # print the Movie Names with ratins count
      printResults(topMost5MovieIDs) # close the spark sessions
      session.stop()

The Output:

  • Capture