PySpark / Spark

Handy Methods in SparkContext Object while writing Spark Applications

SparkContext is Main entry point for Spark functionality. Its basically a class in Spark framework, when initialized, gets access to Spark Libraries.

A SparkContext is responsible for connecting to Spark cluster, and can be used to create RDD(Resilient Distributed Dataset), to broadcast variables on that cluster and has much more useful methods.
To create or initialize Spark Context, SparkConf need to be created before hand. SparkConf is basically the class used to set some configurations for Spark Applications like setting Master, App Name etc.


Creating SparkContext-

from pyspark import SparkConf, SparkContext
conf = SparkConf().set("master", "yarn")
sc = SparkContext(conf=conf)

In latest versions of Spark, sparkContext is available in SparkSession (Class in Spark SQL component/Main entry point for DataFrame and SQL functionality) and can be created like below-

import pyspark.sql.SparkSession
spark = SparkSession.builder \
... .master("local") \
... .appName("Test") \
... .config("master", "yarn") \
... .getOrCreate()
sc = spark.sparkContext()

Below are important important methods available in spark Context which comes into handy while writing Spark Application –

i) Methods to create RDD

parallelize – Used to create an RDD by reading Collections like lists.
sc.parallelize([1,2,3])

textFile – Creates an RDD by reading a text file from hdfs or local file systems.
sc.textFile(path)

sequenceFile – Creates an RDD by reading sequence file.
sc.sequenceFile(folder_path)

wholeTextFiles – Read a directory of text files from HDFS, a local file system and returns RDD.
rdd = sc.wholeTextFiles('hdfs://a-hdfs-path')

emptyRDD – Create an empty RDD that has no partitions or elements.
empty_rdd = sc.emptyRDD()

binaryFiles – Read a directory of binary files from HDFS, a local file system
rdd = sc.binaryFiles(folder_path)

newAPIHadoopRDD – Read a ‘new API’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile
rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apache.hadoop.hbase.io
.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result", conf=conf)

ii) Methods for setting configurations/properties

addFile – Adds file to make it available in every node with the Spark Job. The file path passed can be either a local file, a file in HDFS or an HTTP, HTTPS or FTP URI.
sc.addFile(path)
To access the file in Spark jobs, use SparkFiles.get(“test.txt”) with the filename to find its download location.

addPyFile– Similar to addFile method but used to add a .py or .zip dependency for all tasks to be executed on this SparkContext.

setLogLevel – Control spark logLevel. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN

stop() – Stops the spark context.

applicationId – It’s a unique id that gets created for the Spark application. sc.applicationId

setSystemProperty – Set a Java system property, such as spark.executor.memory. This must must be invoked before instantiating SparkContext

defaultMinPartitions – Default min number of partitions for Hadoop RDDs when not given by user

defaultParallelism – Default level of parallelism to use when not given by user

uiWebUrl – Gets URL of the SparkUI instance started by this SparkContext.

iii) Methods for creating read-only variables

broadcast – Used to create a read-only variable which is available to the cluster. The variable will be sent to each cluster only once. A broadcast variable created with SparkContext.broadcast() and values can be accessed using value method.
temp = sc.broadcast([1, 2, 3, 4, 5])
temp.value
[1, 2, 3, 4, 5]

accumulator – A shared variable that can be accumulated, i.e., has a commutative and associative “add” operation. Worker tasks on a Spark cluster can add values to an Accumulator with the += operator, but only the driver program is allowed to access its value, using value. Updates from the workers get propagated automatically to the driver program
>>> b = sc.accumulator(0)
>>> def g(x):
... b.add(x)
>>> rdd.foreach(g)
>>> b.value
6

Share This Post

An Ambivert, music lover, enthusiast, artist, designer, coder, gamer, content writer. He is Professional Software Developer with hands-on experience in Spark, Kafka, Scala, Python, Hadoop, Hive, Sqoop, Pig, php, html,css. Know more about him at www.24tutorials.com/sai

Lost Password

Register

24 Tutorials