If you run a Hadoop MapReduce Job and check its status page on job tracker
for its job id, you will get performance counters like the below.
File System
FILE: Number of bytes
FILE: Number of bytes
FILE: Number of read
FILE: Number of large read
FILE: Number of write
HDFS: Number of bytes read=54583718086
HDFS: Number of bytes
HDFS: Number of read
HDFS: Number of large read
HDFS: Number of write
Job Counters
Launched map tasks=369
Launched reduce tasks=1
Data-local map tasks=369
Total time spent by all maps in
occupied slots (ms)=34288552
Total time spent by all reduces
in occupied slots (ms)=232084
Total time spent by all map
tasks (ms)=8572138
Total time spent by all reduce
tasks (ms)=58021
Total vcore-seconds taken by
all map tasks=8572138
Total vcore-seconds taken by
all reduce tasks=58021
Total megabyte-seconds taken by
all map tasks=35111477248
Total megabyte-seconds taken by
all reduce tasks=237654016
Map-Reduce Framework
Map input records=14753874
Map output records=666776
Map output bytes=4383426830
Map output materialized
Input split bytes=47970
Combine input records=0
Combine output records=0
Reduce input groups=1
Reduce shuffle bytes=4386098552
Reduce input records=666776
Reduce output records=666776
Spilled Records=1333552
Shuffled Maps =369
Failed Shuffles=0
Merged Map outputs=369
GC time elapsed (ms)=1121584
CPU time spent (ms)=23707900
Physical memory (bytes)
Virtual memory (bytes)
Total committed heap usage
Shuffle Errors
File Input Format Counters
Bytes Read=49449743227
File Output Format Counters
Bytes Written=4382090874
Counters are used to determine if
and how often a particular event occurred during a job execution.
A task’s counters are sent in full every time, rather than sending
the counts since the last transmission, since this guards against errors due to
lost messages. Furthermore, during a job run, counters may go down if a task
fails. Counter values are definitive only once a job has successfully
Performance Monitoring: Tracking the counter values provides vital stats about the mapreduce job. In addition to stats on mapreduce job and its tasks, it also provide stats on operation done on underlying filesystems.
Performance Testing: Knowing the expected values of counters provide first line of performance validation for mapreduce jobs. It helps in ascertaining whether mapreduce job is behaving as expected by the ideal mapreduce paradigm. For example, the simple check that sum of number of records produced by mapper should be equal to total number of records consumed by producer.
Performance Optimization: This forms the second line of performance validation for mapreduce jobs. Counter values gave hints on what can be done to optimize that mapreduce job further.
To describe in detail how the above objectives of Performance Monitoring, Testing and Optimization can be achieved using Counters, I have listed in tabular format all mapreduce counters, their description, ideal expected values and how to optimize hints.
There are 2 categories of
counters in Hadoop:
In-built(file system, job, framework) and custom.
In-built Counters
Source Code hadoop-mapreduce-client-core/2.6.0/org/apache/hadoop/mapreduce
Brief Description
Detailed Description
Expected value for ideally
optimized MapReduce Performance
How to Optimize in case of not
expected value/ Bad Performance
Counter Group: FileSystem Counters
FileName in Hadoop Code = FileSystemCounter.properties)
= File System Counters)
each filesystem(HDFS, MAPRFS, S3):
Number of bytes read from filesystem,
total number of bytes read from a specific filesystem.
ideal expected value as it depends on volume of data being read.
It could be reduced by using efficient
serialization techniques (like Avro, Parquet, etc) and compression algorithms
(like LZO, Snappy, bzip2 etc).
Choices of algorithms purely depend on use
case. For example, Parquet is useful only if read pattern for data is limited
to some columns.
Also, Tradeoff of time taken to
serialize-deserialize and compress-decompress should be taken into
Number of bytes written to filesystem,
total number of bytes written into a specific filesystem.
ideal expected value as it depends on volume of data being generated by
mapreduce algorithm.
total number of bytes read from mapr file
Off the shelf counter for HDFS_BYTES_READ.
total number of bytes written to mapr file
Off the shelf counter for
total number of bytes read from local file
This counter is incremented for each byte read
from the local file system.
These writes occur:
1) during the shuffle phase, when the map
phase intermediate data is read by HTTP worker threads(“tasktracker.http.threads”)
into reducer tasks.
2) during the reduce phase, when reducers
read grouped and sorted data from local filesystem.
ideal expected value as it depends on shuffle phase and reducer logic.
It could be reduced by using efficient
compression algorithms (like LZO, Snappy, bzip2 etc). Also, Tradeoff of time
taken to compress-decompress should be taken into consideration
total number of bytes written to local
file system
This counter is incremented for each byte
written to the local file system.
These writes occur:
1) during the map phase when the mappers
write their intermediate results to the local file system.
2) during the shuffle phase when the
reducers spill intermediate results to their local disks while sorting.
ideal expected value as it depends on mapper logic and shuffle phase.
It could be reduced by using efficient
compression algorithms (like LZO, Snappy, bzip2 etc). Also, Tradeoff of time
taken to compress-decompress should be taken into consideration
Number of read operations
It is accumulated at the client per file
system. It is number of read operations such as listStatus,
getFileBlockLocations, open etc.
This is useful in the interim to identify
jobs that heavily load HDFS.
Number of large read operations
It is accumulated at the client per file
system. On file system, most of the operations are small except listFiles for
a large directory. Iterative listFiles was introduced in HDFS to break down a
single large operation into smaller steps. This counter is incremented for
every iteration of listFiles, when listing files under a large directory.
This is useful in the interim to identify
jobs that heavily load HDFS.
Number of write operations
It is accumulated at the client per file
system. It is number of write operations such as create, append,
setPermission etc.
This is useful in the
interim to identify jobs that heavily load HDFS.
Counter Group: Job
FileName in Hadoop Code = JobCounter.properties)
(CounterGroupName= Job Counters)
Total time spent by all maps in occupied
slots (ms)
This value indicates wall clock time for
the map tasks. It equals to FALLOW_SLOTS_MILLIS_MAPS + MILLIS_MAPS
Optimally, should be
equal to MILLIS_MAPS
Reduce time spent in FALLOW_SLOTS_MILLIS_MAPS
as much as possible
Total time spent by all maps waiting after
reserving slots (ms)
Indicates how much time map tasks wait in
the queue after the slots are reserved but before the map tasks execute. Slot
reservation is a capacity scheduler feature for memory-intensive jobs.
Not used by YARN-based mapreduce.
Optimally, should be
A high number indicates a possible
mismatch between the number of slots configured for a task tracker and how many resources are
actually available.
The way to reduce it can be one of the
1) Increase the number of nodes in cluster
as it will distribute the data and computing capacity further. Overall, load
on a particular node will decrease.
2) Reduce the mapper-task capacity of task-tracker
nodes with low level of resources.
Reduce the value of “mapred.tasktracker.map.tasks.maximum” on
that task-tracker’s mapred-site.xml
Total time spent by all map tasks (ms)
The total time taken in running all the
map tasks in ms, including speculative tasks
No ideal optimal value
as it depends on mapper logic.
Optimize the mapper code as much as
possible by profiling it for memory leaks and following best coding
Total time spent by all reduces in
occupied slots (ms)
This value indicates wall clock time for
Optimally, should be
Reduce time spent in
Total time spent by all reduces waiting
after reserving slots (ms)
indicates how much time map tasks wait in
the queue after the slots are reserved but before the reduce tasks execute.
Slot reservation is a capacity scheduler feature for memory-intensive jobs.
Not used by YARN-based mapreduce.
Optimally, should be
A high number indicates a possible
mismatch between the number of slots configured for a task tracker and how many resources are
actually available.
The way to reduce it can be one of the
1) Increase the number of nodes in cluster
as it will distribute the data and computing capacity further. Overall, load
on a particular node will decrease.
2) Reduce the reducer-task capacity of task-tracker
nodes with low level of resources.
Reduce the value of “mapred.tasktracker.reduce.tasks.maximum”
on that task-tracker’s mapred-site.xml
Total time spent by all reduce tasks (ms)
The total time taken in running all the
reduce tasks in ms, including speculative tasks
No ideal optimal value
as it depends on reducer logic.
Optimize the reducer code as much as
possible by profiling it for memory leaks and following best coding
Total vcore-seconds taken by all map tasks
Vcore stands for virtual core in CPU of a computer.
This counter measures the cpu resources used by all
the mappers.
It is the aggregated number of vcores that each mapper
had been allocated times the number of seconds that mapper had run.
The number of vcores allocated to a mapper is
controlled by property
“mapreduce.map.cpu.vcores” (default 1)
And in yarn, it is controlled by
“yarn.nodemanager.resource.cpu-vcores” (default 8).
In simple language, it is equal to
(“mapreduce.map.cpu.vcores” X MILLIS_MAPS)
It gives the value of cpu resources
that were blocked by mappers.
The ideal optimal value is as much
low value for this counter as possible.
To reduce value to optimize the value of this counter,
either reduce the number of vcore allocated to each mapper or the time taken
to execute a mapper instance.
In other words, mapper should be made optimized and
less resource-intensive as much possible. Then, reduce value of
“mapreduce.map.cpu.vcores” that fits its requirement perfectly.
Total vcore-seconds taken by all reduce tasks
Vcore stands for virtual core in CPU of a computer.
This counter measures the cpu resources used by all
the reducers.
It is the aggregated number of vcores that each
reducer had been allocated times the number of seconds that reducer had run.
The number of vcores allocated to a reducer is
controlled by property
“mapreduce.reduce.cpu.vcores” (default 1)
And in yarn, it is controlled by
“yarn.nodemanager.resource.cpu-vcores” (default 8).
In simple language, it is equal to
(“mapreduce.reduce.cpu.vcores” X MILLIS_REDUCES)
It gives the value of cpu resources
that were blocked by reducers.
The ideal optimal value is as much
low value for this counter as possible.
To reduce value to optimize the value of this counter,
either reduce the number of vcore allocated to each reducer or the time taken
to execute a mapper instance.
In other words, mapper should be made optimized and
less resource-intensive as much possible. Then, reduce value of
“mapreduce.reduce.cpu.vcores” that fits its requirement perfectly.
Total megabyte-seconds taken by all map tasks
This counter measures the memory resources used by all
the mappers.
It is the aggregated amount of memory (in megabytes)
that each mapper had been allocated times the number of seconds that mapper had
The amount of memory(MB) allocated to a mapper is
controlled by property
“mapreduce.map.memory.mb” (default 1024)
And in yarn, it is controlled by
“yarn.nodemanager.resource.memory-mb” (default 8192).
In simple language, it is equal to
(“mapreduce.map.memory.mb” X MILLIS_MAPS)
It gives the value of memory
resources that were blocked by mappers.
The ideal optimal value is as much
low value for this counter as possible.
To reduce value to optimize the value of this counter,
either reduce the number of memory-mb allocated to each mapper or the time
taken to execute a mapper instance.
In other words, mapper should be made optimized and
less resource-intensive as much possible. Then, reduce value of
“mapreduce.map.memory.mb” that fits its requirement perfectly.
Total megabyte-seconds taken by all reduce tasks
This counter measures the memory resources used by all
the reducers.
It is the aggregated amount of memory (in megabytes)
that each reducer had been allocated times the number of seconds that reducer
had run.
The amount of memory(MB) allocated to a reducer is
controlled by property
“mapreduce.reduce.memory.mb” (default 1024)
And in yarn, it is controlled by
“yarn.nodemanager.resource.memory-mb” (default 8192).
In simple language, it is equal to
(“mapreduce.reduce.memory.mb” X MILLIS_ REDUCES)
It gives the value of memory
resources that were blocked by reducers.
The ideal optimal value is as much
low value for this counter as possible.
To reduce value to optimize the value of this counter,
either reduce the number of memory-mb allocated to each reducer or the time
taken to execute a reducer instance.
In other words, mapper should be made optimized and
less resource-intensive as much possible. Then, reduce value of
“mapreduce.reduce.memory.mb” that fits its requirement perfectly.
Data-local map tasks
Total number of map tasks run on local
data blocks (data locality).
It should be as much
near to total maps as possible. Optimally, all the map tasks will execute on
local data to exploit locality of reference
The only solution is to ensure maximum
data locality.
The way to ensure it can be one of the
1) Increase the number of nodes in cluster
as it will distribute the data and computing capacity further. Overall, load
on a particular node will decrease and probability of map task getting data
locally will increase.
2) Increase the mapper-task capacity of task-tracker
nodes with high level of resources.
Increase the value of “mapred.tasktracker.reduce.tasks.maximum”
on that task-tracker’s mapred-site.xml
Rack-local map tasks
Total number of map tasks for which data
blocks are on same rack as node on which map task is executing.
Ideally should be zero.
Other local map tasks tasks
Total number of map tasks for which data
blocks are on some other rack as node on which map task is executing.
Ideally should be zero.
Launched map tasks.
It defines how many map tasks were launched
for the job, including failed tasks, killed tasks and tasks that were started
Optimally, this number
is the same as the number of splits for the job.
In case of value not maching number of
input splits, verify the InputFormat used.
Launched reduce tasks
It defines how many reduce tasks were
launched for the job, including failed tasks, killed tasks and tasks that
were started speculatively.
Optimally, should be
equal to the number of reducers configured for that mapreduce job (default is
2 reducers)
Failed map tasks
The number of map attempts/tasks that were
The reason of failure could be runtime
exception in code or errors like Out Of Memory, I/O errors, etc.
Ideally should be zero.
Failed reduce tasks
The number of reduce attempts/tasks that
were failed.
The reason of failure could be runtime
exception in code or errors like Out Of Memory, I/O errors, etc.
Ideally should be zero.
Killed map tasks
The number of map attempts/tasks that were
The reason of killing could be speculative
execution (in which slower task is killed) or failure of any node on which
task is running.
Ideally should be zero.
A high number indicates too many tasks
getting killed due to speculative execution or node failures.
The way to reduce the instances of
speculative execution can be one of the following:
1) Increase the number of nodes in cluster
as it will distribute the data and computing capacity further. Overall, load
on a particular node will decrease.
2) Remove or minimize the number of nodes
with low level of resources from cluster.
Killed reduce tasks
The number of reduce attempts/tasks that
were killed.
The reason of killing could be speculative
execution (in which slower task is killed) or failure of any node on which
task is running.
Ideally should be zero.
Counter Group: Map-Reduce Framework Counters
FileName in Hadoop Code = TaskCounter.properties)
(CounterGroupName= Map-Reduce Framework)
Input split bytes
The total number of
bytes of input-split object consumed read by maps. These objects represent
the split metadata (offset and length within a file) rather than split data
No ideal optimal value
Map input bytes
The number of bytes of uncompressed input
read by all the maps in the job.
Must be equal to number
of bytes in input data
Map input records
The number of input records consumed by
all the maps in the job. It is incremented for every successful record read from
RecordReader and passed to the mapper’s map method. Records that the map
tasks failed to read are not included in these counters.
Must be equal to number
of records in input data
In case of non-matching value, check
RecordReader logic
Map skipped records.
The number of Input records skipped by all
the maps in the job.
Skipped records are bad/corrupt records that are making the map
task attempts to fail due to run-time exceptions in third-party libraries
being used in mapper. Skipping mode is turned on for a task only after it has
failed twice.
For a task consistently failing on a bad record, the tasktracker
runs the following
task attempts with these outcomes:
1. Task fails.
2. Task fails.
3. Skipping mode is enabled. Task fails but failed record is
stored by the tasktracker.
4. Skipping mode is still enabled. Task succeeds by skipping the
bad record that failed in the previous attempt.
It can detect only one bad record per task attempt, so this
mechanism is appropriate only for detecting
occasional bad records.
To give skipping mode enough attempts to detect and skip
all the bad records in an input split, increase the value of
“mapred.map.max.attempts”. Bad records are saved as sequence files in the
output directory under the _logs/skip
Ideally, should be 0
Map output records
The number of map output records produced
by all the maps in the job. It is incremented for every successful record written
by the mappers. Records that the map tasks failed to write are not included
in these counters.
No ideal optimal value
as it depends on mapper logic.
Map output bytes
The number of bytes of uncompressed output
produced by all the maps in the job. Incremented every time collect() method
is called on a map’s Output Collector.
No ideal optimal value
as it depends on mapper logic.
To optimize this counter, use Hadoop
Serialization or Avro to serialize the Mapper output.
Map output materialized bytes
The number of bytes of map output actually
written to disk.
This is visible and relevant only if map
output compression is enabled. In that case, it is the number of bytes of
compressed output produced by all the maps in the job.
No ideal optimal value
as it depends on mapper logic.
Combine input records
It indicates the number of records that were
read by the optional combiner.
Incremented every time a value is read
from the combiner’s iterator over values
If you don’t specify a
combiner, these counters should be 0. Otherwise, should be equal to MAP_OUTPUT_RECORDS
In case of unexpected value, check
combiner logic
Combine output records
It indicates the number of records that were
written by the optional combiner.
Incremented every time the collect()
method is called on a combiner’s
If you don’t specify a
combiner, these counters should be 0. Otherwise, should be equal to
In case of unexpected value, check
combiner logic
Reduce input records
It indicates how many records were
successfully read by the reducers.
Incremented every time a value is read from the reducer’s
iterator over values.
The input record
counter should be equal to the MAP_OUTPUT_RECORDS counter, in case of no
combiner. Otherwise, should be equal to REDUCE_INPUT_RECORDS
Reduce skipped records
The number of Input records skipped by all
the reducers in the job.
Skipped records are bad/corrupt records that are making the reduce
task attempts to fail due to run-time exceptions in third-party libraries
being used in reducer. Skipping mode is turned on for a task only after it
has failed twice.
For a task consistently failing on a bad record, the tasktracker
runs the following
task attempts with these outcomes:
1. Task fails.
2. Task fails.
3. Skipping mode is enabled. Task fails but failed record is
stored by the tasktracker.
4. Skipping mode is still enabled. Task succeeds by skipping the
bad record that failed in the previous attempt.
It can detect only one bad record per task attempt, so this
mechanism is appropriate only for detecting
occasional bad records.
To give skipping mode enough attempts to detect and skip
all the bad records in an input split, increase the value of “mapred.reduce.max.attempts”.
Bad records are saved as sequence files in the job’s
output directory under the _logs/skip
Ideally should be zero.
Reduce input groups
The total number of distinct unique key
groups consumed by all the reducers in the job. It is incremented for every
unique key that the reducers process.
Incremented every time the reducer’s
reduce() method is called by the
This value should be
equal to the total number of different keys in the intermediate results from
the mappers.
Reduce skipped groups
The number of distinct key groups skipped by
all the reducers in the job.
This value should be zero
Reduce output records
It indicates how many records were
successfully written by all the reducers in the job.
Incremented every time the collect() method is called on a
reducer’s OutputCollector.
Ideally, ratio of
(REDUCE_OUTPUT_RECORDS/ MAP_INPUT_RECORDS) should be less than 1 and as much
closer to 0 as possible.
Although even if ratio is > 1, it is
not invalid. It only violates the basic assumption of mapreduce paradigm that
reduce-phase aggregates the data considerably.
Reduce shuffle bytes
It indicates how many bytes of the map
output were copied by the shuffle to the reducers.
Should be as much lower
as possible.
Higher numbers here
will make the job go slower as the shuffle process is the primary network
consumer in the MapReduce job.
Compress and Serialize the Mapper Output.
It could be reduced by using efficient
serialization techniques (like Avro, Parquet, etc) and compression algorithms
(like LZO, Snappy, bzip2 etc).
Choices of algorithms purely depend on use
case. For example, Parquet is useful only if read pattern for data is limited
to some columns.
Also, Tradeoff of time taken to
serialize-deserialize and compress-decompress should be taken into
Spilled Records
It indicates how much data the map and reduce
tasks wrote (spilled) to disk when processing the records.
Must be equal to sum of
REDUCE_INPUT_RECORDS and MAP_OUTPUT_RECORDS. In case, combiner is also
invoked, It must be equal to sum of REDUCE_INPUT_RECORDS and
Should be as much lower
as possible as it involves costly Disk I/O.
Shuffled Maps
The number of map output files transferred to reducers
by shuffle.
In shuffle phase, each mapper’s output is partitioned
and sorted into files, one for each reducer. There might be scenario that
keys are not uniformly distributed across mappers and some mappers don’t
generate partitions for all reducers.
Must be less than or equal to
product of number of successful mappers and successful reducers.
Mathematically, must be less than
Failed Shuffles
The number of map output copy failures during the
Should be zero.
Merged Map outputs
The number of map outputs that have been merged on the
reduce side of the shuffle.
Must be equal to the value of
CPU time spent (ms)
Total time spent by all tasks on CPU. It is
gathered from /proc/cpuinfo and indicate how much total time was spent
executing map and reduce tasks for a job.
Should be as much lower
as possible.
GC time elapsed (ms)
The total time spent in milliseconds doing garbage
collection. The garbage collection counter is reported from
The Garbage Collection happens in Java to collect
unused objects (objects that are no longer referenced).
If garbage collection is frequent and represents a lot
of time, you may be allocating unnecessary objects.
It should be closer to 0 as much as
If GC_TIME is amounting to
considerable proportion of task run time, follow steps can help
1) Add -verbose:gc -XX:+PrintGCDetails to “mapred.child.java.opts”.
Then inspect the logs for some tasks (mappers or reducers). If new objects
are being created (like “new Text” or “new IntWritable”) unnecessarily or
inside a loop or inside a function, try to optimize it. The Hadoop Writable
Objects are mutable.
2) If too much object creation is
necessity, try to increase heap-size allocation to task attempt jvm by adding
–Xmx parameter to “mapred.map.child.java.opts” for map tasks and “mapred.reduce.child.java.opts”
for reduce tasks. For example, to set committed heap usage to 4GB for map
tasks, set “–Xmx4096m” to
Virtual memory (bytes) snapshot
Total number of Virtual
Memory Bytes (RAM+SWAP) consumed by all the tasks. It shows how much physical
memory plus swap space is consumed by all tasks in a job.
swap space or paged memory is used for a job, which should be close to 0 as
much as possible
The only reason for high value of swap
space is memory committed to current tasks on a node far exceeding available
memory resource on that node. The reason could be other system processes or
applications consuming memory. This will result in pagination of memory which
is being indicated by high value of swap space.
The way to ensure low value of swap memory
can be one of the following:
1) Increase the number of nodes in cluster
as it will distribute the data and computing capacity further. Overall, load
on a particular node will decrease.
2) Decrease the mapper-task capacity and
reducer-task capacity of task-tracker nodes having low level of resources.
Decrease the value of “mapred.tasktracker.map.tasks.maximum” and
“mapred.tasktracker.reduce.tasks.maximum” on that task-tracker’s
Physical memory (bytes) snapshot
Total Physical Memory
consumed by all the tasks. These statistics are gathered from /proc/meminfo
and indicate how much RAM (not including swap space) was consumed by all the
Total committed heap usage (bytes)
The total amount of memory available in the JVM in
bytes, as reported by Runtime.getRuntime().totalMemory()
Should be close to total task
- https://www.mapr.com/blog/managing-monitoring-and-testing-mapreduce-jobs-how-work-counters
- https://www.mapr.com/blog/how-to-avoid-java-heap-space-errors-understanding-and-managing-task-attempt-memory
- http://stackoverflow.com/questions/18264735/virtual-core-vs-physical-core
- http://stackoverflow.com/questions/26522967/how-to-set-the-vcores-in-hadoop-mapreduce-yarn
- http://jason4zhu.blogspot.in/2014/10/vcore-configuration-in-hadoop.html
I am reading your post from the beginning, it was so interesting to read & I feel thanks to you for posting such a good blog, keep updates regularly.
ReplyDeleteHadoop Online Training
R Programming Online Training|
Data Science Online Training|
Hi,Your post on hadoop performance monitoring and testing was the best post and I understood the concepts very well and thanks for posting Hadoop Training in Velachery | Hadoop Training .
ReplyDeleteVery nice article post,Thank you for sharing this awesome blog.
ReplyDeletekeep updating more big data hadoop tutorials.
Big Data Online Training