Sunday, 10 September 2017

Git properties for project having eol(end of line) as LF but check-in and check-out from Windows

$ git config core.autocrlf false
$ git config --global core.eol lf

This will automatically convert the line endings to LF during check-in and check-out of project.

Thursday, 23 March 2017

Managing multiple versions of java in workspace

Found very useful and comprehensive read on managing multiple versions of java in workspace

https://andrew-jones.com/blog/managing-multiple-versions-of-java-on-os-x/

Thursday, 14 January 2016

Spark: Reading Avro serialized Data

Avro being the serialization format of choice in hadoop ecosystem and highly prevelant in legacy mapreduce/hive data pipeline, it becomes necessary to be able to read and process avro serialized data in spark.

Below is the complete code snippet along with the description to read avro data and output all unique PARTN_NBR ids along with their count of occurrences.

SparkConf sparkConf = new SparkConf().setAppName("RDD-Usecase1");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

In the above code snippet, we are intializing the sparkConf object and setting the name of the application. In the next line, we are creating sparkContext which uses spark configuration object, sparkConf to initialize the spark application context, sc. Here, JavaSparkContext is the java subclass of the scala implemented, SparkContext.

As of till spark 1.5, the thumb rule for SparkContext is that there can't be multliple SparkContext instances in the same JVM. Implementation-wise, when the SparkContext constructor is called, it is ensured that no other SparkContext instance is running. It throws an exception if a running context is detected and logs a warning if another thread is constructing a SparkContext


JavaPairRDD<AvroKey,NullWritable> records = sc.newAPIHadoopFile(avroFilePath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, new Configuration());

Here, variable avroFilePath variable holds the string value of the HDFS path of the input avro file. Important point to note is that it doesn't support reading partitioned avro data.
SparkContext's read function, "newAPIHadoopFile" is a generic read function which has five input parameters:
1) hdfs path of input file
2) Data Input Format Class
3)  Key Format Class
4) Value Format Class
5) Hadoop Configuration Instance
Output of the function is the JavaPairRDD instance which abstracts the input data as RDD with key-value pair.

JavaPairRDD<String, Integer> ones = records.mapToPair(new PairFunction<Tuple2<AvroKey,NullWritable>, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(Tuple2<AvroKey, NullWritable> record) throws Exception {
        return new Tuple2<String, Integer>((String) ((GenericRecord)(record._1.datum())).get("PARTN_NBR"), 1);
    }
});

"mapToPair" is the standard RDD transformation function to extract key-value pairs of a RDD. Input parameter is the overridden implementation of the call function of the interface, PairFunction.
"call" function implementation has logic on how to read avro serialized object and extract one of the field, "PARTN_NBR". The input to call function is the "record" object which is avro serialized key-value object. "record._1" gives the avro-serialized key object and "record._2" gives avro-serialized value object. "datum()" function cast the object to GenericRecord instance from which we can extract any field by giving it as parameter to get() function.
Output object, Tuple2's value is integer,1 keeping with the program logic.

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
    }
});

"reduceByKey" is the standard RDD transformation function whose working is similar to reducer. It is used to aggregate data on key.

List<Tuple2<String, Integer>> output = counts.collect();

for (Tuple2<?,?> tuple : output) {
    System.out.println(tuple._1() + ": " + tuple._2());
}

"collect" is the standard RDD action function. It will result in the trigger of all the above transformation functions and collect the final output in the in-memory data structure.
The output can be printed on console or saved in a file.

sc.stop();
sc.close();

It is mandatory to stop and close SparkContext as it will close and clear the complete application context. All the resources will be released.

Wednesday, 6 January 2016

Configuring spark history server for running on Yarn in CDH

In Spark-on-Yarn mode, each running spark application on yarn launches its own web ui which can be accessed from Yarn Resource Manager UI with "tracking url" link. This web ui has all the data on running spark application like event timeline, jobs, stages, task, task metrics, etc.
By default configuration, we can only see this web ui for running jobs. To enable it to do the same for completed jobs, spark history server has to be started and configured.
Spark history server is used to maintain and visualize the event-logs of the spark application after they got completed running on Yarn.

I tested this on CDH 5.3 and 5.5 which have spark version 1.3 and 1.5 respectively.

1) Test if spark-history-server is running or not
$ /etc/init.d/spark-history-server status
If it is not running, start it using
$ /etc/init.d/spark-history-server start

2) Configuring spark-history-server
We need to know two configuration of spark-history-server:
spark.history.fs.logDirectory : this is the directory where history-server expects the application event logs
- spark.history.ui.port : port on which it runs

These properties are configured in file "/etc/default/spark".


So we know that SPARK_HISTORY_SERVER_WEBUI_PORT is 18088 and SPARK_HISTORY_SERVER_LOG_DIR is hdfs:///user/spark/applicationHistory

If we want to change any of this property, we can change it in this file and restart the spark-history-server.

PS: There is quicker way to get to know the value of these properties of spark history server:
$ ps -ef | grep HistoryServer
mapred    2595     1  0 01:54 ?        00:01:54 /usr/java/jdk1.7.0_67-cloudera/bin/java -Dproc_historyserver -Xmx1000m -Dhadoop.log.dir=/var/log/hadoop-mapreduce -Dhadoop.log.file=hadoop.log -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str= -Dhadoop.root.logger=INFO,console -Djava.library.path=/usr/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/var/log/hadoop-mapreduce -Dhadoop.log.file=hadoop.log -Dhadoop.root.logger=INFO,console -Dhadoop.id.str=mapred -Dhadoop.log.dir=/var/log/hadoop-mapreduce -Dhadoop.log.file=hadoop.log -Dhadoop.home.dir=/usr/lib/hadoop -Dhadoop.id.str= -Dhadoop.root.logger=INFO,console -Djava.library.path=/usr/lib/hadoop/lib/native -Dhadoop.policy.file=hadoop-policy.xml -Djava.net.preferIPv4Stack=true -Dhadoop.log.dir=/var/log/hadoop-mapreduce -Dhadoop.log.file=mapred-mapred-historyserver-quickstart.cloudera.log -Dhadoop.root.logger=INFO,console -Dmapred.jobsummary.logger=INFO,JSA -Dhadoop.security.logger=INFO,NullAppender org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer
spark     4174     1  0 01:55 ?        00:03:02 /usr/java/jdk1.7.0_67-cloudera/bin/java -cp /etc/spark/conf/:/usr/lib/spark/lib/spark-assembly-1.5.0-cdh5.5.0-hadoop2.6.0-cdh5.5.0.jar:/etc/hadoop/conf/:/usr/lib/spark/lib/spark-assembly.jar:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/lib/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/lib/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/lib/*:/usr/lib/hadoop-yarn/*:/usr/lib/hive/lib/*:/usr/lib/flume-ng/lib/*:/usr/lib/paquet/lib/*:/usr/lib/avro/lib/* -Dspark.history.fs.logDirectory=hdfs:///user/spark/applicationHistory -Dspark.history.ui.port=18088 -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.history.HistoryServer
cloudera 18659  6018  0 09:06 pts/0    00:00:00 grep HistoryServer 

3) Configuring spark
Add following properties in "/etc/spark/conf/spark-default.conf"(in this file, keys and values are separated by whitespace characters).



Note that the port in "spark.yarn.historyServer.address" should be equal to "SPARK_HISTORY_SERVER_WEBUI_PORT" set in history-server. Similarly, the value of "spark.eventLog.dir" should be equal to "SPARK_HISTORY_SERVER_LOG_DIR" set in history-server.

4) Now run a spark application.
After application has completed, go to yarn ui for that application and click on "Histoy" link. It will take you to the spark web ui for that application.

Tuesday, 5 January 2016

Unit Testing Spark Job

During my time writing lots of mapreduce code and building distributed applications over hadoop framework, one thing i understood that it needs a new unit testing approach than standard java application testing one. As mapreduce code is executed in distributed hadoop framework, we can't do both functional and performance testing as part of unit testing. So, functional testing can be done by using MRUnit framework in which we can simulate working of mapper/reducer using mocked input data. But for performance testing of mapreduce job/workflow, we still have to run mapreduce job with different configuration and input data loads to get the best fit of all of them.

Sometime back when i was a developing distributed application using spark, i applied same testing approach that I described earlier for Hadoop applications. I am not sure if it is by intent by spark developers but unit testing a spark job is much more obvious and easier to think & implement than mapreduce job.

Spark job can be unit tested using following "local" deploy modes.

Deploy Mode
Description
local
Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K]
Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
local[*]
Run Spark locally with as many worker threads as logical cores on your machine.

As first deploy mode, "local" run with one worker thread, there will be 1 executor only. It can be used to do functional unit testing.
After doing functional testing, we can run the code with second deploy mode, "local[K]". For example, if we run with local[2], meaning two worker threads - which represents “minimal” parallelism. It can help in detecting bugs that only exist when we run in a distributed context.

Another major usability factor is the ease of testing the code with these deploy modes. In contrast to MRUnit which is a third party dependency and needs writing lot of boilerplate code, spark deploy modes are sweet and simple.
In Junit testcase for that spark job, you can use "setMaster" method of sparkConf object to set deploy mode like below:

SparkConf sparkConf = new SparkConf().setAppName("Testcase-1");
sparkConf.setMaster("local[2]");


For performance testing, the process is similar to one adopted in Hadoop. I ran the spark job with different values of configuration properties and data loads to get the best fit.


Friday, 18 December 2015

Spark's new Memory Manager, Unified Memory Manager

Starting from v1.6+ release, Spark will move on to the latest Memory Manager implementation, Unified Memory Manager. Over the current implementation, It aims to increase memory efficiency for running programs and lot less configuration parameters passed by user.

Starting from basics, Memory Manager in Spark are responsible for distributing Executor's Heap Memory between storage and shuffle fraction.

1) Storage Memory Fraction: This fraction hold the partitions of the RDDs being processed. It acts as an in-memory LRU cache for the data. It means data doesn't remain in-memory for long and in case of storage fraction being full for incoming data, the oldest data is dropped.
There are multiple usecases of RDDs that are stored in Storage memory:
1. Input RDD: It is the input of the program which is lazily processed on the execution of Action function. The partitions are only loaded till the scope of execution of first function.
2. Intermediate RDD: They are created as output of one function which is consumed by next function in DAG. The partitions are only loaded till the scope of execution of next function.
3. Output RDD: They are the output of the last function in the DAG. They are available till the program scope.
4. Persisted RDD: In case if any of the intermediate RDD have to be used again later, it is recommended to persist it in memory, else the DAG is executed again to calculate them.
It can be persisted using persist() function of Spark API.
Following are the levels of the RDD persistence (sourced from spark's programming guide):
Storage LevelMeaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SERStore RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SERSimilar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. 
If your RDD is small and has low number of steps of calculation, MEMORY_ONLY is recommended.
If your RDD is small and has large number of steps of calculation, MEMORY_AND_DISK is recommended.
If your RDD is large and has low number of steps of calculation, MEMORY_ONLY_SER is recommended.
If your RDD is large and has large number of steps of calculation, MEMORY_AND_DISK_SER is recommended.
If you have multiple RDDs where each is large in size, has large number of steps of execution and want to share it within multiple spark applications, using TACHYON is recommended.
Storage Memory Distribution among Tasks: All the storage memory division logic among tasks is logically implemented, not physically enforced in JVM. So, each task is allocated minimum 1/2n fraction of storage memory with maximum upto 1/n fraction of storage memory. But it can still load the data physically in excess to allocation. That’s why, optimized execution of spark jobs depends significantly on synchronization between value of "spark.executor.memory" MB and number of parallel tasks (spark.executor.cores/spark.task.cpus) running on each executor. 
Thus, If value of "spark.executor.memory" is low, number of parallel tasks should be set to a higher value.

2) Shuffle Memory Fraction: When a dataset is aggregated/reduced by a key, all of its RDDs are shuffled to create a sorted dataset. This sorting needs some memory/buffer to keep sorted chunks of data. The amount of memory used depends on algorithm being used. This memory buffer used during sorting in shuffle phase is called shuffle memory fraction.

Executor JVM Heap Initialization
When Spark application is submitted in Spark-on-Yarn mode, the amount of memory to be used for each of the executors (–executor-memory flag or spark.executor.memory  parameter) is specified. Also the amount of memory to be used by the driver application (–driver-memory flag or spark.driver.memory parameter) is specified.

When executing, there will be spark.executor.instances  number of spark executors, each running as Java process in an isolated Yarn Container. Each Spark executor’s Java process launches a JVM of spark.executor.memory  MB. But Yarn container for spark executor occupies higher memory than spark.executor.memory  MB by max(384 MB, 0.10* spark.executor.memory). 
This difference is memory-overhead of launching yarn container as yarn needs some memory for internal execution and maintaining state. Not adding overhead memory will result in container not getting launched as YARN strictly follows the policy that if request memory is less than memory available in container, request fails.


Current Memory Manager Implementation- Static Memory Manager:
This is the only Memory Manager supported currently. In this implementation,
1) the ratio of the two fractions, storage and shuffle is statically defined by setting the parameter "spark.storage.memoryFraction". Because of statically defined boundaries, each fraction can’t use other fraction’s space even when it is idle. Overall, it results in heavy under-utilization of heap space.
2) To optimize the utilization of heap space, the end-user has to estimate storage memory and shuffle memory requirement for that program and then, have to set memory manager configuration parameters like "spark.storage.memoryFraction","spark.shuffle.memoryFraction", etc. This activity has to be repeated for each application and each execution environment. As it is more hit and trial way of optimizing the application, it can be frustrating for developers.

For more exhaustive read on static memory manager, refer http://0x0fff.com/spark-architecture/

New Memory Manager Implementation- Unified Memory Manager
This implementation aims to mitigate the above two disadvantages of static memory manager: under-utilization of java heap memory and manual intervention in optimizing usage of java heap memory.



It enforces a soft boundary between shuffle and storage memory such that either side can borrow memory from the other.
The region shared between shuffle and storage is a fraction of the total heap space, configurable through `spark.memory.fraction` (default 0.75). The position of the boundary within this space is further determined by `spark.memory.storageFraction` (default 0.5).
This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default. So, if heap size is 512 MB, the storage memory size will be 192 MB. But this is the minimum memory which can be used for storage purpose, as more memory can be borrowed from shuffle as per availability.
Storage can borrow as much shuffle/execution memory as is free until shuffle reclaims its space. When this happens, cached blocks will be evicted from memory until sufficient borrowed memory is released to satisfy the shuffle memory request.
Similarly, shuffle can borrow as much storage memory as is free. However, shuffle memory is never evicted by storage due to the complexities involved in implementing this. The implication is that attempts to cache blocks may fail if shuffle has already eaten up most of the storage space, in which case the new blocks will be evicted immediately according to their respective storage levels.

Unified Memory Manager further helps in reducing JVM heap space issues as heap space is not statically divided. As boundary is fluid, execution and storage can utilize each other’s idle space and other can reclaim it when needed. This results in optimum utilization of JVM heap space.

Wednesday, 9 December 2015

container-launch exception in CDH 5.3 while launching spark-shell or submitting spark job in YARN deploy mode

It is a kind of bug in CDH 5.3 spark configuration but couple of months back while running first spark job, it was a big mysterical hurdle. I am adding a post on it as i came across couple of guys in my workplace facing same issue. I am hoping it might help other guys stepping into spark world through CDH 5.3 .

Below is the sequence of issue occurence and resolution:


  • [cloudera@quickstart bin]$  ./spark-shell --master yarn
2015-10-25 22:42:09,321 INFO  [main] spark.SecurityManager (Logging.scala:logInfo(59)) - Changing view acls to: cloudera
2015-10-25 22:42:09,332 INFO  [main] spark.SecurityManager (Logging.scala:logInfo(59)) - Changing modify acls to: cloudera
2015-10-25 22:42:09,333 INFO  [main] spark.SecurityManager (Logging.scala:logInfo(59)) - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
2015-10-25 22:42:09,333 INFO  [main] spark.HttpServer (Logging.scala:logInfo(59)) - Starting HTTP Server
2015-10-25 22:42:09,483 INFO  [main] server.Server (Server.java:doStart(272)) - jetty-8.y.z-SNAPSHOT
2015-10-25 22:42:09,520 INFO  [main] server.AbstractConnector (AbstractConnector.java:doStart(338)) - Started SocketConnector@0.0.0.0:40247
2015-10-25 22:42:09,521 INFO  [main] util.Utils (Logging.scala:logInfo(59)) - Successfully started service 'HTTP class server' on port 40247.
............................................
2015-10-25 22:42:42,999 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) -
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: root.cloudera
start time: 1445838161149
final status: UNDEFINED
tracking URL: http://quickstart.cloudera:8088/proxy/application_1436476349975_0011/
user: cloudera
2015-10-25 22:42:44,672 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:42:45,681 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:42:55,364 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:43:00,660 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:43:01,849 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:43:02,854 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: FAILED)
2015-10-25 22:43:02,854 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) -
client token: N/A
diagnostics: Application application_1436476349975_0011 failed 2 times due to AM Container for appattempt_1436476349975_0011_000002 exited with  exitCode: 1 due to: Exception from container-launch.
Container id: container_1436476349975_0011_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:197)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1
.Failing this attempt.. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: root.cloudera
start time: 1445838161149
final status: FAILED
tracking URL: http://quickstart.cloudera:8088/cluster/app/application_1436476349975_0011
user: cloudera
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:102)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:58)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:335)
at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
at $iwC$$iwC.<init>(<console>:9)
at $iwC.<init>(<console>:18)
at <init>(<console>:20)
at .<init>(<console>:24)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:270)
at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:60)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:147)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:60)
at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106)
at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:60)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:962)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

  • One thing was obvious after reading the above stacktrace, there is some exception in launching container for spark-shell application, application_1436476349975_0011. The application-id can be retrieved from spark-shell console logs above.
  • The next thing was to check yarn logs for this application.
[cloudera@quickstart bin]$ yarn logs -applicationId application_1436476349975_0011
15/10/25 22:44:39 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032


Container: container_1436476349975_0011_01_000001 on quickstart.cloudera_60545
================================================================================
LogType: stderr
LogLength: 1151
Log Contents:
Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder
at org.apache.spark.Logging$class.initializeLogging(Logging.scala:116)
at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
at org.apache.spark.Logging$class.log(Logging.scala:51)
at org.apache.spark.deploy.yarn.ApplicationMaster$.log(ApplicationMaster.scala:495)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:511)
at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:536)
at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
Caused by: java.lang.ClassNotFoundException: org.slf4j.impl.StaticLoggerBinder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 7 more

LogType: stdout
LogLength: 0
Log Contents:



Container: container_1436476349975_0011_02_000001 on quickstart.cloudera_60545
================================================================================
LogType: stderr
LogLength: 1151
Log Contents:
Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder
at org.apache.spark.Logging$class.initializeLogging(Logging.scala:116)
at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
at org.apache.spark.Logging$class.log(Logging.scala:51)
at org.apache.spark.deploy.yarn.ApplicationMaster$.log(ApplicationMaster.scala:495)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:511)
at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:536)
at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
Caused by: java.lang.ClassNotFoundException: org.slf4j.impl.StaticLoggerBinder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 7 more

LogType: stdout
LogLength: 0
Log Contents:


  • On first look, It seems to be a classpath issue due to which it was not able to find slf4j lib in classpath.
  • Resolution lies in configuration file, /etc/spark/conf/spark-env.sh where there is a minor typographical mistake.
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-etc/hadoop/conf}

In the above statement in spark-env.sh, there is a / missing in front of etc/hadoop/conf due to which it is missing slf4j.

so, i changed it to 

export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf}

  • Now it is running fit and fine.
Happy Hacking !!!