Scala / Spark

How to create Spark DataFrame from different sources

Creating DataFrame from Scala List or Sequence

In some cases in order to test our business logic we need to have DataFrame and in most cases we would have created DataFrame from a sample file. Instead of doing that we can create a List of our sample data and we can convert it to DataFrame.

Note : spark.implicits._ will be available in spark-shell by default. In case if we want to test in IDE we should import spark.implicits._ explicitly.

From CSV Source

From Parquet Source

From Avro Source

From JSON Source

Using Spark StructType schema to create DataFrame on File Sources

Using Spark StructType JSON Schema to create DataFrame on File Sources

In some cases we may require to have a external StructType Schema in such cases we can define the StructType as JSON and store it as file and during runtime we can read the contents of JSON and convert it into Spark StructType.

Note: We don’t need to rebuild the Jar whenever a schema change happens if we externalize the Spark Struct Schema as JSON File

Creating DataFrame on Hive Tables

Creating DataFrame on JDBC Source

JDBC DataFrame using Spark Default Partition Configuration

JDBC Read Optimization

When we created the JDBC DataFrame for the first time we didn’t defined numPartitions options. If we don’t specify it spark by default will use only one partition to read all the data from source which will create a performance issue for bigger tables. To overcome this spark documentation says we have to use the partitionColumnnumPartitionslowerBound and upperBound options.

Investigate mysql df partitions (Data Skew)
// mysql DataFrame num partitions
println(s"Number of Spark Partitions for mysql DataFrame: ${mysql.rdd.getNumPartitions}")

import org.apache.spark.sql.functions.spark_partition_id

val dataSkew = mysql.groupBy(spark_partition_id.alias("partition_id")).count

display(dataSkew) // All the data is loaded into a single partition

Boundary Query – Get Boundary Value for ID Column

Create DataFrame on JDBC – Optimized Way

Things to remember :

  1. partitionColumnlowerBoundupperBound and numPartitions all these options should be used together.
  2. partitionColumn type should be numeric, date, or timestamp.

Creating DataFrame on MongoDB Collection

Note : Spark Mongo Connector added to cluster.

Below spark options should be added to spark-submit and should be available in the spark session to connect with MongoDB. Here, I have added to the Spark Cluster Property.

spark.mongodb.output.uri <connection-string>
spark.mongodb.input.uri <connection-string>

Share This Post

He is an Enthusiastic, Music Lover, Gadget Freek. He loves to learn and explore new technologies. He is an Software Developer with hands on experience in Hadoop, Scala, Spark, Shell Scripting, Hive and Oracle PL-SQL.

Lost Password

Register

24 Tutorials