#!/usr/bin/bash
#############################################################################
#  Copyright (C) 2013-2015 Lawrence Livermore National Security, LLC.
#  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
#  Written by Albert Chu <chu11@llnl.gov>
#  LLNL-CODE-644248
#
#  This file is part of Magpie, scripts for running Hadoop on
#  traditional HPC systems.  For details, see https://github.com/llnl/magpie.
#
#  Magpie is free software; you can redistribute it and/or modify it
#  under the terms of the GNU General Public License as published by
#  the Free Software Foundation; either version 2 of the License, or
#  (at your option) any later version.
#
#  Magpie is distributed in the hope that it will be useful, but
#  WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#  General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with Magpie.  If not, see <http://www.gnu.org/licenses/>.
#############################################################################

# This script sets up configuration files for jobs.  For the most
# part, it shouldn't be editted.  See job submission files for
# configuration details.

if [ "${KAFKA_SETUP}" != "yes" ]
then
    exit 0
fi

source ${MAGPIE_SCRIPTS_HOME}/magpie/exports/magpie-exports-dirs
source ${MAGPIE_SCRIPTS_HOME}/magpie/lib/magpie-lib-defaults
source ${MAGPIE_SCRIPTS_HOME}/magpie/lib/magpie-lib-node-identification
source ${MAGPIE_SCRIPTS_HOME}/magpie/lib/magpie-lib-setup

# kafkanoderank set if succeed
if ! Magpie_am_I_a_kafka_node
then
    exit 0
fi

# For rest of setup, we will use cluster specific paths
Magpie_make_all_local_dirs_node_specific

if [ "${KAFKA_JAVA_OPTS}X" != "X" ]
then
    if ! echo ${KAFKA_JAVA_OPTS} | grep -q -E "java.io.tmpdir"
    then
        export KAFKA_JAVA_OPTS="${KAFKA_JAVA_OPTS} -Djava.io.tmpdir=${KAFKA_LOCAL_SCRATCHSPACE_DIR}/tmp"
    fi
fi
extrakafkajavaopts="${extrakafkajavaopts}${extrakafkajavaopts:+" "}-Djava.io.tmpdir=${KAFKA_LOCAL_SCRATCHSPACE_DIR}/tmp"

if [ ! -d "${KAFKA_LOCAL_SCRATCHSPACE_DIR}/tmp" ]
then
    mkdir -p ${KAFKA_LOCAL_SCRATCHSPACE_DIR}/tmp
    if [ $? -ne 0 ] ; then
        echo "mkdir failed making ${KAFKA_LOCAL_SCRATCHSPACE_DIR}/tmp"
        exit 1
    fi
fi

KAFKA_DATA_DIR=${KAFKA_LOCAL_SCRATCHSPACE_DIR}/tmp/kafka-logs

zookeepernodes=`cat ${ZOOKEEPER_CONF_DIR}/workers`

kafkazookeeperconnect=""
for zookeepernode in ${zookeepernodes}
do
    kafkazookeeperconnect="${kafkazookeeperconnect}${kafkazookeeperconnect:+","}${zookeepernode}:${default_zookeeper_client_port}"
done

if [ "${KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT}X" == "X" ]
then
    kafkazookeepertimeout=30000
else
    kafkazookeepertimeout=${KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT}
fi

#
# Get config files for setup
#

# Magpie_find_conffile will set the 'pre' filenames
Magpie_find_conffile "Kafka" ${KAFKA_CONF_FILES:-""} "kafka-env.sh" "pre_kafkaenvsh"
Magpie_find_conffile "Kafka" ${KAFKA_CONF_FILES:-""} "kafka.server.properties" "pre_kafkaproperties"
Magpie_find_conffile "Kafka" ${KAFKA_CONF_FILES:-""} "kafka.log4j.properties" "pre_log4jproperties"

post_kafkaenvsh=${KAFKA_CONF_DIR}/kafka-env.sh
post_kafkaproperties=${KAFKA_CONF_DIR}/server.properties
post_log4jproperties=${KAFKA_CONF_DIR}/log4j.properties

javahomesubst=`echo "${JAVA_HOME}" | sed "s/\\//\\\\\\\\\//g"`
kafkahomesubst=`echo "${KAFKA_HOME}" | sed "s/\\//\\\\\\\\\//g"`
kafkalogdirsubst=`echo "${KAFKA_LOG_DIR}" | sed "s/\\//\\\\\\\\\//g"`
kafkapiddirsubst=`echo "${KAFKA_PID_DIR}" | sed "s/\\//\\\\\\\\\//g"`
kafkaconfdirsubst=`echo "${KAFKA_CONF_DIR}" | sed "s/\\//\\\\\\\\\//g"`
kafkadatadirsubst=`echo "${KAFKA_DATA_DIR}" | sed "s/\\//\\\\\\\\\//g"`
kafkadlog4joptssubst=`echo "${KAFKA_LOG4J_OPTS}" | sed "s/\\//\\\\\\\\\//g"`

#
# Setup Kafka configuration files and environment files
#
cp ${pre_kafkaenvsh} ${post_kafkaenvsh}

sed -i \
    -e "s/KAFKA_JAVA_HOME/${javahomesubst}/g" \
    -e "s/KAFKALOGDIR/${kafkalogdirsubst}/g" \
    -e "s/KAFKAPIDDIR/${kafkapiddirsubst}/g" \
    -e "s/KAFKACONFDIR/${kafkaconfdirsubst}/g" \
    -e "s/KAFKALOG4JOPTS/${kafkalog4jopts}/g" \
    ${post_kafkaenvsh}

if [ "${MAGPIE_REMOTE_CMD:-ssh}" != "ssh" ]
then
    echo "export KAFKA_SSH_CMD=\"${MAGPIE_REMOTE_CMD}\"" >> ${post_kafkaenvsh}
fi
if [ "${MAGPIE_REMOTE_CMD_OPTS}X" != "X" ]
then
    echo "export KAFKA_SSH_OPTS=\"${MAGPIE_REMOTE_CMD_OPTS}\"" >> ${post_kafkaenvsh}
fi

cp ${pre_kafkaproperties} ${post_kafkaproperties}

sed -i \
    -e "s/KAFKABROKERID/${kafkanoderank}/g" \
    -e "s/KAFKAPORT/${default_kafka_port}/g" \
    -e "s/KAFKALOGDIRS/${kafkadatadirsubst}/g" \
    -e "s/KAFKAZOOKEEPERCNT/${kafkazookeeperconnect}/g" \
    -e "s/KAFKAZOOKEEPERTIMEOUT/${kafkazookeepertimeout}/g" \
    ${post_kafkaproperties}

cp ${pre_log4jproperties} ${post_log4jproperties}

exit 0
