#!/usr/bin/bash
#############################################################################
#  Copyright (C) 2013-2015 Lawrence Livermore National Security, LLC.
#  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
#  Written by Albert Chu <chu11@llnl.gov>
#  LLNL-CODE-644248
#
#  This file is part of Magpie, scripts for running Hadoop on
#  traditional HPC systems.  For details, see https://github.com/llnl/magpie.
#
#  Magpie is free software; you can redistribute it and/or modify it
#  under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  Magpie is distributed in the hope that it will be useful, but
#  WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#  General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with Magpie.  If not, see <http://www.gnu.org/licenses/>.
#############################################################################

# This script is the core terasort running script.  For the most part,
# it shouldn't be editted.  See job submission files for configuration
# details.

source ${MAGPIE_SCRIPTS_HOME}/magpie/lib/magpie-lib-hadoop-helper
source ${MAGPIE_SCRIPTS_HOME}/magpie/lib/magpie-lib-log
source ${MAGPIE_SCRIPTS_HOME}/magpie/lib/magpie-lib-paths

# This is a job, no loading export files or libs except for minimal convenience ones

terasortexamples="share/hadoop/mapreduce/hadoop-mapreduce-examples-${HADOOP_VERSION}.jar"

if [ "${HADOOP_TERASORT_SIZE}X" == "X" ]
then
    terasortsize=50000000
else
    terasortsize=$HADOOP_TERASORT_SIZE
fi

if [ "${HADOOP_FILESYSTEM_MODE}" == "rawnetworkfs" ]
then
    pathprefix="${HADOOP_RAWNETWORKFS_PATH}/"
fi

if [ "${HADOOP_TERASORT_CLEAR_CACHE}X" != "X" ]
then
    if [ "${HADOOP_TERASORT_CLEAR_CACHE}" == "yes" ]
    then
        clearcache="-Ddfs.datanode.drop.cache.behind.reads=true -Ddfs.datanode.drop.cache.behind.writes=true"
    else
        clearcache=""
    fi
else
    clearcache="-Ddfs.datanode.drop.cache.behind.reads=true -Ddfs.datanode.drop.cache.behind.writes=true"
fi

cd ${HADOOP_HOME}

#
# Remove previous runs if they are lingering
#

if ${hadoopcmdprefix}/hadoop fs -ls ${pathprefix} | grep -q terasort-teragen
then
    command="${hadoopcmdprefix}/hadoop fs -rm -r ${pathprefix}terasort-teragen"
    $command
fi

if ${hadoopcmdprefix}/hadoop fs -ls ${pathprefix} | grep -q terasort-sort
then
    command="${hadoopcmdprefix}/hadoop fs -rm -r ${pathprefix}terasort-sort"
    $command
fi

if ${hadoopcmdprefix}/hadoop fs -ls ${pathprefix} | grep -q terasort-checksum
then
    command="${hadoopcmdprefix}/hadoop fs -rm -r ${pathprefix}terasort-checksum"
    $command
fi

if ${hadoopcmdprefix}/hadoop fs -ls ${pathprefix} | grep -q terasort-validate
then
    command="${hadoopcmdprefix}/hadoop fs -rm -r ${pathprefix}terasort-validate"
    $command
fi

if [ "${HADOOP_TERAGEN_MAP_COUNT:-0}" -ne "0" ]
then
    mtasks=$HADOOP_TERAGEN_MAP_COUNT
else
    # Calculate the sensible maximum number of map tasks there should be
    # Want to add +1 mtasks, make each block just below blocksize
    # instead of make each task make blocksize + scratch data.
    terasortfullsize=`expr $terasortsize \* 100`

    if Magpie_hadoop_filesystem_mode_is_hdfs_type
    then
        defaultblocksize=`${hadoopcmdprefix}/hdfs getconf -confKey dfs.blocksize 2> /dev/null`
    elif [ "${HADOOP_FILESYSTEM_MODE}" == "rawnetworkfs" ]
    then
        defaultblocksize=`${hadoopcmdprefix}/hdfs getconf -confKey fs.local.block.size 2> /dev/null`
    else
        Magpie_output_internal_error "Illegal HADOOP_FILESYSTEM_MODE \"${HADOOP_FILESYSTEM_MODE}\" specified"
        exit 1
    fi

    mtasks=`expr $terasortfullsize \/ ${defaultblocksize}`
    mtasks=`expr $mtasks + 1`

    # But use the default map task count if it is smaller than this max.

    defaultmaptasks=`${hadoopcmdprefix}/hdfs getconf -confKey mapreduce.job.maps 2> /dev/null`

    if [ "${defaultmaptasks}" -lt "${mtasks}" ]
    then
        mtasks=${defaultmaptasks}
    fi

    mapreducemaptasks="-Dmapreduce.job.maps=${mtasks}"
fi

echo "*******************************************************"
echo "* Executing TeraGen"
echo "*******************************************************"

command="${hadoopcmdprefix}/hadoop jar ${terasortexamples} teragen ${extralibjars} ${mapreducemaptasks} ${clearcache} $terasortsize ${pathprefix}terasort-teragen"
echo "Running $command" >&2
$command

if [ "${HADOOP_TERASORT_RUN_TERACHECKSUM}" == "yes" ]
then
    sleep 30

    echo "*******************************************************"
    echo "* Executing TeraChecksum"
    echo "*******************************************************"

    # I assume no user is concerned with performance here, we just go with this fixed number
    sumreducetasks=`expr $HADOOP_WORKER_COUNT`

    # To my surprise, TeraChecksum isn't in the mapreduce examples
    # driver.  I wrote a patch for this and submitted it in
    # MAPREDUCE-6624.  If it isn't available in a future version of
    # Hadoop, we can just use teravalidate.

    ${hadoopcmdprefix}/hadoop jar ${terasortexamples} terachecksum 2>&1 | grep -q "Unknown program"
    if [ $? -eq 0 ]
    then
        whichchecksumcommand="teravalidate"
        echo "TeraChecksum not available, using TeraValidate instead"
    else
        whichchecksumcommand="terachecksum"
    fi

    mapreducereducetasks="-Dmapreduce.job.reduces=$sumreducetasks"

    command="${hadoopcmdprefix}/hadoop jar ${terasortexamples} ${whichchecksumcommand} ${extralibjars} ${mapreducereducetasks} ${pathprefix}terasort-teragen ${pathprefix}terasort-checksum"

    echo "Running $command" >&2
    $command

    if [ "${whichchecksumcommand}" == "teravalidate" ]
    then
        inputchecksum=`${hadoopcmdprefix}/hadoop fs -cat ${pathprefix}terasort-checksum/part-r-00000 | head -n 1 | awk '{print $2}'`
    else
        inputchecksum=`${hadoopcmdprefix}/hadoop fs -cat ${pathprefix}terasort-checksum/part-r-00000`
    fi
    echo "TeraChecksum Checksum: ${inputchecksum}"
fi

sleep 30

if [ "${HADOOP_TERASORT_REDUCER_COUNT:-0}" -ne "0" ]
then
    sortreducetasks=$HADOOP_TERASORT_REDUCER_COUNT
else
    sortreducetasks=`expr $HADOOP_WORKER_COUNT \* 2`
fi

if [ "${HADOOP_TERASORT_OUTPUT_REPLICATION}X" != "X" ]
then
    replicationcount="-Dmapreduce.terasort.output.replication=${HADOOP_TERASORT_OUTPUT_REPLICATION}"
fi

echo "*******************************************************"
echo "* Executing TeraSort"
echo "*******************************************************"

mapreducereducetasks="-Dmapreduce.job.reduces=$sortreducetasks"

command="${hadoopcmdprefix}/hadoop jar ${terasortexamples} terasort ${extralibjars} ${mapreducereducetasks} ${replicationcount} ${clearcache} ${pathprefix}terasort-teragen ${pathprefix}terasort-sort"

echo "Running $command" >&2
$command

if [ "${HADOOP_TERASORT_RUN_TERAVALIDATE}" == "yes" ]
then
    sleep 30

    echo "*******************************************************"
    echo "* Executing TeraValidate"
    echo "*******************************************************"

    # I assume no user is concerned with performance here, we just go with this fixed number
    validatereducetasks=`expr $HADOOP_WORKER_COUNT`

    mapreducereducetasks="-Dmapreduce.job.reduces=$validatereducetasks"

    command="${hadoopcmdprefix}/hadoop jar ${terasortexamples} teravalidate ${extralibjars} ${mapreducereducetasks} ${pathprefix}terasort-sort ${pathprefix}terasort-validate"

    echo "Running $command" >&2
    $command

    validateerrors=`${hadoopcmdprefix}/hadoop fs -cat ${pathprefix}terasort-validate/part-r-00000 | grep error | wc -l`

    if [ "${validateerrors}" != "0" ]
    then
        echo "TeraValidate Result: Errors in terasort output"
    else
        echo "TeraValidate Result: No errors in terasort output"
    fi

    outputchecksum=`${hadoopcmdprefix}/hadoop fs -cat ${pathprefix}terasort-validate/part-r-00000 | head -n 1 | awk '{print $2}'`
    echo "TeraValidate Checksum: ${outputchecksum}"
fi

if [ "${HADOOP_TERASORT_RUN_TERACHECKSUM}" == "yes" ] && [ "${HADOOP_TERASORT_RUN_TERAVALIDATE}" == "yes" ]
then
    echo "*******************************************************"
    echo "* Comparing Checksums"
    echo "*******************************************************"

    if [ "${whichchecksumcommand}" == "teravalidate" ]
    then
        inputchecksum=`${hadoopcmdprefix}/hadoop fs -cat ${pathprefix}terasort-checksum/part-r-00000 | head -n 1 | awk '{print $2}'`
    else
        inputchecksum=`${hadoopcmdprefix}/hadoop fs -cat ${pathprefix}terasort-checksum/part-r-00000`
    fi
    outputchecksum=`${hadoopcmdprefix}/hadoop fs -cat ${pathprefix}terasort-validate/part-r-00000 | head -n 1 | awk '{print $2}'`

    if [ "${inputchecksum}" == "${outputchecksum}" ]
    then
        echo "TeraSort Checksum Comparison: Input and output checksums match - ${inputchecksum}"
    else
        echo "TeraSort Checksum Comparison: Input checksum ${inputchecksum} != output checksum ${outputchecksum}"
    fi
fi

command="${hadoopcmdprefix}/hadoop fs -rm -r ${pathprefix}terasort-teragen"
$command
command="${hadoopcmdprefix}/hadoop fs -rm -r ${pathprefix}terasort-sort"
$command
if [ "${HADOOP_TERASORT_RUN_TERACHECKSUM}" == "yes" ]
then
    command="${hadoopcmdprefix}/hadoop fs -rm -r ${pathprefix}terasort-checksum"
    $command
fi
if [ "${HADOOP_TERASORT_RUN_TERAVALIDATE}" == "yes" ]
then
    command="${hadoopcmdprefix}/hadoop fs -rm -r ${pathprefix}terasort-validate"
    $command
fi

exit 0
