#!/usr/bin/bash
#############################################################################
#  Copyright (C) 2013-2015 Lawrence Livermore National Security, LLC.
#  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
#  Written by Albert Chu <chu11@llnl.gov>
#  LLNL-CODE-644248
#
#  This file is part of Magpie, scripts for running Hadoop on
#  traditional HPC systems.  For details, see https://github.com/llnl/magpie.
#
#  Magpie is free software; you can redistribute it and/or modify it
#  under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  Magpie is distributed in the hope that it will be useful, but
#  WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#  General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with Magpie.  If not, see <http://www.gnu.org/licenses/>.
#############################################################################

# This script is the core stormwordcount running script.  For the most part,
# it shouldn't be editted.  See job submission files for configuration
# details.

source ${MAGPIE_SCRIPTS_HOME}/magpie/lib/magpie-lib-paths

# This is a job, no loading export files or libs except for minimal convenience ones

if echo ${STORM_VERSION} | grep -q -E "1\.[1-2]\.[0-9]"
then
    stormstarterjarpath="${STORM_HOME}/examples/storm-starter/target/storm-starter-${STORM_VERSION}.jar"
else
    stormstarterjarpath="${STORM_HOME}/examples/storm-starter/storm-starter-topologies-${STORM_VERSION}.jar"
fi

if echo ${STORM_VERSION} | grep -q -E "1\.[0-9]\.[0-9]"
then
    stormwordcountclass="org.apache.storm.starter.WordCountTopology"
else
    stormwordcountclass="storm.starter.WordCountTopology"
fi

cd ${STORM_HOME}

command="bin/storm list"
stormlistchecking=`$command`

if echo ${stormlistchecking} | grep "WordCount" | grep -q "ACTIVE"
then
    echo "*** Removing earlier run of WordCount"
    command="bin/storm kill WordCount"
    $command
    echo "Sleeping a bit to let WordCount be killed"
    sleep 60
fi

echo "*** Submitting WordCount to Storm"
if echo ${STORM_VERSION} | grep -q -E "1\.[0-9]\.[0-9]"
then
    command="bin/storm jar ${stormstarterjarpath} ${stormwordcountclass} WordCount -c ${stormwordcountclass} WordCount -c nimbus.seeds=[\"${STORM_NIMBUS_HOST}\"]"
else
    command="bin/storm jar ${stormstarterjarpath} ${stormwordcountclass} WordCount -c ${stormwordcountclass} WordCount -c nimbus.host=${STORM_NIMBUS_HOST}"
fi
echo "Running $command" >&2
$command

echo "*** Sleeping a bit to let WordCount run a bit"
sleep 30

echo "*** Checking that Storm WordCount is running"
command="${stormcmdprefix}/storm list"
echo "Running $command" >&2
stormlistrunning=`$command`
echo ${stormlistrunning}

stormkilled=0
if echo ${stormlistrunning} | grep "WordCount" | grep -q "ACTIVE"
then
    echo "*** Storm appears to be executing WordCount properly"

    echo "*** Killing Storm Wordcount "
    command="${stormcmdprefix}/storm kill WordCount"
    echo "Running $command" >&2
    $command

    # Waiting for 2.5 minutes max
    for i in `seq 1 5`
    do
        echo "*** Sleeping a bit to let WordCount be killed"
        sleep 30

        echo "*** Checking that Storm WordCount is no longer running"
        command="${stormcmdprefix}/storm list"
        echo "Running $command" >&2
        stormlistkilled=`$command`

        echo ${stormlistkilled}

        if ! echo ${stormlistkilled} | grep -q "WordCount"
        then
            echo "*** WordCount no longer active, appears to have been killed correctly."
            stormkilled=1
            break
        else
            echo "*** WordCount was not killed ... wait some more."
        fi
    done

    if [ "${stormkilled}" != "1" ]
    then
        echo "*** WordCount was not killed yet, giving up."
    fi
else
    echo "*** Storm appears to have failed to launch WordCount"
fi

exit 0
