Friday, April 27, 2018

Step by Step guide on how to load data using Spark streaming to Cassandra

Objective: To perform the integration between the below components
and perform the data ingestion to Cassandra tables.
  1. zookeeper-3.4.10 
  2. kafka_2.12-1.0.0.tgz
  3. scala-2.11.6.tgz
  4. spark-2.2.1-bin-hadoop2.7.tgz 
  5. apache-cassandra-3.0.14-bin.tar.gz

  1. Component Integration testing

Prerequisite:

Kafka:

Kafka-Zookeeper:
To start the kafka application we need to first start the zookeeper then the kafka server.

To start the zookeeper:

bin/zkServer.sh start

To start the kafka server:

bin/kafka-server-start.sh config/server.properties
Step 1-> Topic creation:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bivibes.sample

With the above command a Kafka topic named “bivibes.sample” has been created for this project.

Step 2:Topic Listing:

bin/kafka-topics.sh --list --zookeeper localhost:2181


This step will list the topic that we created.

Step 3:  KAFKA inbuilt producer consumer testing

NB: This step is just for testing purpose

To test this out open two separate terminals and in one terminal type the command to invoke the inbuilt producer and in the other terminal type the command to invoke the inbuilt consumer. Then type something in the producer terminal; you can see that that is printed in the consumer terminal as well. Side by side both terminal screenshot is given below.

Kafka Console Producer:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bivibes.sample

Kafka Console consumer:
bin/kafka-console-consumer.sh --broker-list localhost:9092 --topic bivibes.sample

  1. Sample Data Ingestion

Step 1:  KAFKA custom producer to read input files and produce to topics

Now that we have tested out Kafka functionality in the step 3 of first section; we are about to do the actual data ingestion to Casandra. So the data flow is as given below:

Source Csv KAFKA Producer Topic Kafka consumer Spark Streaming Cassandra

To perform the data ingestion; we have created a custom producer in java and the consumer application code is written in pyspark.

Producer application we used:


import csv
from kafka import KafkaProducer
import json
import os  
#Spark
from pyspark import SparkContext, SparkConf
from pyspark.sql.context import SQLContext
from pyspark.sql import HiveContext, Row
from pyspark.sql.types import StringType, StructType, StructField
#Spark Streaming
from pyspark.streaming import StreamingContext  
#Kafka
from pyspark.streaming.kafka import KafkaUtils  
#json parsing
import re
import sys
from uuid import uuid1

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11-2.2.0  pyspark-shell'
os.environ['PYTHONPATH'] = '/bivibes_install_directory/spark/spark-2.2.0-bin-hadoop2.7/bin/:/bivibes_install_directory/spark/spark-2.2.0-bin-hadoop2.7/bin/pyspark_cassandra.zip'
topic = sys.argv[1]
inputfilepath = sys.argv[2]
producer = KafkaProducer(bootstrap_servers='localhost:9092')
def sendMessage(msg):
records = msg.collect()
co = msg.count()
print('count is ',co)
if co > 0:
for record in records:
producer.send(topic,json.dumps(record).encode('utf-8'))
#producer.send(topic,str(records))
producer.flush()
print(record)
else:
    print('no data to produce')
def main():
sc = SparkContext(appName="KafkaProducer")
ssc = StreamingContext(sc, 1)
FileStream = ssc.textFileStream(inputfilepath)
rdd = FileStream.map(lambda l:re.sub(',','|',l)).map(lambda li: li.split('\t'))
#rdd = FileStream.map(lambda li: li.split(','))
#rdd.pprint()
FileStream.foreachRDD(sendMessage)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":

  main()

 ******************************************************************************************

Consumer application we used:

import os  
from pyspark import SparkContext, SparkConf
from pyspark.sql.context import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.streaming import StreamingContext  
from pyspark.streaming.kafka import KafkaUtils  
import sys
#import aimodule
#import PythonHttpClient
sys.path.append('/bivibes_install_directory/Modelling/Final/')
#import json
#from uuid import uuid1
#import pyspark_cassandra
#from pyspark_cassandra import CassandraSparkContext, saveToCassandra

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11-2.2.0  pyspark-shell'
os.environ['PYTHONPATH'] = '/bivibes_install_directory/spark/spark-2.2.0-bin-hadoop2.7/bin/:/bivibes_install_directory/spark/spark-2.2.0-bin-hadoop2.7/bin/pyspark_cassandra.zip'

topic = sys.argv[1]
#inputfilepath = sys.argv[2]

def main():
   conf = (SparkConf().setAppName("spark-streaming"))
   sc = SparkContext.getOrCreate();
  # sc.setLogLevel("WARN")
   ssc = StreamingContext(sc, 1)
   #scc = CassandraSparkContext(conf=conf)
   kafkaStream = KafkaUtils.createStream(ssc,  'localhost:2181', 'spark-streaming', {topic:1})
   kafkaStreamRdd = kafkaStream.map(lambda x: x[1])
   kafkaStreamRdd.foreachRDD(process)
   ssc.start()
   ssc.awaitTermination()
   sc.stop()
def process(kafkaStreamRdd):
   # Convert RDD[List] to DataFrame
   print('converting to dataframe')
   sc = SparkContext.getOrCreate();
   sqlctxt = SQLContext(sc)
   delim = ','
   count = kafkaStreamRdd.count()
   if count > 0:
       newrdd = kafkaStreamRdd.map(lambda l: l.replace('"','')).map(lambda l: l.replace('[','')).map(lambda l: l.replace(']','')).map(lambda l: l.split(delim))
       
       schemaString = "colname1,colname2..etc"—depending up on the input file you use
       fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(delim)]
       schema = StructType(fields)
       #print(len(fields),'length of fileds in DataFrame &&&&&&&    ')
       newrdd1 = newrdd.filter(lambda line : line != schemaString.split(','))
   
       newrddDataFrame = sqlctxt.createDataFrame(newrdd1,schema)
       cassandraWrite = newrddDataFrame.write.format("org.apache.spark.sql.cassandra")
       #save dataframe in cassandra table

       cassandraWrite.options(table="sample_data", keyspace="bivibes").mode("append").save()
  else:
       print('no data received')
 
if __name__ == "__main__":

   main()


Place the below three in the Linux box using winscp:
  • Producer.py
  • sparkConsumer.py
  • Input.csv 
Step  4.1:

Invoke the custom KAFKA custom producer command to read the input data and produce to topic we created:

Command used:

java -jar prod.jar bivibes.sample /bivibes_install_directory/input

/bivibes_install_directory/spark/spark-2.2.0-bin-hadoop2.7/bin/spark-submit --jars /bivibes_install_directory/spark/spark-2.2.0-bin-hadoop2.7/jars/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar   --driver-class-path /bivibes_install_directory/spark/spark-2.2.0-bin-hadoop2.7/jars/spark-streaming-kafka-0-10-assembly_2.11-2.2.0 /bivibes_install_directory/kafkaProd.py bivibes.sample /bivibes_install_directory/input


bivibes_sample→ topic we created
directory of the input file/bivibes_install_directory/input



Producer has successfully sent the messages to topic.

Step 2:  Spark streaming to consume the data from the topic:

In step 1 we have tested that KAFKA custom producer has produced the input data to topic. In this step we have to invoke the spark streaming to consume the data that is being sent from the topic. That can be done using the below command:

/bivibes_install_directory/spark/spark-2.2.0-bin-hadoop2.7/bin/spark-submit --jars /bivibes_install_directory/spark/spark-2.2.0-bin-hadoop2.7/jars/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar   --driver-class-path /bivibes_install_directory/spark/spark-2.2.0-bin-hadoop2.7/jars/spark-streaming-kafka-0-8-assembly_2.11-2.2.0 /bivibes_install_directory/sparkConsumer.py bivibes.sample

The above green highlighted are the input parameters for this command

After running the above command open another terminal and run the command to produce the data to topic again.

java -jar prod.jar bivibes.sample /bivibes_install_directory/input

The above green highlighted are the input parameters for this command.

Once you produce the data by running the above command you can see the below data coming in the first consumer window; since we have given the code to show the data for testing purpose.



Step 3:  Connect to Cassandra and create the table

Navigate to the path just before bin and type the below commands:

bin/cqlsh

Step 3.1:

Type the below sql commands one by one. Don’t forget to append semicolon. (;)

#create keyspace

 create keyspace bivibes WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 } AND DURABLE_WRITES = false;

#use keyspace

use bivibes;

#create table command:

Create table sample(--here give the column names along with the data type---);

ex: create table anzsample(id int PRIMARY KEY, name text, dept text);

NB: Now that we created the table in Cassandra we can write the data produced by Kafka to Cassandra. In the sparkConsumer.py file we have edited the code to write to the Cassandra table. PFB the code snippet form sparkConsumer.py file for writing the data. After editing this code repeat the step 2 so that kafka producer will again produce the data and spark streaming will consume the data and write the data to Cassandra.

       cassandraWrite.options(table="sample_data", keyspace="bivibes").mode("append").save()


Step 3.2: To view the data in the Cassandra table after step 3.1:
Connect to Cassandra and just run the sql
Select * from sample


No comments:

Post a Comment