Friday 18 December 2015

Spark's new Memory Manager, Unified Memory Manager

Starting from v1.6+ release, Spark will move on to the latest Memory Manager implementation, Unified Memory Manager. Over the current implementation, It aims to increase memory efficiency for running programs and lot less configuration parameters passed by user.

Starting from basics, Memory Manager in Spark are responsible for distributing Executor's Heap Memory between storage and shuffle fraction.

1) Storage Memory Fraction: This fraction hold the partitions of the RDDs being processed. It acts as an in-memory LRU cache for the data. It means data doesn't remain in-memory for long and in case of storage fraction being full for incoming data, the oldest data is dropped.
There are multiple usecases of RDDs that are stored in Storage memory:
1. Input RDD: It is the input of the program which is lazily processed on the execution of Action function. The partitions are only loaded till the scope of execution of first function.
2. Intermediate RDD: They are created as output of one function which is consumed by next function in DAG. The partitions are only loaded till the scope of execution of next function.
3. Output RDD: They are the output of the last function in the DAG. They are available till the program scope.
4. Persisted RDD: In case if any of the intermediate RDD have to be used again later, it is recommended to persist it in memory, else the DAG is executed again to calculate them.
It can be persisted using persist() function of Spark API.
Following are the levels of the RDD persistence (sourced from spark's programming guide):
Storage LevelMeaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SERStore RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SERSimilar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. 
If your RDD is small and has low number of steps of calculation, MEMORY_ONLY is recommended.
If your RDD is small and has large number of steps of calculation, MEMORY_AND_DISK is recommended.
If your RDD is large and has low number of steps of calculation, MEMORY_ONLY_SER is recommended.
If your RDD is large and has large number of steps of calculation, MEMORY_AND_DISK_SER is recommended.
If you have multiple RDDs where each is large in size, has large number of steps of execution and want to share it within multiple spark applications, using TACHYON is recommended.
Storage Memory Distribution among Tasks: All the storage memory division logic among tasks is logically implemented, not physically enforced in JVM. So, each task is allocated minimum 1/2n fraction of storage memory with maximum upto 1/n fraction of storage memory. But it can still load the data physically in excess to allocation. That’s why, optimized execution of spark jobs depends significantly on synchronization between value of "spark.executor.memory" MB and number of parallel tasks (spark.executor.cores/spark.task.cpus) running on each executor. 
Thus, If value of "spark.executor.memory" is low, number of parallel tasks should be set to a higher value.

2) Shuffle Memory Fraction: When a dataset is aggregated/reduced by a key, all of its RDDs are shuffled to create a sorted dataset. This sorting needs some memory/buffer to keep sorted chunks of data. The amount of memory used depends on algorithm being used. This memory buffer used during sorting in shuffle phase is called shuffle memory fraction.

Executor JVM Heap Initialization
When Spark application is submitted in Spark-on-Yarn mode, the amount of memory to be used for each of the executors (–executor-memory flag or spark.executor.memory  parameter) is specified. Also the amount of memory to be used by the driver application (–driver-memory flag or spark.driver.memory parameter) is specified.

When executing, there will be spark.executor.instances  number of spark executors, each running as Java process in an isolated Yarn Container. Each Spark executor’s Java process launches a JVM of spark.executor.memory  MB. But Yarn container for spark executor occupies higher memory than spark.executor.memory  MB by max(384 MB, 0.10* spark.executor.memory). 
This difference is memory-overhead of launching yarn container as yarn needs some memory for internal execution and maintaining state. Not adding overhead memory will result in container not getting launched as YARN strictly follows the policy that if request memory is less than memory available in container, request fails.


Current Memory Manager Implementation- Static Memory Manager:
This is the only Memory Manager supported currently. In this implementation,
1) the ratio of the two fractions, storage and shuffle is statically defined by setting the parameter "spark.storage.memoryFraction". Because of statically defined boundaries, each fraction can’t use other fraction’s space even when it is idle. Overall, it results in heavy under-utilization of heap space.
2) To optimize the utilization of heap space, the end-user has to estimate storage memory and shuffle memory requirement for that program and then, have to set memory manager configuration parameters like "spark.storage.memoryFraction","spark.shuffle.memoryFraction", etc. This activity has to be repeated for each application and each execution environment. As it is more hit and trial way of optimizing the application, it can be frustrating for developers.

For more exhaustive read on static memory manager, refer http://0x0fff.com/spark-architecture/

New Memory Manager Implementation- Unified Memory Manager
This implementation aims to mitigate the above two disadvantages of static memory manager: under-utilization of java heap memory and manual intervention in optimizing usage of java heap memory.



It enforces a soft boundary between shuffle and storage memory such that either side can borrow memory from the other.
The region shared between shuffle and storage is a fraction of the total heap space, configurable through `spark.memory.fraction` (default 0.75). The position of the boundary within this space is further determined by `spark.memory.storageFraction` (default 0.5).
This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default. So, if heap size is 512 MB, the storage memory size will be 192 MB. But this is the minimum memory which can be used for storage purpose, as more memory can be borrowed from shuffle as per availability.
Storage can borrow as much shuffle/execution memory as is free until shuffle reclaims its space. When this happens, cached blocks will be evicted from memory until sufficient borrowed memory is released to satisfy the shuffle memory request.
Similarly, shuffle can borrow as much storage memory as is free. However, shuffle memory is never evicted by storage due to the complexities involved in implementing this. The implication is that attempts to cache blocks may fail if shuffle has already eaten up most of the storage space, in which case the new blocks will be evicted immediately according to their respective storage levels.

Unified Memory Manager further helps in reducing JVM heap space issues as heap space is not statically divided. As boundary is fluid, execution and storage can utilize each other’s idle space and other can reclaim it when needed. This results in optimum utilization of JVM heap space.

Wednesday 9 December 2015

container-launch exception in CDH 5.3 while launching spark-shell or submitting spark job in YARN deploy mode

It is a kind of bug in CDH 5.3 spark configuration but couple of months back while running first spark job, it was a big mysterical hurdle. I am adding a post on it as i came across couple of guys in my workplace facing same issue. I am hoping it might help other guys stepping into spark world through CDH 5.3 .

Below is the sequence of issue occurence and resolution:


  • [cloudera@quickstart bin]$  ./spark-shell --master yarn
2015-10-25 22:42:09,321 INFO  [main] spark.SecurityManager (Logging.scala:logInfo(59)) - Changing view acls to: cloudera
2015-10-25 22:42:09,332 INFO  [main] spark.SecurityManager (Logging.scala:logInfo(59)) - Changing modify acls to: cloudera
2015-10-25 22:42:09,333 INFO  [main] spark.SecurityManager (Logging.scala:logInfo(59)) - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
2015-10-25 22:42:09,333 INFO  [main] spark.HttpServer (Logging.scala:logInfo(59)) - Starting HTTP Server
2015-10-25 22:42:09,483 INFO  [main] server.Server (Server.java:doStart(272)) - jetty-8.y.z-SNAPSHOT
2015-10-25 22:42:09,520 INFO  [main] server.AbstractConnector (AbstractConnector.java:doStart(338)) - Started SocketConnector@0.0.0.0:40247
2015-10-25 22:42:09,521 INFO  [main] util.Utils (Logging.scala:logInfo(59)) - Successfully started service 'HTTP class server' on port 40247.
............................................
2015-10-25 22:42:42,999 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) -
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: root.cloudera
start time: 1445838161149
final status: UNDEFINED
tracking URL: http://quickstart.cloudera:8088/proxy/application_1436476349975_0011/
user: cloudera
2015-10-25 22:42:44,672 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:42:45,681 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:42:55,364 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:43:00,660 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:43:01,849 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: ACCEPTED)
2015-10-25 22:43:02,854 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) - Application report for application_1436476349975_0011 (state: FAILED)
2015-10-25 22:43:02,854 INFO  [main] yarn.Client (Logging.scala:logInfo(59)) -
client token: N/A
diagnostics: Application application_1436476349975_0011 failed 2 times due to AM Container for appattempt_1436476349975_0011_000002 exited with  exitCode: 1 due to: Exception from container-launch.
Container id: container_1436476349975_0011_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:197)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1
.Failing this attempt.. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: root.cloudera
start time: 1445838161149
final status: FAILED
tracking URL: http://quickstart.cloudera:8088/cluster/app/application_1436476349975_0011
user: cloudera
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:102)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:58)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:140)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:335)
at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986)
at $iwC$$iwC.<init>(<console>:9)
at $iwC.<init>(<console>:18)
at <init>(<console>:20)
at .<init>(<console>:24)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:270)
at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:60)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:147)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:60)
at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106)
at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:60)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:962)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

  • One thing was obvious after reading the above stacktrace, there is some exception in launching container for spark-shell application, application_1436476349975_0011. The application-id can be retrieved from spark-shell console logs above.
  • The next thing was to check yarn logs for this application.
[cloudera@quickstart bin]$ yarn logs -applicationId application_1436476349975_0011
15/10/25 22:44:39 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032


Container: container_1436476349975_0011_01_000001 on quickstart.cloudera_60545
================================================================================
LogType: stderr
LogLength: 1151
Log Contents:
Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder
at org.apache.spark.Logging$class.initializeLogging(Logging.scala:116)
at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
at org.apache.spark.Logging$class.log(Logging.scala:51)
at org.apache.spark.deploy.yarn.ApplicationMaster$.log(ApplicationMaster.scala:495)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:511)
at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:536)
at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
Caused by: java.lang.ClassNotFoundException: org.slf4j.impl.StaticLoggerBinder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 7 more

LogType: stdout
LogLength: 0
Log Contents:



Container: container_1436476349975_0011_02_000001 on quickstart.cloudera_60545
================================================================================
LogType: stderr
LogLength: 1151
Log Contents:
Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder
at org.apache.spark.Logging$class.initializeLogging(Logging.scala:116)
at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
at org.apache.spark.Logging$class.log(Logging.scala:51)
at org.apache.spark.deploy.yarn.ApplicationMaster$.log(ApplicationMaster.scala:495)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:511)
at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:536)
at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
Caused by: java.lang.ClassNotFoundException: org.slf4j.impl.StaticLoggerBinder
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 7 more

LogType: stdout
LogLength: 0
Log Contents:


  • On first look, It seems to be a classpath issue due to which it was not able to find slf4j lib in classpath.
  • Resolution lies in configuration file, /etc/spark/conf/spark-env.sh where there is a minor typographical mistake.
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-etc/hadoop/conf}

In the above statement in spark-env.sh, there is a / missing in front of etc/hadoop/conf due to which it is missing slf4j.

so, i changed it to 

export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf}

  • Now it is running fit and fine.
Happy Hacking !!!

Tuesday 8 December 2015

TRANSPOSE/PIVOT a Table in Hive

Transposing/pivoting a table means to convert values of one of the column as set of new columns and another column as corresponding values to those new set of columns.
For example, if original table, "temp_sample" is:

id code   key    value
-------------------------------
1  A       p       e
2       B       q       f
3       B       p       f
3       B       q       h
3       B       r        j
3       C       t        k

the transpose/pivot of the table should be:

id      code    p       q       r       t
--------------------------------------
1       A       e
2       B                 f
3       B       f         h        j
3       C                                   k


Hive Implementation

1) When value is of string type
If "test_sample" is hive table with following table definiton:
create table test_sample(id string, code string, key string, value string) row format delimited fields terminated by ',' lines terminated by '\n';

hive> set hive.cli.print.header=true;
hive> select * from test_sample;
id code   key    value
1  A       p       e
2       B       q       f
3       B       p       f
3       B       q       h
3       B       r        j
3       C       t        k

the query to create transpose of it is:

select b.id, b.code, concat_ws('',b.p) as p, concat_ws('',b.q) as q, concat_ws('',b.r) as r, concat_ws('',b.t) as t from
(select id, code,
collect_list(a.group_map['p']) as p,
collect_list(a.group_map['q']) as q,
collect_list(a.group_map['r']) as r,
collect_list(a.group_map['t']) as t
from ( select
id, code,
map(key,value) as group_map
from test_sample
) a group by a.id, a.code) b;

On execution of this query, the output will be:
 id      code    p       q       r       t
--------------------------------------
1       A       e
2       B                 f
3       B       f         h        j
3       C                                   k

which is the expected output.

Important, point to note is that it is not using any custom UDF/UDAFs. It is only using in-built hive functions which save us lot of hassles.
"concat_ws" and "map" are hive udf and "collect_list" is a hive udaf.

Working
  • "map" function creates map of values of two columns as key value pairs.
  • the in outer query, we did group by on dimension columns (id and code) and aggregate all the values of a particular key using "collect_list"
id      code    p       q          r       t
1       A       ["e"]   []         []      []
2       B       []        ["f"]     []      []
3       B       ["f"]   ["h"]    ["j"]   []
3       C       []        []          []      ["k"]

  • Then in last outer query, we use "concat_ws" function to get a single string value out of array.

2) When value is of numeric (int/float/double) type
If "test_sample" is hive table with following table definiton:
create table test_sample(id string, code string, key string, value int) row format delimited fields terminated by ',' lines terminated by '\n';

hive> set hive.cli.print.header=true;
hive> select * from test_sample;
id code   key    value
1       A       p       5
2       B       q       6
3       B       p       6
3       B       q       8
3       B       r       10
3       C       t       11

the query to create transpose of it is:

select id, code, sum(a.group_map['p']) as p, sum(a.group_map['q']) as q, sum(a.group_map['r']) as r, sum(a.group_map['t']) as t from
( select id, code,  
map(proc1,proc2) as group_map 
from test_sample
) a group by a.id, a.code;


On execution of this query, the output will be:
 id      code    p            q             r             t
-----------------------------------------------------------
1       A         5            NULL    NULL    NULL
2       B         NULL    6            NULL    NULL
3       B         6             8            10          NULL
3       C         NULL    NULL    NULL    11

which is the expected output.