Creating DataFrame from Scala List or Sequence In some cases in order to test our business logic we need to have DataFrame and in most cases we would have created DataFrame from a sample file. Instead of doing that we can create a List of our sample data and we can convert it to DataFrame. Note : spark.implicits._ will be available in spark-shell by default. In case if we want to test in IDE we should import spark.implicits._ explicitly. From CSV Source From Parquet Source From Avro Source From JSON Source Using Spark StructType schema to create DataFrame on File Sources Using Spark StructType JSON Schema to create DataFrame on File Sources In some cases we may require to have a external StructType Schema in such cases we can define the StructType as JSON and store it as file and during ru...
As a ETL developer, we need to transport data between different platforms/services. It involves establishing connections between them. Below is one such use-case to connect Snowflake from AWS. Here are steps to securely connect to Snowflake using PySpark – Login to AWS EMR service and connect to Spark with below snowflake connectors pyspark --packages net.snowflake:snowflake-jdbc:3.11.1,net.snowflake:spark-snowflake_2.11:2.5.7-spark_2.4 Assumption for this article is that secret key is already created in AWS secrets manager service with SnowFlake credentials. In this example, consider the secret key is ‘test/snowflake/cluster’ Using boto3 library connect to AWS secrets manager and extract the snowflake credentials into json object. Sample code snippet below – def ge...
Application Flow How This Application Works ? When user invokes the application using spark-submit First, the application will parse and validate the input options. Instantiate new SparkSession with mongo config spark.mongodb.output.uri. Depending on the input options provided by the user DataFrame will be created for source data file. If user provided a transformation SQL a temporary view will be created on source DataFrame and transformation will be applied to form transformed DataFrame or the source DataFrame will used for writing the data to Mongo Collection. Finally, either transformed DataFrame or Source DataFrame will be written into Mongo Collection depending on the write configuration provided by user or default write configuration. Read Configuration By default, application will ...
SparkContext is Main entry point for Spark functionality. Its basically a class in Spark framework, when initialized, gets access to Spark Libraries. A SparkContext is responsible for connecting to Spark cluster, and can be used to create RDD(Resilient Distributed Dataset), to broadcast variables on that cluster and has much more useful methods. To create or initialize Spark Context, SparkConf need to be created before hand. SparkConf is basically the class used to set some configurations for Spark Applications like setting Master, App Name etc. Creating SparkContext- from pyspark import SparkConf, SparkContext conf = SparkConf().set("master", "yarn") sc = SparkContext(conf=conf) In latest versions of Spark, sparkContext is available in SparkSession (Class in Spark SQL component/Main entry...
PySpark Core Components includes – Spark Core – All functionalities built on top of Spark Core. Contains classes like SparkContext, RDD Spark SQL – Gives API for structured data processing. Contains important classes like SparkSession, DataFrame, DataSet. Spark Streaming – Gives functionality for Streaming data processing using micro-batching technique. Contains classes like Streaming Context, DStream Spark ML – Provides API to implement Machine learning algorithms.
In the data ingestion stage into Hadoop from RBDMS sources, it often requires password to hit source tables in RDBMS databases. Passing hard password directly is highly unsafe and bad practice in real time applications. So, password can be encrypted by creating JCEKS file. JCEKS is basically a keystore file saved in the Java Cryptography Extension KeyStore (JCEKS) format; used as an alternative keystore to the Java Keystore (JKS) format for the Java platform; stores encoded keys. When working on Spark application which deals with RDBMS sources JCEKS need to be decrypted to query the source tables. Below is the handy function to retrieve password from JCEKS file- Using PySpark Using Scala
Apache Spark SQL component comes with catalyst optimizer which smartly optimizes the jobs by re-arranging the order of transformations and by implementing some special joins according to datasets. Spark performs these joins internally or you can force it to perform them. It’s worthwhile to know this topic, so that it comes to rescue when optimizing the jobs according to your use case. Shuffle Hash Join Shuffle hash join shuffles the data based on join keys and then perform the join. The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition. It follows the classic map-reduce pattern: First ...
Spark has two kinds of memory- 1.Execution Memory which is used to store temporary data of shuffles, joins, sorts, and aggregations 2. Storage Memory which is used to cache RDDs and data frames Executor has some amount of total memory, which is divided into two parts, the execution block and the storage block.This is governed by two configuration options. 1. spark.executor.memory > It is the total amount of memory which is available to executors. It is 1 gigabyte by default 2. spark.memory.fraction > Fraction of the total memory available for execution and storage. In early version of Spark, these two kinds of memory were fixed. And if your job was to fill all the execution space, Spark had to spill data to disk, reducing performance of the application. On the other hand, if your...
How to flatten whole JSON containing ArrayType and StructType in it? In order to flatten a JSON completely we don’t have any predefined function in Spark. We can write our own function that will flatten out JSON completely. We will write a function that will accept DataFrame. For each field in the DataFrame we will get the DataType. If the field is of ArrayType we will create new column with exploding the ArrayColumn using Spark explode_outer function. If the field is of StructType we will create new column with parentfield_childfield for each field in the StructType Field. This is a recursive function. Once the function doesn’t find any ArrayType or StructType. It will return the flattened DataFrame. Otherwise, It will it iterate through the schema to completely flatten out the JSON...
There is no direct library to create Dataframe on HBase table like how we read Hive table with Spark sql. This post gives the way to create dataframe on top of Hbase table. You need to add hbase-client dependency to achieve this. Below is the link to get the dependency. https://mvnrepository.com/artifact/org.apache.hbase/hbase-client/2.1.0 Lets say the hbase table is ’emp’ with rowKey as ’empID’ and columns are ‘name’ and ‘city’ under the column-family named – ‘metadata’. Case class -EmpRow is used in order to give the structure to the dataframe. newAPIHadoopRDD is the API available in Spark to create RDD on hbase, configurations need to passed as shown below. Dataframe will be created when you parse this RDD on case class. ...
You may required to add Serial number to Spark Dataframe sometimes. It can be done with the spark function called monotonically_increasing_id(). It generates a new column with unique 64-bit monotonic index for each row. But it isn’t significant, as the sequence changes based on the partition. In short, random numbers will be assigned which are out of sequence. If the goal is add serial number to the dataframe, you can use zipWithIndex method available on RDD. below is how you can achieve the same on dataframe. [code lang=”python”] from pyspark.sql.types import LongType, StructField, StructType def dfZipWithIndex (df, offset=1, colName="rowId"): ”’ Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe and preserves a ...
Partitions- The data within an RDD is split into several partitions. Properties of partitions: – Partitions never span multiple machines, i.e., tuples in the same partition are guaranteed to be on the same machine. – Each machine in the cluster contains one or more partitions. – The number of partitions to use is configurable. By default, it equals the total number of cores on all executor nodes. Two kinds of partitioning available in Spark: – Hash partitioning – Range partitioning Customizing a partitioning is only possible on Pair RDDs. Hash partitioning- Given a Pair RDD that should be grouped: val purchasesPerCust = purchasesRdd.map(p -> (p.customerId, p.price)) // Pair RDD .groupByKey() groupByKey first computes per tuple (k, v) its partition p: p = k....