Read and write data to SQL Server from Spark using pyspark


Apache Spark is a very powerful general-purpose distributed computing framework. It provides a different kind of data abstractions like RDDs, DataFrames, and DataSets on top of the distributed collection of the data. Spark is highly scalable Big data processing engine which can run on a single cluster to thousands of clusters. To follow this exercise, we can install Spark on our local machine and can use Jupyter notebooks to write code in an interactive mode. In this post “Read and write data to SQL Server from Spark using pyspark“, we are going to demonstrate how we can use Apache Spark to read and write data to a SQL Server table.

Read SQL Server table to DataFrame using Spark SQL JDBC connector – pyspark

Spark SQL APIs can read data from any relational data source which supports JDBC driver. We can read the data of a SQL Server table as a Spark DataFrame or Spark temporary view and then we can apply Spark transformations and actions on the data. Also, we can join this data to other data sources. In this demo, we will be using PySpark which is a Python library for Spark programming to read and write the data into SQL Server using Spark SQL. In order to connect and to read a table from SQL Server, we need to create a JDBC connector which has a common format like driver name, connection string, user name, and password.

In addition to this Spark SQL JDBC connector also exposes some other useful configuration options which can be used to control the data read/write operation. Few useful properties are:

  1. driver – The JDBC driver class name which is used to connect to the source system for example “com.microsoft.sqlserver.jdbc.SQLServerDriver“. This class is loaded on master and worker nodes.
  2. url – JDBC URL for the source system for example sqlserver://localhost:1433;databaseName=TestDB.
  3. dbtable – Name of a table/view/subquery (any database object which can be used in the FROM clause of a SQL query).
  4. query – Either dbtable or query option needs to be provided at a time.

Apart from the above options, we have some other advanced properties:

  1. partitionColumn, lowerBound, and upperBound РpartitionColumn is used to decide the parallelism and lowerBound and upperBound properties are used to controil parition stride. These three properties are always used together. Also, the paritionColumn must be of numeric or timestamp type.
  2. numPartitions – numPartitions is an important property used to control the total number of concurrent connections to the JDBC data source in parallel.
  3. batchsize – JDBC batch size to control the total number of rows to be written per round trip to the source table. It applies only in case of write operations and defaults to 1000.
  4. queryTimeout – Wait time in seconds defaults to 0 (no limit).
  5. pushDownPredicate – Controls whether the filters will be pushed down to the source system or not. It defaults to true.

Suppose, we need to read a SQL Server table named tbl_spark_df from TestDB database. We can use below pyspark code to read it:

#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")

#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
#set variable to be used to connect the database
database = "TestDB"
table = "dbo.tbl_spark_df"
user = "test"
password  = "*****"

#read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://localhost:1433;databaseName={database};") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show the data loaded into dataframe
jdbcDF.show()

In the above code, we are creating a connection string and then we are reading the SQL Server table data in a Spark DataFrame. Below is the output:

SQL Table to Spark DataFrame

SQL Table to Spark DataFrame

Fix “ClassNotFoundException” exception

In case the SQL Server driver jar file is not available in Spark’s jar library, it will raise exception “ClassNotFoundException”.

“Py4JJavaError: An error occurred while calling o223.save.
: java.lang.ClassNotFoundException: com.microsoft.sqlserver.jdbc.SQLServerDriver
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
at scala.Option.foreach(Option.scala:257)……….”

To fix this issue, we need to download the appropriate jar file from Microsoft. For SQL Server 2017, we can download it from here.

  1. Download the driver file.
  2. unzip it and get the “sqljdbc42.jar” file from “sqljdbc_6.0\enu\jre8” location (if are using java 8).
  3. Copy it to spark’s jar folder. In our case it is C:\Spark\spark-2.4.3-bin-hadoop2.7\jars.
  4. Start a new SparkSession if required.

Write DataFrame data to SQL Server table using Spark SQL JDBC connector – pyspark

To write data from a Spark DataFrame into a SQL Server table, we need a SQL Server JDBC connector. Also, we need to provide basic configuration property values like connection string, user name, and password as we did while reading the data from SQL Server.

Suppose we have a csv file named “sample-spark-sql.csv” which we will read in a spark dataframe and then we will load the data back into a SQL Server table named tbl_spark_df. Below is the code to write spark dataframe data into a SQL Server table using Spark SQL in pyspark:

#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")

#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

#read csv file in a dataframe
df = spark.read.csv(path="C:\\sample-spark-sql.csv", header = True, sep = ",")

#set variable to be used to connect the database
database = "TestDB"
table = "dbo.tbl_spark_df"
user = "test"
password  = "*****"

#write the dataframe into a sql table
df.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", f"jdbc:sqlserver://localhost:1433;databaseName={database};") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

Let’s verify it in SQL Server:

Spark DataFrame to SQL Table

Spark DataFrame to SQL Table

We can see that the data has been loaded successfully into a SQL Server table. As we are using overwrite mode, it will create a new table if it does not exists. However, in case the table exists already, it will overwrite the table data.

Thanks for the reading. Please share your input in the comment section.

Rate This
[Total: 1    Average: 5/5]

Gopal Krishna Ranjan

About Gopal Krishna Ranjan

Gopal is a passionate Data Engineer and Data Analyst. He has implemented many end to end solutions using Big Data, Machine Learning, OLAP, OLTP, and cloud technologies. He loves to share his experience at https://www.sqlrelease.com/. Connect with Gopal on LinkedIn at https://www.linkedin.com/in/ergkranjan/.

Leave a comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.