Spark et Cassandra
Cassandra est une base de données distribuée capable de stocker de gros volumes de données. Si son modèle de données constitué de tables et de colonnes, et son langage de requêtage CQL imitent fortement les bases de données relationnelles, la ressemblance s’arrête là.
Les possibilités de requêtage dépendent intrinsèquement de la manière dont sont stockées/modélisées les données.
Datastax fournit un connecteur Spark qui permet manipuler des tables Cassandra sous forme de RDD, concept que nous avons présenté lors du précédent article d’introduction à Spark. Par le biais de ce connecteur, Spark apporte à Cassandra des capacités de requêtage analytique.
D’une manière plus générale, nous étudierons l’intégration de Cassandra et Spark. Ce sera aussi l’occasion de découvrir les possibilités de Spark SQL et Spark Streaming.
Spark classique
Pour commencer avec le connecteur Cassandra Spark, les Jobs Spark doivent pouvoir se connecter à Cassandra. Pour cela on place dans la configuration les identifiants de connexion à Cassandra:
SparkConf conf = new SparkConf(true) .setMaster("local") .setAppName("Zenika") .set("spark.cassandra.connection.host", "node1") .set("spark.cassandra.connection.username", "spark") .set("spark.cassandra.connection.password", "spark");
Pour lire les données dans Cassandra, on décrit au moyen de la classe CassandraJavaUtil
une requête de type select
comme en CQL. On obtient ainsi un RDD de lignes Cassandra:
CassandraJavaRDD<CassandraRow> metricRdd = javaFunctions(sparkContext) .cassandraTable("metrics", "metric") .select("host", "name", "date", "value");
Au passage, notez qu’en Scala, les conversions implicites auraient permis d’éviter l’appel à javaFunctions
.
Le connecteur sait automatiser le mapping ligne/objet à la manière d’un ORM (voir mapRowTo
), la columnMap
contient la correspondance colonne/propriété:
CassandraJavaRDD<Metric> metricRdd = javaFunctions(sparkContext) .cassandraTable("metrics", "metric", mapRowTo(Metric.class, columnMap)) .select("host", "name", "date", "value");
Le chemin inverse, l’écriture d’un RDD dans Cassandra, est très similaire à la lecture. La méthode saveToCassandra
est une action Spark tout comme saveAsTextFile
:
javaFunctions(metricStatRdd) .writerBuilder("metrics", "metric_stat", mapToRow(MetricStat.class, columnMap)) .saveToCassandra();
De manière à optimiser les accès base, il est de bon ton de placer les esclaves Spark sur les mêmes machines que les noeuds Cassandra.
Le connecteur Cassandra-Spark amène chaque exécutant Spark à préférer lire les partitions Cassandra du noeud sur lequel il est, évitant ainsi de coûteux allers-retours. Autrement dit, le connecteur sait optimiser les lectures, dès lors que Spark et Cassandra cohabitent sur un même noeud.
Spark SQL
Pour mémoire, Spark SQL est une extension de Spark qui permet d’exprimer les traitements sur un RDD dans un langage inspiré de SQL. Sous le capot, Spark SQL utilise un type de RDD spécialisé, nommé SchemaRDD
, dans lequel les données seront structurées sous forme d’un tableau. Le SchemaRDD
est constitué de lignes (Row
), toutes ayant les mêmes colonnes. Pour passer d’un RDD
tout simple à un SchemaRDD
, on décrit les colonnes, puis on lui affecte un nom de table:
// Un RDD tout simple List<Metric> metrics = ... JavaRDD<Metric> simpleRdd = sparkContext.parallelize(metrics); // Un SchemaRDD JavaSQLContext sqlContext = new JavaSQLContext(sparkContext); JavaSchemaRDD schemaRDD = sqlContext.applySchema(simpleRdd, Metric.class); schemaRDD.registerTempTable("metric"); JavaSchemaRDD resultRDD = sqlContext.sql("select host, avg(value) from metric where name='cpu.total' group by host");
Une fois tables et colonnes déclarées dans le contexte, Spark SQL permet d’exécuter du pseudo-SQL sur n’importe quel RDD. En enregistrant plusieurs tables dans le context Spark SQL, on aurait pu faire faire des jointures entre elles.
Revenons à Cassandra, avec le connecteur Cassandra Spark, on peut utiliser Spark SQL pour requêter directement la base de données et produire des SchemaRDD
s.
JavaCassandraSQLContext cassandraSQLContext = new JavaCassandraSQLContext(sc); JavaSchemaRDD metricRDD = cassandraSQLContext.sql("select name from metrics.metric where name like 'cpu%'"); long count = metricRDD .map(row -> row.getString(0)) distinct().count();
On peut aussi appliquer des transformations habituelles (map, filter), du cache, etc., sur le résultat de la requête comme sur tout RDD qui se respecte.
Pourquoi utiliser Spark SQL alors qu’on dispose déjà de CQL? Spark SQL permet de:
- De requêter les données même si le modèle de données ne si prête pas. En effet, en CQL, on ne pourra pas exprimer une requête si le modèle ne s’y prête pas.
- De faire des jointures et d’utiliser de nombreux opérateurs comme
like
ougroup by
dans les exemples ci-dessous
Bien que plus riche et permissive, l’approche Spark SQL est aussi beaucoup plus coûteuse que du CQL. En effet, Spark va généralement balayer un gros volume de données à la façon d’un full scan. Pour limiter l’impact de Spark sur les performances de Cassandra, il est vivement conseillé de créer un datacenter logique au niveau de Cassandra de manière à isoler cette charge.
Au final, Spark SQL ne remplace pas CQL, mais le complète pour tout ce qui est requêtes analytiques.
Spark Streaming
Si le coeur de Spark est focalisé sur les traitements en masse de gros volumes de données, Spark Streaming propose une API similaire pour des traitements de données au fil de l’eau. On troque les forts volumes de données pour de forts débits, et les RDD pour des DStreams
(Discrete Streams). Mais l’API reste très proche, map
, filter
sont toujours de la partie:
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, new Duration(10000)); JavaDStream<Metric> metricDStream = streamingContext .socketTextStream("metric-source", 7075) .map(Metric::parseStream) .filter(metric -> metric.getValue() != null);
Ces DStreams seront alimentés par des systèmes orientés message/événement comme Kafka, ZeroMQ, Akka, etc. Cependant, les traitements seront effectués de manière périodique (toutes les 10 secondes dans l’exemple ci-dessus). Spark Streaming est un framework de micro-batch, il traite les données sous forme de petits RDDs toutes les N secondes (N entre 1 seconde et 1 minute):
Le fait d’avoir des RDDs ouvre pas mal de possibilités comme factoriser du code avec les traitements batch classiques, faire des jointures…
Et Cassandra dans tout ça? Et bien le connecteur permet de persister un DStream
dans une table de la même manière qu’un RDD:
javaFunctions(metricDStream) .writerBuilder("metrics", "metric", mapRowTo(Metric.class, columnMap)) .saveToCassandra();
Cassandra pour le stockage et le requêtage temps réel, Spark pour le requêtage analytique, les traitements de masse et l’intégration au fil de l’eau. Ce redoutable tandem est capable d’absorber des données en grande quantité et procéder à des analyses de données à grande échelle. Spark apporte à Cassandra des capacités de requêtage et de fouille des données, Cassandra apporte à Spark une solution pour persister
les RDDs de manière distribuée et structurée.