This documentation is not maintained. Please refer to doc.castsoftware.com/technologies to find the latest updates.

Extension ID

com.castsoftware.java.spark

What's new?

Please see Apache Spark JDBC - 1.0 - Release Notes for more information.

Description

Apache Spark is an open-source unified analytics engine for large-scale data processing. This extension provides support for Apache Spark JDBC connections in Java. Spark SQL is a component on top of Spark Core where data source options of JDBC can be set using read and write methods.

In what situation should you install this extension?

If your Java application contains Apache Spark JDBC API related source code and you want to view these object types and their links, then you should install this extension. More specifically the extension will identify:

  • "callLinks" from Java methods to Apache Spark JDBC Objects

Technology support

Apache Spark

The following Apache Spark releases are supported by this extension:

Release

Supported

Supported Technology

2.0 and above(tick)Java

AIP Core compatibility

This extension is compatible with:

AIP Core release

Supported

8.3.x(tick)

Supported DBMS servers

This extension is compatible with the following DBMS servers:

DBMS

Supported

CAST Storage Service/PostgreSQL      (tick)

Download and installation instructions

For JEE applications using Apache Spark JDBC, the extension will be automatically installed by CAST Console. This is in place since October 2023.

For upgrade, if the Extension Strategy is not set to Auto update, you can manually install the extension using the Application - Extensions interface.

What results can you expect?

Once the analysis/snapshot generation is completed, you can view the results. The following objects and  links will be displayed in CAST Enlighten:

Objects

The following objects are displayed in CAST Enlighten

IconType DescriptionWhen is this object created ?

Spark JDBC Readan object is created for each database table name found in DataFrameReader api and resolved in a JDBC method call

Spark JDBC Writean object is created for each database table name found in DataFrameWriter api and resolved in a JDBC method call

Spark JDBC SQL Queryan object is created for each SQL query found and resolved in a JDBC method call 
Link TypeCaller typeCallee typeAPI's supported
callLinkJava method

Apache Spark JDBC object

(Spark JDBC Read or

Spark JDBC Write or

Spark JDBC SQL Query)

org.apache.spark.sql.DataFrameReader.jdbc

org.apache.spark.sql.DataFrameWriter.jdbc

org.apache.spark.sql.DataFrameReader.load

org.apache.spark.sql.DataFrameWriter.save

org.apache.spark.sql.SQLContext.jdbc

org.apache.spark.sql.SQLContext.load

useLinkSpark JDBC SQL QueryTable, ViewCreated by SQL Analyzer when DDL source files are analyzed
callLinkSpark JDBC SQL QueryProcedure
useLinkSpark JDBC SQL QueryMissing TableCreated by Missing tables and procedures for JEE extension when the object is not analyzed
callLinkSpark JDBC SQL QueryMissing Procedure

Code Examples

Read Operation

Read operation through .jdbc() API
public static void main(String[] args) {
    SparkSession spark = SparkSession
      .builder()
      .appName("Java Spark SQL data sources example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate();

    String table_name = "CAR";
	sparkRead(spark, table_name);
	spark.stop();
  }
  
  private static void sparkRead(SparkSession spark, String table_name) {
	Properties connectionProperties = new Properties();
    connectionProperties.put("user", "username");
    connectionProperties.put("password", "password");
	
    Dataset<Row> jdbcDF = spark.read()
      .jdbc("jdbc:postgresql:dbserver", table_name, connectionProperties);	
  }

Read operation through .load() API
using dbtable
providing table name as dbtable
private static void sparkReadLoad(SparkSession spark) {
	String tab = "BIKE";
	String opt = "dbtable";	
	Dataset<Row> jdbcDF = spark.read()
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option(opt, tab)
      .option("user", "username")
      .option("password", "password")
      .load();	
  }

providing query as dbtable
public static void main(String[] args) {
    SparkSession spark = SparkSession
      .builder()
      .appName("Java Spark SQL data sources example")
      .config("spark.some.config.option", "some-value")
      .getOrCreate();

    String sql_query = "INSERT INTO EMPLOYEE (NAME, AGE, DEPT) VALUES ('Har', 23, 'SDE') ";
	sparkReadQueryLoad(spark, sql_query);
	spark.stop();
  }

private static void sparkReadQueryLoad(SparkSession spark, String sql_query) {
	Dataset<Row> jdbcDF = spark.read()
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", sql_query)
      .option("user", "username")
      .option("password", "password")
      .load();	
  }

using query
private static void sparkQuery(SparkSession spark) {
	String opt = "query";
	String query = "select * from MANAGER where rownum<=20";
	Dataset<Row> jdbcDF = spark.read()
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option(opt, query)
      .option("user", "username")
      .option("password", "password")
      .load();	
  }

Read operation using Map
public List<String> getTableNames(String JDBC_URI,String JDBC_username,String JDBC_password) {
	List<String> TablesList =  mysql_getTablesName("URL", "username", "password");
	return TablesList;
}
public List<String> mysql_getTablesName(String query, String uri, String username, String password) {
	SparkConf sc = new SparkConf().setAppName("MySQL");
	JavaSparkContext ctx = new JavaSparkContext(sc);
	SQLContext sqlContext = new SQLContext(ctx);
			
	String dbt = "CAR";
	Map<String, String> optionsReadMap = new HashMap<String, String>();
	optionsReadMap.put("url", "MYSQL_CONNECTION_URL");
	optionsReadMap.put("driver", "MYSQL_DRIVER");
	optionsReadMap.put("dbtable", dbt);
	optionsReadMap.put("user", "MYSQL_USERNAME");
	optionsReadMap.put("password", "MYSQL_PWD");
	Dataset<Row> jdbcDF = sqlContext.read().format("jdbc").options(optionsReadMap).load();
}

Read operation through SQLContext.load() API using Map
public static void main(String[] args) {
        //Data source options
        Map<String, String> options = new HashMap<>();
        options.put("driver", MYSQL_DRIVER);
        options.put("url", MYSQL_CONNECTION_URL);
        options.put("dbtable",
                    "(select emp_no, concat_ws(' ', first_name, last_name) as full_name from employees) as employees_name");
        options.put("partitionColumn", "emp_no");
        options.put("lowerBound", "10001");
        options.put("upperBound", "499999");
        options.put("numPartitions", "10");

        //Load MySQL query result as DataFrame
        DataFrame jdbcDF = sqlContext.load("jdbc", options);

        List<Row> employeeFullNameRows = jdbcDF.collectAsList();

        for (Row employeeFullNameRow : employeeFullNameRows) {
            LOGGER.info(employeeFullNameRow);
        }
    }

Write Operation

Write operation through .jdbc() API
private static void sparkWrite() {
	Properties connectionProperties = new Properties();
    connectionProperties.put("user", "username");
    connectionProperties.put("password", "password");
	
	Dataset<Row> jdbcDF;
	jdbcDF.write()
      .jdbc("jdbc:postgresql:dbserver", "STUDENT", connectionProperties);	
  }

Write operation through .save() API
private static void sparkWriteSave() {
	Dataset<Row> jdbcDF;
	jdbcDF.write()
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "FACULTY")
      .option("user", "username")
      .option("password", "password")
      .save();	
  }

Write operation using Map
public List<String> getTableNames(String JDBC_URI,String JDBC_username,String JDBC_password) {
	List<String> TablesList =  mysql_getTablesName("SELECT * FROM AUTO", "RED" , "username", "password");
	return TablesList;
}
public List<String> mysql_getTablesName(String query, String uri, String username, String password) {
	Map<String, String> optionsWriteMap = new HashMap<String, String>();
	optionsWriteMap.put("url", "MYSQL_CONNECTION_URL");
	optionsWriteMap.put("driver", "MYSQL_DRIVER");
	optionsWriteMap.put("dbtable", query);
	optionsWriteMap.put("user", "MYSQL_USERNAME");
	optionsWriteMap.put("password", "MYSQL_PWD");
			
	Dataset<Row> DF;
	DF.write().format("jdbc").options(optionsWriteMap).save();
}