• LinkedIn

  • Follow via Facebook

  • Follow via Twitter

  • Submit RFP

  • Contact Us

Apache Spark connectivity with Redshift

Posted by BDD BigData Practice


Apache Spark connectivity with Redshift

The Spark SQL Data Sources API was introduced in Apache Spark 1.2 to provide a pluggable mechanism for integration with structured data sources of all kinds. Spark users can read data from a variety of sources such as Hive tables, JSON files, columnar Parquet tables, and many others. This post discusses a new Spark data source for accessing the Amazon Redshift Service. Redshift Data Source for Spark is a package maintained by Databricks, with community contributions from SwiftKey and other companies.

Reading from Redshift

Say you want to process an entire table (or a query which returns a large number of rows) in Spark and combine it with a dataset from another large data source such as Hive. The set of commands to load the Redshift table (query) data into a schema compliant DataFrame instance is:

val jdbcURL = “jdbc:redshift://test-redshift.czac2vcs84ci.us-east-.redshift.amazonaws.com:5439/


val tempS3Dir = “s3n://spark-redshift-testing/temp/”

val salesDF = sqlContext.read .format(“com.databricks.spark.redshift”)

.option(“url”, jdbcURL) //Provide the JDBC URL

.option(“tempdir”, tempS3Dir) //User provides a temporary S3 folder

.option(“dbtable”, “sales”) //or use

.option(“query”,”select * from sales”) .load()

The above command provides a DataFrame instance for the Redshift table (query). The user only needs to provide the JDBC URL, temporary S3 folder to which this package unloads Redshift data, and the name of the table or query.

Writing to Redshift

Spark Data Sources API is a powerful ETL tool. A common use case in Big Data systems is to source large scale data from one system, apply transformations on it in a distributed manner, and store it back in another system. For example, it is typical to source data from Hive tables in HDFS and copy the tables into Redshift to allow for interactive processing. This package is perfectly suited for this use case.

Assume that a transaction table sourced from Hive is available in the Spark environment and needs to be copied to a corresponding Redshift table redshift_transaction. The following command achieves this goal:

sqlContext.sql("SELECT * FROM transaction")
          .option("url", jdbcURL)
          .option("tempdir", tempS3Dir) 
          .option("dbtable", "redshift_transaction") 

Screenshot from 2017-04-06 21:33:16Note the mode(SaveMode.Overwrite) in the Scala code above. This indicates to Redshift Data Source for Spark to overwrite the table if it exists. By default (only mode available in SQL CLI mode) this package will throw an error if the table already exists.(SaveMode.ErrorIfExists) There is also a SaveMode.Append mode that creates the table if it does not exist and appends to the table if it does exist. The last mode is SaveMode.Ignore which creates the table if it does not exist and quietly ignores the entire command if the table already exists.

Under the hood, Redshift Data Source for Spark will first create the table in Redshift using JDBC. It then copies the partitioned RDD encapsulated by the source DataFrame (a Hive table in our example) instance to the temporary S3 folder. Finally, it executes the Redshift COPY command that performs a high performance distributed copy of S3 folder contents to the newly created Redshift table.



Leave a Reply