Apache Spark

How to create Spark DataFrame from different sources

Creating DataFrame from Scala List or Sequence In some cases in order to test our business logic we need to have DataFrame and in most cases we would have created DataFrame from a sample file. Instead of doing that we can create a List of our sample data and we can convert it to DataFrame. Note : spark.implicits._ will be available in spark-shell by default. In case if we want to test in IDE we should import spark.implicits._ explicitly. From CSV Source From Parquet Source From Avro Source From JSON Source Using Spark StructType schema to create DataFrame on File Sources Using Spark StructType JSON Schema to create DataFrame on File Sources In some cases we may require to have a external StructType Schema in such cases we can define the StructType as JSON and store it as file and during ru...

How to connect to Snowflake from AWS EMR using PySpark

As a ETL developer, we need to transport data between different platforms/services. It involves establishing connections between them. Below is one such use-case to connect Snowflake from AWS. Here are steps to securely connect to Snowflake using PySpark – Login to AWS EMR service and connect to Spark with below snowflake connectors pyspark --packages net.snowflake:snowflake-jdbc:3.11.1,net.snowflake:spark-snowflake_2.11:2.5.7-spark_2.4 Assumption for this article is that secret key is already created in AWS secrets manager service with SnowFlake credentials. In this example, consider the secret key is ‘test/snowflake/cluster’ Using boto3 library connect to AWS secrets manager and extract the snowflake credentials into json object. Sample code snippet below – def ge...

Reusable Spark Scala application to export files from HDFS/S3 into Mongo Collection

Application Flow How This Application Works ? When user invokes the application using spark-submit First, the application will parse and validate the input options. Instantiate new SparkSession with mongo config spark.mongodb.output.uri. Depending on the input options provided by the user DataFrame will be created for source data file. If user provided a transformation SQL a temporary view will be created on source DataFrame and transformation will be applied to form transformed DataFrame or the source DataFrame will used for writing the data to Mongo Collection. Finally, either transformed DataFrame or Source DataFrame will be written into Mongo Collection depending on the write configuration provided by user or default write configuration. Read Configuration By default, application will ...

Apache Spark Interview Questions For 2020

1.What is the version of spark you are using? Check the spark version you are using before going to Interview. As per 2020, the latest version of spark is 2.4.x 2.Difference between RDD, Dataframe, Dataset? RDD – RDD is Resilient Distributed Dataset. It is the fundamental data structure of Spark and is immutable collection of records partitioned across nodes of cluster. It allows us to perform in-memory computations on large clusters in a fault-tolerant manner. Compared with DF and DS, RDD will not hold the schema. It holds only the data. If user want to implement schema over the RDD, User have to create a case class and have to implement the schema over the data. We will use RDD for the below cases: -When our data is unstructured, A streams of text or media streams. -When we donR...

Handy Methods in SparkContext Object while writing Spark Applications

SparkContext is Main entry point for Spark functionality. Its basically a class in Spark framework, when initialized, gets access to Spark Libraries. A SparkContext is responsible for connecting to Spark cluster, and can be used to create RDD(Resilient Distributed Dataset), to broadcast variables on that cluster and has much more useful methods. To create or initialize Spark Context, SparkConf need to be created before hand. SparkConf is basically the class used to set some configurations for Spark Applications like setting Master, App Name etc. Creating SparkContext- from pyspark import SparkConf, SparkContext conf = SparkConf().set("master", "yarn") sc = SparkContext(conf=conf) In latest versions of Spark, sparkContext is available in SparkSession (Class in Spark SQL component/Main entry...

PySpark – Components

PySpark Core Components includes – Spark Core – All functionalities built on top of Spark Core. Contains classes like SparkContext, RDD Spark SQL – Gives API for structured data processing. Contains important classes like SparkSession, DataFrame, DataSet. Spark Streaming – Gives functionality for Streaming data processing using micro-batching technique. Contains classes like Streaming Context, DStream Spark ML – Provides API to implement Machine learning algorithms.

How to Retrieve Password from JCEKS file in Spark

In the data ingestion stage into Hadoop from RBDMS sources, it often requires password to hit source tables in RDBMS databases. Passing hard password directly is highly unsafe and bad practice in real time applications. So, password can be encrypted by creating JCEKS file. JCEKS is basically a keystore file saved in the Java Cryptography Extension KeyStore (JCEKS) format; used as an alternative keystore to the Java Keystore (JKS) format for the Java platform; stores encoded keys. When working on Spark application which deals with RDBMS sources JCEKS need to be decrypted to query the source tables. Below is the handy function to retrieve password from JCEKS file- Using PySpark Using Scala

Joins in Spark SQL- Shuffle Hash, Sort Merge, BroadCast

Apache Spark SQL component comes with catalyst optimizer which smartly optimizes the jobs by re-arranging the order of transformations and by implementing some special joins according to datasets. Spark performs these joins internally or you can force it to perform them. It’s worthwhile to know this topic, so that it comes to rescue when optimizing the jobs according to your use case. Shuffle Hash Join Shuffle hash join shuffles the data based on join keys and then perform the join. The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition. It follows the classic map-reduce pattern: First ...

Memory Management in Spark and its tuning

Spark has two kinds of memory- 1.Execution Memory which is used to store temporary data of shuffles, joins, sorts, and aggregations 2. Storage Memory     which is used to cache RDDs and data frames Executor has some amount of total memory, which is divided into two parts, the execution block and the storage block.This is governed by two configuration options. 1. spark.executor.memory > It is the total amount of memory which is available to executors. It is 1 gigabyte by default 2. spark.memory.fraction > Fraction of the total memory available for execution and storage. In early version of Spark, these two kinds of memory were fixed. And if your job was to fill all the execution space, Spark had to spill data to disk, reducing performance of the application. On the other hand, if your...

How to flatten JSON in Spark Dataframe

How to flatten whole JSON containing ArrayType and StructType in it? In order to flatten a JSON completely we don’t have any predefined function in Spark. We can write our own function that will flatten out JSON completely. We will write a function that will accept DataFrame. For each field in the DataFrame we will get the DataType. If the field is of ArrayType we will create new column with exploding the ArrayColumn using Spark explode_outer function. If the field is of StructType we will create new column with parentfield_childfield for each field in the StructType Field. This is a recursive function. Once the function doesn’t find any ArrayType or StructType. It will return the flattened DataFrame. Otherwise, It will it iterate through the schema to completely flatten out the JSON...

How to create Spark Dataframe on HBase table[Code Snippets]

There is no direct library to create Dataframe on HBase table like how we read Hive table with Spark sql. This post gives the way to create dataframe on top of Hbase table. You need to add hbase-client dependency to achieve this. Below is the link to get the dependency. https://mvnrepository.com/artifact/org.apache.hbase/hbase-client/2.1.0 Lets say the hbase table is ’emp’ with rowKey as ’empID’ and columns are ‘name’ and ‘city’ under the column-family named – ‘metadata’. Case class -EmpRow is used in order to give the structure to the dataframe. newAPIHadoopRDD is the API available in Spark to create RDD on hbase, configurations need to passed as shown below. Dataframe will be created when you parse this RDD on case class. ...

How to Add Serial Number to Spark Dataframe

You may required to add Serial number to Spark Dataframe sometimes. It can be done with the spark function called monotonically_increasing_id(). It generates a new column with unique 64-bit monotonic index for each row. But it isn’t significant, as the sequence changes based on the partition. In short,  random numbers will be assigned which are out of sequence. If the goal is add serial number to the dataframe, you can use zipWithIndex method available on RDD. below is how you can achieve the same on dataframe. [code lang=”python”] from pyspark.sql.types import LongType, StructField, StructType def dfZipWithIndex (df, offset=1, colName="rowId"): ”’ Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe and preserves a ...

Lost Password


24 Tutorials