Thursday, 14 January 2016

Spark: Reading Avro serialized Data

Avro being the serialization format of choice in hadoop ecosystem and highly prevelant in legacy mapreduce/hive data pipeline, it becomes necessary to be able to read and process avro serialized data in spark.

Below is the complete code snippet along with the description to read avro data and output all unique PARTN_NBR ids along with their count of occurrences.

SparkConf sparkConf = new SparkConf().setAppName("RDD-Usecase1");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

In the above code snippet, we are intializing the sparkConf object and setting the name of the application. In the next line, we are creating sparkContext which uses spark configuration object, sparkConf to initialize the spark application context, sc. Here, JavaSparkContext is the java subclass of the scala implemented, SparkContext.

As of till spark 1.5, the thumb rule for SparkContext is that there can't be multliple SparkContext instances in the same JVM. Implementation-wise, when the SparkContext constructor is called, it is ensured that no other SparkContext instance is running. It throws an exception if a running context is detected and logs a warning if another thread is constructing a SparkContext


JavaPairRDD<AvroKey,NullWritable> records = sc.newAPIHadoopFile(avroFilePath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, new Configuration());

Here, variable avroFilePath variable holds the string value of the HDFS path of the input avro file. Important point to note is that it doesn't support reading partitioned avro data.
SparkContext's read function, "newAPIHadoopFile" is a generic read function which has five input parameters:
1) hdfs path of input file
2) Data Input Format Class
3)  Key Format Class
4) Value Format Class
5) Hadoop Configuration Instance
Output of the function is the JavaPairRDD instance which abstracts the input data as RDD with key-value pair.

JavaPairRDD<String, Integer> ones = records.mapToPair(new PairFunction<Tuple2<AvroKey,NullWritable>, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(Tuple2<AvroKey, NullWritable> record) throws Exception {
        return new Tuple2<String, Integer>((String) ((GenericRecord)(record._1.datum())).get("PARTN_NBR"), 1);
    }
});

"mapToPair" is the standard RDD transformation function to extract key-value pairs of a RDD. Input parameter is the overridden implementation of the call function of the interface, PairFunction.
"call" function implementation has logic on how to read avro serialized object and extract one of the field, "PARTN_NBR". The input to call function is the "record" object which is avro serialized key-value object. "record._1" gives the avro-serialized key object and "record._2" gives avro-serialized value object. "datum()" function cast the object to GenericRecord instance from which we can extract any field by giving it as parameter to get() function.
Output object, Tuple2's value is integer,1 keeping with the program logic.

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
    }
});

"reduceByKey" is the standard RDD transformation function whose working is similar to reducer. It is used to aggregate data on key.

List<Tuple2<String, Integer>> output = counts.collect();

for (Tuple2<?,?> tuple : output) {
    System.out.println(tuple._1() + ": " + tuple._2());
}

"collect" is the standard RDD action function. It will result in the trigger of all the above transformation functions and collect the final output in the in-memory data structure.
The output can be printed on console or saved in a file.

sc.stop();
sc.close();

It is mandatory to stop and close SparkContext as it will close and clear the complete application context. All the resources will be released.

No comments:

Post a Comment