AWS / PySpark / Spark

How to connect to Snowflake from AWS EMR using PySpark

As a ETL developer, we need to transport data between different platforms/services. It involves establishing connections between them. Below is one such use-case to connect Snowflake from AWS.

connect snowflake from EMR 24tutorials

Here are steps to securely connect to Snowflake using PySpark –

  • Login to AWS EMR service and connect to Spark with below snowflake connectors
pyspark --packages net.snowflake:snowflake-jdbc:3.11.1,net.snowflake:spark-snowflake_2.11:2.5.7-spark_2.4

Assumption for this article is that secret key is already created in AWS secrets manager service with SnowFlake credentials. In this example, consider the secret key is ‘test/snowflake/cluster’

  • Using boto3 library connect to AWS secrets manager and extract the snowflake credentials into json object. Sample code snippet below –
def get_secret_credentials(secret_key):
  session = boto3.session.Session()
  client = session.client(service_name='secretsmanager',region_name='us-east-1')
  secret_response = client.get_secret_value(SecretId=secret_key)
  secret = json.loads(secret_response['secretString'])
  return secret
  • Pass secretkey to above function and create options as shown below.
secrets_json = get_secret_credentials('test/snowflake/cluster')

snowflake_credentials = { "sfUrl" : secrets_json['account']+".snowflakecomputing.com",
                          "sfUsername" : secrets_json['test_user'],
                          "sfPassword" : secrets_json['test_password'],
                          "sfRole" : secrets_json['sf_role'],
                          "sfDatabase" : "test_db",
                          "sfSchema" : "test_schema",
                          "sfWarehouse" : secrets_json['test_wareohouse']
                        }
  • Now you can create dataframe on Snowflake tables using spark read method by passing above options.
my_query = "select * from sfdb.test_table limit 1"

df = spark.read.format("net.snowflake.spark.snowflake").options(**snowflake_credentials).option("query",my_query).load()

df.show()

Here is my gist with full code –

Share This Post

An Ambivert, music lover, enthusiast, artist, designer, coder, gamer, content writer. He is Professional Software Developer with hands-on experience in Spark, Kafka, Scala, Python, Hadoop, Hive, Sqoop, Pig, php, html,css. Know more about him at www.24tutorials.com/sai

Lost Password

Register

24 Tutorials