Skip to content

Spark Cassandra Module

karthikprasad13 edited this page Aug 3, 2015 · 7 revisions

This module includes support for Cassandra. User can perform Read-Write & Query operations over data.

Support

To use it, user needs to add the following dependency in pom.xml.

<dependency>
     <groupId>com.impetus.kundera.client</groupId>
     <artifactId>kundera-spark-cassandra</artifactId>
     <version>${kundera.version}</version>
</dependency>

Persistence unit configuration

 <persistence-unit name="spark_cass_pu">
   <provider>com.impetus.kundera.KunderaPersistence</provider>
   <class>com.impetus.client.spark.Book</class>
   <properties>
      <property name="kundera.nodes" value="localhost" />
      <property name="kundera.port" value="7077" />
      <property name="kundera.keyspace" value="sparktest" />
      <property name="kundera.dialect" value="spark" />
      <property name="kundera.client" value="cassandra" />
      <property name="kundera.client.lookup.class" value="com.impetus.spark.client.SparkClientFactory" />
      <property name="kundera.client.property" value="KunderaSparkCassProperties.xml" />
   </properties>
</persistence-unit>

Spark Related Properties

Spark Related Properties are added using xml file. For example in above persistence.xml we mentioned KunderaSparkCassProperties.xml.

Sample Property File:

<?xml version="1.0" encoding="UTF-8"?>
 <clientProperties>
   <datastores>
      <dataStore>
         <name>cassandra</name>
         <connection>
            <properties>
               <property name="spark.executor.memory" value="1g" />
               <property name="spark.cassandra.connection.host" value="localhost" />
               <property name="spark.cassandra.connection.native.port" value="9042" />
               <property name="spark.cassandra.connection.rpc.port" value="9160" />
               <property name="spark.master" value="local" />
               <property name="spark.app.name" value="testspark" />
               <property name="spark.driver.allowMultipleContexts" value="true" />
            </properties>
         </connection>
      </dataStore>
   </datastores>
</clientProperties>

Here "spark.master" and "spark.app.name" properties are mandatory. User can add more [spark related properties] (http://spark.apache.org/docs/latest/configuration.html#available-properties) as per their need.

Entity

@Entity
@Table(name = "spark_book")
public class Book implements Serializable
{

    /** The Constant serialVersionUID. */
    private static final long serialVersionUID = 1L;

    /** The id. */
    @Id
    private String id;

    /** The title. */
    @Column
    private String title;

    /** The author. */
    @Column
    private String author;

    /** The category. */
    @Column
    private String category;

    /** The num pages. */
    @Column
    private int numPages;

   // setters and getters. 
}

Read-Write Operation

    EntityManagerFactory emf = Persistence.createEntityManagerFactory("spark_cass_pu");
    EntityManager em = emf.createEntityManager();
    Book book = new Book();
    book.setId("1");
    book.setTitle("A Tale of Two Cities");
    book.setAuthor("Charles Dickens");
    book.setCategory("History");
    book.setNumPages(441);

    // save data 
    em.persist(book);
    
    em.clear();

   Book bookFound = em.find(Book.class, "1"); 

   em.close();
   emf.close();

Query Operation

Select all :

String query = "select * from spark_book"; 
List results = em.createNativeQuery(query).getResultList();

Select with WHERE :

String query = "select * from spark_book where numPages > 450";
List results = em.createNativeQuery(query).getResultList();

Select with LIKE :

String query = "select * from spark_book where title like 'The%'";
List results = em.createNativeQuery(query).getResultList();

Sum (Aggregation) :

String query = "select sum(numPages) from spark_book";
List results = em.createNativeQuery(query).getResultList();

Saving data after Querying

Details of the syntax can be found here.

To save in Cassandra:

    String query = "INSERT INTO cassandra.sparktest.spark_book_copy FROM (select * from spark_book)";
    Query q = em.createNativeQuery(query, Book.class);
    q.executeUpdate();

To save in FS as CSV:

    String query = "INSERT INTO fs.[src/test/resources/testspark_csv] AS CSV FROM (select * from spark_book)";
    Query q = em.createNativeQuery(query, Book.class);
    q.executeUpdate();

To save in FS as JSON:

    query = "INSERT INTO fs.[src/test/resources/testspark_json] AS JSON FROM (select * from spark_book)";
    q = em.createNativeQuery(query, Book.class);
    q.executeUpdate();

For more details find the testcase.

Limitations:

Please refer this for limitations.

Clone this wiki locally