Follow my blog with Bloglovin

Friday, January 10, 2014

Cassandra Data Modeling

Forget everything about relational modeling and model queries

In Cassandra Data Modeling we have to model query results and all data need to reside in single table, joins are not supported. Also, only columns part of primary key or indexed can be used in ad-hoc (where clause) query and aggregation/group by is also not supported (important consideration in modeling query results). Indexes makes writes slow, so should be avoided.

Keys in Cassandra data modeling:
  • Primary Key [Partition Key+Clustering Key+Unique Key]:
    • Decides how data will be partitioned [Partition Key(s)]. First column is partition key. If multiple keys need to be used as partition key they can be enclosed in parentheses. These columns can not be updated.
    • Decides how data will be ordered/grouped [Clustering Keys]. All other columns in primary keys except partition key are clustering keys. Data will be ordered according to order of clustering keys in a partition.
    • Uniquely identify rows. All of them identifies data uniquely (at least one column must ensure uniqueness).
    • Syntax:
                           CREATE TABLE table1{
                                  column1 type1,
                                  column2 type1,
                                  column3 type1,
                                  column4 type1,
                                  .....
                                   PRIMARY KEY ((column1,column2),column3,column4)
                             }
  • Ad-hoc query must include primary key columns from left to right continuously leaving columns at right. For example,  it can include column1, column2 and column3 but can not column1, column3 or column1, column2, column4
  • Depending on queries we may need to create multiple table and store same date with different keys and their ordering.
Consider the playist example from Datastax Virtual Training:
We need to store following music track information:
Id
Title
Album
Artist
Genre
Track length
Rating

And we need following queries:
  • Get a particular track selected (track by id).
  • Get all songs from an album (tracks in a album order by rating).
  • Search songs by genre (tracks of a genre type, grouped into albums and order by ratings).
  • Get songs by artist (tracks by an artist, grouped into album and order by rating).
  • GROUP album BY listen_count
  • GROUP album and track BY listen_count
 In relational modeling there will be normalized tables like, artist_detail, album_detail etc and track table will have (N:1) relation with those table, holding only reference ids and need to join for fetching result. In Cassandra we will have to store all details in one table (multiple tables for all queries). This is similar to materialized view in databases.

Create a keyspace say "playlist":
 CREATE KEYSPACE playlist WITH REPLICATION = {
            'class':'SimpleStrategy',
            'replication_factor':3
 };

Create Tables:
CREATE TABLE track_by_id (
id int PRIMARY KEY,
title text,
album text.
artist text,
genre text,
track_length int,
rating int
);

CREATE TABLE tracks_by_album (
id int,
title text,
album text.
artist text,
genre text,
track_length int,
rating int,
PRIMARY KEY(album,rating,id)
) WITH CLUSTERING ORDER BY rating DESC;

CREATE TABLE tracks_by_genre (
id int,
title text,
album text.
artist text,
genre text,
track_length int,
rating int,
PRIMARY KEY(genre,album,rating,id)
) WITH CLUSTERING ORDER BY rating DESC;

CREATE TABLE tracks_by_artist (
id int,
title text,
album text.
artist text,
genre text,
track_length int,
rating int,
PRIMARY KEY(artist,album,rating,id)
) WITH CLUSTERING ORDER BY rating DESC;

CREATE TABLE album_listen_count (
album text,
listen_count counter,
PRIMARY KEY(album)
);
Update album count every time a track is requested from track_by_id. Counter tables can have only one counter column and rest of columns must be part of primary key, as there will be always update to this column and no insert.

This table group by each track in album and be used for drilldown for above table data. Keys album and track_id are sufficient, but as tables are denormalized need to add title for display to user. Can use title and remove track_id, this example is only to emphasize on denormalization.

CREATE TABLE album_track_listen_count (
album text,
track_id int,
title text,
listen_count counter,
PRIMARY KEY(album,title,track_id)
);
Six tables for six queries. Can reduce tables by creating indexes, but in Cassandra duplication is encouraged rather than performance cost.

Insert data into all four atomically:
BEGIN BATCH
INSERT INTO track_by_id (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);
INSERT INTO tracks_by_album (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);
INSERT INTO tracks_by_genre (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);
INSERT INTO tracks_by_artist (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);
APPLY BATCH;

This will guarantee atomicity but costs ~30% on performance. All queries can be run independently outside BATCH.

Queries to fetch data:
SELECT FROM track_by_id WHERE id=1;

Along with this query always do below two update query, in application code, for populating groupby tables:

UPDATE album_track_listen_count SET listen_count = listen_count+1 WHERE 
album=? AND track_id=? AND title=?;
UPDATE album_listen_count SET listen_count = listen_count+1 WHERE album=?;

These updates will populate aggregate data in count tables. Query them as like any other table to get listen counts, equivalent  of Group By + Count in SQL.

SELECT FROM tracks_by_album WHERE album='Album1';
SELECT FROM tracks_by_genre WHERE genre='g1';
SELECT FROM tracks_by_artist WHERE artist='Artist1';

Java code to do this using Datastax Java Driver:
package dao;

import java.util.ArrayList;
import java.util.List;
 
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class ArtistsDAO {
        
         private static void insertTrack(Session session, int id, String title, String album,
                       String artist, String genre, int track_length, int rating){
                BatchStatement batch = new BatchStatement();
                PreparedStatement byIdPs = session.prepare("INSERT INTO track_by_id (id, title, album, artist, genre, track_length, rating) VALUES (?,?,?,?,?,?,?);");
                PreparedStatement byAlbumPs = session.prepare("INSERT INTO tracks_by_album (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);");
                PreparedStatement byGenrePs = session.prepare("INSERT INTO tracks_by_genre (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);");
                PreparedStatement byArtistPs = session.prepare("INSERT INTO tracks_by_artist (id, title, album, artist, genre, track_length, rating) VALUES (1,'Song1','Album1','Artist1','g1',300,3);");
               
                batch.add(byIdPs.bind(id, title, album, artist, genre, track_length, rating));
                batch.add(byAlbumPs.bind(id, title, album, artist, genre, track_length, rating));
                batch.add(byGenrePs.bind(id, title, album, artist, genre, track_length, rating));
                batch.add(byArtistPs.bind(id, title, album, artist, genre, track_length, rating));
               
                session.execute(batch);
         }
        
         private static void getTracksByGenre(Session session, String genre){
                PreparedStatement ps = session.prepare("SELECT FROM tracks_by_genre WHERE genre=?;");
                //as one genre can have thousand of songs, we need to process result in chunks (of size 100)
                //setFetchSize sets chunk size of records in auto paging
                ResultSet result = session.execute(ps.bind(genre).setFetchSize(100));
                for(Row row : result){
                       //process row, iteration will iterate whole result in chunks.
                }
         }
        
         public static void main(String[] args) {
              Cluster cluster = Cluster.builder().addContactPoint("localhost").build();
              Session session = cluster.connect("playlist");
              insertTrack(session, 1,"Song1","Album1","Artist1","g1",300,3);
              getTracksByGenre(session, "g1");
              session.shutdown();
              cluster.shutdown();
       }
      
}

This is for simple understanding of folks from relational world.

Popular Posts