Table of Contents
- Creating DataFrame from Scala List or Sequence
- From CSV Source
- From Parquet Source
- From Avro Source
- From JSON Source
- Using Spark StructType schema to create DataFrame on File Sources
- Using Spark StructType JSON Schema to create DataFrame on File Sources
- Creating DataFrame on Hive Tables
- Creating DataFrame on JDBC Source
- Creating DataFrame on MongoDB Collection
Creating DataFrame from Scala List or Sequence
In some cases in order to test our business logic we need to have DataFrame and in most cases we would have created DataFrame from a sample file. Instead of doing that we can create a List of our sample data and we can convert it to DataFrame.
Note : spark.implicits._
will be available in spark-shell by default. In case if we want to test in IDE we should import spark.implicits._
explicitly.
From CSV Source
From Parquet Source
From Avro Source
From JSON Source
Using Spark StructType schema to create DataFrame on File Sources
Using Spark StructType JSON Schema to create DataFrame on File Sources
In some cases we may require to have a external StructType Schema in such cases we can define the StructType as JSON and store it as file and during runtime we can read the contents of JSON and convert it into Spark StructType.
Note: We don’t need to rebuild the Jar whenever a schema change happens if we externalize the Spark Struct Schema as JSON File
Creating DataFrame on Hive Tables
Creating DataFrame on JDBC Source
JDBC DataFrame using Spark Default Partition Configuration
JDBC Read Optimization
When we created the JDBC DataFrame for the first time we didn’t defined numPartitions
options. If we don’t specify it spark by default will use only one partition to read all the data from source which will create a performance issue for bigger tables. To overcome this spark documentation says we have to use the partitionColumn
, numPartitions
, lowerBound
and upperBound
options.
Investigate mysql df partitions (Data Skew)
// mysql DataFrame num partitions println(s"Number of Spark Partitions for mysql DataFrame: ${mysql.rdd.getNumPartitions}") import org.apache.spark.sql.functions.spark_partition_id val dataSkew = mysql.groupBy(spark_partition_id.alias("partition_id")).count display(dataSkew) // All the data is loaded into a single partition
Boundary Query – Get Boundary Value for ID Column
Create DataFrame on JDBC – Optimized Way
Things to remember :
partitionColumn
,lowerBound
,upperBound
andnumPartitions
all these options should be used together.partitionColumn
type should be numeric, date, or timestamp.
Creating DataFrame on MongoDB Collection
Note : Spark Mongo Connector added to cluster.
Below spark options should be added to spark-submit and should be available in the spark session to connect with MongoDB. Here, I have added to the Spark Cluster Property.
spark.mongodb.output.uri <connection-string>
spark.mongodb.input.uri <connection-string>